Commit fbe0f09e authored by sfritschi's avatar sfritschi
Browse files

producer consumer strategy testing

parent d5caabbb
import multiprocessing as mp
from random import seed
seed(42)
from random import randint
from itertools import accumulate
import sys
def par_min(x, nthreads, tid, start, end, shmem):
min_ = sys.maxsize
for val in x[start:end]:
if val < min_:
min_ = val
shmem[tid] = min_
def ser_min(x):
min_ = sys.maxsize
for val in x:
if val < min_:
min_ = val
return min_
def main():
n = 10000000
x = [randint(0, n) for _ in range(n)]
result = ser_min(x)
print("Result: %d" % result)
nthreads = 4
load = n // nthreads
rem = n % nthreads
loads = [load] * nthreads
for i in range(rem):
loads[i] += 1
shmem = mp.RawArray('i', nthreads)
offsets = [0]
offsets += accumulate(loads)
del loads
jobs = []
for tid in range(nthreads):
start = offsets[tid]
end = offsets[tid+1]
proc = mp.Process(target=par_min, args=(x, nthreads, tid, start, end, shmem))
proc.start()
jobs.append(proc)
for j in jobs:
j.join()
result_par = ser_min(shmem)
print("Result parallel: %d" % result_par)
if __name__ == '__main__':
main()
import multiprocessing as mp
class Dummy:
def __init__(self, n):
self.vals = [i for i in range(n)]
def square(self, i, offset, values):
return self.vals[i]**2 + offset + max(values)
def f(x):
return x**2
def main():
dummy = Dummy(10)
offset = 1
values = [-1, 0, 3, 1]
x = {1, 2, 3, 4, 5, 6}
with mp.Pool(processes=4) as p:
res = p.starmap(func=dummy.square, iterable=[(i, offset, values) for i in range(6)])
res = p.map(func=f, iterable=x)
print(res)
if __name__ == '__main__':
......
import time
from time import perf_counter
import random
from itertools import islice
import multiprocessing as mp
class Dummy:
def __init__(self, n):
self.values = [i for i in range(n)]
def producer(dummys, queue):
random.seed(0)
# Put dummys in queue
for dummy in dummys:
# Do work
time.sleep(random.randint(0, 5))
queue.put(dummy)
def consumer(tid, queue):
random.seed(tid)
def consumer(in_queue, out_queue):
# Infinite loop
while True:
# Do work
time.sleep(random.randint(0, 5))
# Get dummy object
dummy = queue.get()
values = in_queue.get()
# Debug
if (tid == 1):
print(dummy.values)
# Modify dummy
dummy.values.pop()
out_queue.put(sum(values))
def main():
queue = mp.Queue()
in_queue = mp.Queue()
out_queue = mp.Queue()
n = 10
dummys = [Dummy(4) for _ in range(n)]
prod = mp.Process(target=producer, args=(dummys, queue))
prod.start()
n = 100000
n_batches = 10
batch_size = n // n_batches
x = set([i for i in range(n)])
start = perf_counter()
serial = sum(x)
end = perf_counter()
elapsed_serial = end - start
print(serial)
print("Elapsed: %e s" % elapsed_serial)
nconsumer = 3
consumers = []
for tid in range(1, nconsumer + 1):
p = mp.Process(target=consumer, args=(tid, queue), daemon=True)
nthreads = 4
start = perf_counter()
for tid in range(nthreads):
p = mp.Process(target=consumer, args=(in_queue, out_queue), daemon=True)
p.start()
consumers.append(p)
prod.join()
prod.close()
for _ in range(n_batches):
batch = islice(x, batch_size)
in_queue.put(batch)
sum_ = 0
for _ in range(n_batches):
val = out_queue.get()
sum_ += val
end = perf_counter()
elapsed_par = end - start
print(sum_)
print("Elapsed par: %e s" % elapsed_par)
print("Speedup: %e" % (elapsed_serial / elapsed_par))
# Verify results
for dummy in dummys:
assert(len(dummy.values) == 3)
assert(serial == sum_)
if __name__ == '__main__':
main()
......@@ -3,7 +3,7 @@
# Daniel W. Meyer
# Institute of Fluid Dynamics, ETH Zurich
# January 2019
import multiprocessing as mp
from typing import List, Dict, Set, Tuple # for type hints in argument lists
LABELS = ('', 'in', 'out', 'cut') # don't change the order
......@@ -12,7 +12,7 @@ class Pore:
"""Class of a pore connected to other pores via throats."""
id = 0
def __init__(self, pos: List[float], r: float, label: str = LABELS[0], \
throats: Set = None, index: int = -1, id: int = -1):
throats: Set = None, index: int = 0, id: int = -1):
if id == -1: # no id given
self.id = Pore.id; Pore.id = Pore.id+1
else: # id provided
......@@ -118,20 +118,28 @@ class CellList:
self.invCellSizes = [self.nCells[i] / domainSize[i] for i in range(self.dim)]
# Total number of cells
self.totalCells = prod(self.nCells)
# Pore positions
self.poresPos = [0.] * self.dim * len(pores)
# Sort pores according to cell membership
self.poresSorted = [set() for _ in range(self.totalCells)]
for pore in pores:
poreIdx = pore.index
cellIdx = self.pore_to_index(pore)
self.poresSorted[cellIdx].add(pore)
self.poresSorted[cellIdx].add(poreIdx)
lb = self.dim * poreIdx
ub = self.dim + lb
self.poresPos[lb:ub] = pore.pos
# Helper for converting 3D index triple into 1D (flattened) index
def flatten(self, i: int, j: int, k: int) -> int:
return i + self.nCells[0] * (j + self.nCells[1] * k)
def pore_to_triplet(self, pore: Pore) -> List[int]:
def pore_to_triplet(self, poreIdx: int) -> List[int]:
from math import floor
# Shift pore position to be positive first (buffer layer)
return [floor( (pore.pos[i] + self.Lmax) * self.invCellSizes[i]) \
porePos = self.fetch_pos(poreIdx)
return [floor( (porePos[i] + self.Lmax) * self.invCellSizes[i]) \
for i in range(self.dim)]
# Compute index of given pore in cell list based on position
def pore_to_index(self, pore: Pore) -> int:
......@@ -145,41 +153,76 @@ class CellList:
return 0 <= idx < self.totalCells
# Compute dictionary of all nbor candidates of given pore
def find_candidates(self, pore: Pore) -> Dict[Pore, float]:
def find_candidates(self, poreIdx: int) -> Dict[int, float]:
candidates = {}
cellIdxTrip = self.pore_to_triplet(pore)
# Check all possible neighbor cells (27 in 3D)
porePos = self.fetch_pos(poreIdx)
cellIdxTrip = self.pore_to_triplet(poreIdx)
nearestCellIdx = self.flatten(*cellIdxTrip)
for nborIdx in self.poresSorted[nearestCellIdx]:
if (nborIdx == poreIdx): continue
l = distance(self.fetch_pos(nborIdx), porePos)
if (l < self.Lmax):
candidates[nborIdx] = l
# TODO: Speed up finding of suitable candidates (e.g. multiprocessing)
for i in range(-1, 2):
for j in range(-1, 2):
for k in range(-1, 2):
nborIdx = self.flatten(cellIdxTrip[0] + i, \
cellIdxTrip[1] + j, \
cellIdxTrip[2] + k)
# Check if valid cell index
#assert(self.is_valid_index(nborIdx))
# Check all pores within current neighbor-cell
for nbor in self.poresSorted[nborIdx]:
if (nbor == pore): continue
l = distance(nbor.pos, pore.pos)
if (l < self.Lmax):
candidates[nbor] = l
for n in range(self.dim):
for i in [-1, 1]:
cellIdx = self.flatten(cellIdxTrip[0] + i * (n == 0), \
cellIdxTrip[1] + i * (n == 1), \
cellIdxTrip[2] + i * (n == 2))
# Check if valid cell index
#assert(self.is_valid_index(nborIdx))
# Check all pores within current neighbor-cell
for nborIdx in self.poresSorted[cellIdx]:
l = distance(self.fetch_pos(nborIdx), porePos)
if (l < self.Lmax):
candidates[nborIdx] = l
return candidates
def find_best_candidate_in_cell(self, cellIdx: int, porePos: List[float],
poreIdx: int, throatLen: float) -> Tuple[int, float]:
best_candidate_index = -1
best_length = self.Lmax
for nbor in self.poresSorted[cellIdx]:
if (nbor.index == poreIdx): continue
l = distance(nbor.pos, porePos)
diff = abs(l - throatLen)
if (l < self.Lmax and diff < abs(best_length - throatLen)):
best_candidate_index = nbor.index
best_length = l
return (best_candidate_index, best_length)
def fetch_pos(self, poreIdx: int) -> List[float]:
lb = self.dim * poreIdx
ub = self.dim + lb
return self.poresPos[lb:ub]
def par_find_candidates(self, in_q: mp.Queue, out_q: mp.Queue):
while True:
# Retrieve payload from in-queue
payload = in_q.get()
conpores = payload[0]
poreIdx = payload[1]
nborCellIdx = payload[2]
lt = payload[3]
bestIdx = -1
bestLen = self.Lmax
bestRank = self.Lmax
for nborIdx in self.poresSorted[nborCellIdx]:
if (nborIdx in conpores or nborIdx == poreIdx): continue
l = distance(self.fetch_pos(poreIdx), self.fetch_pos(nborIdx))
rank = abs(lt - l)
if (l < self.Lmax and rank < bestRank):
bestIdx = nborIdx
bestLen = l
bestRank = rank
# Put result in out-queue
out_q.put((bestIdx, bestLen, bestRank))
# Only consider subset of all neighboring cells (closest 7)
def nbor_indices(self, pore: Pore) -> List[int]:
cellIdx = self.pore_to_triplet(pore)
# i, j, k; i +- 1, j, k; i, j +- 1, k; i, j, k +- 1;
neighborhood = [self.flatten(*cellIdx)] # Cell containing the pore
neighborhood += [self.flatten(cellIdx[0] + i * (n == 0), \
cellIdx[1] + i * (n == 1), \
cellIdx[2] + i * (n == 2))
for n in range(self.dim) for i in [-1, 1]]
return neighborhood
def nbor_cell_counts(self, neighborhood: List[int]) -> List[int]:
return [len(self.poresSorted[i]) for i in neighborhood]
def cell_stats(self):
cellCounts = [len(self.poresSorted[i]) for i in range(self.totalCells)]
n = sum(cellCounts)
......@@ -198,8 +241,8 @@ class CellList:
# Remove pore from sorted pores
def expel(self, pore: Pore):
poreIdx = self.pore_to_index(pore)
self.poresSorted[poreIdx].remove(pore)
poreCellIdx = self.pore_to_index(pore)
self.poresSorted[poreCellIdx].remove(pore.index)
def distance(p1: List[float], p2: List[float]):
"""Distance between points p1 and p2."""
......@@ -577,7 +620,8 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
Periodic throats are marked with a periodicity label, e.g., '1 0 -1' in 3d.
If cutoff == float('nan'), pores are uniformly distributed.
"""
from random import seed, random, randint, shuffle
import multiprocessing as mp
from random import seed, random, randint
from itertools import product
seed(sd)
d = len(targetsize) # number of spatial dimensions
......@@ -644,10 +688,8 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
for k, pore in enumerate(basepores):
pos = [c-lb + si*(ub-lb) \
for c,si,lb,ub in zip(centroids[k],s,basenet.lb,basenet.ub)]
pores.append(Pore(pos=pos, r=pore.r, label=LABELS[0],
throats=pore.throats.copy()))
# Finally, shuffle pores (random order)
shuffle(pores)
pores.insert(randint(0,len(pores)), Pore(pos=pos,
r=pore.r, label=LABELS[0], throats=pore.throats.copy()))
print("\b"*21 + "left {0:d} of {1:d} clusters (incl. {2:d} pores) untouched".\
format(sum([int(not j) for j in touched]), len(touched), len(basepores)))
......@@ -668,14 +710,24 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
trueDomainSize = [L[i] + 2 * basenet.Lmax for i in range(d)]
# Initialize cell-list; Place each pore in respective cell-set
cellList = CellList(pores, trueDomainSize, basenet.Lmax, basenet.Lmax)
# throats, connect pores
if (not mute): print("\b"*21 + "connecting")
throats = []
k = 0 # count of unrealised throats
# DEBUG
#max_throat_diff = 0.
"""
nthreads = 4
# Spawn daemonic worker threads
in_q = mp.Queue()
out_q = mp.Queue()
for _ in range(nthreads):
worker = mp.Process(target=cellList.par_find_candidates, args=(in_q, out_q))
worker.daemon = True
worker.start()
"""
max_throat_diff = 0.
for i, pore in enumerate(pores[:n]):
# (re)run triangulation if maximal number of neighbors exceeds
# threshold given by 2x init. maximum (large nnbrs -> expensive search)
......@@ -686,29 +738,63 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
# find candidates in neighborhood and respective throat lengths
# TODO: Search neighboring cells in parallel
candidates = cellList.find_candidates(pore)
candidates = cellList.find_candidates(pore.index)
#neighborhood = cellList.nbor_indices(pore)
# establish throat connections between pore and best candidate
# TODO: Perform this for all throats simultaneously
# (nbor, throat, actual length)
# Set of neighbor indices already connected to
#conpores = set()
while (len(pore.throats) > 0):
"""
if (not mute):
print("\b"*12 + ", throat {0:3d}".format(len(pore.throats)),
end="", flush=True)
"""
#if (not mute):
# print("\b"*12 + ", throat {0:3d}".format(len(pore.throats)),
# end="", flush=True)
throat = pore.throats.pop()
lt = distance(*throat_ends(throat,
[ub-lb for lb,ub in zip(basenet.lb,basenet.ub)])) # target len.
"""
# Distribute cell indices of neighbor
for nborCellIdx in neighborhood:
in_q.put((conpores, pore.index, nborCellIdx, lt))
# Wait for results to arrive and compute best candidate
bestIdx = -1
bestLen = basenet.Lmax
bestRank = basenet.Lmax
for _ in range(len(neighborhood)):
results = out_q.get()
nborIdx = results[0]
nborLen = results[1]
nborRank = results[2]
if (nborRank < bestRank):
bestIdx = nborIdx
bestLen = nborLen
bestRank = nborRank
print("%d: %e" % (bestIdx, bestRank))
# find best candidate
if (len(candidates) == 0):
if (bestIdx == -1):
k = k+1; continue # no candidates left, give up
nbor, lt = min(candidates.items(), key=lambda pl: abs(pl[1] - lt))
nbor = pores[bestIdx]
if (len(nbor.throats) == 0):
k = k+1; continue
lt = bestLen
# Update connected pores
conpores.add(bestIdx)
"""
if (len(candidates) == 0):
k = k+1; continue # no candidates left
nborIdx, ltc = min(candidates.items(), key=lambda pl: abs(pl[1] - lt))
nbor = pores[nborIdx]
diff = abs(lt - ltc)
if (max_throat_diff < diff):
max_throat_diff = diff
"""
lt = ltc
# Remove nbor from candidates
del candidates[nbor]
del candidates[nborIdx]
nborc = nbor # memorize potential periodic copy
# find original of buffer layer pore
if (nbor.label != LABELS[0]):
......@@ -716,6 +802,7 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
nbor = pores[int(j)]
else:
lbl = LABELS[0]
# connect pore to nbor
throats.append([pore, nbor, lbl, throat.r])
# remove most similar throat of nbor
......@@ -727,18 +814,19 @@ def generate_dendrogram(basenet: Network, targetsize: List[int], \
# remove fully connected nbor from search neighborhood
if (len(nbor.throats) == 0):
cellList.expel(nbor)
for cpore in copies[nbor]:
cellList.expel(cpore)
# remove fully connected pore from search neighborhood
cellList.expel(pore)
for cpore in copies[pore]:
cellList.expel(cpore)
print("\b"*24 + "{0:d} throats in total, {1:d} unrealised".\
format(len(throats), k))
#print("Max. throat length difference: %e" % max_throat_diff)
cellList.expel(cpore)
percent = k / (k + len(throats)) * 100.
print("\b"*24 + "{0:d} throats in total, {1:d} unrealised ({2:.1f}%)".\
format(len(throats), k, percent))
print("Max. throat length difference: %e" % max_throat_diff)
# assemble and return network
network = Network(lb=[0.0 for k in range(d)],
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment