Source code for pints._mcmc._monomial_gamma_hamiltonian

#
# Monomial gamma HMC MCMC method
#
# This file is part of PINTS (https://github.com/pints-team/pints/) which is
# released under the BSD 3-clause license. See accompanying LICENSE.md for
# copyright notice and full license details.
#
import pints
import numpy as np

from scipy import integrate
from scipy import interpolate


[docs] class MonomialGammaHamiltonianMCMC(pints.SingleChainMCMC): r""" Implements Monomial Gamma HMC as described in [1]_ - a generalisation of HMC as described in [2]_ - involving a non-physical kinetic energy term. Uses a physical analogy of a particle moving across a landscape under Hamiltonian dynamics to aid efficient exploration of parameter space. Introduces an auxilary variable -- the momentum (``p_i``) of a particle moving in dimension ``i`` of negative log posterior space -- which supplements the position (``q_i``) of the particle in parameter space. The particle's motion is dictated by solutions to Hamilton's equations, .. math:: dq_i/dt = \partial H/\partial p_i, dp_i/dt = - \partial H/\partial q_i. The Hamiltonian is given by, .. math:: H(q,p) = U(q) + K(p) = -log(p(q|X)p(q)) + \Sigma_{i=1}^{d} ( -g(p_i) + (2/c) \text{log}(1 + \text{exp}(cg(p_i)))) where ``d`` is the dimensionality of model, ``U`` is the potential energy and ``K`` is the kinetic energy term. Note the kinetic energy is the 'soft' version described in [1], where, .. math:: g(p_i) = (1 / m_i) \text{sign}|p_i|^{1 / a} To numerically integrate Hamilton's equations, it is essential to use a sympletic discretisation routine, of which the most typical approach is the leapfrog method, .. math:: p_i(t + \epsilon/2) &= p_i(t) - (\epsilon/2) dU(q_i)/ dq_i\\ q_i(t + \epsilon) &= q_i(t) + \epsilon d K(p_i(t + \epsilon/2))/dp_i\\ p_i(t + \epsilon) &= p_i(t + \epsilon/2) - (\epsilon/2) dU(q_i + \epsilon)/ dq_i The derivative of the soft kinetic energy term is given by, .. math:: d K(p_i)/dp_i = |p_i|^{-1 + 1 / a}\text{sign}(p_i) \times \text{tanh}(c|p_i|^{1/a}\text{sign}(p_i) / {2 m_i}) / {a m_i} In particular, the algorithm we implement follows eqs. (4.14)-(4.16) in [2]_, since we allow different epsilon according to dimension. Extends :class:`SingleChainMCMC`. References ---------- .. [1] Towards Unifying Hamiltonian Monte Carlo and Slice Sampling Yizhe Zhang, Xiangyu Wang, Changyou Chen, Ricardo Henao, Kai Fan, Lawrence Cari. Advances in Neural Information Processing Systems (NIPS) .. [2] MCMC using Hamiltonian dynamics Radford M. Neal, Chapter 5 of the Handbook of Markov Chain Monte Carlo by Steve Brooks, Andrew Gelman, Galin Jones, and Xiao-Li Meng. """ def __init__(self, x0, sigma0=None): super(MonomialGammaHamiltonianMCMC, self).__init__(x0, sigma0) # Set initial state self._running = False self._ready_for_tell = False # Current point in the Markov chain self._current = None # Aka current_q in the chapter self._current_energy = None # Aka U(current_q) = -log_pdf self._current_gradient = None self._current_momentum = None # Aka current_p # Current point in the leapfrog iterations self._momentum = None # Aka p in the chapter self._position = None # Aka q in the chapter self._gradient = None # Aka grad_U(q) in the chapter # Iterations, acceptance monitoring, and leapfrog iterations self._mcmc_iteration = 0 self._mcmc_acceptance = 0 self._frog_iteration = 0 # Default number of leapfrog iterations self._n_frog_iterations = 20 # Default integration step size for leapfrog algorithm self._epsilon = 0.1 self._step_size = None self.set_leapfrog_step_size(np.diag(self._sigma0)) # Divergence checking # Create a vector of divergent iterations self._divergent = np.asarray([], dtype='int') # Default threshold for Hamiltonian divergences # (currently set to match Stan) self._hamiltonian_threshold = 10**3 # Estimate quantities needed for sampling from the ke distribution self._a = 1.0 self._c = 0.2 self._m = 1.0
[docs] def a(self): """ Returns ``a`` in kinetic energy function. """ return self._a
[docs] def ask(self): """ See :meth:`SingleChainMCMC.ask()`. """ # Check ask/tell pattern if self._ready_for_tell: raise RuntimeError('Ask() called when expecting call to tell().') # Initialise on first call if not self._running: self._running = True self._initialise_ke() # Notes: # Ask is responsible for updating the position, which is the point # returned to the user # Tell is then responsible for updating the momentum, which uses the # gradient at this new point # The MCMC step happens in tell, and does not require any new # information (it uses the log_pdf and gradient of the final point # in the leapfrog run). # Very first iteration if self._current is None: # Ask for the pdf and gradient of x0 self._ready_for_tell = True return np.array(self._x0, copy=True) # First iteration of a run of leapfrog iterations if self._frog_iteration == 0: # Sample random momentum for current point using identity cov self._current_momentum = self._sample_momentum() # First leapfrog position is the current sample in the chain self._position = np.array(self._current, copy=True) self._gradient = np.array(self._current_gradient, copy=True) self._momentum = np.array(self._current_momentum, copy=True) # Perform a half-step before starting iteration 0 below self._momentum -= self._scaled_epsilon * self._gradient * 0.5 # Perform a leapfrog step for the position self._position += self._scaled_epsilon * ( self._K_deriv(self._momentum, self._a, self._c, self._m) ) # Ask for the pdf and gradient of the current leapfrog position # Using this, the leapfrog step for the momentum is performed in tell() self._ready_for_tell = True return np.array(self._position, copy=True)
[docs] def c(self): """ Returns ``c`` in kinetic energy function. """ return self._c
def _cdf(self, p, a, c, m, z): """ Auxillary kinetic energy cumulative density defined in [1]_. """ return integrate.quad(lambda p1: self._pdf(p1, a, c, m, z), -np.inf, p)[0]
[docs] def divergent_iterations(self): """ Returns the iteration number of any divergent iterations. """ return self._divergent
[docs] def epsilon(self): """ Returns epsilon used in leapfrog algorithm. """ return self._epsilon
def _g(self, p, a, m): """ Helper function used to define soft kinetic energy in [1]. """ return (1 / m) * np.sign(p) * np.abs(p)**(1 / a)
[docs] def hamiltonian_threshold(self): """ Returns threshold difference in Hamiltonian value from one iteration to next which determines whether an iteration is divergent. """ return self._hamiltonian_threshold
def _initialise_ke(self): """ Initialises functions needed for sampling from soft kinetic energy function defined in [1]_. """ z = integrate.quad( lambda p: np.exp(-self._K_indiv(p, self._a, self._c, self._m)), -np.inf, np.inf)[0] self._f = self._inverse_cdf_calculator(self._a, self._c, self._m, z) def _inverse_cdf_calculator(self, a, c, m, z, pmax=100): """ Estimates empirical cdf. """ p = np.linspace(-pmax, pmax, 1000) lcdf = [self._cdf(p[i], a, c, m, z) for i in range(1000)] f = interpolate.interp1d(lcdf, p, fill_value="extrapolate") return f def _K(self, v_p, a, c, m): """ Soft kinetic energy function defined in [1]_. """ return np.sum([self._K_indiv(p, a, c, m) for p in v_p]) def _K_deriv(self, v_p, a, c, m): """ Derivative of soft kinetic energy function defined in [1]_. """ return np.array([self._K_deriv_indiv(p, a, c, m) for p in v_p]) def _K_deriv_indiv(self, p, a, c, m): """ Derivative of soft kinetic energy function defined in [1]_ for individual momentum component. """ abs_p = np.abs(p) sign_p = np.sign(p) tanh = np.tanh(0.5 * c * abs_p**(1.0 / a) * sign_p / m) return abs_p**(-2 + 1.0 / a) * p * sign_p * tanh / (a * m) def _K_indiv(self, p, a, c, m): """ Soft kinetic energy function defined in [1]_ for individual momentum component. """ return -self._g(p, a, m) + (2.0 / c) * np.log( 1.0 + np.exp(c * self._g(p, a, m)))
[docs] def leapfrog_steps(self): """ Returns the number of leapfrog steps to carry out for each iteration. """ return self._n_frog_iterations
[docs] def leapfrog_step_size(self): """ Returns the step size for the leapfrog algorithm. """ return self._step_size
def _log_init(self, logger): """ See :meth:`Loggable._log_init()`. """ logger.add_float('Accept.') def _log_write(self, logger): """ See :meth:`Loggable._log_write()`. """ logger.log(self._mcmc_acceptance)
[docs] def mass(self): """ Returns mass ``m`` in kinetic energy function. """ return self._m
[docs] def name(self): """ See :meth:`pints.MCMCSampler.name()`. """ return 'Monomial-Gamma Hamiltonian Monte Carlo'
[docs] def needs_sensitivities(self): """ See :meth:`pints.MCMCSampler.needs_sensitivities()`. """ return True
[docs] def n_hyper_parameters(self): """ See :meth:`TunableMethod.n_hyper_parameters()`. """ return 5
def _pdf(self, p, a, c, m, z): """ Auxillary kinetic energy probability density defined in [1]_. """ return (1.0 / z) * np.exp(-self._K_indiv(p, a, c, m)) def _sample_momentum(self): """ Samples from soft kinetic energy pdf defined in [1]_. """ us = np.random.uniform(size=self._n_parameters) return np.array([self._f([u])[0] for u in us])
[docs] def scaled_epsilon(self): """ Returns scaled epsilon used in leapfrog algorithm. """ return self._scaled_epsilon
[docs] def set_a(self, a): """ Sets ``a`` in kinetic energy function. """ if a <= 0: raise ValueError("a must be positive") self._a = a
[docs] def set_c(self, c): """ Sets ``c`` in kinetic energy function. """ if c <= 0: raise ValueError("c must be positive") self._c = c
[docs] def set_epsilon(self, epsilon): """ Sets epsilon for the leapfrog algorithm. """ epsilon = float(epsilon) if epsilon <= 0: raise ValueError('epsilon must be positive for leapfrog algorithm') self._epsilon = epsilon self._set_scaled_epsilon()
[docs] def set_hamiltonian_threshold(self, hamiltonian_threshold): """ Sets threshold difference in Hamiltonian value from one iteration to next which determines whether an iteration is divergent. """ if hamiltonian_threshold < 0: raise ValueError('Threshold for divergent iterations must be ' + 'non-negative.') self._hamiltonian_threshold = hamiltonian_threshold
[docs] def set_hyper_parameters(self, x): """ The hyper-parameter vector is ``[leapfrog_steps, leapfrog_step_size, a, c, mass]``. See :meth:`TunableMethod.set_hyper_parameters()`. """ self.set_leapfrog_steps(x[0]) self.set_leapfrog_step_size(x[1]) self.set_a(x[2]) self.set_c(x[3]) self.set_mass(x[4])
[docs] def set_leapfrog_steps(self, steps): """ Sets the number of leapfrog steps to carry out for each iteration. """ steps = int(steps) if steps < 1: raise ValueError('Number of steps must exceed 0.') self._n_frog_iterations = steps
[docs] def set_leapfrog_step_size(self, step_size): """ Sets the step size for the leapfrog algorithm. """ a = np.atleast_1d(step_size) if len(a[a < 0]) > 0: raise ValueError( 'Step size for leapfrog algorithm must' + 'be greater than zero.' ) if len(a) == 1: step_size = np.repeat(step_size, self._n_parameters) elif not len(step_size) == self._n_parameters: raise ValueError( 'Step size should either be of length 1 or equal to the' + 'number of parameters' ) self._step_size = step_size self._set_scaled_epsilon()
[docs] def set_mass(self, m): """ Sets mass ``m`` in kinetic energy function. """ if m <= 0: raise ValueError("Mass must be positive") self._m = m
def _set_scaled_epsilon(self): """ Rescales epsilon along the dimensions of step_size. """ self._scaled_epsilon = np.zeros(self._n_parameters) for i in range(self._n_parameters): self._scaled_epsilon[i] = self._epsilon * self._step_size[i]
[docs] def tell(self, reply): """ See :meth:`pints.SingleChainMCMC.tell()`. """ if not self._ready_for_tell: raise RuntimeError('Tell called before proposal was set.') self._ready_for_tell = False # Unpack reply energy, gradient = reply # Check reply, copy gradient energy = float(energy) gradient = pints.vector(gradient) assert gradient.shape == (self._n_parameters, ) # Energy = -log_pdf, so flip both signs! energy = -energy gradient = -gradient # Very first call if self._current is None: # Check first point is somewhere sensible if not np.isfinite(energy): raise ValueError( 'Initial point for MCMC must have finite logpdf.') # Set current sample, energy, and gradient self._current = self._x0 self._current_energy = energy self._current_gradient = gradient # Increase iteration count self._mcmc_iteration += 1 # Mark current as read-only, so it can be safely returned. # Gradient won't be returned (only -gradient, so no need. self._current.setflags(write=False) # Return first point in chain return ( self._current, (-self._current_energy, -self._current_gradient), True ) # Set gradient of current leapfrog position self._gradient = gradient # Update the leapfrog iteration count self._frog_iteration += 1 # Not the last iteration? Then perform a leapfrog step and return if self._frog_iteration < self._n_frog_iterations: self._momentum -= self._scaled_epsilon * self._gradient # Return None to indicate there is no new sample for the chain return None # Final leapfrog iteration: only do half a step self._momentum -= self._scaled_epsilon * self._gradient * 0.5 # Before starting accept/reject procedure, check if the leapfrog # procedure has led to a finite momentum and logpdf. If not, reject. accept = 0 if np.isfinite(energy) and np.all(np.isfinite(self._momentum)): # Evaluate potential and kinetic energies at start and end of # leapfrog trajectory current_U = self._current_energy current_K = self._K(self._current_momentum, self._a, self._c, self._m) proposed_U = energy proposed_K = self._K(self._momentum, self._a, self._c, self._m) # Check for divergent iterations by testing whether the # Hamiltonian difference is above a threshold div = proposed_U + proposed_K - (self._current_energy + current_K) if np.abs(div) > self._hamiltonian_threshold: # pragma: no cover self._divergent = np.append( self._divergent, self._mcmc_iteration) self._momentum = self._position = self._gradient = None self._frog_iteration = 0 # Update MCMC iteration count self._mcmc_iteration += 1 # Update acceptance rate (only used for output!) self._mcmc_acceptance = ( (self._mcmc_iteration * self._mcmc_acceptance + accept) / (self._mcmc_iteration + 1)) return ( self._current, (-self._current_energy, -self._current_gradient), False ) # Accept/reject else: r = np.exp(current_U - proposed_U + current_K - proposed_K) if np.random.uniform(0, 1) < r: accept = 1 self._current = self._position self._current_energy = energy self._current_gradient = gradient # Mark current as read-only, so it can be safely returned. # Gradient won't be returned (only -gradient, so no need. self._current.setflags(write=False) # Reset leapfrog mechanism self._momentum = self._position = self._gradient = None self._frog_iteration = 0 # Update MCMC iteration count self._mcmc_iteration += 1 # Update acceptance rate (only used for output!) self._mcmc_acceptance = ( (self._mcmc_iteration * self._mcmc_acceptance + accept) / (self._mcmc_iteration + 1)) # Return current position as next sample in the chain return ( self._current, (-self._current_energy, -self._current_gradient), accept != 0 )