#
# Relativistic MCMC method
#
# This file is part of PINTS (https://github.com/pints-team/pints/) which is
# released under the BSD 3-clause license. See accompanying LICENSE.md for
# copyright notice and full license details.
#
import numpy as np
import pints
from scipy.interpolate import interp1d
from scipy.optimize import root
import warnings
[docs]
class RelativisticMCMC(pints.SingleChainMCMC):
r"""
Implements Relativistic Monte Carlo as described in [1]_.
Uses a physical analogy of a particle moving across a landscape under
Hamiltonian dynamics to aid efficient exploration of parameter space.
Introduces an auxilary variable -- the momentum (``p_i``) of a particle
moving in dimension ``i`` of negative log posterior space -- which
supplements the position (``q_i``) of the particle in parameter space. The
particle's motion is dictated by solutions to Hamilton's equations,
.. math::
dq_i/dt &= \partial H/\partial p_i\\
dp_i/dt &= - \partial H/\partial q_i.
The Hamiltonian is given by,
.. math::
H(q,p) &= U(q) + KE(p)\\
&= -\text{log}(p(q|X)p(q)) +
mc^2 (\Sigma_{i=1}^{d} p_i^2 / (m^2 c^2) + 1)^{0.5}
where ``d`` is the dimensionality of model, ``m`` is the scalar 'mass'
given to each particle (chosen to be 1 as default) and ``c`` is the
speed of light (chosen to be 10 by default).
To numerically integrate Hamilton's equations, it is essential to use a
sympletic discretisation routine, of which the most typical approach is
the leapfrog method,
.. math::
p_i(t + \epsilon/2) &= p_i(t) - (\epsilon/2) d U(q_i(t))/dq_i\\
q_i(t + \epsilon) &= q_i(t) +
\epsilon M^{-1}(p_i(t + \epsilon/2)) p_i(t + \epsilon/2)\\
p_i(t + \epsilon) &= p_i(t + \epsilon/2) -
(\epsilon/2) d U(q_i(t + \epsilon))/dq_i
where relativistic mass (a scalar) is,
.. math::
M(p) = m (\Sigma_{i=1}^{d} p_i^2 / (m^2 c^2) + 1)^{0.5}
In particular, the algorithm we implement follows eqs. in section 2.1 of
[1]_.
Extends :class:`SingleChainMCMC`.
References
----------
.. [1] "Relativistic Monte Carlo". Xiaoyu Lu, Valerio Perrone,
Leonard Hasenclever, Yee Whye Teh, Sebastian J. Vollmer,
2017, Proceedings of Machine Learning Research.
"""
def __init__(self, x0, sigma0=None):
super(RelativisticMCMC, self).__init__(x0, sigma0)
# Set initial state
self._running = False
self._ready_for_tell = False
# Current point in the Markov chain
self._current = None # Aka current_q in the chapter
self._current_energy = None # Aka U(current_q) = -log_pdf
self._current_gradient = None
self._current_momentum = None # Aka current_p
# Current point in the leapfrog iterations
self._momentum = None # Aka p in the chapter
self._position = None # Aka q in the chapter
self._gradient = None # Aka grad_U(q) in the chapter
# Iterations, acceptance monitoring, and leapfrog iterations
self._mcmc_iteration = 0
self._mcmc_acceptance = 0
self._frog_iteration = 0
# Default number of leapfrog iterations
self._n_frog_iterations = 20
# Default integration step size for leapfrog algorithm
self._epsilon = 0.1
self._step_size = None
self._mass = 1
self._c = 10
self.set_leapfrog_step_size(np.diag(self._sigma0))
# Divergence checking
# Create a vector of divergent iterations
self._divergent = np.asarray([], dtype='int')
# Default threshold for Hamiltonian divergences
# (currently set to match Stan)
self._hamiltonian_threshold = 10**3
# Maximum number of points in integration grid for momentum
self._max_integration_size = 1e8
[docs]
def ask(self):
""" See :meth:`SingleChainMCMC.ask()`. """
# Check ask/tell pattern
if self._ready_for_tell:
raise RuntimeError('Ask() called when expecting call to tell().')
# Initialise on first call
if not self._running:
self._running = True
self._mc2 = self._mass * self._c**2
self._m2c2 = self._mass**2 * self._c**2
self._calculate_momentum_distribution()
# Notes:
# Ask is responsible for updating the position, which is the point
# returned to the user
# Tell is then responsible for updating the momentum, which uses the
# gradient at this new point
# The MCMC step happens in tell, and does not require any new
# information (it uses the log_pdf and gradient of the final point
# in the leapfrog run).
# Very first iteration
if self._current is None:
# Ask for the pdf and gradient of x0
self._ready_for_tell = True
return np.array(self._x0, copy=True)
# First iteration of a run of leapfrog iterations
if self._frog_iteration == 0:
# Sample random momentum for current point
self._current_momentum = self._sample_momentum()
# First leapfrog position is the current sample in the chain
self._position = np.array(self._current, copy=True)
self._gradient = np.array(self._current_gradient, copy=True)
self._momentum = np.array(self._current_momentum, copy=True)
# Perform a half-step before starting iteration 0 below
self._momentum -= self._scaled_epsilon * self._gradient * 0.5
# Perform a leapfrog step for the position
squared = np.sum(np.array(self._momentum)**2)
relativistic_mass = self._mass * np.sqrt(squared / self._m2c2 + 1)
self._position += (
self._scaled_epsilon * self._momentum / relativistic_mass)
# Ask for the pdf and gradient of the current leapfrog position
# Using this, the leapfrog step for the momentum is performed in tell()
self._ready_for_tell = True
return np.array(self._position, copy=True)
def _calculate_momentum_distribution(self):
"""Calculate an approximation to the CDF of momentum magnitude.
The purpose of this method is to calculate :member:_inv_cdf, a function
giving a numerical approximation to the inverse cumulative distribution
function of the magnitude of the momentum vector. This function can be
used to perform inverse transform sampling to generate samples of the
momentum.
This method should be called before any sampling is performed. As
implemented, it is called upon the first call to self.ask.
Numerical integration of the momentum density is performed using the
trapezoidal rule.
"""
# Calculate the value of p (momentum) corresponding to the maximum of
# the pdf
def logpdf_deriv(p):
# logpdf_deriv(p) = 0 is the simplified form of d/dp(logpdf(p)) = 0
d = np.sqrt(self._mass * (self._n_parameters - 1)) \
* (p**2 + self._m2c2) ** 0.25 / np.sqrt(self._c * self._mass) \
- p
return d
p_max = root(logpdf_deriv, self._mass).x[0]
# The initial integration grid goes from 0 to twice p_max, with 1000
# grid points
max_value = 2 * p_max
spacing = max_value / 1000
integration_accepted = False
while not integration_accepted:
# Evaluate the logpdf on a grid
integration_grid = np.arange(
min(1e-6, 0.5 * spacing), max_value, spacing)
logpdf_values = self._momentum_logpdf(integration_grid)
# Integrate using the trapezoidal rule
# Note that the step size (and any other constants) can be ignored
# here, as the CDF will be normalized after this calculation
cdf = np.logaddexp.accumulate(
np.logaddexp(logpdf_values[1:], logpdf_values[:-1]))
# Normalize the CDF
cdf = np.exp(cdf - cdf[-1])
# Adapt the integration grid if the result is inaccurate
if np.diff(cdf)[-1] > 0.001 * max(np.diff(cdf)):
# cdf is still increasing at the end
max_value *= 2
spacing *= 2
elif max(np.diff(cdf)) > 1e-3:
# cdf is changing too much, so a finer grid is required
spacing /= 2
else:
integration_accepted = True
if max_value / spacing > self._max_integration_size:
warnings.warn('Failed to approximate momentum distribution '
'for given mass and speed of light. Samples of '
'momentum may be inaccurate.')
integration_accepted = True
# Do a reverse interpolation to approximate inverse cdf
inv_cdf = interp1d([0.0] + list(cdf), integration_grid)
# Save the inverse cdf so that it can be used for sampling
self._inv_cdf = inv_cdf
[docs]
def divergent_iterations(self):
"""
Returns the iteration number of any divergent iterations
"""
return self._divergent
[docs]
def epsilon(self):
"""
Returns epsilon used in leapfrog algorithm
"""
return self._epsilon
[docs]
def hamiltonian_threshold(self):
"""
Returns threshold difference in Hamiltonian value from one iteration to
next which determines whether an iteration is divergent.
"""
return self._hamiltonian_threshold
[docs]
def leapfrog_steps(self):
"""
Returns the number of leapfrog steps to carry out for each iteration.
"""
return self._n_frog_iterations
[docs]
def leapfrog_step_size(self):
"""
Returns the step size for the leapfrog algorithm.
"""
return self._step_size
def _log_init(self, logger):
""" See :meth:`Loggable._log_init()`. """
logger.add_float('Accept.')
def _log_write(self, logger):
""" See :meth:`Loggable._log_write()`. """
logger.log(self._mcmc_acceptance)
def _kinetic_energy(self, momentum):
"""
Kinetic energy of relativistic particle, which is defined in [1]_.
"""
squared = np.sum(np.array(momentum)**2)
return self._mc2 * (squared / self._m2c2 + 1)**0.5
[docs]
def mass(self):
""" Returns ``mass`` which is the rest mass of particle. """
return self._mass
def _momentum_logpdf(self, u):
r"""Evalute the unnormalized logpdf of the magnitude of momentum.
The probability density of the momentum vector :math:`p` is stated in
[1]_ as :math:`f(p) \propto e^{-K(p)}`, where
.. math::
K(p) = m c^2 \left( \frac{p^T p}{m^2 c^2} + 1 \right)^{1/2}
Note that this is the distribution for the vector
:math:`p = (p_1, p_2, \dots, p_n)`, where n is the dimension of the
parameter space. However, this distribution has no dependence on the
direction of the vector p. Thus, the distribution for the magnitude
(:math:`||p||`) of the momentum vector can be obtained directly, but we
must include the Jacobian term for the transformation to hyperspherical
coordinates, such that
.. math::
f(||p||) \propto e^{-K(p)} ||p||^{n-1}
The logarithm of this unnormalized density is returned by this method.
Parameters
----------
u : float
Where to evaluate the unnormalized log pdf of the magnitude of
momentum.
"""
return -self._mc2 * np.sqrt(u ** 2 / self._m2c2 + 1) \
+ np.log(u ** (self._n_parameters - 1))
[docs]
def n_hyper_parameters(self):
""" See :meth:`TunableMethod.n_hyper_parameters()`. """
return 4
[docs]
def name(self):
""" See :meth:`pints.MCMCSampler.name()`. """
return 'Relativistic MCMC'
[docs]
def needs_sensitivities(self):
""" See :meth:`pints.MCMCSampler.needs_sensitivities()`. """
return True
def _sample_momentum(self):
"""Draw a sample of the momentum vector.
This method first samples a random direction for the momentum. Then, it
samples the magnitude of momentum using inverse transform sampling. It
assumes that the inverse cumulative distribution function for the
magnitude of momentum has already been calculated by the
:meth:`_calculate_momentum_distribution` method.
"""
# Sample a direction for the momentum vector, which is uniform
dir = np.random.randn(self._n_parameters)
dir /= np.linalg.norm(dir)
# Sample a magnitude for the momentum vector
u = np.random.random()
p = self._inv_cdf(u)
return p * dir
[docs]
def scaled_epsilon(self):
"""
Returns scaled epsilon used in leapfrog algorithm
"""
return self._scaled_epsilon
def _set_scaled_epsilon(self):
"""
Rescales epsilon along the dimensions of step_size
"""
self._scaled_epsilon = np.zeros(self._n_parameters)
for i in range(self._n_parameters):
self._scaled_epsilon[i] = self._epsilon * self._step_size[i]
[docs]
def set_epsilon(self, epsilon):
"""
Sets epsilon for the leapfrog algorithm
"""
epsilon = float(epsilon)
if epsilon <= 0:
raise ValueError('epsilon must be positive for leapfrog algorithm')
self._epsilon = epsilon
self._set_scaled_epsilon()
[docs]
def set_hamiltonian_threshold(self, hamiltonian_threshold):
"""
Sets threshold difference in Hamiltonian value from one iteration to
next which determines whether an iteration is divergent.
"""
if hamiltonian_threshold < 0:
raise ValueError('Threshold for divergent iterations must be ' +
'non-negative.')
self._hamiltonian_threshold = hamiltonian_threshold
[docs]
def set_hyper_parameters(self, x):
"""
The hyper-parameter vector is ``[leapfrog_steps, leapfrog_step_size,
mass, c]``.
See :meth:`TunableMethod.set_hyper_parameters()`.
"""
self.set_leapfrog_steps(x[0])
self.set_leapfrog_step_size(x[1])
self.set_mass(x[2])
self.set_speed_of_light(x[3])
[docs]
def set_leapfrog_steps(self, steps):
"""
Sets the number of leapfrog steps to carry out for each iteration.
"""
steps = int(steps)
if steps < 1:
raise ValueError('Number of steps must exceed 0.')
self._n_frog_iterations = steps
[docs]
def set_leapfrog_step_size(self, step_size):
"""
Sets the step size for the leapfrog algorithm.
"""
a = np.atleast_1d(step_size)
if len(a[a < 0]) > 0:
raise ValueError(
'Step size for leapfrog algorithm must' +
'be greater than zero.'
)
if len(a) == 1:
step_size = np.repeat(step_size, self._n_parameters)
elif not len(step_size) == self._n_parameters:
raise ValueError(
'Step size should either be of length 1 or equal to the' +
'number of parameters'
)
self._step_size = step_size
self._set_scaled_epsilon()
[docs]
def set_mass(self, mass):
""" Sets scalar mass. """
if isinstance(mass, list):
raise ValueError('Mass must be scalar.')
if mass <= 0:
raise ValueError('Mass must be positive.')
self._mass = mass
[docs]
def set_speed_of_light(self, c):
""" Sets `speed of light`. """
if c <= 0:
raise ValueError('Speed of light must be positive.')
self._c = c
[docs]
def speed_of_light(self):
""" Returns `speed of light`. """
return self._c
[docs]
def tell(self, reply):
""" See :meth:`pints.SingleChainMCMC.tell()`. """
if not self._ready_for_tell:
raise RuntimeError('Tell called before proposal was set.')
self._ready_for_tell = False
# Unpack reply
energy, gradient = reply
# Check reply, copy gradient
energy = float(energy)
gradient = pints.vector(gradient)
assert gradient.shape == (self._n_parameters, )
# Energy = -log_pdf, so flip both signs!
energy = -energy
gradient = -gradient
# Very first call
if self._current is None:
# Check first point is somewhere sensible
if not np.isfinite(energy):
raise ValueError(
'Initial point for MCMC must have finite logpdf.')
# Set current sample, energy, and gradient
self._current = self._x0
self._current_energy = energy
self._current_gradient = gradient
# Increase iteration count
self._mcmc_iteration += 1
# Mark current as read-only, so it can be safely returned.
# Gradient won't be returned (only -gradient, so no need to copy).
self._current.setflags(write=False)
# Return first point in chain
return (
self._current,
(-self._current_energy, -self._current_gradient),
True
)
# Set gradient of current leapfrog position
self._gradient = gradient
# Update the leapfrog iteration count
self._frog_iteration += 1
# Not the last iteration? Then perform a leapfrog step and return
if self._frog_iteration < self._n_frog_iterations:
self._momentum -= self._scaled_epsilon * self._gradient
# Return None to indicate there is no new sample for the chain
return None
# Final leapfrog iteration: only do half a step
self._momentum -= self._scaled_epsilon * self._gradient * 0.5
# Before starting accept/reject procedure, check if the leapfrog
# procedure has led to a finite momentum and logpdf. If not, reject.
accept = 0
if np.isfinite(energy) and np.all(np.isfinite(self._momentum)):
# Evaluate potential and kinetic energies at start and end of
# leapfrog trajectory
current_U = self._current_energy
current_K = self._kinetic_energy(self._current_momentum)
proposed_U = energy
proposed_K = self._kinetic_energy(self._momentum)
# Check for divergent iterations by testing whether the
# Hamiltonian difference is above a threshold
div = proposed_U + proposed_K - (self._current_energy + current_K)
if np.abs(div) > self._hamiltonian_threshold: # pragma: no cover
self._divergent = np.append(
self._divergent, self._mcmc_iteration)
self._momentum = self._position = self._gradient = None
self._frog_iteration = 0
# Update MCMC iteration count
self._mcmc_iteration += 1
# Update acceptance rate (only used for output!)
self._mcmc_acceptance = (
(self._mcmc_iteration * self._mcmc_acceptance + accept) /
(self._mcmc_iteration + 1))
return (
self._current,
(-self._current_energy, -self._current_gradient),
False
)
# Accept/reject
else:
r = np.exp(current_U - proposed_U + current_K - proposed_K)
if np.random.uniform(0, 1) < r:
accept = 1
self._current = self._position
self._current_energy = energy
self._current_gradient = gradient
# Mark current as read-only, so it can be safely returned.
# Gradient won't be returned (only -gradient, so no need
# to copy).
self._current.setflags(write=False)
# Reset leapfrog mechanism
self._momentum = self._position = self._gradient = None
self._frog_iteration = 0
# Update MCMC iteration count
self._mcmc_iteration += 1
# Update acceptance rate (only used for output!)
self._mcmc_acceptance = (
(self._mcmc_iteration * self._mcmc_acceptance + accept) /
(self._mcmc_iteration + 1))
# Return current position as next sample in the chain
return (
self._current,
(-self._current_energy, -self._current_gradient),
accept != 0
)