Source code for taylorism.examples

"""
Basic examples of how to use the module.

A more advanced example of use can be found in ``epygram``'s epy_conv.py tool.
"""

import time

from footprints import proxy as fpx

from taylorism import Worker, run_as_server
from .util import SharedNumpyArray


[docs]class Sleeper(Worker): """ Sample worker for tutorial or debugging purpose, that sleeps a given time. The over-loading of __init__() is not mandatory, but a possibility. """ _footprint = dict( info="Sleeps.", attr=dict( sleeping_time=dict( info="Sleeping time in s.", values=[0.001, 0.01, 0.1] + list(range(10)) + list(range(10, 65, 5)), type=float, ), wakeup_sentence=dict( info="What to say after sleep.", optional=True, access='rwx', default='Hello !', ), ) ) def _task(self): """ Actual task of the Sleeper is implemented therein. Return the report to be sent back to the Boss. """ time.sleep(self.sleeping_time) return ' '.join([self.wakeup_sentence, 'Woke up after', str(self.sleeping_time), "s sleep on cpu", str(self.binding())])
[docs]class Logarithmer(Worker): """ Computes the logarithm of an array. OK, this would be much more efficient with numpy.log(array), but it is just an example of using shared numpy arrays among workers. """ _footprint = dict( attr=dict( row=dict( info="Index of the row of the array on which the Worker is supposed to work.", type=int), array=dict( info="The shared-memory array on which to work on.", type=SharedNumpyArray, access='rwx') ) ) def _task(self): import numpy for j in range(self.array.shape[1]): self.array[self.row, j] = numpy.log(self.array[self.row, j])
[docs]class Summer(Worker): """ Computes a Sum, each Worker adding its value to a SharedNumpyArray. This needs and illustrates process-safetiness. """ _footprint = dict( attr=dict( value=dict( info="Value to be added by the worker.", type=int), shared_sum=dict( info="The shared-memory array on which to sum.", type=SharedNumpyArray, access='rwx'), use_lock=dict( info="Whether to use the lock (thread-safe) or not (may lead to a wrong result !).", type=bool) ) ) def _task(self): if self.use_lock: # acquire the lock, to be sure no other process is accessing the data meanwhile self.shared_sum.acquire() self.shared_sum[0] += self.value if self.use_lock: self.shared_sum.release() # release the lock
[docs]class MatrixProducter(Worker): """ Computes a Matrix Product C = A x B by hand, but parallel by blocks, using 3 SharedNumpyArray, each Worker being responsible for a [i1:i2, j1:j2] block of the resulting C matrix. Again, numpy matrix products may probably be more efficient... """ _footprint = dict( attr=dict( A=dict( info="The A shared-memory array matrix.", type=SharedNumpyArray, access='rwx'), B=dict( info="The B shared-memory array matrix.", type=SharedNumpyArray, access='rwx'), C=dict( info="The C shared-memory array matrix.", type=SharedNumpyArray, access='rwx'), i1=dict( info="The first index i of the the Worker is responsible for.", type=int), i2=dict( info="The last index i of the the Worker is responsible for.", type=int), j1=dict( info="The first index j of the the Worker is responsible for.", type=int), j2=dict( info="The first index j of the the Worker is responsible for.", type=int), ) ) def _task(self): for i in range(self.i1, self.i2 + 1): for j in range(self.j1, self.j2 + 1): self.C[i, j] = sum([self.A[i, k] * self.B[k, j] for k in range(self.A.shape[1])])
[docs]def sleepers_generic_program(verbose=True, scheduler=None): """Generic example: how to run and control the Boss.""" boss = run_as_server( common_instructions=dict(wakeup_sentence='Hello Dolly !'), individual_instructions=dict(sleeping_time=[4, 9, 2, 1]), scheduler=scheduler, verbose=verbose, ) time.sleep(5) print('Intermediate report:', boss.get_report()) boss.set_instructions(dict(), individual_instructions=dict(sleeping_time=[3, ])) boss.wait_till_finished() report = boss.get_report() for r in report['workers_report']: print(r)
[docs]def sleepers_example_laxist(verbose=True): """Example: assuming no selection of strategy for scheduling.""" sleepers_generic_program( verbose=verbose, scheduler=fpx.scheduler(nosort=True), )
[docs]def sleepers_example_threads(verbose=True): """Example: scheduling is driven by number of threads.""" sleepers_generic_program( verbose=verbose, scheduler=fpx.scheduler(limit='threads', max_threads=3), )
[docs]def sleepers_example_bindedthreads(verbose=True): """Example: scheduling is driven by number of threads and processes are binded.""" sleepers_generic_program( verbose=verbose, scheduler=fpx.scheduler(limit='threads', max_threads=3, binded=True), )
[docs]def sleepers_example_memory(verbose=True): """Example: scheduling is driven by memory consumption.""" sleepers_generic_program( verbose=verbose, scheduler=fpx.scheduler(limit='memory', memory_per_task=1.8), )
[docs]def sleepers_example_bindedmemory(verbose=True): """Example: scheduling is driven by memory consumption and processes are binded.""" sleepers_generic_program( verbose=verbose, scheduler=fpx.scheduler(limit='memory', binded=True), )
[docs]def logarithmer_example(verbose=True): """Example: how to use a numpy array, shared among workers.""" import numpy # sample initialization of the SharedNumpyArray nrows = 2 ncols = 3 a = SharedNumpyArray(numpy.ones((nrows, ncols))) for i in range(a.shape[0]): for j in range(a.shape[1]): a[i, j] = i + j print(a[...]) # run workers, each one dedicated to a row boss = run_as_server(common_instructions={}, individual_instructions=dict(row=list(range(nrows))), scheduler=fpx.scheduler(limit='threads', max_threads=nrows), verbose=verbose, sharedmemory_common_instructions=dict(array=a)) boss.wait_till_finished() print(a[...])
[docs]def summer_example(verbose=True, use_lock=True): """Example: how to use a process-safe numpy array, shared among workers.""" import numpy # sample initialization of the SharedNumpyArray n = 10 s = SharedNumpyArray(numpy.zeros((1,), dtype=int)) # run workers, each one dedicated to a value boss = run_as_server(common_instructions=dict(use_lock=use_lock), individual_instructions=dict(value=list(range(n))), # maximize the number of simultaneous threads to test the importance of the lock scheduler=fpx.scheduler(limit='threads', max_threads=n), verbose=verbose, sharedmemory_common_instructions=dict(shared_sum=s)) boss.wait_till_finished() print(str(s[0]) + '==' + str(sum(range(n))))
[docs]def matrixproduct_example(A_shape=(6, 7), B_shape=(7, 8), iblocks=2, jblocks=4, verbose=True): """ Example: how to use several numpy array shared among workers. :param A_shape: size of the A matrix :param B_shape: size of the B matrix :param iblocks: number of blocks in direction i of result matrix; must divide A_shape[0] :param jblocks: number of blocks in direction j of result matrix; must divide B_shape[1] """ import numpy assert A_shape[1] == B_shape[0] assert A_shape[0] % iblocks == 0 and B_shape[1] % jblocks == 0 iblocksize = A_shape[0] // iblocks jblocksize = B_shape[1] // jblocks # sample initialization of the SharedNumpyArray A = SharedNumpyArray(numpy.random.random(A_shape)) B = SharedNumpyArray(numpy.random.random(B_shape)) C = SharedNumpyArray(numpy.zeros((A.shape[0], B.shape[1]))) # build blocks indices, other cuts could be possible of course... i1 = list(range(0, C.shape[0], iblocksize)) i2 = list(i1[1:] + [C.shape[0] - 1]) j1 = list(range(0, C.shape[1], jblocksize)) j2 = list(j1[1:] + [C.shape[1] - 1]) t0 = time.time() # constitute each workers instructions indexes = dict(i1=[], i2=[], j1=[], j2=[]) for i in range(len(i1)): for j in range(len(j1)): indexes['i1'].append(i1[i]) indexes['i2'].append(i2[i]) indexes['j1'].append(j1[j]) indexes['j2'].append(j2[j]) # run workers, each one dedicated to a block boss = run_as_server(common_instructions=dict(), individual_instructions=indexes, scheduler=fpx.scheduler(limit='threads', max_threads=4), verbose=verbose, sharedmemory_common_instructions=dict(A=A, B=B, C=C)) boss.wait_till_finished() print('Exec in:', time.time() - t0) print(C[:, :])