Commit 81827b35 authored by sfritschi's avatar sfritschi
Browse files

Parallelized finding candidates; less communication

parent 96fcecc8
......@@ -193,27 +193,18 @@ class CellList:
if payload is None:
break # Terminate daemonic process
conpores = payload[0]
poreIdx = payload[1]
neighbors = payload[2]
lt = payload[3]
poreIdx = payload[0]
neighbors = payload[1]
porePos = self.fetch_pos(poreIdx)
bestIdx = -1
bestLen = self.Lmax
bestRank = self.Lmax
candidates = {}
for nborIdx in neighbors:
if (nborIdx in conpores or nborIdx == poreIdx):
continue
if (nborIdx == poreIdx): continue
l = distance(porePos, self.fetch_pos(nborIdx))
rank = abs(lt - l)
if (l < self.Lmax and rank < bestRank):
bestIdx = nborIdx
bestLen = l
bestRank = rank
if (l < self.Lmax):
candidates[nborIdx] = l
# Put result in out-queue
out_q.put((bestIdx, bestLen, bestRank))
out_q.put(candidates)
# Only consider subset of all neighboring cells (closest 7)
def nbor_indices(self, poreIdx: int) -> List[int]:
......@@ -224,6 +215,7 @@ class CellList:
cellIdx[1] + i * (n == 1), \
cellIdx[2] + i * (n == 2))
for n in range(self.dim) for i in [-1, 1]]
return neighborhood
def nbor_cell_counts(self, neighborhood: List[int]) -> List[int]:
......@@ -722,15 +714,17 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
throats = []
k = 0 # count of unrealised throats
# DEBUG
nthreads = 4
# Spawn daemonic worker threads
in_q = mp.SimpleQueue()
out_q = mp.SimpleQueue()
workers = []
for _ in range(nthreads):
worker = mp.Process(target=cellList.par_find_candidates, args=(in_q, out_q))
worker.daemon = True
worker.start()
workers.append(worker)
avg_throat_diff = 0.
count = 0
......@@ -743,71 +737,23 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
# TODO: Search neighboring cells in parallel
#candidates = cellList.find_candidates(poreIdx)
neighborhood = cellList.nbor_indices(poreIdx)
for nborCellIdx in neighborhood:
in_q.put((poreIdx, cellList.poresSorted[nborCellIdx]))
candidates = {}
for _ in range(len(neighborhood)):
cellCandidates = out_q.get()
# Add cell candidates to global candidates
candidates = {**candidates, **cellCandidates}
# establish throat connections between pore and best candidate
# TODO: Perform this for all throats simultaneously
conpores = set()
#conpores = set()
while (len(pore.throats) > 0):
#if (not mute):
# print("\b"*12 + ", throat {0:3d}".format(len(pore.throats)),
# end="", flush=True)
throat = pore.throats.pop()
lt = distance(*throat_ends(throat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])) # target len.
# Distribute cell indices of neighbor
for nborCellIdx in neighborhood:
in_q.put((conpores, poreIdx, cellList.poresSorted[nborCellIdx], lt))
# Wait for results to arrive and compute best candidate
bestIdx = -1
bestLen = basenet.Lmax
bestRank = basenet.Lmax
for _ in range(len(neighborhood)):
results = out_q.get()
nborIdx = results[0]
nborLen = results[1]
nborRank = results[2]
if (nborRank < bestRank):
bestIdx = nborIdx
bestLen = nborLen
bestRank = nborRank
#print("%d: %e" % (bestIdx, bestRank))
# find best candidate
if (bestIdx == -1):
k = k+1; continue # no candidates left, give up
nbor = pores[bestIdx]
avg_throat_diff += abs(lt - bestLen)
count += 1
lt = bestLen
# Update connected pores
conpores.add(bestIdx)
nborc = nbor # memorize potential periodic copy
# find original of buffer layer pore
if (nbor.label != LABELS[0]):
j, lbl = nbor.label.split(' ',1)
nbor = pores[int(j)]
else:
lbl = LABELS[0]
# connect pore to nbor
throats.append([pore, nbor, lbl, throat.r])
# remove most similar throat of nbor
ranking = [(distance(*throat_ends(nthroat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])),
nthroat) for nthroat in nbor.throats]
_, nthroat = min(ranking, key=lambda lth: abs(lth[0] - lt))
nbor.throats.remove(nthroat)
# remove fully connected nbor from search neighborhood
if (len(nbor.throats) == 0):
cellList.expel(nbor)
for cpore in copies[nbor]:
cellList.expel(cpore)
"""
if (len(candidates) == 0):
k = k+1; continue # no candidates left
nborIdx, ltc = min(candidates.items(), key=lambda pl: abs(pl[1] - lt))
......@@ -841,7 +787,6 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
for cpore in copies[nbor]:
cellList.expel(cpore)
"""
# remove fully connected pore from search neighborhood
cellList.expel(pore)
......@@ -851,7 +796,9 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
# Terminate worker threads
for _ in range(nthreads):
in_q.put(None)
for worker in workers:
worker.join()
percent = k / (k + len(throats)) * 100.
print("\b"*24 + "{0:d} throats in total, {1:d} unrealised ({2:.1f}%)".\
format(len(throats), k, percent))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment