Source code for taylorism.schedulers

"""
Contains classes for Schedulers.

Among a set of instructions to be passed to a Worker, and according to its own
criteria, the Scheduler determine at the current moment the ones that can be
launched right now simultaneously, and those that must be delayed.

A scheduler hence basically has one method:
launchable(pending_instructions, workers, report).

Its parameters (constant among execution) can be attributed in its constructor.
Other quantities, variables among execution, must be available within
*workers* (work being done) and *report* (work done).

A set of basic schedulers is given.

Starting from version 1.0.7, schedulers should be created using the footprints
package::

    import footprints as fp
    # In order to create a NewMaxThreadsScheduler scheduler:
    mt_sched = fp.proxy.scheduler(limit='threads', max_threads=2)

Compatibility classes are still provided (see :class:`LaxistScheduler`,
:class:`MaxThreadsScheduler`, :class:`MaxMemoryScheduler` and
:class:`SingleOpenFileScheduler`) but they should not be used anymore.

Dependencies
------------

:mod:`footprints` (MF package)
"""

import footprints
from footprints import FootprintBase
from bronx.fancies import loggers
from bronx.system import cpus, memory
from bronx.syntax.decorators import secure_getattr

import multiprocessing

logger = loggers.getLogger(__name__)

MAX_NUMBER_PROCESSES = 512


