[docs]classSimulatorAdapter(abc.ABC):def__init__(self,comm=None):""" :param comm: The mpi4py MPI communicator to use. Only nodes in the communicator will participate in the simulation. The first node will idle as the main node. """self.simdata:dict[Simulation,SimulationData]=dict()self.comm=MPIService(comm)self._controllers=[]self._duration=Noneself.current_checkpoint=0
[docs]defsimulate(self,*simulations,post_prepare=None):""" Simulate the given simulations. :param simulations: One or a list of simulation configurations to simulate. :type simulations: ~bsb.simulation.simulation.Simulation :param post_prepare: Optional callable to run after the simulations' preparation. :return: List of simulation results for each simulation run. :rtype: list[~bsb.simulation.results.SimulationResult] """withExitStack()ascontext:forsimulationinsimulations:context.enter_context(simulation.scaffold.storage.read_only())alldata=[]ifoptions.sim_console_progress:listener=FixedStepProgressController(simulations,self,options.sim_console_progress)self._controllers.append(listener)forsimulationinsimulations:data=self.prepare(simulation)alldata.append(data)forhookinsimulation.post_prepare:hook(self,simulation,data)ifpost_prepare:post_prepare(self,simulations,alldata)results=self.run(*simulations)returnself.collect(results)
[docs]@abc.abstractmethoddefprepare(self,simulation):""" Reset the simulation backend and prepare for the given simulation. :param simulation: The simulation configuration to prepare. :type simulation: ~bsb.simulation.simulation.Simulation :return: Prepared simulation data. :rtype: SimulationData """pass
[docs]@abc.abstractmethoddefrun(self,*simulations):""" Fire up the prepared adapter. :param simulations: One or a list of simulation configurations to simulate. :type simulations: ~bsb.simulation.simulation.Simulation :return: List of simulation results. :rtype: list[~bsb.simulation.results.SimulationResult] """pass
[docs]defget_next_checkpoint(self):whileself.current_checkpoint<self._duration:checkpoints=[controller.get_next_checkpoint()forcontrollerinself._controllers]# Filter out invalid "regressive" checkpoints,# and default to the end of the simulationchkp_noregressive=[checkpointforcheckpointincheckpointsifcheckpoint>self.current_checkpoint]self.current_checkpoint=min(chkp_noregressive,default=self._duration)participants=[self._controllers[i]fori,checkpointinenumerate(checkpoints)ifcheckpoint==self.current_checkpoint]yield(self.current_checkpoint,participants)
[docs]defcollect(self,results):""" Collect the output the simulations that completed. :return: Collected simulation results. :rtype: list[~bsb.simulation.results.SimulationResult] """forresultinresults:result.flush()returnresults
[docs]defload_controllers(self,simulation):forcomponentinsimulation.get_components():ifhasattr(component,"get_next_checkpoint"):ifhasattr(component,"run_checkpoint"):self._controllers.append(component)else:raiseAttributeMissingError(f"Device {component.name} is configured to be a controller "f"but run_checkpoint is not defined")