#
# Utility classes to perform multiple model evaluations sequentially or in
# parallell.
#
# This file is part of PINTS (https://github.com/pints-team/pints/) which is
# released under the BSD 3-clause license. See accompanying LICENSE.md for
# copyright notice and full license details.
#
# Some code in this file was adapted from Myokit (see http://myokit.org)
#
import gc
import os
import multiprocessing
import queue
import sys
import time
import traceback
import numpy as np
import threadpoolctl
[docs]def evaluate(f, x, parallel=False, args=None):
"""
Evaluates the function ``f`` on every value present in ``x`` and returns
a sequence of evaluations ``f(x[i])``.
It is possible for the evaluation of ``f`` to involve the generation of
random numbers (using numpy). In this case, the results from calling
``evaluate`` can be made reproducible by first seeding numpy's generator
with a fixed number. However, a call with ``parallel=True`` will use a
different (but consistent) sequence of random numbers than a call with
``parallel=False``.
Parameters
----------
f : callable
The function to evaluate, called as ``f(x[i], *args)``.
x
A list of values to evaluate ``f`` with
parallel : boolean
Run in parallel or not.
If set to ``True``, the evaluations will happen in parallel using a
number of worker processes equal to the detected cpu core count. The
number of workers can be set explicitly by setting ``parallel`` to an
integer greater than 0.
Parallelisation can be disabled by setting ``parallel`` to ``0`` or
``False``.
args : sequence
Optional extra arguments to pass into ``f``.
"""
if parallel is True:
n_workers = max(min(ParallelEvaluator.cpu_count(), len(x)), 1)
evaluator = ParallelEvaluator(f, n_workers=n_workers, args=args)
elif parallel >= 1:
evaluator = ParallelEvaluator(f, n_workers=int(parallel), args=args)
else:
evaluator = SequentialEvaluator(f, args=args)
return evaluator.evaluate(x)
[docs]class Evaluator(object):
"""
Abstract base class for classes that take a function (or callable object)
``f(x)`` and evaluate it for list of input values ``x``.
This interface is shared by a parallel and a sequential implementation,
allowing easy switching between parallel or sequential implementations of
the same algorithm.
It is possible for the evaluation of ``f`` to involve the generation of
random numbers (using numpy). In this case, the results from calling
``evaluate`` can be made reproducible by first seeding numpy's generator
with a fixed number. However, different ``Evaluator`` implementations may
use a different random sequence. In other words, each Evaluator can be made
to return consistent results, but the results returned by different
Evaluators may vary.
Parameters
----------
function : callable
A function or other callable object ``f`` that takes a value ``x`` and
returns an evaluation ``f(x)``.
args : sequence
An optional sequence of extra arguments to ``f``. If ``args`` is
specified, ``f`` will be called as ``f(x, *args)``.
"""
def __init__(self, function, args=None):
# Check function
if not callable(function):
raise ValueError('The given function must be callable.')
self._function = function
# Check args
if args is None:
self._args = ()
else:
try:
len(args)
except TypeError:
raise ValueError(
'The argument `args` must be either None or a sequence.')
self._args = args
[docs] def evaluate(self, positions):
"""
Evaluate the function for every value in the sequence ``positions``.
Returns a list with the returned evaluations.
"""
try:
len(positions)
except TypeError:
raise ValueError(
'The argument `positions` must be a sequence of input values'
' to the evaluator\'s function.')
return self._evaluate(positions)
def _evaluate(self, positions):
""" See :meth:`evaluate()`. """
raise NotImplementedError
[docs]class ParallelEvaluator(Evaluator):
"""
Evaluates a single-valued function object for any set of input values
given, using all available cores.
Shares an interface with the :class:`SequentialEvaluator`, allowing
parallelism to be switched on and off with minimal hassle. Parallelism
takes a little time to be set up, so as a general rule of thumb it's only
useful for if the total run-time is at least ten seconds (anno 2015).
By default, the number of processes ("workers") used to evaluate the
function is set equal to the number of CPU cores reported by python's
``multiprocessing`` module. To override the number of workers used, set
``n_workers`` to some integer greater than ``0``.
There are two important caveats for using multiprocessing to evaluate
functions:
1. Processes don't share memory. This means the function to be
evaluated will be duplicated (via pickling) for each process (see
`Avoid shared state <http://docs.python.org/2/library/\
multiprocessing.html#all-platforms>`_ for details).
2. On windows systems your code should be within an
``if __name__ == '__main__':`` block (see `Windows
<https://docs.python.org/2/library/multiprocessing.html#windows>`_
for details).
The evaluator will keep it's subprocesses alive and running until it is
tidied up by garbage collection.
Note that while this class uses multiprocessing, it is not thread/process
safe itself: It should not be used by more than a single thread/process at
a time.
Extends :class:`Evaluator`.
Parameters
----------
function
The function to evaluate
n_workers
The number of worker processes to use. If left at the default value
``n_workers=None`` the number of workers will equal the number of CPU
cores in the machine this is run on. In many cases this will provide
good performance.
max_tasks_per_worker
Python garbage collection does not seem to be optimized for
multi-process function evaluation. In many cases, some time can be
saved by refreshing the worker processes after every
``max_tasks_per_worker`` evaluations. This number can be tweaked for
best performance on a given task / system.
n_numpy_threads
Numpy and other scientific libraries may make use of threading in C or
C++ based BLAS libraries, which can interfere with PINTS
multiprocessing and cause slower execution. To prevent this, the number
of threads to use will be limited to 1 by default, using the
``threadpoolctl`` module. To use the current numpy default instead, set
``n_numpy_threads`` to ``None``, to use the BLAS/OpenMP etc. defaults,
set ``n_numpy_threads`` to ``0``, or to use a specific number of
threads pass in any integer greater than 1.
args
An optional sequence of extra arguments to ``f``. If ``args`` is
specified, ``f`` will be called as ``f(x, *args)``.
"""
def __init__(
self, function,
n_workers=None,
max_tasks_per_worker=500,
n_numpy_threads=1,
args=None):
super(ParallelEvaluator, self).__init__(function, args)
# Determine number of workers
if n_workers is None:
self._n_workers = ParallelEvaluator.cpu_count()
else:
self._n_workers = int(n_workers)
if self._n_workers < 1:
raise ValueError(
'Number of workers must be an integer greater than 0 or'
' `None` to use the default value.')
# Create empty set of workers
self._workers = []
# Maximum tasks per worker (for some reason, this saves memory)
self._max_tasks = int(max_tasks_per_worker)
if self._max_tasks < 1:
raise ValueError(
'Maximum tasks per worker should be at least 1 (but probably'
' much greater).')
# Maximum number of numpy threads to use. See the _Worker class.
self._n_numpy_threads = n_numpy_threads
# Queue with tasks
# Each task is stored as a tuple (id, seed, argument)
self._tasks = multiprocessing.Queue()
# Queue with results
self._results = multiprocessing.Queue()
# Queue used to add an exception object and context to
self._errors = multiprocessing.Queue()
# Flag set if an error is encountered
self._error = multiprocessing.Event()
def __del__(self):
# Cancel everything
try:
self._stop()
except Exception:
pass
def _clean(self):
"""
Cleans up any dead workers & return the number of workers tidied up.
"""
cleaned = 0
for k in range(len(self._workers) - 1, -1, -1):
w = self._workers[k]
if w.exitcode is not None: # pragma: no cover
w.join()
cleaned += 1
del(self._workers[k], w)
if cleaned: # pragma: no cover
gc.collect()
return cleaned
[docs] @staticmethod
def cpu_count():
"""
Uses the multiprocessing module to guess the number of available cores.
For machines with simultaneous multithreading ("hyperthreading") this
will return the number of virtual cores.
"""
return max(1, multiprocessing.cpu_count())
def _populate(self):
"""
Populates (but usually repopulates) the worker pool.
"""
for k in range(self._n_workers - len(self._workers)):
w = _Worker(
self._function,
self._args,
self._tasks,
self._results,
self._max_tasks,
self._n_numpy_threads,
self._errors,
self._error,
)
self._workers.append(w)
w.start()
def _evaluate(self, positions):
"""
Evaluate all tasks in parallel, in batches of size self._max_tasks.
"""
# Ensure task and result queues are empty
# For some reason these lines block when running on windows
# if not (self._tasks.empty() and self._results.empty()):
# raise Exception('Unhandled tasks/results left in queues.')
# Clean up any dead workers
self._clean()
# Ensure worker pool is populated
self._populate()
# Generate seeds for numpy random number generators.
# This ensures that:
# 1. Each process has a randomly selected random number generator
# state, instead of inheriting the state from the calling process.
# 2. If the calling process has a seeded number generator, the random
# sequences within each task will be reproducible. Note that we
# cannot achieve this by seeding the worker processes once, as the
# allocation of tasks to workers is not deterministic.
# The upper bound is chosen to get a wide range and still work on all
# systems. Windows, in particular, seems to insist on a 32 bit int even
# in Python 3.9.
seeds = np.random.randint(0, 2**16, len(positions))
# Start
try:
# Enqueue all tasks (non-blocking)
for k, x in enumerate(positions):
self._tasks.put((k, seeds[k], x))
# Collect results (blocking)
n = len(positions)
m = 0
results = [0] * n
while m < n and not self._error.is_set():
time.sleep(0.001) # This is really necessary
# Retrieve all results
try:
while True:
i, f = self._results.get(block=False)
results[i] = f
m += 1
except queue.Empty:
pass
# Clean dead workers
if self._clean(): # pragma: no cover
# Repolate
self._populate()
except (IOError, EOFError): # pragma: no cover
# IOErrors can originate from the queues as a result of issues in
# the subprocesses. Check if the error flag is set. If it is, let
# the subprocess exception handling deal with it. If it isn't,
# handle it here.
if not self._error.is_set():
self._stop()
raise
# TODO: Maybe this should be something like while(error is not set)
# wait for it to be set, then let the subprocess handle it...
except (Exception, SystemExit, KeyboardInterrupt): # pragma: no cover
# All other exceptions, including Ctrl-C and user triggered exits
# should (1) cause all child processes to stop and (2) bubble up to
# the caller.
self._stop()
raise
# Error in worker threads
if self._error.is_set():
errors = self._stop()
# Raise exception
if errors:
pid, trace = errors[0]
raise Exception(
'Exception in subprocess:\n' + trace
+ '\nException in subprocess')
else:
# Don't think this is reachable!
raise Exception(
'Unknown exception in subprocess.') # pragma: no cover
# Return results
return results
def _stop(self):
"""
Forcibly halts the workers
"""
time.sleep(0.1)
# Terminate workers
for w in self._workers:
if w.exitcode is None:
w.terminate()
for w in self._workers:
if w.is_alive():
w.join()
self._workers = []
# Clear queues
def clear(q):
items = []
try:
while True:
items.append(q.get(timeout=0.1))
except (queue.Empty, IOError, EOFError):
pass
return items
clear(self._tasks)
clear(self._results)
errors = clear(self._errors)
# Create new queues & error event
self._tasks = multiprocessing.Queue()
self._results = multiprocessing.Queue()
self._errors = multiprocessing.Queue()
self._error = multiprocessing.Event()
# Free memory
gc.collect()
# Return errors
return errors
[docs]class SequentialEvaluator(Evaluator):
"""
Evaluates a function (or callable object) for a list of input values, and
returns a list containing the calculated function evaluations.
Runs sequentially, but shares an interface with the
:class:`ParallelEvaluator`, allowing parallelism to be switched on/off.
Extends :class:`Evaluator`.
Parameters
----------
function : callable
The function to evaluate.
args : sequence
An optional tuple containing extra arguments to ``f``. If ``args`` is
specified, ``f`` will be called as ``f(x, *args)``.
"""
def __init__(self, function, args=None):
super(SequentialEvaluator, self).__init__(function, args)
def _evaluate(self, positions):
scores = [0] * len(positions)
for k, x in enumerate(positions):
scores[k] = self._function(x, *self._args)
return scores
#
# Note: For Windows multiprocessing to work, the _Worker can never be a nested
# class!
#
class _Worker(multiprocessing.Process):
"""
Worker class for use with :class:`ParallelEvaluator`.
Evaluates a single-valued function for every point in a ``tasks`` queue
and places the results on a ``results`` queue.
Keeps running until it's given the string "stop" as a task.
Extends ``multiprocessing.Process``.
Parameters
----------
function : callable
The function to optimize.
args : sequence
A (possibly empty) tuple containing extra input arguments to the
objective function.
tasks
The queue to read tasks from. Tasks are stored as tuples
``(i, s, x)`` where ``i`` is a task id, ``s`` is a seed for numpy's
random number generator, and ``x`` is the argument to evaluate.
results
The queue to store results in. Results are stored as
tuples ``(i, p, r)`` where ``i`` is the task id, ``p`` is
the position evaluated (which can be updated by the
refinement method!) and ``r`` is the result at ``p``.
max_tasks : int
The maximum number of tasks to perform before dying.
max_threads : int
The number of numpy BLAS (or other threadpoolctl controlled) threads to
allow. Use ``None`` to leave current settings unchanged, use ``0`` to
let the C-library's use their own defaults, use ``1`` to disable this
type of threading (recommended), or use any larger integer to set a
specific number.
errors
A queue to store exceptions on
error
This flag will be set by the worker whenever it encounters an
error.
"""
def __init__(
self, function, args, tasks, results, max_tasks, max_threads,
errors, error):
super(_Worker, self).__init__()
self.daemon = True
self._function = function
self._args = args
self._tasks = tasks
self._results = results
self._max_tasks = max_tasks
self._max_threads = \
None if max_threads is None else max(0, int(max_threads))
self._errors = errors
self._error = error
def run(self):
# Worker processes should never write to stdout or stderr.
# This can lead to unsafe situations if they have been redirected to
# a GUI task such as writing to the IDE console.
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')
try:
with threadpoolctl.threadpool_limits(self._max_threads):
for k in range(self._max_tasks):
i, seed, x = self._tasks.get()
np.random.seed(seed)
f = self._function(x, *self._args)
self._results.put((i, f))
# Check for errors in other workers
if self._error.is_set():
return
except (Exception, KeyboardInterrupt, SystemExit):
self._errors.put((self.pid, traceback.format_exc()))
self._error.set()