Source code for pints._mcmc._emcee_hammer

#
# Emcee hammer MCMC
#
# This file is part of PINTS (https://github.com/pints-team/pints/) which is
# released under the BSD 3-clause license. See accompanying LICENSE.md for
# copyright notice and full license details.
#
import pints
import numpy as np


[docs] class EmceeHammerMCMC(pints.MultiChainMCMC): """ Uses the differential evolution algorithm "emcee: the MCMC hammer", described in Algorithm 2 in [1]_. For ``k`` in ``1:N``: - Draw a walker ``X_j`` at random from the "complementary ensemble" (the group of chains not including ``k``) without replacement. - Sample ``z ~ g(z)``, (see below). - Set ``Y = X_j(t) + z[X_k(t) - X_j(t)]``. - Set ``q = z^{d - 1} p(Y) / p(X_k(t))``. - Sample ``r ~ U(0, 1)``. - If ``r <= q``, set ``X_k(t + 1)`` equal to ``Y``, if not use ``X_k(t)``. Here, ``N`` is the number of chains (or walkers), ``d`` is the dimensionality of the space, and ``g(z)`` is proportional to ``1 / sqrt(z)`` if ``z`` is in ``[1 / a, a]`` or to 0, otherwise (where ``a`` is a parameter with default value ``2``). References ---------- .. [1] "emcee: The MCMC Hammer", Daniel Foreman-Mackey, David W. Hogg, Dustin Lang, Jonathan Goodman, 2013, arXiv, https://arxiv.org/pdf/1202.3665.pdf """ def __init__(self, chains, x0, sigma0=None): super(EmceeHammerMCMC, self).__init__(chains, x0, sigma0) # Need at least 3 chains if self._n_chains < 3: raise ValueError('Need at least 3 chains.') # Set initial state self._running = False # Current samples and log_pdfs self._current = None self._current_log_pdfs = None # Proposal: # - n_chains points on the initial step # - only a single point after #TODO: Update this class to algorithm 3 self._proposed = None # Scale parameter (see docstring above) self._a = None self.set_scale(2)
[docs] def ask(self): """ See :meth:`pints.MultiChainMCMC.ask()`. """ # Initialise on first call (will set first proposal) if not self._running: self._initialise() # Propose new points if self._proposed is None: # Cycle through chains if len(self._remaining) == 0: self._remaining = np.arange(self._n_chains) self._k = self._remaining[0] self._remaining = np.delete(self._remaining, 0) j = self._random_select_other_index(self._k) x_j = self._current[j] x_k = self._current[self._k] self._z = self._sample_z(self._a) self._proposed = x_j + self._z * (x_k - x_j) # Ensure proposed is array containing a single sample #TODO Switch to algorithm 3 self._proposed = np.array([self._proposed]) self._proposed.setflags(write=False) # Return proposed points return self._proposed
def _random_select_other_index(self, current_index): """ Selects an index uniformly at random from all chains excluding the index of the current chain. """ free_chains = list(range(self._n_chains)) free_chains.remove(current_index) other_index = np.random.choice(free_chains) return other_index def _sample_z(self, a): """ Samples ``z~g(z)`` where ``g(z)`` is proportional to ``1 / sqrt(z)`` if ``z`` is in ``[1 / a, a]``; otherwise 0. It does this by using inverse transform sampling. """ r = np.random.rand() return ((1 + r * (a - 1))**2) / a def _initialise(self): """ Initialises the routine before the first iteration. """ if self._running: raise RuntimeError('Already initialised.') # Propose x0 as first points # Note proposal is multiple points this time! self._current = None self._current_log_pdfs = None self._proposed = self._x0 self._proposed.setflags(write=False) # Number of chains left to update in this cycle self._remaining = np.arange(self._n_chains) # Update sampler state self._running = True
[docs] def name(self): """ See :meth:`pints.MCMCSampler.name()`. """ return 'Emcee Hammer MCMC'
[docs] def tell(self, fx): """ See :meth:`pints.MultiChainMCMC.tell()`. """ # Check if we had a proposal if self._proposed is None: raise RuntimeError('Tell called before proposal was set.') # Ensure fx is a numpy array proposed_log_pdf = np.array(fx, copy=True) # First points? if self._current is None: # Note that proposed and proposed_log_pdf are for all chains on # this iteration if not np.all(np.isfinite(proposed_log_pdf)): raise ValueError( 'Initial points for MCMC must have finite logpdf.') # Accept self._current = self._proposed # already read-only self._current_log_pdfs = proposed_log_pdf self._current_log_pdfs.setflags(write=False) # Clear proposal self._proposed = None # Return first samples for chains accepted = np.array([True] * self._n_chains) return self._current, self._current_log_pdfs, accepted # Perform iteration, updating the selected chain # Note that proposed/proposed_log_pdf are length 1 here accepted = np.array([False] * self._n_chains) r_log = np.log(np.random.rand()) log_q = ( (self._n_parameters - 1) * np.log(self._z) + proposed_log_pdf[0] - self._current_log_pdfs[self._k]) if log_q >= r_log: next = np.copy(self._current) next_log_pdfs = np.copy(self._current_log_pdfs) next[self._k] = self._proposed next_log_pdfs[self._k] = proposed_log_pdf self._current.setflags(write=False) self._current_log_pdfs.setflags(write=False) self._current = next self._current_log_pdfs = next_log_pdfs accepted[self._k] = True # Clear proposal self._proposed = None # Return samples to add to chains return self._current, self._current_log_pdfs, accepted
[docs] def scale(self): """ Returns the scale coefficient ``a`` used in updating the position of the chains. """ return self._a
[docs] def set_scale(self, scale): """ Sets the scale coefficient ``a`` used in updating the position of the chains. """ scale = float(scale) if scale <= 1: raise ValueError('The scale parameter must exceed 1.') self._a = scale
[docs] def n_hyper_parameters(self): """ See :meth:`TunableMethod.n_hyper_parameters()`. """ return 1
[docs] def set_hyper_parameters(self, x): """ The hyper-parameter vector is ``[scale]``. See :meth:`TunableMethod.set_hyper_parameters()`. """ self.set_scale(x[0])