Commit 9b2b4264 authored by sfritschi's avatar sfritschi
Browse files

Updated thesis; Tested producer-consumer pattern; rename folder

parent dc20015c
......@@ -4,20 +4,77 @@ import matplotlib.pyplot as plt
from netflow import *
import time
import random
import multiprocessing as mp
def process_throat(throat_queue_in, pore_queue_out):
while True:
# Retrieve throat when ready (blocking)
throat = throat_queue_in.get()
# do work
time.sleep(random.random() * 1e-5)
# put result in queue
pore_queue_out.put(throat.pore1)
def process_throats_parallel(network):
n = len(network.pores)
throat_queue_in = mp.Queue() # Throats to be processed
pore_queue_out = mp.Queue() # Throats to remove from network
# Spawn worker threads (only once)
nworkers = 3
workers = []
for tid in range(nworkers):
worker = mp.Process(target=process_throat, args=(throat_queue_in, pore_queue_out), daemon=True)
worker.start()
workers.append(worker)
for i, pore in enumerate(network.pores):
print("Processing pore %d/%d" % (i+1, n))
nresults = len(pore.throats)
while (len(pore.throats) > 0):
throat = pore.throats.pop()
# Put in queue
throat_queue_in.put(throat)
# do work
time.sleep(random.random() * 1e-6)
# Receive results from processes
while (nresults > 0):
pore = pore_queue_out.get()
nresults -= 1
# do work
time.sleep(random.random() * 1e-6)
def main():
basenet = netflow.load_network_from('../netflow/network/network.h5')
print("Network statistics:")
print("Throats: %d" % len(basenet.throats))
print("Max. number of throats per pore: %d" % max([len(pore.throats) for pore in basenet.pores]))
print("Min. number of throats per pore: %d" % min([len(pore.throats) for pore in basenet.pores]))
print("Avg. number of throats per pore: %f" % (sum([len(pore.throats) for pore in basenet.pores]) / len(basenet.pores)))
# Modify recursion limit to allow mp.Queue() to pickle recursive
# Throat object
"""
sys.setrecursionlimit(8000)
process_throats_parallel(basenet)
# Verify results
for pore in basenet.pores:
assert(len(pore.throats) == 0)
"""
target = [1, 1, 1]
cutoff = 0.5 * max([basenet.ub[i] - basenet.lb[i] \
for i in range(len(basenet.ub))])
dendro = netflow.generate_dendrogram(basenet=basenet, targetsize=target, \
cutoff=cutoff, sd=42, mute=False)
cutoff=cutoff, sd=42, mute=True)
_ = netgen.plot_network(dendro)
plt.savefig('plots/dendro.png')
if __name__ == '__main__':
main()
gen/plots/dendro.png

143 KB | W: | H:

gen/plots/dendro.png

142 KB | W: | H:

gen/plots/dendro.png
gen/plots/dendro.png
gen/plots/dendro.png
gen/plots/dendro.png
  • 2-up
  • Swipe
  • Onion skin
