Source code for pints._mcmc._mala

#
# Metropolis-adjusted Langevin algorithm
#
# This file is part of PINTS (https://github.com/pints-team/pints/) which is
# released under the BSD 3-clause license. See accompanying LICENSE.md for
# copyright notice and full license details.
#
import pints
import numpy as np
import scipy.stats


[docs] class MALAMCMC(pints.SingleChainMCMC): r""" Metropolis-Adjusted Langevin Algorithm (MALA), an MCMC sampler as described in [1]_. This method involves simulating Langevin diffusion such that the solution to the time evolution equation (the Fokker-Planck PDE) is a stationary distribution that equals the target density (in Bayesian problems, the posterior distribution). The stochastic differential equation (SDE) given below ensures that if :math:`u(\theta, 0) = \pi(\theta)`, then :math:`\partial u / \partial t = 0`, .. math:: \mathrm{d}\Theta_t = 1/2 \nabla \; \text{log} \pi(\Theta_t) \mathrm{d}t + \mathrm{d}W_t where :math:`\pi(\theta)` is the target density and :math:`W` is a standard multivariate Wiener process. In general, the above SDE cannot be solved exactly and the below first-order Euler discretisation is used instead, .. math:: \theta^* = \theta_t + \epsilon^2 1/2 \nabla \; \text{log} \pi(\theta_t) + \epsilon z where :math:`z \sim \mathcal{N}(0, I)` resulting in a mean :math:`\mu(\theta^*) = \theta_t + \epsilon^2 1/2 \nabla \; \text{log} \pi(\theta_t)`. To correct for first-order integration error that is introduced from discretisation, a Metropolis-Hastings acceptance probability is calculated after a step, .. math:: \alpha = \frac{\pi(\theta^*)q(\theta_t|\theta^*)}{\pi(\theta_t) q(\theta^*|\theta_t)} where :math:`q(\theta_2|\theta_1) = \mathcal{N}(\theta_2|\mu(\theta_1), \epsilon I)` and :math:`\theta^*` is accepted with probability :math:`\text{min}(1, \alpha)`. Here we consider a slight variant of the above method discussed in [1]_, which is to use a preconditioning matrix :math:`M` to allow differing degrees of freedom in each dimension. .. math:: \theta^* = \theta_t + \epsilon'^2 1/2 \nabla \; \text{log} \pi(\theta_t) + \epsilon' z leading to :math:`q(\theta_2|\theta_1) = \mathcal{N}(\theta_2|\mu(\theta_1), \epsilon')`. where :math:`\epsilon' = \epsilon \sqrt{M}` is given by the initial value of ``sigma0``. Extends :class:`SingleChainMCMC`. References ---------- .. [1] Girolami, M. and Calderhead, B., 2011. Riemann manifold langevin and hamiltonian monte carlo methods. Journal of the Royal Statistical Society: Series B (Statistical Methodology), 73(2), pp.123-214. https://doi.org/10.1111/j.1467-9868.2010.00765.x """ def __init__(self, x0, sigma0=None): super(MALAMCMC, self).__init__(x0, sigma0) # Set initial state self._running = False self._ready_for_tell = False # Current point and proposed point self._current = None self._current_log_pdf = None self._current_gradient = None self._proposed = None # hyper parameters self._epsilon = None self._step_size = 0.2 self._scale_vector = np.diag(self._sigma0) # Acceptance rate monitoring self._iterations = 0 self._acceptance = 0 # Step size self._forward_mu = None self._backward_mu = None self._forward_q = None self._backward_q = None # initialise step size self.set_epsilon()
[docs] def acceptance_rate(self): """ Returns the current (measured) acceptance rate. """ return self._acceptance
[docs] def ask(self): """ See :meth:`SingleChainMCMC.ask()`. """ # Initialise on first call if not self._running: self._initialise() if self._ready_for_tell: raise RuntimeError('Ask() called when expecting call to tell().') # Propose new point if self._proposed is None: self._forward_mu = self._current + (self._epsilon**2 / 2.0) * ( self._current_gradient) self._proposed = np.random.multivariate_normal( self._forward_mu, self._epsilon**2 * np.diag(np.ones(self._n_parameters))) self._forward_q = scipy.stats.multivariate_normal.logpdf( self._proposed, self._forward_mu, self._epsilon**2 * np.diag(np.ones(self._n_parameters)) ) # Set as read-only self._proposed.setflags(write=False) self._ready_for_tell = True # Return proposed point return self._proposed
[docs] def epsilon(self): """ Returns ``epsilon`` which is the effective step size used in proposals. """ return self._epsilon
def _initialise(self): """ Initialises the routine before the first iteration. """ if self._running: raise RuntimeError('Already initialised.') # Propose x0 as first point self._current = None self._current_log_pdf = None self._proposed = self._x0 self._proposed.setflags(write=False) # Update sampler state self._running = True def _log_init(self, logger): """ See :meth:`Loggable._log_init()`. """ logger.add_float('Accept.') def _log_write(self, logger): """ See :meth:`Loggable._log_write()`. """ logger.log(self._acceptance)
[docs] def name(self): """ See :meth:`pints.MCMCSampler.name()`. """ return 'Metropolis-Adjusted Langevin Algorithm (MALA)'
[docs] def needs_sensitivities(self): """ See :meth:`pints.MCMCSampler.needs_sensitivities()`. """ return True
[docs] def n_hyper_parameters(self): """ See :meth:`TunableMethod.n_hyper_parameters()`. """ return 1
[docs] def set_epsilon(self, epsilon=None): """ Sets epsilon, which is the effective step size used in proposals. If epsilon not specified, then ``epsilon = 0.2 * diag(sigma0)`` will be used. """ if epsilon is None: self._epsilon = self._step_size * self._scale_vector else: a = np.atleast_1d(epsilon) if not len(a) == self._n_parameters: raise ValueError('Dimensions of epsilon must be same as ' + 'number of parameters.') for element in epsilon: if element <= 0: raise ValueError('Elements of epsilon must exceed 0.') self._epsilon = np.array(epsilon)
[docs] def set_hyper_parameters(self, x): """ The hyper-parameter vector is ``[epsilon]``. The effective step size (``epsilon``) is ``step_size * scale_vector``. See :meth:`TunableMethod.set_hyper_parameters()`. """ self.set_epsilon(x[0])
[docs] def tell(self, reply): """ See :meth:`pints.SingleChainMCMC.tell()`. """ # Check if we had a proposal if not self._ready_for_tell: raise RuntimeError('Tell called before proposal was set.') self._ready_for_tell = False # Unpack reply fx, log_gradient = reply # Check reply, copy gradient fx = float(fx) log_gradient = pints.vector(log_gradient) assert log_gradient.shape == (self._n_parameters, ) # First point? if self._current is None: if not np.isfinite(fx): raise ValueError( 'Initial point for MCMC must have finite log_pdf.') # Accept self._current = self._proposed self._current_log_pdf = fx self._current_gradient = log_gradient # Mark current state as read-only for safe returning (proposed is # already set) self._current_gradient.setflags(write=False) # Increase iteration count self._iterations += 1 # Clear proposal self._proposed = None # Return first point for chain return ( self._current, (self._current_log_pdf, self._current_gradient), True ) # Calculate alpha proposed_gradient = log_gradient self._backward_mu = self._proposed + ( 0.5 * self._epsilon**2 * proposed_gradient) self._backward_q = scipy.stats.multivariate_normal.logpdf( self._current, self._backward_mu, self._epsilon**2 * (np.diag(np.ones(self._n_parameters)))) alpha = fx + self._backward_q - ( self._current_log_pdf + self._forward_q) # Check if the proposed point can be accepted accepted = 0 if np.isfinite(fx): u = np.log(np.random.uniform(0, 1)) if alpha > u: accepted = 1 self._current = self._proposed self._current_log_pdf = fx self._current_gradient = log_gradient self._current_gradient.setflags(write=False) # Clear proposal self._proposed = None # Update acceptance rate (only used for output!) self._acceptance = ((self._iterations * self._acceptance + accepted) / (self._iterations + 1)) # Increase iteration count self._iterations += 1 # Return next point for chain return ( self._current, (self._current_log_pdf, self._current_gradient), accepted > 0 )