[docs]class BaseScheduler(FootprintBase): """Abstract base class for schedulers.""" _abstract = True _collector = ('scheduler',) _footprint = dict( attr=dict( identity=dict( info="Scheduler identity.", optional=True, default='anonymous', ), ) )
[docs] def launchable(self, pending_instructions, workers, report): """ Split *pending_instructions* into "launchable" and "not_yet_launchable" instructions according to the scheduler own rules. For that purpose and in a generic manner, the scheduler may need: - *pending_instructions*: todo - *workers*: being done - *report*: done. """ raise NotImplementedError('launchable() method must be implemented in \ inheritant classes. (BaseScheduler is abstract).')
def _all_tickets(self): return {None} def _workers_hooks(self): """Return a list of callbacks to be triggered before workers task processing.""" return list() def _assign_tickets(self, workers, launchable): """Assign available tickets in **launchable** instructions.""" assigned_tickets = {w.scheduler_ticket for w in workers.values()} possible_tickets = sorted(self._all_tickets() - assigned_tickets) for instructions in launchable: possible_tickets.append(None) instructions.update( scheduler_ticket=possible_tickets.pop(0), scheduler_hooks=self._workers_hooks(), ) return launchable
[docs]class NewLaxistScheduler(BaseScheduler): """No sorting is done !""" _footprint = dict( attr=dict( nosort=dict( alias=('laxist', 'unsorted'), values=(True,), type=bool, ), ) )
[docs] def launchable(self, pending_instructions, workers, report): """Very crude strategy: any pending instruction could be triggered.""" launchable = self._assign_tickets(workers, pending_instructions) return launchable, list()
[docs]class NewLimitedScheduler(BaseScheduler): """ A scheduler that dequeue the pending list as long as a maximum number of simultaneous tasks (*max_threads*) is not reached. """ _abstract = True, _footprint = dict( attr=dict( limit=dict( values=['threads', 'memory', 'mem', 'threads+memory'], remap=dict(mem='memory'), ), ) )
#: Abstract footprint attribute for binding aware schedulers _binded_fpattr = footprints.Footprint(info='Abstract binded attribute', attr=dict(binded=dict(type=bool, info="Binds the process to a single cpu.", default=False, optional=True)))
[docs]def binding_setup(worker): """Bind a *worker* to its *scheduler_ticket* compute core.""" cpusinfo = cpus.LinuxCpusInfo() cpuslist = list(cpusinfo.socketpacked_cpulist()) binded_cpu = cpuslist[worker.scheduler_ticket % cpusinfo.nvirtual_cores] cpus.set_affinity(binded_cpu)
[docs]def BindingAwareScheduler(cls): """ A class decorator that wraps the original _workers_hooks method to add binding's setup method to the list Worker's hooks. NB: The class' footprint should include a 'binded' attribute. """ # Wrap _workers_hooks original_hooks = getattr(cls, '_workers_hooks') def new_hooks(self): hookslist = original_hooks(self) if getattr(self, 'binded', False): hookslist.append(binding_setup) return hookslist cls._workers_hooks = new_hooks return cls
[docs]@BindingAwareScheduler class NewMaxThreadsScheduler(NewLimitedScheduler): """ A basic scheduler that dequeue the pending list as long as a maximum number of simultaneous tasks (*max_threads*) is not reached. """ _footprint = [ _binded_fpattr, dict( attr=dict( limit=dict( values=['threads', 'processes'], remap=dict(processes='threads'), ), max_threads=dict( alias=('maxpc', 'maxthreads'), remap={0: multiprocessing.cpu_count() / 2}, type=int, ), ) ) ] def _all_tickets(self): """The actual range of available tickets is limited by a maximum number of threads.""" return set(range(0, self.max_threads))
[docs] def launchable(self, pending_instructions, workers, report): """Limit strategy: only max_threads processes could run simultaneously.""" available_threads = self.max_threads - len(workers) launchable = pending_instructions[0:max(available_threads, 0)] not_yet_launchable = pending_instructions[max(available_threads, 0):] launchable = self._assign_tickets(workers, launchable) return launchable, not_yet_launchable
[docs]@BindingAwareScheduler class NewMaxMemoryScheduler(NewLimitedScheduler): """ A basic scheduler that dequeue the pending list as long as a critical memory level (according to 'memory' element of workers instructions (in MB) and total system memory) is not reached. """ _footprint = [ _binded_fpattr, dict( attr=dict( limit=dict( values=['memory', 'mem'], remap=dict(mem='memory'), ), max_memory=dict( info="Amount of usable memroy (in MiB)", optional=True, type=float, access='rwx', ), memory_per_task=dict( info=("If a worker do not provide any information on memory, " + "request at least *memory_per_task* MiB of memory."), optional=True, default=2048., type=float, ), memory_max_percentage=dict( info=("Max memory level as a fraction of the total" + "system memory (used only if max_memroy is not provided)."), optional=True, default=0.75, type=float, ), ) ) ] def __init__(self, *args, **kw): """Setup the maximum available memory.""" super().__init__(*args, **kw) if self.max_memory is None: # memory tools are all but generic, they might fail ! try: system_mem = memory.LinuxMemInfo().system_RAM('MiB') except OSError: raise OSError("Unable to determine the total system's memory size.") self.max_memory = self.memory_max_percentage * system_mem def _all_tickets(self): """The actual range of available tickets is limited by a maximum number of threads.""" return set(range(0, MAX_NUMBER_PROCESSES))
[docs] def launchable(self, pending_instructions, workers, report): """Limit strategy: only processes that fit in a given amount of memory could run.""" used_memory = sum([w.memory or self.memory_per_task for w in workers.values()]) launchable = list() not_yet_launchable = list() for instructions in pending_instructions: actual_memory = instructions.get('memory', self.memory_per_task) if used_memory + actual_memory < self.max_memory: launchable.append(instructions) used_memory += actual_memory else: not_yet_launchable.append(instructions) launchable = self._assign_tickets(workers, launchable) return launchable, not_yet_launchable
[docs]class LongerFirstScheduler(NewMaxMemoryScheduler): """ A scheduler based on the NewMaxMemory Scheduler. It aims at launching as soon as possible the workers that are expected to have the longest run-time (whilst there is enough available memory). Workers needs to have 2 attributes: * expected_time representing the expected run time * memory representing the expected memory consumed This scheduler is typically used with Bateur workers, in vortex's src/common/algo/odbtools.py (for parallel BATOR run). """ _footprint = dict( attr=dict( limit=dict( values=['threads+memory'], ), max_threads=dict( alias=('maxpc', 'maxthreads'), remap={0: multiprocessing.cpu_count() / 2}, type=int, ), ) ) def _all_tickets(self): """The actual range of available tickets is limited by a maximum number of threads.""" return set(range(0, self.max_threads))
[docs] def launchable(self, pending_instructions, workers, report): """Limit strategy: only processes that fit in a given amount of memory could run.""" available_threads = self.max_threads - len(workers) used_memory = sum([w.memory or self.memory_per_task for w in workers.values()]) available_memory = self.max_memory - used_memory launchable = [] not_yet_launchable = [] # sort by decreasing time pending_instructions.sort(key=lambda tup: tup.get('expected_time', 0), reverse=True) for instructions in pending_instructions: actual_memory = instructions.get('memory', self.memory_per_task) if (actual_memory <= available_memory) and (available_threads > 0): launchable.append(instructions) available_memory -= actual_memory available_threads -= 1 else: not_yet_launchable.append(instructions) launchable = self._assign_tickets(workers, launchable) return launchable, not_yet_launchable
[docs]class NewSingleOpenFileScheduler(NewMaxThreadsScheduler): """ Ensure that files will not be open 2 times simultaneously by 2 workers. And with a maximum threads number. """ _footprint = dict( attr=dict( singlefile=dict( values=(True,), type=bool, ), ) )
[docs] def launchable(self, pending_instructions, workers, report): """Limit strategy: deal with what looks like a file locking.""" launchable = [] not_yet_launchable = [] open_files = [w.fileA for w in workers.values()] + [w.fileB for w in workers.values()] # check files are not already open by other worker for pi in pending_instructions: if pi['fileA'] in open_files or pi['fileB'] in open_files: not_yet_launchable.append(pi) else: launchable.append(pi) # and finally sort with regards to MaxThreads (launchable, nyl) = super().launchable(launchable, workers, report) not_yet_launchable.extend(nyl) launchable = self._assign_tickets(workers, launchable) return launchable, not_yet_launchable
# ------------------------------------------------------------------------------ # The following classes are kept for backward compatibility. From now and on, one # should abstain to use them. class _AbstractOldSchedulerProxy: """the abstract class of deprecated scheduler objects.""" _TARGET_CLASS = None def __init__(self, *kargs, **kwargs): if self._TARGET_CLASS is None: raise RuntimeError('_TARGET_CLASS needs to be defined') logger.warning('The %s class is deprecated. ' + 'Instead, use the footprint mechanism to create schedulers.', self.__class__.__name__) self.__target_scheduler = self._TARGET_CLASS(*kargs, **kwargs) super().__init__() @secure_getattr def __getattr__(self, name): return getattr(self.__target_scheduler, name)
[docs]class LaxistScheduler(_AbstractOldSchedulerProxy): """Deprecated class: should not be used from now and on.""" _TARGET_CLASS = NewLaxistScheduler def __init__(self): super().__init__(nosort=True)
[docs]class MaxThreadsScheduler(_AbstractOldSchedulerProxy): """Deprecated class: should not be used from now and on.""" _TARGET_CLASS = NewMaxThreadsScheduler def __init__(self, max_threads=0): super().__init__(limit='threads', max_threads=max_threads)
[docs]class MaxMemoryScheduler(_AbstractOldSchedulerProxy): """Deprecated class: should not be used from now and on.""" _TARGET_CLASS = NewMaxMemoryScheduler def __init__(self, max_memory_percentage=0.75, total_system_memory=None): """ :param float max_memory_percentage: Max memory level as a fraction of the total system memory :param float total_system_memory: Memory available on this system (in MiB) """ if total_system_memory is not None: max_memory = total_system_memory * max_memory_percentage else: max_memory = None super().__init__(limit='memory', max_memory=max_memory, memory_max_percentage=max_memory_percentage)
[docs]class SingleOpenFileScheduler(_AbstractOldSchedulerProxy): """Deprecated class: should not be used from now and on.""" _TARGET_CLASS = NewSingleOpenFileScheduler def __init__(self, max_threads=0): super().__init__(limit='threads', max_threads=max_threads, singlefile=True)