......@@ -33,7 +33,7 @@ def main():
n = 10
dummys = [Dummy(4) for _ in range(n)]
prod = mp.Process(target=producer, args=(dummys, queue))
prod.start()
......
......@@ -8,6 +8,8 @@ from typing import List, Dict, Set, Tuple # for type hints in argument lists
LABELS = ('', 'in', 'out', 'cut') # don't change the order
import multiprocessing as mp
class Pore:
"""Class of a pore connected to other pores via throats."""
id = 0
......@@ -22,6 +24,7 @@ class Pore:
self.throats = throats # throats set
self.r = r # radius
self.label = label # label string like '', 'in', or 'out'
#self.is_connected = mp.Value('i', 0)
def __repr__(self): return str(self.__class__) + ': ' + \
str({k:self.__dict__[k] for k in self.__dict__ if k != 'throats'}) + \
' {0:d} throats'.format(len(self.throats)) # dont flush throat objects
......@@ -506,97 +509,133 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
nnbrs = max(nnbrs, len(neighborhood[nbor])) # update max. number of neighbors
del(neighborhood[outsider])
return nnbrs
# throats, connect pores
def connect_pores(pores, mute):
if (not mute): print("\b"*21 + "connecting")
throats = []
n_unrealized = 0 # count of unrealised throats
nnbrs0 = 0; nnbrs = 0; # (initial) maximum number of neighbors
for i, pore in enumerate(pores[:n]):
# (re)run triangulation if maximal number of neighbors exceeds
# threshold given by 2x init. maximum (large nnbrs -> expensive search)
if ((i == 0) or (nnbrs > 2*nnbrs0)):
if (not mute): print(" triangulation", end="", flush=True)
import scipy.spatial.qhull as qh
try:
neighborhood = triangulation(pores)
nnbrs = max([len(nl) for nl in neighborhood.values()])
except qh.QhullError as err:
print("Error: triangulation failed,", err)
if (i == 0): raise
if (not mute): print("\b"*14 + " "*14 + "\b"*14, end="", flush=True)
# check throats and neighborhood
if (not mute):
print("\b"*24 + f"pore {i:7d}" + " "*12, end="", flush=True)
if (len(pore.throats) == 0): continue # no further connection needed
if neighborhood == {}: break # no pores left to connect to
if (i == 0): nnbrs0 = nnbrs
# lists with front and candidate pores
front = neighborhood[pore]
front = dict(zip(front,
[distance(pore.pos, nbor.pos) for nbor in front]))
candidates = {p:l for p, l in front.items()} # copy front dict
conpores = set() # pores that pore is connected to
# establish throat connections between pore and best candidate
while (len(pore.throats) > 0):
if (not mute):
print("\b"*12 + ", throat {0:3d}".format(len(pore.throats)),
end="", flush=True)
throat = pore.throats.pop()
lt = distance(*throat_ends(throat,
# Worker/consumer function
def process_throat(input_queue, results_queue):
# Infinite work-loop
while True:
# Receive throat (blocking)
input_ = input_queue.get()
throat = input_[0]
front = input_[1]
neighborhood = input_[2]
candidates = input_[3]
lt = distance(*throat_ends(throat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])) # target len.
# gather candidates
while (len(front) > 0):
if (min(front.values()) >= lt): break
# grow search region with candidates
newfront = {}
for frontpore in front:
for nbor in neighborhood[frontpore]:
if (nbor == pore): continue # no self connections
l = distance(pore.pos, nbor.pos)
# distinguish between new front-/candidate-pores
if (not nbor in candidates): newfront[nbor] = l
candidates[nbor] = l
front = {p:l for p, l in newfront.items()} # copy
# find best candidate
# avoid double connections and throats longer than Lmax
ranking = [(l,p) for p, l in candidates.items() \
if (not p in conpores) and (l < basenet.Lmax)]
if (len(ranking) == 0):
n_unrealized += 1; continue # no candidates left, give up
ranking.sort(key=lambda lp: abs(lp[0]-lt))
nbor = ranking[0][1] # pore
nborc = nbor # memorize potential periodic copy
# find original of buffer layer pore
if (nbor.label != LABELS[0]):
j, lbl = nbor.label.split(' ',1)
nbor = pores[int(j)]
else:
lbl = LABELS[0]
# connect pore to nbor
throats.append([pore, nbor, lbl, throat.r])
conpores.add(nborc) # update list of connected pores
# remove most similar throat of nbor
lt = ranking[0][0] # actual throat length
ranking = [(distance(*throat_ends(nthroat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])),
nthroat) for nthroat in nbor.throats]
ranking.sort(key=lambda lth: abs(lth[0]-lt))
nbor.throats.remove(ranking[0][1])
# remove fully connected nbor from search neighborhood
if (len(nbor.throats) == 0):
nnbrs = expel(pore, nbor, neighborhood, front, candidates, nnbrs)
for cpore in copies[nbor]:
nnbrs = expel(pore, cpore, neighborhood, front, candidates, nnbrs)
# remove fully connected pore from search neighborhood
nnbrs = expel(pore, pore, neighborhood, front, candidates, nnbrs)
for cpore in copies[pore]:
nnbrs = expel(pore, cpore, neighborhood, front, candidates, nnbrs)
return (throats, n_unrealized)
# gather candidates
while (len(front) > 0):
if (min(front.values()) >= lt): break
# grow search region with candidates
newfront = {}
for frontpore in front:
for nbor in neighborhood[frontpore]:
if (nbor == pore): continue # no self connections
l = distance(pore.pos, nbor.pos)
# distinguish between new front-/candidate-pores
if (not nbor in candidates): newfront[nbor] = l
candidates[nbor] = l
front = {p:l for p, l in newfront.items()} # copy
# find best candidate
# avoid double connections and throats longer than Lmax
ranking = [(l,p) for p, l in candidates.items() \
if (True) and (l < basenet.Lmax)]
if (len(ranking) == 0):
continue # no candidates left, give up
ranking.sort(key=lambda lp: abs(lp[0]-lt))
nbor = ranking[0][1] # pore
nborc = nbor # memorize potential periodic copy
# find original of buffer layer pore
if (nbor.label != LABELS[0]):
j, lbl = nbor.label.split(' ',1)
nbor = pores[int(j)]
else:
lbl = LABELS[0]
# remove most similar throat of nbor
lt = ranking[0][0] # actual throat length
ranking = [(distance(*throat_ends(nthroat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])),
nthroat) for nthroat in nbor.throats]
ranking.sort(key=lambda lth: abs(lth[0]-lt))
# Put results in queue
results_queue.put([nbor, lbl, throat.r, ranking[0][1]])
# throats, connect pores
if (not mute): print("\b"*21 + "connecting")
throats = []
n_unrealized = 0 # count of unrealised throats
nnbrs0 = 0; nnbrs = 0; # (initial) maximum number of neighbors
# Spawn processes to process individual throats
import sys
# Required for pickling recursive Throat object
sys.setrecursionlimit(8000)
import multiprocessing as mp
throats, n_unrealized = connect_pores(pores, mute)
input_queue = mp.Queue() # Queue for sending throats to workers
results_queue = mp.Queue() # Queue for sending results back
nworkers = 4
for _ in range(nworkers):
proc = mp.Process(target=process_throat, args=(input_queue, results_queue), daemon=True)
proc.start()
for i, pore in enumerate(pores[:n]):
print("Processing pore: %d/%d" % (i, n+1), end="\r", flush=True)
# (re)run triangulation if maximal number of neighbors exceeds
# threshold given by 2x init. maximum (large nnbrs -> expensive search)
if ((i == 0) or (nnbrs > 2*nnbrs0)):
if (not mute): print(" triangulation", end="", flush=True)
import scipy.spatial.qhull as qh
try:
neighborhood = triangulation(pores)
nnbrs = max([len(nl) for nl in neighborhood.values()])
except qh.QhullError as err:
print("Error: triangulation failed,", err)
if (i == 0): raise
if (not mute): print("\b"*14 + " "*14 + "\b"*14, end="", flush=True)
# check throats and neighborhood
if (not mute):
print("\b"*24 + f"pore {i:7d}" + " "*12, end="", flush=True)
if (len(pore.throats) == 0): continue # no further connection needed
if neighborhood == {}: break # no pores left to connect to
if (i == 0): nnbrs0 = nnbrs
# lists with front and candidate pores
front = neighborhood[pore]
front = dict(zip(front,
[distance(pore.pos, nbor.pos) for nbor in front]))
candidates = {p:l for p, l in front.items()} # copy front dict
#conpores = set() # pores that pore is connected to
# establish throat connections between pore and best candidate
nthroats = len(pore.throats)
while (len(pore.throats) > 0):
if (not mute):
print("\b"*12 + ", throat {0:3d}".format(len(pore.throats)),
end="", flush=True)
throat = pore.throats.pop()
# Put throat in queue to be processed by workers
input_queue.put([throat, front, neighborhood, candidates])
for _ in range(nthroats):
# Fetch results (blocking)
results = results_queue.get()
nbor = results[0]
lbl = results[1]
throat_r = results[2]
nbor_throat = results[3]
# connect pore to nbor
throats.append([pore, nbor, lbl, throat_r])
#conpores.add(nborc) # update list of connected pores
nbor.throats.remove(nbor_throat)
# remove fully connected nbor from search neighborhood
if (len(nbor.throats) == 0):
nnbrs = expel(pore, nbor, neighborhood, front, candidates, nnbrs)
for cpore in copies[nbor]:
nnbrs = expel(pore, cpore, neighborhood, front, candidates, nnbrs)
# remove fully connected pore from search neighborhood
nnbrs = expel(pore, pore, neighborhood, front, candidates, nnbrs)
for cpore in copies[pore]:
nnbrs = expel(pore, cpore, neighborhood, front, candidates, nnbrs)
print("\b"*24 + "{0:d} throats in total, {1:d} unrealised".\
format(len(throats), n_unrealized))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment