Commit 8409818c authored by sfritschi's avatar sfritschi
Browse files

Added correct producer-consumer parallel processing

parent 9e5f5e49
......@@ -15,7 +15,7 @@ def main():
print("Min. number of throats per pore: %d" % min(pore_throat_counts))
print("Avg. number of throats per pore: %f" % (sum(pore_throat_counts) / len(pore_throat_counts)))
target = [5, 5, 5]
target = [3, 3, 3]
cutoff = 0.5 * max([basenet.ub[i] - basenet.lb[i] \
for i in range(len(basenet.ub))])
......
......@@ -102,58 +102,6 @@ class Network:
pore1.throats.add(throat); pore2.throats.add(throat)
return throat
class Ranking:
"""Ranking class maintaining ordered list of best candidates.
"""
def __init__(self, referenceVals: List[float]):
self.referenceVals = referenceVals
self.size = len(referenceVals)
self.rankings = [[] for _ in range(self.size)]
# Insert new candidate in appropriate place(s) of ranking
def insert(self, value: float, index: int):
for i in range(self.size):
ranking = self.rankings[i]
ref = self.referenceVals[i]
diff = abs(value - ref)
for j in range(len(ranking) + 1):
# Check if no match found
if j == len(ranking):
# Check if there's still space left
if j < i + 1:
ranking.append((value, index))
# Find correct placement for value
elif diff < abs(ranking[j][0] - ref):
ranking.insert(j, (value, index))
if len(ranking) > i + 1:
# Remove last element in ranking
ranking.pop()
# Correct placement found; done
break
# Extract best (unique) candidates from ranking
def extract_best(self) -> List[Tuple[float, int]]:
candidates = []
# Keep track of already extracted candidates
touched = set()
for i in range(self.size):
ranking = self.rankings[i]
# Special case: Not enough values
if len(ranking) < i + 1:
break # Nothing left to do
candidate = ranking[0]
k = 1
# Find best candidate among those
# not already chosen
while candidate[1] in touched:
candidate = ranking[k]
k += 1
candidates.append(candidate)
touched.add(candidate[1])
return candidates
class CellList:
"""CellList class dividing physical domain into equally-sized cells
......@@ -205,11 +153,8 @@ class CellList:
return 0 <= idx < self.totalCells
# Compute dictionary of all nbor candidates of given pore
def find_candidates(self, poreIdx: int, targetLens: List[float]) \
-> List[Tuple[float, int]]:
# Initialize ranking
ranking = Ranking(targetLens)
def find_candidates(self, poreIdx: int) -> Dict[int, float]:
candidates = {}
porePos = self.fetch_pos(poreIdx)
cellIdxTrip = self.pore_to_triplet(poreIdx)
......@@ -218,8 +163,7 @@ class CellList:
if (nborIdx == poreIdx): continue
l = distance(self.fetch_pos(nborIdx), porePos)
if (l < self.Lmax):
# Insert neighbor pore in ranking
ranking.insert(l, nborIdx)
candidates[nborIdx] = l
# TODO: Speed up finding of suitable candidates (e.g. multiprocessing)
for n in range(self.dim):
......@@ -227,34 +171,42 @@ class CellList:
cellIdx = self.flatten(cellIdxTrip[0] + i * (n == 0), \
cellIdxTrip[1] + i * (n == 1), \
cellIdxTrip[2] + i * (n == 2))
# Check if valid cell index
#assert(self.is_valid_index(nborIdx))
# Check all pores within current neighbor-cell
for nborIdx in self.poresSorted[cellIdx]:
l = distance(self.fetch_pos(nborIdx), porePos)
if (l < self.Lmax):
ranking.insert(l, nborIdx)
# Return list of best candidates for each throat
return ranking.extract_best()
candidates[nborIdx] = l
return candidates
def fetch_pos(self, poreIdx: int) -> List[float]:
lb = self.dim * poreIdx
ub = self.dim + lb
return self.poresPos[lb:ub]
def par_find_candidates(self, in_q: mp.Queue, out_q: mp.Queue):
def par_find_candidates(self, in_q: mp.SimpleQueue, out_q: mp.SimpleQueue):
while True:
# Retrieve payload from in-queue
payload = in_q.get()
if payload is None:
break # Terminate daemonic process
conpores = payload[0]
poreIdx = payload[1]
nborCellIdx = payload[2]
neighbors = payload[2]
lt = payload[3]
porePos = self.fetch_pos(poreIdx)
bestIdx = -1
bestLen = self.Lmax
bestRank = self.Lmax
for nborIdx in self.poresSorted[nborCellIdx]:
if (nborIdx in conpores or nborIdx == poreIdx): continue
l = distance(self.fetch_pos(poreIdx), self.fetch_pos(nborIdx))
for nborIdx in neighbors:
if (nborIdx in conpores or nborIdx == poreIdx):
continue
l = distance(porePos, self.fetch_pos(nborIdx))
rank = abs(lt - l)
if (l < self.Lmax and rank < bestRank):
bestIdx = nborIdx
......@@ -264,8 +216,8 @@ class CellList:
out_q.put((bestIdx, bestLen, bestRank))
# Only consider subset of all neighboring cells (closest 7)
def nbor_indices(self, pore: Pore) -> List[int]:
cellIdx = self.pore_to_triplet(pore)
def nbor_indices(self, poreIdx: int) -> List[int]:
cellIdx = self.pore_to_triplet(poreIdx)
# i, j, k; i +- 1, j, k; i, j +- 1, k; i, j, k +- 1;
neighborhood = [self.flatten(*cellIdx)] # Cell containing the pore
neighborhood += [self.flatten(cellIdx[0] + i * (n == 0), \
......@@ -771,79 +723,29 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
k = 0 # count of unrealised throats
# DEBUG
"""
nthreads = 4
# Spawn daemonic worker threads
in_q = mp.Queue()
out_q = mp.Queue()
in_q = mp.SimpleQueue()
out_q = mp.SimpleQueue()
for _ in range(nthreads):
worker = mp.Process(target=cellList.par_find_candidates, args=(in_q, out_q))
worker.daemon = True
worker.start()
"""
max_throat_diff = 0.
avg_throat_diff = 0.
count = 0
for poreIdx, pore in enumerate(pores[:n]):
# (re)run triangulation if maximal number of neighbors exceeds
# threshold given by 2x init. maximum (large nnbrs -> expensive search)
# check throats and neighborhood
if (not mute):
print(f"progress {((poreIdx+1) / n)*100.:.1f}%", end="\r", flush=True)
if (len(pore.throats) == 0): continue # no further connection needed
# Compute list of target throat lengths
targetLens = [distance(*throat_ends(throat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)]))
for throat in pore.throats]
throatR = [throat.r for throat in pore.throats]
# find candidates in neighborhood and respective throat lengths
# TODO: Search neighboring cells in parallel
#candidates = cellList.find_candidates(pore.index, targetLens)
candidates = cellList.find_candidates(poreIdx, targetLens)
#candidates = cellList.find_candidates(poreIdx)
neighborhood = cellList.nbor_indices(poreIdx)
# establish throat connections between pore and best candidate
# TODO: Perform this for all throats simultaneously
for i, candidate in enumerate(candidates):
lt = candidate[0]
nborIdx = candidate[1]
# DEBUG: Update maximum difference between
# realised throat length and target throat length
diff = abs(lt - targetLens[i])
if (max_throat_diff < diff):
max_throat_diff = diff
nbor = pores[nborIdx]
nborc = nbor # memorize potential periodic copy
# find original of buffer layer pore
if (nbor.label != LABELS[0]):
j, lbl = nbor.label.split(' ',1)
nbor = pores[int(j)]
else:
lbl = LABELS[0]
# connect pore to nbor
throats.append([pore, nbor, lbl, throatR[i]])
# remove most similar throat of nbor
nborRanking = [(distance(*throat_ends(nthroat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])),
nthroat) for nthroat in nbor.throats]
_, nthroat = min(nborRanking, key=lambda lth: abs(lth[0] - lt))
nbor.throats.remove(nthroat)
# remove fully connected nbor from search neighborhood
if (len(nbor.throats) == 0):
cellList.expel(nbor)
for cpore in copies[nbor]:
cellList.expel(cpore)
# update count of unrealised throats
k = k + (len(pore.throats) - len(candidates))
# remove fully connected pore from search neighborhood
cellList.expel(pore)
for cpore in copies[pore]:
cellList.expel(cpore)
"""
conpores = set()
while (len(pore.throats) > 0):
#if (not mute):
# print("\b"*12 + ", throat {0:3d}".format(len(pore.throats)),
......@@ -854,7 +756,7 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
# Distribute cell indices of neighbor
for nborCellIdx in neighborhood:
in_q.put((conpores, pore.index, nborCellIdx, lt))
in_q.put((conpores, poreIdx, cellList.poresSorted[nborCellIdx], lt))
# Wait for results to arrive and compute best candidate
bestIdx = -1
......@@ -871,25 +773,49 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
bestLen = nborLen
bestRank = nborRank
print("%d: %e" % (bestIdx, bestRank))
#print("%d: %e" % (bestIdx, bestRank))
# find best candidate
if (bestIdx == -1):
k = k+1; continue # no candidates left, give up
nbor = pores[bestIdx]
if (len(nbor.throats) == 0):
k = k+1; continue
avg_throat_diff += abs(lt - bestLen)
count += 1
lt = bestLen
# Update connected pores
conpores.add(bestIdx)
nborc = nbor # memorize potential periodic copy
# find original of buffer layer pore
if (nbor.label != LABELS[0]):
j, lbl = nbor.label.split(' ',1)
nbor = pores[int(j)]
else:
lbl = LABELS[0]
# connect pore to nbor
throats.append([pore, nbor, lbl, throat.r])
# remove most similar throat of nbor
ranking = [(distance(*throat_ends(nthroat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])),
nthroat) for nthroat in nbor.throats]
_, nthroat = min(ranking, key=lambda lth: abs(lth[0] - lt))
nbor.throats.remove(nthroat)
# remove fully connected nbor from search neighborhood
if (len(nbor.throats) == 0):
cellList.expel(nbor)
for cpore in copies[nbor]:
cellList.expel(cpore)
"""
if (len(candidates) == 0):
k = k+1; continue # no candidates left
nborIdx, ltc = min(candidates.items(), key=lambda pl: abs(pl[1] - lt))
nbor = pores[nborIdx]
diff = abs(lt - ltc)
if (max_throat_diff < diff):
max_throat_diff = diff
avg_throat_diff += diff
count += 1
lt = ltc
# Remove nbor from candidates
del candidates[nborIdx]
......@@ -915,16 +841,23 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
for cpore in copies[nbor]:
cellList.expel(cpore)
"""
# remove fully connected pore from search neighborhood
cellList.expel(pore)
for cpore in copies[pore]:
cellList.expel(cpore)
"""
cellList.expel(cpore)
# Terminate worker threads
for _ in range(nthreads):
in_q.put(None)
percent = k / (k + len(throats)) * 100.
print("\b"*24 + "{0:d} throats in total, {1:d} unrealised ({2:.1f}%)".\
format(len(throats), k, percent))
print("Max. throat length difference: %e" % max_throat_diff)
avg_throat_diff /= count
print("Avg. throat length difference: %e" % avg_throat_diff)
print("Relative to Lmax: %.1f%%" % (avg_throat_diff / basenet.Lmax * 100.))
# assemble and return network
network = Network(lb=[0.0 for k in range(d)],
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment