Commit afcf470b authored by sfritschi's avatar sfritschi
Browse files

Fully parallelized best match search

parent 6f832140
...@@ -7,6 +7,11 @@ from netflow import * ...@@ -7,6 +7,11 @@ from netflow import *
from time import perf_counter from time import perf_counter
def main(): def main():
if len(sys.argv) != 2:
raise AssertionError("Usage: python3 generate.py <nthreads>")
nthreads = int(sys.argv[1])
print("Using %d threads" % nthreads)
basenet = netflow.load_network_from('../netflow/network/network.h5') basenet = netflow.load_network_from('../netflow/network/network.h5')
print("Network statistics:") print("Network statistics:")
print("Throats: %d" % len(basenet.throats)) print("Throats: %d" % len(basenet.throats))
...@@ -22,7 +27,7 @@ def main(): ...@@ -22,7 +27,7 @@ def main():
start = perf_counter() start = perf_counter()
dendro = netflow.generate_dendrogram(basenet=basenet, targetsize=target, \ dendro = netflow.generate_dendrogram(basenet=basenet, targetsize=target, \
cutoff=cutoff, sd=42, mute=False) cutoff=cutoff, sd=42, nthreads=nthreads, mute=False)
end = perf_counter() end = perf_counter()
elapsed = end - start elapsed = end - start
print("Elapsed time: %e s" % elapsed) print("Elapsed time: %e s" % elapsed)
......
...@@ -12,7 +12,7 @@ class Pore: ...@@ -12,7 +12,7 @@ class Pore:
"""Class of a pore connected to other pores via throats.""" """Class of a pore connected to other pores via throats."""
id = 0 id = 0
def __init__(self, pos: List[float], r: float, label: str = LABELS[0], \ def __init__(self, pos: List[float], r: float, label: str = LABELS[0], \
throats: Set = None, index: int = 0, id: int = -1): throats: Set = None, id: int = -1, index: int = -1):
if id == -1: # no id given if id == -1: # no id given
self.id = Pore.id; Pore.id = Pore.id+1 self.id = Pore.id; Pore.id = Pore.id+1
else: # id provided else: # id provided
...@@ -38,7 +38,7 @@ class Throat: ...@@ -38,7 +38,7 @@ class Throat:
""" """
id = 0 id = 0
def __init__(self, pore1: Pore, pore2: Pore, r: float, \ def __init__(self, pore1: Pore, pore2: Pore, r: float, \
label: str = LABELS[0], id: int = -1): label: str = LABELS[0], id: int = -1, index: int = -1):
if id == -1: # no id given if id == -1: # no id given
self.id = Throat.id; Throat.id = Throat.id+1 self.id = Throat.id; Throat.id = Throat.id+1
else: # id provided else: # id provided
...@@ -108,10 +108,10 @@ class CellList: ...@@ -108,10 +108,10 @@ class CellList:
find_candidates() assumes pore is located in interior of domain (non-periodic) find_candidates() assumes pore is located in interior of domain (non-periodic)
""" """
def __init__(self, pores: List[Pore], domainSize: List[float], \ def __init__(self, pores: List[Pore], domainSize: List[float], \
cellSize: float, Lmax: float): cellSize: float, basenet: Network, nInterior: int):
from math import ceil from math import ceil
self.dim = len(domainSize) self.dim = len(domainSize)
self.Lmax = Lmax self.Lmax = basenet.Lmax
# Number of cells in each dimension # Number of cells in each dimension
self.nCells = [max(1, ceil(domainSize[i] / cellSize)) for i in range(self.dim)] self.nCells = [max(1, ceil(domainSize[i] / cellSize)) for i in range(self.dim)]
# Inverse of cell sizes in domain # Inverse of cell sizes in domain
...@@ -119,17 +119,41 @@ class CellList: ...@@ -119,17 +119,41 @@ class CellList:
# Total number of cells # Total number of cells
self.totalCells = prod(self.nCells) self.totalCells = prod(self.nCells)
# Pore positions # Pore positions
self.poresPos = [0.] * self.dim * len(pores) self.poresPos = [] # size: dim * #pores
# Throat lengths
self.throatL = [] # size: #throats (from non-periodic pores)
# Throat radii
self.throatR = [] # size: #throats (from non-periodic pores)
throatIdx = 0
# Offset in pore-throat table for given pore index
self.throatOffsets = [0] # size: #pores + 1 (non-periodic)
offset = 0
# Pore-throat table
self.poreThroatIndices = [set() for _ in range(nInterior)]
L = [ub-lb for lb,ub in zip(basenet.lb,basenet.ub)]
# Sort pores according to cell membership # Sort pores according to cell membership
self.poresSorted = [set() for _ in range(self.totalCells)] self.poresSorted = [set() for _ in range(self.totalCells)]
for pore in pores: for poreIdx, pore in enumerate(pores):
poreIdx = pore.index
cellIdx = self.pore_to_index(pore) cellIdx = self.pore_to_index(pore)
self.poresSorted[cellIdx].add(poreIdx) self.poresSorted[cellIdx].add(poreIdx)
lb = self.dim * poreIdx self.poresPos += pore.pos
ub = self.dim + lb
self.poresPos[lb:ub] = pore.pos if poreIdx < nInterior:
# Pre-fix sum
nthroats = len(pore.throats)
offset += nthroats
self.throatOffsets.append(offset)
for throat in pore.throats:
self.throatL.append(distance(*throat_ends(throat, L)))
self.throatR.append(throat.r)
self.poreThroatIndices[poreIdx].add(throatIdx)
throatIdx += 1
self.poreThroatTable = mp.RawArray('i', throatIdx)
# Helper for converting 3D index triple into 1D (flattened) index # Helper for converting 3D index triple into 1D (flattened) index
def flatten(self, i: int, j: int, k: int) -> int: def flatten(self, i: int, j: int, k: int) -> int:
...@@ -151,7 +175,34 @@ class CellList: ...@@ -151,7 +175,34 @@ class CellList:
def is_valid_index(self, idx: int) -> bool: def is_valid_index(self, idx: int) -> bool:
return 0 <= idx < self.totalCells return 0 <= idx < self.totalCells
def match_maker(self, pores: List[int], tid: int):
n = len(pores)
for poreIdx in pores:
if tid == 0:
print(f"progress {((poreIdx+1) / n)*100.:.1f}%", end="\r", flush=True)
porePos = self.fetch_pos(poreIdx)
neighborhood = self.nbor_indices(poreIdx)
candidates = {}
for nborCellIdx in neighborhood:
for nborIdx in self.poresSorted[nborCellIdx]:
if (nborIdx == poreIdx): continue
l = distance(porePos, self.fetch_pos(nborIdx))
if (l < self.Lmax):
candidates[nborIdx] = l
throats = self.fetch_throat_indices(poreIdx)
for idx, throatIdx in enumerate(throats):
lt = self.throatL[throatIdx]
matchIdx, matchLen = min(candidates.items(), key=lambda il: abs(il[1] - lt))
# Remove match from candidates
# If targetsize contains 1 need to remove periodic copies as well
del candidates[matchIdx]
self.set_throat(poreIdx, matchIdx, idx)
# Compute dictionary of all nbor candidates of given pore # Compute dictionary of all nbor candidates of given pore
def find_candidates(self, poreIdx: int) -> Dict[int, float]: def find_candidates(self, poreIdx: int) -> Dict[int, float]:
candidates = {} candidates = {}
...@@ -181,6 +232,21 @@ class CellList: ...@@ -181,6 +232,21 @@ class CellList:
return candidates return candidates
def set_throat(self, poreIdx: int, matchIdx: int, idx: int):
index = self.throatOffsets[poreIdx] + idx
# Set matchIdx
self.poreThroatTable[index] = matchIdx
def fetch_throats(self, poreIdx: int) -> List[int]:
lb = self.throatOffsets[poreIdx]
ub = self.throatOffsets[poreIdx + 1]
return self.poreThroatTable[lb:ub]
def fetch_throat_indices(self, poreIdx: int) -> List[int]:
lb = self.throatOffsets[poreIdx]
ub = self.throatOffsets[poreIdx + 1]
return range(lb, ub)
def fetch_pos(self, poreIdx: int) -> List[float]: def fetch_pos(self, poreIdx: int) -> List[float]:
lb = self.dim * poreIdx lb = self.dim * poreIdx
ub = self.dim + lb ub = self.dim + lb
...@@ -604,8 +670,8 @@ def cell_list_scaling(basenet: Network, targetsize: List[int], \ ...@@ -604,8 +670,8 @@ def cell_list_scaling(basenet: Network, targetsize: List[int], \
def generate_dendrogram(basenet: Network, targetsize: List[int], \ def generate_dendrogram(basenet: Network, targetsize: List[int], \
cutoff: float = float('inf'), sd: int = None, mute: bool = False) \ cutoff: float = float('inf'), sd: int = None, nthreads: int = mp.cpu_count(), \
-> Network: mute: bool = False) -> Network:
"""Generate a new spatially periodic network. """Generate a new spatially periodic network.
Based on an existing network basenet, generate a new network of size Based on an existing network basenet, generate a new network of size
...@@ -706,86 +772,140 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \ ...@@ -706,86 +772,140 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
# Flag for handling the case where original nbor pore and its periodic # Flag for handling the case where original nbor pore and its periodic
# copies are potential candidates of a given pore at the same time # copies are potential candidates of a given pore at the same time
# (necessary only if target size is 1 in at least one dimension) # (necessary only if target size is 1 in at least one dimension)
isPeriodicCheck = 1 in targetsize #isPeriodicCheck = 1 in targetsize
# Domain size including periodic buffer layers on all sides # Domain size including periodic buffer layers on all sides
trueDomainSize = [L[i] + 2 * basenet.Lmax for i in range(d)] trueDomainSize = [L[i] + 2 * basenet.Lmax for i in range(d)]
# Initialize cell-list; Place each pore in respective cell-set # Initialize cell-list; Place each pore in respective cell-set
cellList = CellList(pores, trueDomainSize, basenet.Lmax, basenet.Lmax) cellList = CellList(pores, trueDomainSize, basenet.Lmax, basenet, n)
# throats, connect pores remainder = n % nthreads
if (not mute): print("\b"*21 + "connecting") loads = [n // nthreads] * nthreads
throats = [] for i in range(remainder):
k = 0 # count of unrealised throats loads[i] += 1
nthreads = 4 from itertools import accumulate
# Spawn worker threads # Compute inclusive-scan
in_q = mp.SimpleQueue() displs = [0]
out_q = mp.SimpleQueue() displs += accumulate(loads)
workers = [] workers = []
for _ in range(nthreads): if (not mute): print("computing best matches")
worker = mp.Process(target=cellList.par_find_candidates, args=(in_q, out_q))
for tid in range(nthreads):
poreList = range(displs[tid], displs[tid+1])
worker = mp.Process(target=cellList.match_maker, args=(poreList, tid))
worker.start() worker.start()
workers.append(worker) workers.append(worker)
for worker in workers:
worker.join()
# throats, connect pores
if (not mute): print("connecting")
throats = []
k = 0 # count of unrealised throats
avg_throat_diff = 0. avg_throat_diff = 0.
count = 0 count = 0
next_best_count = 0 total = 0
n_already_taken = 0
for poreIdx, pore in enumerate(pores[:n]): for poreIdx, pore in enumerate(pores[:n]):
if (not mute): if (not mute):
print(f"progress {((poreIdx+1) / n)*100.:.1f}%", end="\r", flush=True) print(f"progress {((poreIdx+1) / n)*100.:.1f}%", end="\r", flush=True)
if (len(pore.throats) == 0): continue # no further connection needed # Set of throats to be realized
ttbr = cellList.poreThroatIndices[poreIdx]
if (len(ttbr) == 0): continue # Nothing left to do
neighborhood = cellList.nbor_indices(poreIdx) matches = cellList.fetch_throats(poreIdx)
for nborCellIdx in neighborhood: throatIndices = cellList.fetch_throat_indices(poreIdx)
in_q.put((poreIdx, cellList.poresSorted[nborCellIdx]))
# Compute throat lengths and radii # TODO: Remove throats from both pore.throats and nbor.throats
throatLens = [] for i, match in enumerate(matches):
throatR = [] throatIdx = throatIndices[i]
for throat in pore.throats:
throatLens.append(distance(*throat_ends(throat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])))
throatR.append(throat.r)
# (index, throatLen)
bestMatches = [(-1, basenet.Lmax)] * len(throatLens)
candidates = {}
for _ in range(len(neighborhood)):
cellCandidates = out_q.get()
# Compute best match for each throat
# (index, throatLen)
if (len(cellCandidates) != 0):
matches = [min(cellCandidates.items(), key=lambda il: abs(il[1] - lt)) \
for lt in throatLens]
# Update global best matches based on current (local) best matches
bestMatches = [min(m, b, key=lambda il: abs(il[1] - lt)) \
for m, b, lt in zip(matches, bestMatches, throatLens)]
# Add cell candidates to global candidates
candidates = {**candidates, **cellCandidates}
# establish throat connections between pore and best candidate
for i, match in enumerate(bestMatches):
nborIdx, lt = match if (throatIdx not in ttbr):
if (nborIdx == -1): continue # Throat already realized
k = k+1; continue # no candidates left for this throat
pore.throats.pop()
# Check if best matching nbor is still available r = cellList.throatR[throatIdx]
if nborIdx not in candidates: targetLen = cellList.throatL[throatIdx]
if (len(candidates) == 0): nbor = pores[match]
k = k+1; continue # no further candidates available lt = distance(pore.pos, nbor.pos)
# Find next best available candidate (expensive)
next_best_count += 1
nborIdx, lt = min(candidates.items(), key=lambda pl: abs(pl[1] - lt))
nbor = pores[nborIdx] nborc = nbor # memorize potential periodic copy
# find original of buffer layer pore
if (nbor.label != LABELS[0]):
j, lbl = nbor.label.split(' ',1)
nbor = pores[int(j)]
else:
lbl = LABELS[0]
diff = abs(lt - throatLens[i]) nborTtbr = cellList.poreThroatIndices[nbor.index]
total += 1
if (len(nborTtbr) == 0):
n_already_taken += 1
continue
# connect pore to nbor
throats.append([pore, nbor, lbl, r])
# remove most similar throat of nbor
ranking = [(nborThroatIdx, abs(cellList.throatL[nborThroatIdx] - lt)) \
for nborThroatIdx in nborTtbr]
nborThroatIdx, _ = min(ranking, key=lambda il: il[1])
diff = abs(targetLen - lt)
avg_throat_diff += diff avg_throat_diff += diff
count += 1 count += 1
# Remove throat from neighbor
nborTtbr.remove(nborThroatIdx)
# Successfully connected this throat
ttbr.remove(throatIdx)
# remove fully connected nbor from search neighborhood
if (len(nborTtbr) == 0):
cellList.expel(nbor)
for cpore in copies[nbor]:
cellList.expel(cpore)
# remove fully connected pore from search neighborhood
if (len(ttbr) == 0):
cellList.expel(pore)
for cpore in copies[pore]:
cellList.expel(cpore)
# Finally, find next best candidates for left-out pores
if (not mute): print("finding next best candidates")
for poreIdx, pore in enumerate(pores[:n]):
if (not mute):
print(f"progress {((poreIdx+1) / n)*100.:.1f}%", end="\r", flush=True)
ttbr = cellList.poreThroatIndices[poreIdx]
if (len(ttbr) == 0):
continue # Nothing left to do
candidates = cellList.find_candidates(poreIdx)
for throatIdx in ttbr.copy():
targetLen = cellList.throatL[throatIdx]
if (len(candidates) == 0):
# no candidates left; give up
k += len(ttbr) # update num unrealized throats
break
nborIdx, lt = min(candidates.items(), key=lambda il: abs(il[1] - targetLen))
del candidates[nborIdx]
nbor = pores[nborIdx]
nborc = nbor # memorize potential periodic copy nborc = nbor # memorize potential periodic copy
# find original of buffer layer pore # find original of buffer layer pore
if (nbor.label != LABELS[0]): if (nbor.label != LABELS[0]):
...@@ -794,59 +914,51 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \ ...@@ -794,59 +914,51 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
else: else:
lbl = LABELS[0] lbl = LABELS[0]
# Remove nbor from candidates as well as all periodic copies
del candidates[nborc.index]
# Remove original non-periodic nbor as well as all its
# periodic copies. Only applies if targetsize[i] = 1 for some i
if isPeriodicCheck:
if nbor.index in candidates:
del candidates[nbor.index]
for cpore in copies[nbor]:
if cpore.index in candidates:
del candidates[cpore.index]
# connect pore to nbor # connect pore to nbor
throats.append([pore, nbor, lbl, throatR[i]]) throats.append([pore, nbor, lbl, r])
nborTtbr = cellList.poreThroatIndices[nbor.index]
# remove most similar throat of nbor # remove most similar throat of nbor
ranking = [(distance(*throat_ends(nthroat, ranking = [(nborThroatIdx, abs(cellList.throatL[nborThroatIdx] - lt)) \
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])), for nborThroatIdx in nborTtbr]
nthroat) for nthroat in nbor.throats] nborThroatIdx, _ = min(ranking, key=lambda il: il[1])
diff = abs(targetLen - lt)
avg_throat_diff += diff
count += 1
_, nthroat = min(ranking, key=lambda lth: abs(lth[0] - lt)) # Remove throat from neighbor
nbor.throats.remove(nthroat) nborTtbr.remove(nborThroatIdx)
# Successfully connected this throat
ttbr.remove(throatIdx)
# remove fully connected nbor from search neighborhood # remove fully connected nbor from search neighborhood
if (len(nbor.throats) == 0): if (len(nborTtbr) == 0):
cellList.expel(nbor) cellList.expel(nbor)
for cpore in copies[nbor]: for cpore in copies[nbor]:
cellList.expel(cpore) cellList.expel(cpore)
# remove fully connected pore from search neighborhood # remove fully connected pore from search neighborhood
cellList.expel(pore) if (len(ttbr) == 0):
cellList.expel(pore)
for cpore in copies[pore]: for cpore in copies[pore]:
cellList.expel(cpore) cellList.expel(cpore)
# Terminate worker threads
for _ in range(nthreads):
in_q.put(None)
for worker in workers:
worker.join()
percent = k / (k + len(throats)) * 100. percent = k / (k + len(throats)) * 100.
print("\b"*24 + "{0:d} throats in total, {1:d} unrealised ({2:.1f}%)".\ print("\b"*24 + "{0:d} throats in total, {1:d} unrealised ({2:.1f}%)".\
format(len(throats), k, percent)) format(len(throats), k, percent))
avg_throat_diff /= count avg_throat_diff /= count
print("Avg. throat length difference: %e" % avg_throat_diff) print("Avg. throat length difference: %e" % avg_throat_diff)
print("Relative to Lmax: %.1f%%" % (avg_throat_diff / basenet.Lmax * 100.)) print("Relative to Lmax: %.1f%%" % (avg_throat_diff / basenet.Lmax * 100.))
print("Percentage of finding next best candidate: %.1f%%" % (next_best_count / count * 100.)) print("Percentage where match was fully-connected: %.1f%%" % (n_already_taken / total * 100.))
# assemble and return network # assemble and return network
network = Network(lb=[0.0 for k in range(d)], network = Network(lb=[0.0 for k in range(d)],
ub=L, Lmax=basenet.Lmax, label='from_' + basenet.label) ub=L, Lmax=basenet.Lmax, label='from_' + basenet.label)
for pore in pores[:n]: network.add_pore(pore) for pore in pores[:n]:
pore.throats.clear() # Remove throats
network.add_pore(pore)
for throat in throats: network.connect_pores(pore1=throat[0], for throat in throats: network.connect_pores(pore1=throat[0],
pore2=throat[1], label=throat[2], r=throat[3]) pore2=throat[1], label=throat[2], r=throat[3])
return network return network
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment