Source code for mpi_list.dfm

# schedule regrouping
from .segment import even_spread, cumsum, segments
# gather / repartition sends
from .gather import gather_partitions, send_items
# reduce
from .reducer import Reducer, CommReducer
# prefix scan
from .pscan import psched

[docs]class DFM: """Distributed Free Monoid = A list of something. All method calls are parallel, and must be called by every rank. Attributes: C: Reference to the Context object E: List of local elements. """ def __init__(self, C, E): self.C = C self.E = E
[docs] def len(self): """Number of elements in DFM (returned to all ranks) Returns: int : total size """ return self.C.comm.allreduce(len(self.E))
[docs] def map(self, f): """Map over elements. Args: f: function of type = elem -> new elem Returns: new DFM """ return DFM(self.C, [f(e) for e in self.E])
[docs] def filter(self, f): """Filter, removing some elements. Args: f: function of type = elem -> bool Returns: new DFM """ return DFM(self.C, [e for e in self.E if f(e)])
[docs] def flatMap(self, f): # applyM """Map over elements and concatenate all results. Args: f: function of type = elem -> [new elem] Returns: new DFM """ ans = [] for e in self.E: ans.extend( f(e) ) return DFM(self.C, ans)
[docs] def reduce(self, f, x0, distribute=True): """Reduce the dataset to a value. Apply an associative, pairwise reduction to the dataset. The result is sent to all ranks if distribute=True otherwise, it is present only on the root rank (rank 0). Yhe function f `may` operate in-place, storing its result on the left, for example:: def f(a,b): a[0] = b[0] return a This is indicated in the function's type with `*elem`. Each rank calls `x0 = f(x0, e)` on all its elements, then does a fan-in reduction on x0. You can technically implement a different function for each phase by detecting whether `e` has the type of x0 or the type of an element. In fact, this is the only way to get around the restriction that `type(x0) == type(e)`. Note: `x0` may be updated in-place. Even if you do this, `x0` will contain an undefined value on return. x0 must be initialized to a value representing the starting value for a single MPI rank. Args: f: a function of type = *elem, elem -> *elem It is permissable to modify the left argument in-place and return it. x0: the "zero" value of the first argument distribute: Distribute the answer from rank 0 to all ranks? Returns: elem """ R = Reducer(f, x0) for e in self.E: R(e) x0 = CommReducer(self.C,R)() if distribute: x0 = self.C.comm.bcast(x0) return x0
[docs] def scan(self, f): """Perform a parallel prefix-scan on the dataset. Args: f: an associative, pairwise function of type = elem, elem -> elem Returns: DFM containing [e0, f(e0,e1), f(e0,f(e1,e2)), ...] """ rank = self.C.rank procs = self.C.procs # compute local prefix-sum pre = [] if len(self.E) > 0: pre = [self.E[0]] for i in range(1, len(self.E)): pre.append(f(pre[i-1], self.E[i])) if procs == 1: return DFM(self.C, pre) last = [] if len(pre) > 0: last = [ pre[-1] ] # send last val. to rank+1 nbr if rank % 2 == 0: # even ranks send first if rank != procs-1: self.C.comm.send(last, dest=rank+1, tag=10) if rank == 0: recv = [] else: recv = self.C.comm.recv(source=rank-1, tag=11) else: # odd ranks recv first recv = self.C.comm.recv(source=rank-1, tag=10) if rank != procs-1: self.C.comm.send(last, dest=rank+1, tag=11) last = recv # ranks 1, ..., procs-1 participate in prefix scan if rank > 0: vrank = rank-1 # virtual rank numbering sch = psched(procs-1) for i,sl in enumerate(sch): off = sl.step//2 # sending rank? if vrank >= sl.start \ and vrank < sl.stop \ and (vrank - sl.start)%sl.step == 0: self.C.comm.send(last, dest=rank+off, tag=i) # receiving rank? elif vrank >= sl.start+off \ and (vrank - sl.start-off)%sl.step == 0: u = self.C.comm.recv(source=rank-off, tag=i) if len(last) == 0: last = u elif len(u) != 0: last = [ f(u[0], last[0]) ] # else u == [] and last remains unchanged # distribute incoming prefix scan (if non-empty) if len(last) > 0: for i in range(len(pre)): pre[i] = f(last[0], pre[i]) return DFM(self.C, pre)
[docs] def collect(self, root=0): """Collect all the elements to the root rank. This is equivalent to reduce(extend, [], distribute=False), but uses MPI_Gather. Note: Even though non-root ranks receive None, they *still* have call this function. Check the dataset size before calling this, since this may cause a memory error. Args: root: The rank receiving the collected results. if root is None, then all ranks receive the result. Returns: List of elems if rank == root, None otherwise. """ if root is None: lE = self.C.comm.allgather(self.E) else: lE = self.C.comm.gather(self.E, root=root) if self.C.rank != root: return None ans = [] for x in lE: ans.extend(x) return ans
[docs] def nodeMap(self, f): """map over the MPI ranks. The input function, f, is called once on every rank with arguments: * rank : int * E : list containing all local elements Note: f must return a list Args: f: function of type = int, [elem] -> [new elems] Returns: DFM """ ans = f(self.C.rank, self.E) assert isinstance(ans, list), f"nodeMap: f must return a list (got {type(f)})" return DFM(self.C, ans)
[docs] def head(self, n=10): """Distribute the first n elements to all ranks (useful for interactive debugging) Returns: first n values, [elem] """ # create dfm with length of each rank ans = [] root = 0 while len(ans) < n: if root >= self.C.procs: break data = None if root == self.C.rank: data = self.E[:min(len(self.E), n - len(ans))] ans.extend( self.C.comm.bcast(data, root=root) ) root += 1 return ans
[docs] def repartition(self, llen, split, concat, N): """Repartition into N "equally distributed" items. Each element, `e`, is assumed to represent a collection of llen(e) items. The function `split` should split `e` up into blocks spanning the specified index ranges. Repartition will then communicate these blocks as needed so that each output partition has a list of its component blocks. The function ``concat`` will do the final join of all blocks composing each output element. Args: llen: function of type = e -> int returning the internal length of each element split: function of type = elem, [(i0,i1)] -> [e'] creating intermediate blocks to communicate concat: function of type = [e'] -> new elem building the final output elements Returns: DFM of new elems """ def run_split(e, idx): v = split(e, idx) assert isinstance(v, list), "Error: invalid split return value" assert len(v) == len(idx), "Error: invalid split return value" return v # cumulative sum of all lengths plen = self.map(llen).scan(lambda a,b: a+b).E # gather this directly, since the segments tell us the ranks # owning each slice of segments slen = self.C.comm.allgather( plen ) srank = [] # src rank for each segment ssum = [0] # global index starts for elements on ea. rank start_local = 0 for i,o in enumerate(slen): # loop over ranks if i < self.C.rank: start_local += len(o) srank.extend( [i]*len(o) ) ssum.extend( o ) #print(f"local for {self.C.rank} = {start_local}") # target elem-lens on return (heavy elems at end) tgt = list(reversed(even_spread(ssum[-1], N))) # rank of output elems (extra elems at start) orank = [] for i,n in enumerate(even_spread(N,self.C.procs)): orank.extend( [i]*n ) seg = segments(ssum, cumsum(tgt)) local = [] sched = [] cur = None # use these to accumulate indices loc = [] # to split current src elem, `cur` for i,s in enumerate(seg): ri = srank[s.src] ro = orank[s.dst] if ri == self.C.rank: if cur is None: cur = s.src elif cur != s.src: local.extend(run_split(self.E[cur-start_local], loc)) loc = [] cur = s.src loc.append( (s.s0,s.s1) ) sched.append((i, ri, ro, s.dst)) elif ro == self.C.rank: sched.append((i, ri, ro, s.dst)) if len(loc) > 0: local.extend(run_split(self.E[cur-start_local], loc)) newE = send_items(self.C, local, sched) return DFM(self.C, [concat(e) for e in newE])
[docs] def group(self, f, concat, N): """Group elements into `N` partitions. Each element, `e`, is assumed to represent a collection of things. The function, `f`, returns a dictionary mapping the new element number to an intermediate collection, `e'`. For example, if `e` is a list of names, the function def f(e, out): for name in e: i = ord(name[0].upper())-ord('A') if i not in out: out[i] = [] out[i].append(name) would break it up into groups by first-letter. Group will then communicate these new values as needed so that each output element has a list of its component blocks. The function ``concat`` will do the final join of all blocks composing each output element. Args: f: function of type = e, *{i: [e']} -> () modifying the dictionary to append all elements belonging to each key Note: this requires 0 <= i < N concat: function of type = [e'] -> new elem building the final output elements N: the number of output elements Returns: DFM of new elems """ dP = {} for e in self.E: f(e, dP) ans = gather_partitions(self.C, dP, N) del dP return DFM(self.C, [concat(a) for a in ans])
[docs]class Context: """Global context The context is a convenient place to store MPI attributes. Attributes: rank: rank of the current process (0, 1, ..., procs-1) procs: number of MPI ranks comm: MPI.COMM_WORLD MPI: mpi4py's MPI module """ def __init__(self): from mpi4py import MPI self.comm = MPI.COMM_WORLD self.rank = self.comm.Get_rank() self.procs = self.comm.Get_size() self.MPI = MPI
[docs] def iterates(self, n, robin=False): """Create a DFM from a sequence of numbers. Args: n: The number of iterates robin: If True, assignments are done round-robin, so rank 0 will have 0, procs, 2*procs, ... rank 1 will have 1, procs+1, 2*procs+1, ... Returns: DFM holding numbers 0, 1, ..., n-1 """ if robin: # round-robin is simpler, but destroys ordering return DFM(self, list(range(self.rank, n, self.procs))) blk = n // self.procs extra = n % self.procs elapsed = min(self.rank, extra) # extra elements prior to rank extra1 = self.rank < extra # do I have an extra element? i0 = blk*self.rank + elapsed return DFM(self, list(range(i0, i0+blk+extra1)))