Source code for pints._mcmc._dram_ac

#
# DRAM AC MC method
#
# This file is part of PINTS (https://github.com/pints-team/pints/) which is
# released under the BSD 3-clause license. See accompanying LICENSE.md for
# copyright notice and full license details.
#
import pints
import numpy as np


[docs] class DramACMC(pints.AdaptiveCovarianceMC): """ DRAM (Delayed Rejection Adaptive Covariance) MCMC, as described in [1]_. In this method, rejections do not necessarily lead an iteration to end. Instead, if a rejection occurs, another point is proposed although typically from a narrower (i.e. more conservative) proposal kernel than was used for the first proposal. The number of proposal kernels is fixed to 2. In this approach, in each iteration, the following steps return the next state of the Markov chain (assuming the current state is ``theta_0``):: theta_1 ~ N(theta_0, lambda * scale_1 * sigma) alpha_1(theta_0, theta_1) = min(1, p(theta_1|X) / p(theta_0|X)) u_1 ~ uniform(0, 1) if alpha_1(theta_0, theta_1) > u_1: return theta_1 theta_2 ~ N(theta_0, lambda * scale_2 * sigma0) alpha_2(theta_0, theta_1, theta_2) = min(1, p(theta_2|X) (1 - alpha_1(theta_2, theta_1)) / (p(theta_0|X) (1 - alpha_1(theta_0, theta_1)))) u_2 ~ uniform(0, 1) if alpha_2(theta_0, theta_1, theta_2) > u_2: return theta_2 else: return theta_0 At the end of each iterations, a 'base' proposal kernel is adapted:: mu = (1 - gamma) mu + gamma theta sigma = (1 - gamma) sigma + gamma (theta - mu)(theta - mu)^t log_lambda = log_lambda + gamma (accepted - target_acceptance_rate) where ``gamma = adaptations^-eta``, ``theta`` is the current state of the Markov chain and ``accepted`` is a binary indicator for whether any of the series of proposals were accepted. The kernels for the two proposals are then adapted as ``[scale_1, scale_2] * sigma``, where the scale factors are set using ``set_sigma_scale``. *Extends:* :class:`GlobalAdaptiveCovarianceMC` References ---------- .. [1] "DRAM: Efficient adaptive MCMC". H Haario, M Laine, A Mira, E Saksman, (2006) Statistical Computing https://doi.org/10.1007/s11222-006-9438-0 """ def __init__(self, x0, sigma0=None): super(DramACMC, self).__init__(x0, sigma0) self._log_lambda = 0 self._n_kernels = 2 # This is fixed! self._proposal_count = 0 self._sigma_base = np.copy(self._sigma) self._Y_log_pdf = np.zeros(self._n_kernels) self.set_sigma_scale([1, 0.01]) # scale used in [1]_ for experiments def _adapt_sigma(self): """ Updates the covariance matrices of the two kernels being used according to adaptive Metropolis routine. """ dsigm = np.reshape(self._current - self._mu, (self._n_parameters, 1)) self._sigma_base = ((1 - self._gamma) * self._sigma_base + self._gamma * np.dot(dsigm, dsigm.T)) self._sigma = [self._sigma_scale[i] * self._sigma_base for i in range(self._n_kernels)] def _alpha_1_log(self, fx, fy1): """ Calculates probability of acceptance in stage 1 of DRAM (eq. 1 in [1]_). """ alpha_log = fy1 - fx return min(0, alpha_log) def _alpha_2_log(self, fx, fy1, fy2): """ Calculates probability of acceptance in stage 1 of DRAM (eq. 2 in [1]_). """ alpha_log = (fy2 - fx + np.log1p(np.exp(self._alpha_1_log(fy2, fy1))) - np.log1p(np.exp(self._alpha_1_log(fx, fy1)))) return min(0, alpha_log) def _generate_proposal(self): """ See :meth:`AdaptiveCovarianceMC._generate_proposal()`. """ proposed = np.random.multivariate_normal( self._current, np.exp(self._log_lambda) * self._sigma[self._proposal_count]) return proposed
[docs] def name(self): """ See :meth:`pints.MCMCSampler.name()`. """ return 'Delayed Rejection Adaptive Metropolis (Dram) MCMC'
[docs] def n_hyper_parameters(self): """ See :meth:`TunableMethod.n_hyper_parameters()`. """ return 3
def _r_log(self): """ Calculates value of logged acceptance ratio (eq. 3 in [1]_). """ if self._proposal_count == 0: r_log = self._alpha_1_log( self._current_log_pdf, self._Y_log_pdf[0]) else: r_log = self._alpha_2_log( self._current_log_pdf, self._Y_log_pdf[0], self._Y_log_pdf[1]) return r_log
[docs] def set_hyper_parameters(self, x): """ The hyper-parameter vector is ``[eta, sigma_scale_1, sigma_scale_2]``. See :meth:`TunableMethod.set_hyper_parameters()`. """ self.set_eta(x[0]) self.set_sigma_scale(x[1:3])
[docs] def set_sigma_scale(self, scales): """ Set the scale of the mulipliers for the two proposal kernel covariance matrices. Must be of the form ``[scale_1, scale_2]``. """ if len(scales) != 2: raise ValueError('Scales must be of length 2.') for scale in scales: if scale <= 0: raise ValueError('Scales must be positive.') self._sigma_scale = [scales[0], scales[1]] self._sigma = [self._sigma_scale[i] * self._sigma_base for i in range(self._n_kernels)]
[docs] def sigma_scale(self): """ Returns scale factors used to multiply a base covariance matrix, resulting in proposal matrices for each accept-reject step. """ return self._sigma_scale
[docs] def tell(self, fx): """ If first proposal, then accept with ordinary Metropolis probability; if a later proposal, use probability determined by [1]_. """ # Check if we had a proposal if self._proposed is None: raise RuntimeError('Tell called before proposal was set.') if self._proposal_count == 0: self._iterations += 1 # Ensure fx is a float fx = float(fx) self._Y_log_pdf[self._proposal_count] = fx # First point? if self._current is None: if not np.isfinite(fx): raise ValueError( 'Initial point for MCMC must have finite logpdf.') self._current = self._proposed self._current_log_pdf = fx self._proposed = None # Return first point for chain return self._current, self._current_log_pdf, True # Check if the proposed point can be accepted accept = 0 if np.isfinite(fx): r_log = self._r_log() u = np.log(np.random.uniform(0, 1)) if u < r_log: accept = 1 self._acceptance_count += 1 self._current = self._proposed self._current_log_pdf = fx self._proposal_count = 0 self._Y_log_pdf = np.zeros(self._n_kernels) self._proposed = None if accept == 0: if self._proposal_count < (self._n_kernels - 1): self._proposal_count += 1 return None else: self._proposal_count = 0 # Update mu, covariance matrix and log lambda self._acceptance_rate = self._acceptance_count / self._iterations self._gamma = self._adaptations**-self._eta self._adaptations += 1 self._adapt_mu() self._adapt_sigma() self._log_lambda += (self._gamma * (accept - self._target_acceptance)) return self._current, self._current_log_pdf, accept != 0