Source code for pints._mcmc._dram_ac

#
# DRAM AC MC method
#
# This file is part of PINTS (https://github.com/pints-team/pints/) which is
# released under the BSD 3-clause license. See accompanying LICENSE.md for
# copyright notice and full license details.
#
import pints
import numpy as np
import scipy.stats as stats


[docs]class DramACMC(pints.AdaptiveCovarianceMC): """ DRAM (Delayed Rejection Adaptive Covariance) MCMC, as described in [1]_. In this method, rejections do not necessarily lead an iteration to end. Instead, if a rejection occurs, another point is proposed although typically from a narrower (i.e. more conservative) proposal kernel than was used for the first proposal. In this approach, in each iteration, the following steps return the next state of the Markov chain (assuming the current state is ``theta_0`` and that there are 2 proposal kernels):: theta_1 ~ N(theta_0, lambda * scale_1 * sigma) alpha_1(theta_0, theta_1) = min(1, p(theta_1|X) / p(theta_0|X)) u_1 ~ uniform(0, 1) if alpha_1(theta_0, theta_1) > u_1: return theta_1 theta_2 ~ N(theta_0, lambda * scale_2 * sigma0) alpha_2(theta_0, theta_1, theta_2) = min(1, p(theta_2|X) (1 - alpha_1(theta_2, theta_1)) / (p(theta_0|X) (1 - alpha_1(theta_0, theta_1)))) u_2 ~ uniform(0, 1) if alpha_2(theta_0, theta_1, theta_2) > u_2: return theta_2 else: return theta_0 Our implementation also allows more than 2 proposal kernels to be used. This means that ``k`` accept-reject steps are taken. In each step (``i``), the probability that a proposal ``theta_i`` is accepted is:: alpha_i(theta_0, theta_1, ..., theta_i) = min(1, p(theta_i|X) / p(theta_0|X) * n_i / d_i) where:: n_i = (1 - alpha_1(theta_i, theta_i-1)) * (1 - alpha_2(theta_i, theta_i-1, theta_i-2)) * ... (1 - alpha_i-1(theta_i, theta_i-1, ..., theta_0)) d_i = (1 - alpha_1(theta_0, theta_1)) * (1 - alpha_2(theta_0, theta_1, theta_2)) * ... (1 - alpha_i-1(theta_0, theta_1, ..., theta_i-1)) If ``k`` proposals have been rejected, the initial point ``theta_0`` is returned. At the end of each iterations, a 'base' proposal kernel is adapted:: mu = (1 - gamma) mu + gamma theta sigma = (1 - gamma) sigma + gamma (theta - mu)(theta - mu)^t log_lambda = log_lambda + gamma (accepted - target_acceptance_rate) where ``gamma = adaptations^-eta``, ``theta`` is the current state of the Markov chain and ``accepted`` is a binary indicator for whether any of the series of proposals were accepted. The kernels for the all proposals are then adapted as ``[scale_1, scale_2, ..., scale_k] * sigma``, where the scale factors are set using ``set_sigma_scale``. *Extends:* :class:`GlobalAdaptiveCovarianceMC` References ---------- .. [1] "DRAM: Efficient adaptive MCMC". H Haario, M Laine, A Mira, E Saksman, (2006) Statistical Computing https://doi.org/10.1007/s11222-006-9438-0 """ def __init__(self, x0, sigma0=None): super(DramACMC, self).__init__(x0, sigma0) self._adapt_kernel = True self._before_kernels_set = True self._log_lambda = 0 self._n_kernels = 2 self._proposal_count = 0 self._sigma_base = np.copy(self._sigma) self._upper_scale = 1000 self._Y = [None] * self._n_kernels self._Y_log_pdf = np.zeros(self._n_kernels) def _adapt_sigma(self): """ Updates the covariance matrices of the various kernels being used according to adaptive Metropolis routine. """ dsigm = np.reshape(self._current - self._mu, (self._n_parameters, 1)) self._sigma_base = ((1 - self._gamma) * self._sigma_base + self._gamma * np.dot(dsigm, dsigm.T)) self._sigma = [self._sigma_scale[i] * self._sigma_base for i in range(self._n_kernels)] def _calculate_alpha_log(self, n, Y, log_Y): """ Calculates alpha expression necessary in eq. 3 of Haario et al. for determining accept/reject """ alpha_log = log_Y[n + 1] - log_Y[0] if n == 0: return min(0, alpha_log) Y_rev = Y[::-1] log_Y_rev = log_Y[::-1] for i in range(n): alpha_log += ( stats.multivariate_normal.logpdf( x=Y[n - i - 1], mean=Y[n + 1], cov=self._sigma[n], allow_singular=True) - stats.multivariate_normal.logpdf( x=Y[i], mean=self._current, cov=self._sigma[0], allow_singular=True) + np.log(1 - np.exp(self._calculate_alpha_log( i, Y_rev[0:(i + 2)], log_Y_rev[0:(i + 2)]))) - np.log(1 - np.exp(self._calculate_alpha_log( i, Y[0:(i + 2)], log_Y[0:(i + 2)]))) ) return min(0, alpha_log) def _calculate_r_log(self, fx): """ Calculates value of logged acceptance ratio (eq. 3 in [1]_). """ c = self._proposal_count temp_Y = np.concatenate([[self._current], self._Y[0:(c + 1)]]) temp_log_Y = np.concatenate( [[self._current_log_pdf], self._Y_log_pdf[0:(c + 1)]]) self._r_log = self._calculate_alpha_log(c, temp_Y, temp_log_Y) def _generate_proposal(self): """ See :meth:`AdaptiveCovarianceMC._generate_proposal()`. """ if self._before_kernels_set: self.set_sigma_scale() self._Y = [None] * self._n_kernels self._Y_log_pdf = np.zeros(self._n_kernels) self._before_kernels_set = False proposed = np.random.multivariate_normal( self._current, np.exp(self._log_lambda) * self._sigma[self._proposal_count]) self._Y[self._proposal_count] = proposed return proposed
[docs] def name(self): """ See :meth:`pints.MCMCSampler.name()`. """ return 'Delayed Rejection Adaptive Metropolis (Dram) MCMC'
[docs] def n_kernels(self): """ Returns number of proposal kernels. """ return self._n_kernels
[docs] def n_hyper_parameters(self): """ See :meth:`TunableMethod.n_hyper_parameters()`. """ return 3
[docs] def set_hyper_parameters(self, x): """ The hyper-parameter vector is ``[eta, n_kernels, upper_scale]``. See :meth:`TunableMethod.set_hyper_parameters()`. """ self.set_eta(x[0]) self.set_n_kernels(x[1]) self.set_upper_scale(x[2])
[docs] def set_n_kernels(self, n_kernels): """ Sets number of proposal kernels. """ if n_kernels < 1: raise ValueError('Number of proposal kernels must be equal to ' + 'or greater than 1.') self._n_kernels = int(n_kernels)
[docs] def set_upper_scale(self, upper_scale): """ Set the upper scale of initial covariance matrix multipliers for each of the kernels: ``[0,...,upper]`` where the gradations are uniform on the log10 scale meaning the proposal covariance matrices are: ``[10^upper,..., 1] * sigma``. """ if upper_scale < 0: raise ValueError('Upper scale must be positive.') self._upper_scale = upper_scale
[docs] def set_sigma_scale(self): """ Set the scale of initial covariance matrix multipliers for each of the kernels: ``[0,...,upper]`` where the gradations are uniform on the log10 scale meaning the proposal covariance matrices are: ``[10^upper,..., 1] * sigma``. """ a_min = np.log10(1) a_max = np.log10(self._upper_scale) self._sigma_scale = 10**np.linspace(a_min, a_max, self._n_kernels) self._sigma_scale = self._sigma_scale[::-1] self._sigma = [self._sigma_scale[i] * self._sigma_base for i in range(self._n_kernels)]
[docs] def sigma_scale(self): """ Returns scale factors used to multiply a base covariance matrix, resulting in proposal matrices for each accept-reject step. """ return self._sigma_scale
[docs] def tell(self, fx): """ If first proposal, then accept with ordinary Metropolis probability; if a later proposal, use probability determined by [1]_. """ # Check if we had a proposal if self._proposed is None: raise RuntimeError('Tell called before proposal was set.') # Ensure fx is a float fx = float(fx) self._Y_log_pdf[self._proposal_count] = fx # First point? if self._current is None: if not np.isfinite(fx): raise ValueError( 'Initial point for MCMC must have finite logpdf.') # Accept self._current = self._proposed self._current_log_pdf = fx # Increase iteration count self._iterations += 1 # Clear proposal self._proposed = None # Return first point for chain return self._current, self._current_log_pdf, True # Check if the proposed point can be accepted accepted = 0 if np.isfinite(fx): self._calculate_r_log(fx) u = np.log(np.random.uniform(0, 1)) if u < self._r_log: accepted = 1 self._current = self._proposed self._current_log_pdf = fx self._proposed = None if accepted == 0: # rejected proposal if self._n_kernels > 1 and ( self._proposal_count < (self._n_kernels - 1)): self._proposal_count += 1 return None else: self._proposal_count = 0 self._gamma = (self._adaptations**-self._eta) self._adaptations += 1 # Update mu, covariance matrix and log lambda self._adapt_mu() self._adapt_sigma() self._log_lambda += (self._gamma * (accepted - self._target_acceptance)) return self._current, self._current_log_pdf, accepted != 0
[docs] def upper_scale(self): """ Returns upper scale limit (see :meth:`pints.DramACMC.set_upper_scale()`). """ return self._upper_scale