Commit dc20015c authored by sfritschi's avatar sfritschi
Browse files

Added examples for netgen parallelization

parent e5559ad5
import sys
sys.path.append('../')
import matplotlib.pyplot as plt
from netflow import *
import multiprocessing as mp
def main():
basenet = netflow.load_network_from('../netflow/network/network.h5')
print("Throats: %d" % len(basenet.throats))
target = [1, 1, 1]
cutoff = 0.5 * max([basenet.ub[i] - basenet.lb[i] \
for i in range(len(basenet.ub))])
dendro = netflow.generate_dendrogram(basenet=basenet, targetsize=target, \
cutoff=cutoff, sd=42, mute=False)
_ = netgen.plot_network(dendro)
plt.savefig('plots/dendro.png')
if __name__ == '__main__':
main()
import time
import random
import multiprocessing as mp
class Dummy:
def __init__(self, n):
self.values = [i for i in range(n)]
def producer(dummys, queue):
random.seed(0)
# Put dummys in queue
for dummy in dummys:
# Do work
time.sleep(random.randint(0, 5))
queue.put(dummy)
def consumer(tid, queue):
random.seed(tid)
# Infinite loop
while True:
# Do work
time.sleep(random.randint(0, 5))
# Get dummy object
dummy = queue.get()
# Debug
if (tid == 1):
print(dummy.values)
# Modify dummy
dummy.values.pop()
def main():
queue = mp.Queue()
n = 10
dummys = [Dummy(4) for _ in range(n)]
prod = mp.Process(target=producer, args=(dummys, queue))
prod.start()
nconsumer = 3
consumers = []
for tid in range(1, nconsumer + 1):
p = mp.Process(target=consumer, args=(tid, queue), daemon=True)
p.start()
consumers.append(p)
prod.join()
prod.close()
# Verify results
for dummy in dummys:
assert(len(dummy.values) == 3)
if __name__ == '__main__':
main()
import sys
sys.path.append('../')
from netflow import *
import multiprocessing as mp
import time
def func(l, tid, nlocal, p_sums):
start = tid * nlocal
end = start + nlocal
p_sum = 0
for i in range(start, end):
p_sum += l[i]
p_sums[tid] = p_sum
def sum_serial(l):
s = 0
for i in range(len(l)):
s += l[i]
return s
def main():
with mp.Manager() as manager:
n = 100000
#l = manager.list([i for i in range(n)])
l = [i for i in range(n)]
nthreads = 4
assert(n % nthreads == 0)
nlocal = n // nthreads # assume n divides nthreads
# typecode: 'L' -> unsigned 64bit integer
p_sums = mp.RawArray('L', nthreads)
jobs = []
start = time.time()
for tid in range(nthreads):
proc = mp.Process(target=func, args=(l, tid, nlocal, p_sums))
proc.start()
jobs.append(proc)
for j in jobs:
j.join()
result = 0
for tid in range(nthreads):
result += p_sums[tid]
end = time.time()
elapsed = end - start
print("Result: %d, Time: %e s" % (result, elapsed))
start = time.time()
result_serial = sum_serial(l)
end = time.time()
elapsed_serial = end - start
print("Result serial: %d, Time serial: %e s" % (result_serial, elapsed_serial))
print("Speedup: %e" % (elapsed_serial / elapsed))
if __name__ == '__main__':
main()
......@@ -507,99 +507,98 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
del(neighborhood[outsider])
return nnbrs
"""
PARALELLIZATION START
"""
# throats, connect pores
if (not mute): print("\b"*21 + "connecting")
throats = []
k = 0 # count of unrealised throats
nnbrs0 = 0; nnbrs = 0; # (initial) maximum number of neighbors
for i, pore in enumerate(pores[:n]):
# (re)run triangulation if maximal number of neighbors exceeds
# threshold given by 2x init. maximum (large nnbrs -> expensive search)
if ((i == 0) or (nnbrs > 2*nnbrs0)):
if (not mute): print(" triangulation", end="", flush=True)
import scipy.spatial.qhull as qh
try:
neighborhood = triangulation(pores)
nnbrs = max([len(nl) for nl in neighborhood.values()])
except qh.QhullError as err:
print("Error: triangulation failed,", err)
if (i == 0): raise
if (not mute): print("\b"*14 + " "*14 + "\b"*14, end="", flush=True)
# check throats and neighborhood
if (not mute):
print("\b"*24 + f"pore {i:7d}" + " "*12, end="", flush=True)
if (len(pore.throats) == 0): continue # no further connection needed
if neighborhood == {}: break # no pores left to connect to
if (i == 0): nnbrs0 = nnbrs
# lists with front and candidate pores
front = neighborhood[pore]
front = dict(zip(front,
[distance(pore.pos, nbor.pos) for nbor in front]))
candidates = {p:l for p, l in front.items()} # copy front dict
conpores = set() # pores that pore is connected to
# establish throat connections between pore and best candidate
while (len(pore.throats) > 0):
def connect_pores(pores, mute):
if (not mute): print("\b"*21 + "connecting")
throats = []
n_unrealized = 0 # count of unrealised throats
nnbrs0 = 0; nnbrs = 0; # (initial) maximum number of neighbors
for i, pore in enumerate(pores[:n]):
# (re)run triangulation if maximal number of neighbors exceeds
# threshold given by 2x init. maximum (large nnbrs -> expensive search)
if ((i == 0) or (nnbrs > 2*nnbrs0)):
if (not mute): print(" triangulation", end="", flush=True)
import scipy.spatial.qhull as qh
try:
neighborhood = triangulation(pores)
nnbrs = max([len(nl) for nl in neighborhood.values()])
except qh.QhullError as err:
print("Error: triangulation failed,", err)
if (i == 0): raise
if (not mute): print("\b"*14 + " "*14 + "\b"*14, end="", flush=True)
# check throats and neighborhood
if (not mute):
print("\b"*12 + ", throat {0:3d}".format(len(pore.throats)),
end="", flush=True)
throat = pore.throats.pop()
lt = distance(*throat_ends(throat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])) # target len.
# gather candidates
while (len(front) > 0):
if (min(front.values()) >= lt): break
# grow search region with candidates
newfront = {}
for frontpore in front:
for nbor in neighborhood[frontpore]:
if (nbor == pore): continue # no self connections
l = distance(pore.pos, nbor.pos)
# distinguish between new front-/candidate-pores
if (not nbor in candidates): newfront[nbor] = l
candidates[nbor] = l
front = {p:l for p, l in newfront.items()} # copy
# find best candidate
# avoid double connections and throats longer than Lmax
ranking = [(l,p) for p, l in candidates.items() \
if (not p in conpores) and (l < basenet.Lmax)]
if (len(ranking) == 0):
k = k+1; continue # no candidates left, give up
ranking.sort(key=lambda lp: abs(lp[0]-lt))
nbor = ranking[0][1] # pore
nborc = nbor # memorize potential periodic copy
# find original of buffer layer pore
if (nbor.label != LABELS[0]):
j, lbl = nbor.label.split(' ',1)
nbor = pores[int(j)]
else:
lbl = LABELS[0]
# connect pore to nbor
throats.append([pore, nbor, lbl, throat.r])
conpores.add(nborc) # update list of connected pores
# remove most similar throat of nbor
lt = ranking[0][0] # actual throat length
ranking = [(distance(*throat_ends(nthroat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])),
nthroat) for nthroat in nbor.throats]
ranking.sort(key=lambda lth: abs(lth[0]-lt))
nbor.throats.remove(ranking[0][1])
# remove fully connected nbor from search neighborhood
if (len(nbor.throats) == 0):
nnbrs = expel(pore, nbor, neighborhood, front, candidates, nnbrs)
for cpore in copies[nbor]:
nnbrs = expel(pore, cpore, neighborhood, front, candidates, nnbrs)
# remove fully connected pore from search neighborhood
nnbrs = expel(pore, pore, neighborhood, front, candidates, nnbrs)
for cpore in copies[pore]:
nnbrs = expel(pore, cpore, neighborhood, front, candidates, nnbrs)
"""
PARALELLIZATION END
"""
print("\b"*24 + f"pore {i:7d}" + " "*12, end="", flush=True)
if (len(pore.throats) == 0): continue # no further connection needed
if neighborhood == {}: break # no pores left to connect to
if (i == 0): nnbrs0 = nnbrs
# lists with front and candidate pores
front = neighborhood[pore]
front = dict(zip(front,
[distance(pore.pos, nbor.pos) for nbor in front]))
candidates = {p:l for p, l in front.items()} # copy front dict
conpores = set() # pores that pore is connected to
# establish throat connections between pore and best candidate
while (len(pore.throats) > 0):
if (not mute):
print("\b"*12 + ", throat {0:3d}".format(len(pore.throats)),
end="", flush=True)
throat = pore.throats.pop()
lt = distance(*throat_ends(throat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])) # target len.
# gather candidates
while (len(front) > 0):
if (min(front.values()) >= lt): break
# grow search region with candidates
newfront = {}
for frontpore in front:
for nbor in neighborhood[frontpore]:
if (nbor == pore): continue # no self connections
l = distance(pore.pos, nbor.pos)
# distinguish between new front-/candidate-pores
if (not nbor in candidates): newfront[nbor] = l
candidates[nbor] = l
front = {p:l for p, l in newfront.items()} # copy
# find best candidate
# avoid double connections and throats longer than Lmax
ranking = [(l,p) for p, l in candidates.items() \
if (not p in conpores) and (l < basenet.Lmax)]
if (len(ranking) == 0):
n_unrealized += 1; continue # no candidates left, give up
ranking.sort(key=lambda lp: abs(lp[0]-lt))
nbor = ranking[0][1] # pore
nborc = nbor # memorize potential periodic copy
# find original of buffer layer pore
if (nbor.label != LABELS[0]):
j, lbl = nbor.label.split(' ',1)
nbor = pores[int(j)]
else:
lbl = LABELS[0]
# connect pore to nbor
throats.append([pore, nbor, lbl, throat.r])
conpores.add(nborc) # update list of connected pores
# remove most similar throat of nbor
lt = ranking[0][0] # actual throat length
ranking = [(distance(*throat_ends(nthroat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])),
nthroat) for nthroat in nbor.throats]
ranking.sort(key=lambda lth: abs(lth[0]-lt))
nbor.throats.remove(ranking[0][1])
# remove fully connected nbor from search neighborhood
if (len(nbor.throats) == 0):
nnbrs = expel(pore, nbor, neighborhood, front, candidates, nnbrs)
for cpore in copies[nbor]:
nnbrs = expel(pore, cpore, neighborhood, front, candidates, nnbrs)
# remove fully connected pore from search neighborhood
nnbrs = expel(pore, pore, neighborhood, front, candidates, nnbrs)
for cpore in copies[pore]:
nnbrs = expel(pore, cpore, neighborhood, front, candidates, nnbrs)
return (throats, n_unrealized)
throats, n_unrealized = connect_pores(pores, mute)
print("\b"*24 + "{0:d} throats in total, {1:d} unrealised".\
format(len(throats), k))
format(len(throats), n_unrealized))
# assemble and return network
network = Network(lb=[0.0 for k in range(d)],
......
from mpi4py import MPI
import sys
sys.path.append("../")
sys.path.append('../')
from netflow import *
from math import sqrt
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment