bsb.services package

Submodules

bsb.services.mpi module

class bsb.services.mpi.MPIModule(module)[source]

Bases: MockModule

Module provider of the MPI interface.

property COMM_WORLD
class bsb.services.mpi.MPIService(comm=None)[source]

Bases: object

Interface for MPI Communication context.

This class will also emulate MPI Communication context in single node context.

abort(errorcode=1)[source]
allgather(obj)[source]
barrier()[source]
bcast(obj, root=0)[source]
gather(obj, root=0)[source]
get_communicator()[source]
get_rank()[source]
get_size()[source]
window(buffer)[source]

bsb.services.mpilock module

class bsb.services.mpilock.Fence(access)[source]

Bases: object

collect()[source]
guard()[source]
share(obj)[source]
exception bsb.services.mpilock.FencedSignal[source]

Bases: Exception

class bsb.services.mpilock.MPILockModule(module)[source]

Bases: MockModule

sync(comm=None, master=0)[source]
class bsb.services.mpilock.MockedWindowController(comm=None, master=0)[source]

Bases: object

close()[source]
property closed
property master
property rank
read()[source]
single_write(handle=None, rank=None)[source]
write()[source]

bsb.services.pool module

Job pooling module.

Jobs derive from the base Job class which can be put on the queue of a JobPool. In order to submit themselves to the pool Jobs will serialize() themselves into a predefined set of variables:

job.serialize() -> (job_type, f, args, kwargs)
  • job_type should be a string that is a class name defined in this module.

    (e.g. "PlacementJob")

  • f should be the function object that the job’s execute method should

    execute.

  • args and kwargs are the args to be passed to that f.

The Job.execute() handler can help interpret args and kwargs before running f. The execute handler has access to the scaffold on the MPI process so one best serializes just the name of some part of the configuration, rather than trying to pickle the complex objects. For example, the PlacementJob uses the first args element to store the PlacementStrategy name and then retrieve it from the scaffold:

@staticmethod
def execute(job_owner, f, args, kwargs):
    placement = job_owner.placement[args[0]]
    indicators = placement.get_indicators()
    return f(placement, *args[1:], indicators, **kwargs)

A job has a couple of display variables that can be set: _cname for the class name, _name for the job name and _c for the chunk. These are used to display what the workers are doing during parallel execution. This is an experimental API and subject to sudden change in the future.

class bsb.services.pool.ConnectivityJob(pool, strategy, pre_roi, post_roi, deps=None)[source]

Dispatches the execution of a chunk of a connectivity strategy through a JobPool.

static execute(job_owner, args, kwargs)[source]

Job handler.

class bsb.services.pool.FunctionJob(pool, f, args, kwargs, deps=None, cache_items=None, **context)[source]
static execute(job_owner, args, kwargs)[source]

Job handler.

class bsb.services.pool.Job(pool, submission_context: SubmissionContext, args, kwargs, deps=None, cache_items=None)[source]

Dispatches the execution of a function through a JobPool.

Parameters:

submission_context (SubmissionContext)

cancel(reason: str | None = None)[source]
Parameters:

reason (str | None)

change_status(status: JobStatus)[source]
Parameters:

status (JobStatus)

property context
property description
property error
abstractmethod static execute(job_owner, args, kwargs)[source]

Job handler.

property name
on_completion(cb)[source]
property result
run(timeout=None)[source]

Execute the job on the current process, in a thread, and return whether the job is still running.

serialize()[source]

Convert the job to a (de)serializable representation.

set_exception(e: Exception)[source]
Parameters:

e (Exception)

set_result(value)[source]
property status
property submitter
exception bsb.services.pool.JobErroredError(message, error)[source]
class bsb.services.pool.JobPool(id, scaffold, fail_fast=False, workflow: Workflow = None)[source]
Parameters:

workflow (Workflow)

add_listener(listener, max_wait=None)[source]
add_notification(notification: PoolProgress)[source]
Parameters:

notification (PoolProgress)

change_status(status: PoolStatus)[source]
Parameters:

status (PoolStatus)

execute(return_results=False)[source]

Execute the jobs in the queue.

In serial execution this runs all the jobs in the queue in First In First Out order. In parallel execution this enqueues all jobs into the MPIPool unless they have dependencies that need to complete first.

classmethod get_owner(id)[source]
get_required_cache_items()[source]

Returns the list of cache functions for all the jobs in the queue.

Returns:

set of cache function name

Return type:

set[int]

get_submissions_of(submitter)[source]
classmethod get_tmp_folder(id)[source]
is_main()[source]
property jobs: list[Job]
notify()[source]
property owner
property parallel
ping()[source]
queue(f, args=None, kwargs=None, deps=None, **context)[source]
queue_connectivity(strategy, pre_roi, post_roi, deps=None)[source]
queue_placement(strategy, chunk, deps=None)[source]
raise_unhandled()[source]
schedule(nodes, scheduler=None)[source]
property scheduling
property status
property workflow
class bsb.services.pool.JobStatus(value)[source]
ABORTED = 'aborted'
CANCELLED = 'cancelled'
FAILED = 'failed'
PENDING = 'pending'
QUEUED = 'queued'
RUNNING = 'running'
SUCCESS = 'success'
class bsb.services.pool.PlacementJob(pool, strategy, chunk, deps=None)[source]

Dispatches the execution of a chunk of a placement strategy through a JobPool.

static execute(job_owner, args, kwargs)[source]

Job handler.

class bsb.services.pool.PoolJobAddedProgress(pool: JobPool, job: Job)[source]
Parameters:
property job
class bsb.services.pool.PoolJobUpdateProgress(pool: JobPool, job: Job, old_status: JobStatus)[source]
Parameters:
property job
property old_status
property status
class bsb.services.pool.PoolProgress(pool: JobPool, reason: PoolProgressReason)[source]

Class used to report pool progression to listeners.

Parameters:
property jobs
property reason
property status
property workflow
class bsb.services.pool.PoolProgressReason(value)[source]
JOB_ADDED = 2
JOB_STATUS_CHANGE = 3
MAX_TIMEOUT_PING = 4
POOL_STATUS_CHANGE = 1
class bsb.services.pool.PoolStatus(value)[source]
CLOSING = 'closing'
EXECUTING = 'executing'
SCHEDULING = 'scheduling'
class bsb.services.pool.PoolStatusProgress(pool: JobPool, old_status: PoolStatus)[source]
Parameters:
class bsb.services.pool.SubmissionContext(submitter, chunks=None, **kwargs)[source]

Context information on who submitted a certain job.

property chunks
property context
property name
property submitter
class bsb.services.pool.Workflow(phases: list[str])[source]
Parameters:

phases (list[str])

property finished
next_phase()[source]
property phase
property phases
exception bsb.services.pool.WorkflowError[source]
bsb.services.pool.dispatcher(pool_id, job_args)[source]

The dispatcher is the function that gets pickled on main, and unpacked “here” on the worker. Through class variables on JobPool and the given pool_id we can find the pool and scaffold object, and the job function to run.

Before running a job, the cache is checked for eventual cached items to free up.

bsb.services.pool.free_stale_pool_cache(scaffold, required_cache_items: set[int])[source]
Parameters:

required_cache_items (set[int])

bsb.services.pool.get_node_cache_items(node)[source]
bsb.services.pool.pool_cache(caching_function)[source]

Developer modules

class bsb.services._util.ErrorModule(message)[source]

Bases: object

class bsb.services._util.MockModule(module)[source]

Bases: object

class bsb.services._util._ExceptionT_co