#
# Sub-module containing MCMC inference routines
#
# This file is part of PINTS (https://github.com/pints-team/pints/) which is
# released under the BSD 3-clause license. See accompanying LICENSE.md for
# copyright notice and full license details.
#
import os
import pints
import numpy as np
[docs]class MCMCSampler(pints.Loggable, pints.TunableMethod):
"""
Abstract base class for (single or multi-chain) MCMC methods.
All MCMC samplers implement the :class:`pints.Loggable` and
:class:`pints.TunableMethod` interfaces.
"""
[docs] def name(self):
"""
Returns this method's full name.
"""
raise NotImplementedError
[docs] def in_initial_phase(self):
"""
For methods that need an initial phase (see
:meth:`needs_initial_phase()`), this method returns ``True`` if the
method is currently configured to be in its initial phase. For other
methods a ``NotImplementedError`` is returned.
"""
raise NotImplementedError
[docs] def needs_initial_phase(self):
"""
Returns ``True`` if this method needs an initial phase, for example an
adaptation-free period for adaptive covariance methods, or a warm-up
phase for DREAM.
"""
return False
[docs] def needs_sensitivities(self):
"""
Returns ``True`` if this methods needs sensitivities to be passed in to
``tell`` along with the evaluated logpdf.
"""
return False
[docs] def set_initial_phase(self, in_initial_phase):
"""
For methods that need an initial phase (see
:meth:`needs_initial_phase()`), this method toggles the initial phase
algorithm. For other methods a ``NotImplementedError`` is returned.
"""
raise NotImplementedError
[docs]class SingleChainMCMC(MCMCSampler):
"""
Abstract base class for MCMC methods that generate a single markov chain,
via an ask-and-tell interface.
Extends :class:`MCMCSampler`.
Parameters
----------
x0
An starting point in the parameter space.
sigma0
An optional (initial) covariance matrix, i.e., a guess of the
covariance of the distribution to estimate, around ``x0``.
"""
def __init__(self, x0, sigma0=None):
# Check initial position
self._x0 = pints.vector(x0)
# Get number of parameters
self._n_parameters = len(self._x0)
# Check initial standard deviation
if sigma0 is None:
# Get representative parameter value for each parameter
self._sigma0 = np.abs(self._x0)
self._sigma0[self._sigma0 == 0] = 1
# Use to create diagonal matrix
self._sigma0 = np.diag(0.01 * self._sigma0)
else:
self._sigma0 = np.array(sigma0, copy=True)
if np.product(self._sigma0.shape) == self._n_parameters:
# Convert from 1d array
self._sigma0 = self._sigma0.reshape((self._n_parameters,))
self._sigma0 = np.diag(self._sigma0)
else:
# Check if 2d matrix of correct size
self._sigma0 = self._sigma0.reshape(
(self._n_parameters, self._n_parameters))
[docs] def ask(self):
"""
Returns a parameter vector to evaluate the LogPDF for.
"""
raise NotImplementedError
[docs] def tell(self, fx):
"""
Performs an iteration of the MCMC algorithm, using the
:class:`pints.LogPDF` evaluation ``fx`` of the point ``x`` specified by
``ask``.
For methods that require sensitivities (see
:meth:`MCMCSamper.needs_sensitivities`), ``fx`` should be a tuple
``(log_pdf, sensitivities)``, containing the values returned by
:meth:`pints.LogPdf.evaluateS1()`.
After a successful call, :meth:`tell()` returns a tuple
``(x, fx, accepted)``, where ``x`` contains the current position of the
chain, ``fx`` contains the corresponding evaluation, and ``accepted``
is a boolean indicating whether the last evaluated sample was added to
the chain.
Some methods may require multiple ask-tell calls per iteration. These
methods can return ``None`` to indicate an iteration is still in
progress.
"""
raise NotImplementedError
[docs] def replace(self, current, current_log_pdf, proposed=None):
"""
Replaces the internal current position, current LogPDF, and proposed
point (if any) by the user-specified values.
This method can only be used once the initial position and LogPDF have
been set (so after at least 1 round of ask-and-tell).
This is an optional method, and some samplers may not support it.
"""
raise NotImplementedError
[docs]class MultiChainMCMC(MCMCSampler):
"""
Abstract base class for MCMC methods that generate multiple markov chains,
via an ask-and-tell interface.
Extends :class:`MCMCSampler`.
Parameters
----------
chains : int
The number of MCMC chains to generate.
x0
A sequence of starting points. Can be a list of lists, a 2-dimensional
array, or any other structure such that ``x0[i]`` is the starting point
for chain ``i``.
sigma0
An optional initial covariance matrix, i.e., a guess of the covariance
in ``logpdf`` around the points in ``x0`` (the same ``sigma0`` is used
for each point in ``x0``).
Can be specified as a ``(d, d)`` matrix (where ``d`` is the dimension
of the parameterspace) or as a ``(d, )`` vector, in which case
``diag(sigma0)`` will be used.
"""
def __init__(self, chains, x0, sigma0=None):
# Check number of chains
self._n_chains = int(chains)
if self._n_chains < 1:
raise ValueError('Number of chains must be at least 1.')
# Check initial position(s)
if len(x0) != chains:
raise ValueError(
'Number of initial positions must be equal to number of'
' chains.')
self._n_parameters = len(x0[0])
if not all([len(x) == self._n_parameters for x in x0[1:]]):
raise ValueError('All initial points must have same dimension.')
self._x0 = np.array([pints.vector(x) for x in x0])
self._x0.setflags(write=False)
# Check initial standard deviation
if sigma0 is None:
# Get representative parameter value for each parameter
self._sigma0 = np.max(np.abs(self._x0), axis=0)
self._sigma0[self._sigma0 == 0] = 1
# Use to create diagonal matrix
self._sigma0 = np.diag(0.01 * self._sigma0)
else:
self._sigma0 = np.array(sigma0, copy=True)
if np.product(self._sigma0.shape) == self._n_parameters:
# Convert from 1d array
self._sigma0 = self._sigma0.reshape((self._n_parameters,))
self._sigma0 = np.diag(self._sigma0)
else:
# Check if 2d matrix of correct size
self._sigma0 = self._sigma0.reshape(
(self._n_parameters, self._n_parameters))
[docs] def ask(self):
"""
Returns a sequence of parameter vectors to evaluate a LogPDF for.
"""
raise NotImplementedError
[docs] def current_log_pdfs(self):
"""
Returns the log pdf values of the current points (i.e. of the most
recent points returned by :meth:`tell()`).
"""
raise NotImplementedError
[docs] def tell(self, fxs):
"""
Performs an iteration of the MCMC algorithm, using the
:class:`pints.LogPDF` evaluations ``fxs`` of the points ``xs``
specified by ``ask``.
For methods that require sensitivities (see
:meth:`MCMCSamper.needs_sensitivities`), each entry in ``fxs`` should
be a tuple ``(log_pdf, sensitivities)``, containing the values returned
by :meth:`pints.LogPdf.evaluateS1()`.
After a successful call, :meth:`tell()` returns a tuple
``(xs, fxs, accepted)``, where ``x`` contains the current position of
the chain, ``fx`` contains the corresponding evaluation, and
``accepted`` is an array of booleans indicating whether the last
evaluated sample was added to the chain.
Some methods may require multiple ask-tell calls per iteration. These
methods can return ``None`` to indicate an iteration is still in
progress.
"""
raise NotImplementedError
[docs]class MCMCController(object):
"""
Samples from a :class:`pints.LogPDF` using a Markov Chain Monte Carlo
(MCMC) method.
The method to use (either a :class:`SingleChainMCMC` class or a
:class:`MultiChainMCMC` class) is specified at runtime. For example::
mcmc = pints.MCMCController(
log_pdf, 3, x0, method=pints.HaarioBardenetACMC)
Properties related to the number if iterations, parallelisation, and
logging can be set directly on the ``MCMCController`` object, e.g.::
mcmc.set_max_iterations(1000)
Sampler specific properties must be set on the internal samplers
themselves, e.g.::
for sampler in mcmc.samplers():
sampler.set_target_acceptance_rate(0.2)
Finally, to run an MCMC routine, call::
chains = mcmc.run()
By default, an MCMCController run will write regular progress updates to
screen. This can be disabled using :meth:`set_log_to_screen()`. To write a
similar progress log to a file, use :meth:`set_log_to_file()`. To store the
chains and/or evaluations generated by :meth:`run()` to a file, use
:meth:`set_chain_filename()` and :meth:`set_log_pdf_filename()`.
Parameters
----------
log_pdf : pints.LogPDF
A :class:`LogPDF` function that evaluates points in the parameter
space.
chains : int
The number of MCMC chains to generate.
x0
A sequence of starting points. Can be a list of lists, a 2-dimensional
array, or any other structure such that ``x0[i]`` is the starting point
for chain ``i``.
sigma0
An optional initial covariance matrix, i.e., a guess of the covariance
in ``logpdf`` around the points in ``x0`` (the same ``sigma0`` is used
for each point in ``x0``).
Can be specified as a ``(d, d)`` matrix (where ``d`` is the dimension
of the parameter space) or as a ``(d, )`` vector, in which case
``diag(sigma0)`` will be used.
transformation : pints.Transformation
An optional :class:`pints.Transformation` to allow the sampler to work
in a transformed parameter space. If used, points shown or returned to
the user will first be detransformed back to the original space.
method : class
The class of :class:`MCMCSampler` to use. If no method is specified,
:class:`HaarioBardenetACMC` is used.
"""
def __init__(
self, log_pdf, chains, x0, sigma0=None, transformation=None,
method=None):
# Check function
if not isinstance(log_pdf, pints.LogPDF):
raise ValueError('Given function must extend pints.LogPDF')
# Apply a transformation (if given). From this point onward the MCMC
# sampler will see only the transformed search space and will know
# nothing about the model parameter space.
if transformation is not None:
# Convert log pdf
log_pdf = transformation.convert_log_pdf(log_pdf)
# Convert initial positions
x0 = [transformation.to_search(x) for x in x0]
# Convert sigma0, if provided
if sigma0 is not None:
sigma0 = np.asarray(sigma0)
n_parameters = log_pdf.n_parameters()
# Make sure sigma0 is a (covariance) matrix
if np.product(sigma0.shape) == n_parameters:
# Convert from 1d array
sigma0 = sigma0.reshape((n_parameters,))
sigma0 = np.diag(sigma0)
elif sigma0.shape != (n_parameters, n_parameters):
# Check if 2d matrix of correct size
raise ValueError(
'sigma0 must be either a (d, d) matrix or a (d, ) '
'vector, where d is the number of parameters.')
sigma0 = transformation.convert_covariance_matrix(sigma0,
x0[0])
# Store transformation for later detransformation: if using a
# transformation, any parameters logged to the filesystem or printed to
# screen should be detransformed first!
self._transformation = transformation
# Store function
self._log_pdf = log_pdf
# Get number of parameters
self._n_parameters = self._log_pdf.n_parameters()
# Check number of chains
self._n_chains = int(chains)
if self._n_chains < 1:
raise ValueError('Number of chains must be at least 1.')
# Check initial position(s): Most checking is done by samplers!
if len(x0) != chains:
raise ValueError(
'Number of initial positions must be equal to number of'
' chains.')
if not all([len(x) == self._n_parameters for x in x0]):
raise ValueError(
'All initial positions must have the same dimension as the'
' given LogPDF.')
# Don't check initial standard deviation: done by samplers!
# Set default method
if method is None:
method = pints.HaarioBardenetACMC
else:
try:
ok = issubclass(method, pints.MCMCSampler)
except TypeError: # Not a class
ok = False
if not ok:
raise ValueError('Given method must extend pints.MCMCSampler.')
# Using single chain samplers?
self._single_chain = issubclass(method, pints.SingleChainMCMC)
# Create sampler(s)
if self._single_chain:
# Using n individual samplers (Note that it is possible to have
# _single_chain=True and _n_samplers=1)
self._n_samplers = self._n_chains
self._samplers = [method(x, sigma0) for x in x0]
else:
# Using a single sampler that samples multiple chains
self._n_samplers = 1
self._samplers = [method(self._n_chains, x0, sigma0)]
# Check if sensitivities are required
self._needs_sensitivities = self._samplers[0].needs_sensitivities()
# Initial phase (needed for e.g. adaptive covariance)
self._initial_phase_iterations = None
self._needs_initial_phase = self._samplers[0].needs_initial_phase()
if self._needs_initial_phase:
self.set_initial_phase_iterations()
# Logging
self._log_to_screen = True
self._log_filename = None
self._log_csv = False
self.set_log_interval()
# Storing chains and evaluations in memory
self._chains_in_memory = True
self._evaluations_in_memory = False
self._samples = None
self._evaluations = None
self._n_evaluations = None
self._time = None
# Writing chains and evaluations to disk
self._chain_files = None
self._evaluation_files = None
# Parallelisation
self._parallel = False
self._n_workers = 1
self.set_parallel()
# :meth:`run` can only be called once
self._has_run = False
#
# Stopping criteria
#
# Maximum iterations
self._max_iterations = None
self.set_max_iterations()
# TODO: Add more stopping criteria
[docs] def chains(self):
"""
Returns the chains generated by :meth:`run()`.
The returned array has shape ``(n_chains, n_iterations,
n_parameters)``.
If the controller has not run yet, or if chain storage to memory is
disabled, this method will return ``None``.
"""
# Note: Not copying this, for efficiency. At this point we're done with
# the chains, so nothing will go wrong if the user messes the array up.
return self._samples
[docs] def initial_phase_iterations(self):
"""
For methods that require an initial phase (e.g. an adaptation-free
phase for the adaptive covariance MCMC method), this returns the number
of iterations that the initial phase will take.
For methods that do not require an initial phase, a
``NotImplementedError`` is raised.
"""
return self._initial_phase_iterations
[docs] def log_pdfs(self):
"""
Returns the :class:`LogPDF` evaluations generated by :meth:`run()`.
If a :class:`LogPosterior` was used, the returned array will have shape
``(n_chains, n_iterations, 3)``, and for each sample the LogPDF,
LogLikelihood, and LogPrior will be stored.
For all other cases, only the full LogPDF evaluations are returned, in
an array of shape ``(n_chains, n_iterations)``.
If the controller has not run yet, or if storage of evaluations to
memory is disabled (default), this method will return ``None``.
"""
# Note: Not copying this, for efficiency. At this point we're done with
# the chains, so nothing will go wrong if the user messes the array up.
return self._evaluations
[docs] def max_iterations(self):
"""
Returns the maximum iterations if this stopping criterion is set, or
``None`` if it is not. See :meth:`set_max_iterations()`.
"""
return self._max_iterations
[docs] def method_needs_initial_phase(self):
"""
Returns true if this sampler has been created with a method that has
an initial phase (see :meth:`MCMCSampler.needs_initial_phase()`.)
"""
return self._samplers[0].needs_initial_phase()
[docs] def n_evaluations(self):
"""
Returns the number of evaluations performed during the last run, or
``None`` if the controller hasn't run yet.
"""
return self._n_evaluations
[docs] def parallel(self):
"""
Returns the number of parallel worker processes this routine will be
run on, or ``False`` if parallelisation is disabled.
"""
return self._n_workers if self._parallel else False
[docs] def run(self):
"""
Runs the MCMC sampler(s) and returns the result.
By default, this method returns an array of shape ``(n_chains,
n_iterations, n_parameters)``.
If storing chains to memory has been disabled with
:meth:`set_chain_storage`, then ``None`` is returned instead.
"""
# Can only run once for each controller instance
if self._has_run:
raise RuntimeError("Controller is valid for single use only")
self._has_run = True
# Check stopping criteria
has_stopping_criterion = False
has_stopping_criterion |= (self._max_iterations is not None)
if not has_stopping_criterion:
raise ValueError('At least one stopping criterion must be set.')
# Iteration and evaluation counting
iteration = 0
self._n_evaluations = 0
# Choose method to evaluate
f = self._log_pdf
if self._needs_sensitivities:
f = f.evaluateS1
# Create evaluator object
if self._parallel:
# Use at most n_workers workers
n_workers = min(self._n_workers, self._n_chains)
evaluator = pints.ParallelEvaluator(f, n_workers=n_workers)
else:
evaluator = pints.SequentialEvaluator(f)
# Initial phase
if self._needs_initial_phase:
for sampler in self._samplers:
sampler.set_initial_phase(True)
# Storing evaluations to memory or disk
prior = None
store_evaluations = \
self._evaluations_in_memory or self._evaluation_files
if store_evaluations:
# Bayesian inference on a log-posterior? Then separate out the
# prior so we can calculate the loglikelihood
if isinstance(self._log_pdf, pints.LogPosterior):
prior = self._log_pdf.log_prior()
# Store last accepted logpdf, per chain
current_logpdf = np.zeros(self._n_chains)
current_prior = np.zeros(self._n_chains)
# Write chains to disk
chain_loggers = []
if self._chain_files:
for filename in self._chain_files:
cl = pints.Logger()
cl.set_stream(None)
cl.set_filename(filename, True)
for k in range(self._n_parameters):
cl.add_float('p' + str(k))
chain_loggers.append(cl)
# Write evaluations to disk
eval_loggers = []
if self._evaluation_files:
for filename in self._evaluation_files:
cl = pints.Logger()
cl.set_stream(None)
cl.set_filename(filename, True)
if prior:
# Logposterior in first column, to be consistent with the
# non-bayesian case
cl.add_float('logposterior')
cl.add_float('loglikelihood')
cl.add_float('logprior')
else:
cl.add_float('logpdf')
eval_loggers.append(cl)
# Set up progress reporting
next_message = 0
# Start logging
logging = self._log_to_screen or self._log_filename
if logging:
if self._log_to_screen:
print('Using ' + str(self._samplers[0].name()))
print('Generating ' + str(self._n_chains) + ' chains.')
if self._parallel:
print('Running in parallel with ' + str(n_workers) +
' worker processess.')
else:
print('Running in sequential mode.')
if self._chain_files:
print(
'Writing chains to ' + self._chain_files[0] + ' etc.')
if self._evaluation_files:
print(
'Writing evaluations to ' + self._evaluation_files[0]
+ ' etc.')
# Set up logger
logger = pints.Logger()
if not self._log_to_screen:
logger.set_stream(None)
if self._log_filename:
logger.set_filename(self._log_filename, csv=self._log_csv)
# Add fields to log
max_iter_guess = max(self._max_iterations or 0, 10000)
max_eval_guess = max_iter_guess * self._n_chains
logger.add_counter('Iter.', max_value=max_iter_guess)
logger.add_counter('Eval.', max_value=max_eval_guess)
for sampler in self._samplers:
sampler._log_init(logger)
logger.add_time('Time m:s')
# Pre-allocate arrays for chain storage
# Note: we store the inverse transformed (to model space) parameters
# only if transformation is provided.
if self._chains_in_memory:
# Store full chains
samples = np.zeros(
(self._n_chains, self._max_iterations, self._n_parameters))
else:
# Store only the current iteration
samples = np.zeros((self._n_chains, self._n_parameters))
# Pre-allocate arrays for evaluation storage
if self._evaluations_in_memory:
if prior:
# Store posterior, likelihood, prior
evaluations = np.zeros(
(self._n_chains, self._max_iterations, 3))
else:
# Store pdf
evaluations = np.zeros((self._n_chains, self._max_iterations))
# Some samplers need intermediate steps, where None is returned instead
# of a sample. But samplers can run asynchronously, so that one returns
# None while another returns a sample. To deal with this, we maintain a
# list of 'active' samplers that have not reached `max_iterations` yet,
# and we store the number of samples that we have in each chain.
if self._single_chain:
active = list(range(self._n_chains))
n_samples = [0] * self._n_chains
# Start sampling
timer = pints.Timer()
running = True
while running:
# Initial phase
# Note: self._initial_phase_iterations is None when no initial
# phase is needed
if iteration == self._initial_phase_iterations:
for sampler in self._samplers:
sampler.set_initial_phase(False)
if self._log_to_screen:
print('Initial phase completed.')
# Get points
if self._single_chain:
xs = [self._samplers[i].ask() for i in active]
else:
xs = self._samplers[0].ask()
# Calculate logpdfs
fxs = evaluator.evaluate(xs)
# Update evaluation count
self._n_evaluations += len(fxs)
# Update chains
if self._single_chain:
# Single chain
# Check and update the individual chains
fxs_iterator = iter(fxs)
for i in list(active): # new list: active may be modified
reply = self._samplers[i].tell(next(fxs_iterator))
if reply is not None:
# Unpack reply into position, evaluation, and status
y, fy, accepted = reply
# Inverse transform to model space if transformation is
# provided
if self._transformation:
y_store = self._transformation.to_model(y)
else:
y_store = y
# Store sample in memory
if self._chains_in_memory:
samples[i][n_samples[i]] = y_store
else:
samples[i] = y_store
# Update current evaluations
if store_evaluations:
# If accepted, update log_pdf and prior for logging
if accepted:
current_logpdf[i] = fy
if prior is not None:
current_prior[i] = prior(y)
# Calculate evaluations to log
e = current_logpdf[i]
if prior is not None:
e = [e,
current_logpdf[i] - current_prior[i],
current_prior[i]]
# Store evaluations in memory
if self._evaluations_in_memory:
evaluations[i][n_samples[i]] = e
# Write evaluations to disk
if self._evaluation_files:
if prior is None:
eval_loggers[i].log(e)
else:
eval_loggers[i].log(*e)
# Stop adding samples if maximum number reached
n_samples[i] += 1
if n_samples[i] == self._max_iterations:
active.remove(i)
# This is an intermediate step until the slowest sampler has
# produced a new sample since the last `iteration`.
intermediate_step = min(n_samples) <= iteration
else:
# Multi-chain methods
# Get all chains samples at once
reply = self._samplers[0].tell(fxs)
intermediate_step = reply is None
if not intermediate_step:
# Unpack reply into positions, evaluations, and status
ys, fys, accepted = reply
# Inverse transform to model space if transformation is
# provided
if self._transformation:
ys_store = np.zeros(ys.shape)
for i, y in enumerate(ys):
ys_store[i] = self._transformation.to_model(y)
else:
ys_store = ys
# Store samples in memory
if self._chains_in_memory:
samples[:, iteration] = ys_store
else:
samples = ys_store
# Update current evaluations
if store_evaluations:
es = []
for i, y in enumerate(ys):
# Check if accepted, if so, update log_pdf and
# prior to be logged
if accepted[i]:
current_logpdf[i] = fys[i]
if prior is not None:
current_prior[i] = prior(ys[i])
# Calculate evaluations to log
e = current_logpdf[i]
if prior is not None:
e = [e,
current_logpdf[i] - current_prior[i],
current_prior[i]]
es.append(e)
# Write evaluations to memory
if self._evaluations_in_memory:
for i, e in enumerate(es):
evaluations[i, iteration] = e
# Write evaluations to disk
if self._evaluation_files:
if prior is None:
for i, eval_logger in enumerate(eval_loggers):
eval_logger.log(es[i])
else:
for i, eval_logger in enumerate(eval_loggers):
eval_logger.log(*es[i])
# If no new samples were added, then no MCMC iteration was
# performed, and so the iteration count shouldn't be updated,
# logging shouldn't be triggered, and stopping criteria shouldn't
# be checked
if intermediate_step:
continue
# Write samples to disk
if self._chains_in_memory:
for i, chain_logger in enumerate(chain_loggers):
chain_logger.log(*samples[i][iteration])
else:
for i, chain_logger in enumerate(chain_loggers):
chain_logger.log(*samples[i])
# Show progress
if logging and iteration >= next_message:
# Log state
logger.log(iteration, self._n_evaluations)
for sampler in self._samplers:
sampler._log_write(logger)
logger.log(timer.time())
# Choose next logging point
if iteration < self._message_warm_up:
next_message = iteration + 1
else:
next_message = self._message_interval * (
1 + iteration // self._message_interval)
# Update iteration count
iteration += 1
# Check requested number of samples
if (self._max_iterations is not None and
iteration >= self._max_iterations):
running = False
halt_message = ('Halting: Maximum number of iterations ('
+ str(iteration) + ') reached.')
# Finished running
self._time = timer.time()
# Log final state and show halt message
if logging:
logger.log(iteration, self._n_evaluations)
for sampler in self._samplers:
sampler._log_write(logger)
logger.log(self._time)
if self._log_to_screen:
print(halt_message)
# Store evaluations in memory
if self._evaluations_in_memory:
self._evaluations = evaluations
if self._chains_in_memory:
# Store generated chains in memory
self._samples = samples
# Return generated chains
return samples if self._chains_in_memory else None
[docs] def sampler(self):
"""
Returns the underlying :class:`MultiChainMCMC` object, or raises an
error if :class:`SingleChainMCMC` objects are being used.
See also: :meth:`samplers()`.
"""
if self._single_chain:
raise RuntimeError(
'The method MCMCController.sampler() is only supported when a'
' MultiChainMCMC is selected. Please use'
' MCMCController.samplers() instead, to obtain a list of all'
' internal SingleChainMCMC instances.')
return self._samplers[0]
[docs] def samplers(self):
"""
Returns a list containing the underlying sampler objects.
If a :class:`SingleChainMCMC` method was selected, this will be a list
containing as many :class:`SingleChainMCMC` objects as the number of
chains. If a :class:`MultiChainMCMC` method was selected, this will be
a list containing a single :class:`MultiChainMCMC` instance.
"""
return self._samplers
[docs] def set_chain_filename(self, chain_file):
"""
Write chains to disk as they are generated.
If a ``chain_file`` is specified, a CSV file will be created for each
chain, to which samples will be written as they are accepted. To
disable logging of chains, set ``chain_file=None``.
Filenames for each chain file will be derived from ``chain_file``, e.g.
if ``chain_file='chain.csv'`` and there are 2 chains, then the files
``chain_0.csv`` and ``chain_1.csv`` will be created. Each CSV file will
start with a header (e.g. ``"p0","p1","p2",...``) and contain a sample
on each subsequent line.
"""
d = self._n_chains
self._chain_files = None
if chain_file:
b, e = os.path.splitext(str(chain_file))
self._chain_files = [b + '_' + str(i) + e for i in range(d)]
[docs] def set_chain_storage(self, store_in_memory=True):
"""
Store chains in memory as they are generated.
By default, all generated chains are stored in memory as they are
generated, and returned by :meth:`run()`. This method allows this
behaviour to be disabled, which can be useful for very large chains
which are already stored to disk (see :meth:`set_chain_filename()`).
"""
self._chains_in_memory = bool(store_in_memory)
[docs] def set_initial_phase_iterations(self, iterations=200):
"""
For methods that require an initial phase (e.g. an adaptation-free
phase for the adaptive covariance MCMC method), this sets the number of
iterations that the initial phase will take.
For methods that do not require an initial phase, a
``NotImplementedError`` is raised.
"""
if not self._needs_initial_phase:
raise NotImplementedError
# Check input
iterations = int(iterations)
if iterations < 0:
raise ValueError(
'Number of initial-phase iterations cannot be negative.')
self._initial_phase_iterations = iterations
[docs] def set_log_interval(self, iters=20, warm_up=3):
"""
Changes the frequency with which messages are logged.
Parameters
----------
iters : int
A log message will be shown every ``iters`` iterations.
warm_up : int
A log message will be shown every iteration, for the first
``warm_up`` iterations.
"""
iters = int(iters)
if iters < 1:
raise ValueError('Interval must be greater than zero.')
warm_up = max(0, int(warm_up))
self._message_interval = iters
self._message_warm_up = warm_up
[docs] def set_log_pdf_filename(self, log_pdf_file):
"""
Write :class:`LogPDF` evaluations to disk as they are generated.
If an ``evaluation_file`` is specified, a CSV file will be created for
each chain, to which :class:`LogPDF` evaluations will be written for
every accepted sample. To disable this feature, set
``evaluation_file=None``. If the ``LogPDF`` being evaluated is a
:class:`LogPosterior`, the individual likelihood and prior will also
be stored.
Filenames for each evaluation file will be derived from
``evaluation_file``, e.g. if ``evaluation_file='evals.csv'`` and there
are 2 chains, then the files ``evals_0.csv`` and ``evals_1.csv`` will
be created. Each CSV file will start with a header (e.g.
``"logposterior","loglikelihood","logprior"``) and contain the
evaluations for i-th accepted sample on the i-th subsequent line.
"""
d = self._n_chains
self._evaluation_files = None
if log_pdf_file:
b, e = os.path.splitext(str(log_pdf_file))
self._evaluation_files = [b + '_' + str(i) + e for i in range(d)]
[docs] def set_log_pdf_storage(self, store_in_memory=False):
"""
Store :class:`LogPDF` evaluations in memory as they are generated.
By default, evaluations of the :class:`LogPDF` are not stored. This
method can be used to enable storage of the evaluations for the
accepted samples.
After running, evaluations can be obtained using :meth:`evaluations()`.
"""
self._evaluations_in_memory = bool(store_in_memory)
[docs] def set_log_to_file(self, filename=None, csv=False):
"""
Enables progress logging to file when a filename is passed in, disables
it if ``filename`` is ``False`` or ``None``.
The argument ``csv`` can be set to ``True`` to write the file in comma
separated value (CSV) format. By default, the file contents will be
similar to the output on screen.
"""
if filename:
self._log_filename = str(filename)
self._log_csv = True if csv else False
else:
self._log_filename = None
self._log_csv = False
[docs] def set_log_to_screen(self, enabled):
"""
Enables or disables progress logging to screen.
"""
self._log_to_screen = True if enabled else False
[docs] def set_max_iterations(self, iterations=10000):
"""
Adds a stopping criterion, allowing the routine to halt after the
given number of `iterations`.
This criterion is enabled by default. To disable it, use
`set_max_iterations(None)`.
"""
if iterations is not None:
iterations = int(iterations)
if iterations < 0:
raise ValueError(
'Maximum number of iterations cannot be negative.')
self._max_iterations = iterations
[docs] def set_parallel(self, parallel=False):
"""
Enables/disables parallel evaluation.
If ``parallel=True``, the method will run using a number of worker
processes equal to the detected cpu core count. The number of workers
can be set explicitly by setting ``parallel`` to an integer greater
than 0.
Parallelisation can be disabled by setting ``parallel`` to ``0`` or
``False``.
"""
if parallel is True:
self._parallel = True
self._n_workers = pints.ParallelEvaluator.cpu_count()
elif parallel >= 1:
self._parallel = True
self._n_workers = int(parallel)
else:
self._parallel = False
self._n_workers = 1
[docs] def time(self):
"""
Returns the time needed for the last run, in seconds, or ``None`` if
the controller hasn't run yet.
"""
return self._time
[docs]class MCMCSampling(MCMCController):
""" Deprecated alias for :class:`MCMCController`. """
def __init__(self, log_pdf, chains, x0, sigma0=None, transformation=None,
method=None):
# Deprecated on 2019-02-06
import warnings
warnings.warn(
'The class `pints.MCMCSampling` is deprecated.'
' Please use `pints.MCMCController` instead.')
super(MCMCSampling, self).__init__(log_pdf, chains, x0, sigma0,
transformation, method=method)
[docs]def mcmc_sample(log_pdf, chains, x0, sigma0=None, transformation=None,
method=None):
"""
Sample from a :class:`pints.LogPDF` using a Markov Chain Monte Carlo
(MCMC) method.
Parameters
----------
log_pdf : pints.LogPDF
A :class:`LogPDF` function that evaluates points in the parameter
space.
chains : int
The number of MCMC chains to generate.
x0
A sequence of starting points. Can be a list of lists, a 2-dimensional
array, or any other structure such that ``x0[i]`` is the starting point
for chain ``i``.
sigma0
An optional initial covariance matrix, i.e., a guess of the covariance
in ``logpdf`` around the points in ``x0`` (the same ``sigma0`` is used
for each point in ``x0``).
Can be specified as a ``(d, d)`` matrix (where ``d`` is the dimension
of the parameterspace) or as a ``(d, )`` vector, in which case
``diag(sigma0)`` will be used.
transformation : pints.Transformation
An optional :class:`pints.Transformation` to allow the sampler to work
in a transformed parameter space. If used, points shown or returned to
the user will first be detransformed back to the original space.
method : class
The class of :class:`MCMCSampler` to use. If no method is specified,
:class:`HaarioBardenetACMC` is used.
"""
return MCMCController( # pragma: no cover
log_pdf, chains, x0, sigma0, transformation, method=method).run()