Source code for pints._mcmc._slice_doubling

# -*- coding: utf-8 -*-
#
# Slice Sampling with Doubling MCMC Method
#
# This file is part of PINTS (https://github.com/pints-team/pints/) which is
# released under the BSD 3-clause license. See accompanying LICENSE.md for
# copyright notice and full license details.
#
import pints
import numpy as np


[docs]class SliceDoublingMCMC(pints.SingleChainMCMC): r""" Implements Slice Sampling with Doubling, as described in [1]_. This is a univariate method, which is applied in a Slice-Sampling-within-Gibbs framework to allow MCMC sampling from multivariate models. Generates samples by sampling uniformly from the volume underneath the posterior (:math:`f`). It does so by introducing an auxiliary variable (:math:`y`) and by defining a Markov chain. If the distribution is univariate, sampling follows: 1. Calculate the pdf (:math:`f(x0)`) of the current sample (:math:`x0`). 2. Draw a real value (:math:`y`) uniformly from (0, f(x0)), defining a horizontal "slice": :math:`S = {x: y < f (x)}`. Note that :math:`x0` is always within S. 3. Find an interval (:math:`I = (L, R)`) around :math:`x0` that contains all, or much, of the slice. 4. Draw a new point (:math:`x1`) from the part of the slice within this interval. If the distribution is multivariate, we apply the univariate algorithm to each variable in turn, where the other variables are set at their current values. This implementation uses the "Doubling" method to estimate the interval :math:`I = (L, R)`, as described in [1] Fig. 4. pp.715 and consists of the following steps: 1. :math:`U \sim uniform(0, 1)` 2. :math:`L = x_0 - wU` 3. :math:`R = L + w` 4. :math:`K = p` 5. while :math:`K > 0` and :math:`{y < f(L) or y < f(R)}`: a. :math:`V \sim uniform(0, 1)` b. if :math:`V < 0.5`, then :math:`L = L - (R - L)` else, :math:`R = R + (R - L)` 6. :math:`K = K - 1` Intuitively, the interval ``I`` is estimated by expanding the initial interval by producing a sequence of intervals, each twice the size of the previous one, until an interval is found with both ends outside the slice, or until a pre-determined limit is reached. The parameters ``p`` (an integer, which determines the limit of slice size) and ``w`` (the estimate of typical slice width) are hyperparameters. To sample from the interval :math:`I = (L, R)`, such that the sample :math:`x` satisfies :math:`y < f(x)`, we use the "Shrinkage" procedure, which reduces the size of the interval after rejecting a trial point, as defined in [1] Fig. 5. pp.716. This algorithm consists of the following steps: 1. :math:`\bar{L} = L` and :math:`\bar{R} = R` 2. Repeat: a. :math:`U \sim uniform(0, 1)` b. :math:`x_1 = \bar{L} + U (\bar{R} - \bar{L})` c. if :math:`y < f(x_1)` and :math:`Accept(x_1)`, exit loop else: if :math:`x_1 < x_0`, then :math:`\bar{L} = x_1` else :math:`\bar{R} = x_1` Intuitively, we uniformly sample a trial point from the interval ``I``, and subsequently shrink the interval each time a trial point is rejected. The ``Accept(x_1)`` check is required to guarantee detailed balance. We shall refer to this check as the ``Acceptance Check``. Intuitively, it tests whether starting the doubling expansion at ``x_1`` leads to an earlier termination compared to starting it from the current state ``x_0``. The procedure works backward through the intervals that the doubling expansion would pass through to arrive at ``I`` when starting from ``x_1``, checking that none of them has both ends outside the slice. The algorithm is described in [1] Fig. 6. pp.717 and it consists of the following steps: 1. :math:`\hat{L} = L` and :math:`\hat{R} = R` and :math:`D = False` 2. while :math:`\hat{R} - \hat{L} > 1.1w`: a. M = :math:`(\hat{L} + \hat{R})/2` b. if {:math:`x_0 < M` and :math:`x_1 >= M`} or {:math:`x_0 >= M` and :math:` x_1 < M`}, then :math:`D = True` c. if :math:`x_1 < M`, then :math:`\hat{R} = M` else, :math:`\hat{L} = M` d. if :math:`D` and :math:`y >= f(\hat{L})` and :math:`y >= f(\hat{R})`, then reject proposal 3. If the proposal is not rejected in the previous loop, accept it The multiplication by ``1.1`` in the ``while`` condition in Step 2 guards against possible round-off errors. The variable ``D`` tracks whether the intervals that would be generated from ``x_1`` differ from those leading to ``x_0``: when they don't, time is saved by omitting the subsequent check. To avoid floating-point underflow, we implement the suggestion advanced in [1] pp.712. We use the log pdf of the un-normalised posterior (:math:`g(x) = log(f(x))`) instead of :math:`f(x)`. In doing so, we use an auxiliary variable :math:`z = log(y) = g(x0) - \epsilon`, where :math:`\epsilon \sim \text{exp}(1)` and define the slice as :math:`S = {x : z < g(x)}`. Extends :class:`SingleChainMCMC`. References ---------- .. [1] Neal, R.M., 2003. Slice sampling. The annals of statistics, 31(3), pp.705-767. https://doi.org/10.1214/aos/1056562461 """ def __init__(self, x0, sigma0=None): super(SliceDoublingMCMC, self).__init__(x0, sigma0) # Set initial state self._x0 = np.asarray(x0, dtype=float) self._running = False self._ready_for_tell = False self._current = None self._current_log_pdf = None self._current_log_y = None self._proposed = None self._proposed_pdf = None # Flag used to store the log_pdf of the proposed sample self._sent_proposal = False # Default initial interval width ``w`` used in the Doubling procedure # to expand the interval self._w = np.abs(self._x0) self._w[self._w == 0] = 1 self._w = 0.1 * self._w # Default integer ``p`` limiting the size of the interval to # ``(2^p) * w``. # Integer ``k``` is used to count the interval expansion steps self._p = 10 self._k = 0 # Flag to initialise the expansion of the interval ``I=(L,R)`` self._first_expansion = False # Flag indicating whether the interval expansion is concluded self._interval_found = False # Edges of the interval ``I=(L,R)`` self._l = None self._r = None # Parameter values at interval edges self._temp_l = None self._temp_r = None # Log_pdf of interval edges self._fx_l = None self._fx_r = None # Edges used for the ``Acceptance Check`` self._l_hat = None self._r_hat = None # Parameter values at ``Acceptance Check`` edges ``l_hat ,r_hat`` self._temp_l_hat = None self._temp_r_hat = None # Log_pdf of the ``Acceptance Check`` interval edge points # ``l_hat,r_hat`` self._fx_l_hat = None self._fx_r_hat = None # Variable used in the ``Acceptance Check`` procedure to track whether # the intervals that would be generated from the new trial point # differ from those leading to the current point. self._d = False # Flag to initialise the ``Acceptance Check`` self._init_check = False # Flag to control the ``Acceptance Check`` self._continue_check = False # Mid point between ``l_hat`` and ``r_hat`` used in the ``Acceptance # Check`` self._m = None # Index of parameter ``xi`` we are updating of the sample # ``x = (x1,...,xn)`` self._active_param_index = 0 # Flags used to calculate log_pdf of initial interval edges ``l,r`` self._init_left = False self._init_right = False # Flags used to calculate log_pdf of initial ``Acceptance Check`` # edges ``l_hat,r_hat`` self._init_left_hat = False self._init_right_hat = False
[docs] def ask(self): """ See :meth:`SingleChainMCMC.ask()`. """ # Check ask/tell pattern if self._ready_for_tell: raise RuntimeError('Ask() called when expecting call to tell().') # Initialise on first call if not self._running: self._running = True # Very first iteration if self._current is None: # Ask for the log pdf of x0 self._ready_for_tell = True return np.array(self._x0, copy=True) # Initialise the expansion of interval ``I=(l,r)`` if self._first_expansion: self._k = self._p # Set initial values for ``l,r`` self._u = np.random.uniform() self._l = (self._proposed[self._active_param_index] - self._w[self._active_param_index] * self._u) self._r = self._l + self._w[self._active_param_index] # Initialise arrays used for calculating the log_pdf of the # edges ``l,r`` self._temp_l = np.array(self._proposed, copy=True) self._temp_r = np.array(self._proposed, copy=True) self._temp_l[self._active_param_index] = self._l self._temp_r[self._active_param_index] = self._r self._first_expansion = False # Set flags to calculate log_pdf of ``l,r`` self._init_left = True self._init_right = True # Ask for log_pdf of initial edges ``l,r``` if self._init_left: self._ready_for_tell = True return np.array(self._temp_l, copy=True) if self._init_right: self._ready_for_tell = True return np.array(self._temp_r, copy=True) # Expand the interval ``I``` until edges ``l,r`` are outside the slice # or until expansion limit is reached if self._k > 0 and (self._current_log_y < self._fx_l or self._current_log_y < self._fx_r): self._k -= 1 # Use ``Doubling`` expansion procedure as described in # [1] Fig. 4. pp.715 self._v = np.random.uniform() self._ready_for_tell = True if self._v < .5: self._l = self._l - (self._r - self._l) self._temp_l[self._active_param_index] = self._l return np.array(self._temp_l, copy=True) else: self._r = self._r + (self._r - self._l) self._temp_r[self._active_param_index] = self._r return np.array(self._temp_r, copy=True) self._interval_found = True # After having proposed a new point, we initialise the # ``Acceptance Check`` if self._init_check: self._init_check = False # Initialise edges for the ``Acceptance Check`` self._l_hat = self._l self._r_hat = self._r self._temp_l_hat = np.array(self._temp_l, copy=True) self._temp_r_hat = np.array(self._temp_r, copy=True) # Set flags to calculate log_pdf of ``l_hat,r_hat`` self._init_left_hat = True self._init_right_hat = True # Ask for log_pdf of initial edges ``l_hat,r_hat`` if self._init_left_hat: self._ready_for_tell = True return np.array(self._temp_l_hat, copy=True) if self._init_right_hat: self._ready_for_tell = True return np.array(self._temp_r_hat, copy=True) # After having initialised the ``Acceptance Check`` procedure, we # continue with the checking loop if self._continue_check: # Work backward through the intervals that the doubling procedure # would pass through to arrive at the interval ``I`` when starting # from the new trial point. if ((self._r_hat - self._l_hat) > 1.1 * self._w[self._active_param_index]): # Calculate interval ``A=(l_hat, r_hat)`` mid point self._m = (self._l_hat + self._r_hat) / 2 # Boolean ``d`` tracks tracks whether the intervals that would # be generated from the new point differ from those leading to # the current point if ((self._current[self._active_param_index] < self._m and self._proposed[self._active_param_index] >= self._m) or (self._current[self._active_param_index] >= self._m and self._proposed[self._active_param_index] < self._m)): self._d = True self._ready_for_tell = True # Work backward through the doubling procedure starting from # the trial point if self._proposed[self._active_param_index] < self._m: self._r_hat = self._m self._temp_r_hat[self._active_param_index] = self._r_hat return np.array(self._temp_r_hat, copy=True) if self._proposed[self._active_param_index] >= self._m: self._l_hat = self._m self._temp_l_hat[self._active_param_index] = self._l_hat return np.array(self._temp_l_hat, copy=True) # Now that (r_hat - l_hat) <= 1.1*w, the ``Acceptance Check`` loop # is over and we accept the trial point. # Reset variables self._fx_r_hat = None self._fx_l_hat = None self._d = False self._continue_check = False # Send the accepted trial point self._ready_for_tell = True return np.array(self._proposed, copy=True) # Sample new parameter by sampling uniformly from the interval # ``I=(l,r)`` self._u = np.random.uniform() self._proposed[self._active_param_index] = (self._l + self._u * (self._r - self._l)) # Set ``Acceptance Check`` flags for the proposal point self._init_check = True self._continue_check = True # Set flag indicating we have created a new proposal. This is used to # store the value of the log_pdf of the proposed point in the tell() # method self._sent_proposal = True # Send new point for to check ``f(x_1) >= y`` self._ready_for_tell = True return np.array(self._proposed, copy=True)
[docs] def current_slice_height(self): """ Returns current height value used to define the current slice. """ return self._current_log_y
[docs] def expansion_steps(self): """ Returns integer used for limiting interval expansion. """ return self._p
[docs] def name(self): """ See :meth:`pints.MCMCSampler.name()`. """ return 'Slice Sampling - Doubling'
[docs] def n_hyper_parameters(self): """ See :meth:`TunableMethod.n_hyper_parameters()`. """ return 2
[docs] def set_expansion_steps(self, p): """ Set integer for limiting interval expansion. """ p = int(p) if p <= 0: raise ValueError( 'Integer must be greater than zero (to limit the interval size' ' to (2 ** integer) * width).') self._p = p
[docs] def set_hyper_parameters(self, x): """ The hyper-parameter vector is ``[width, expansion steps]``. See :meth:`TunableMethod.set_hyper_parameters()`. """ self.set_width(x[0]) self.set_expansion_steps(x[1])
[docs] def set_width(self, w): """ Sets the width for generating the interval. This can either be a single number or an array with the same number of elements as the number of variables to update. """ if np.isscalar(w): w = np.ones(self._n_parameters) * w else: w = pints.vector(w) if len(w) != self._n_parameters: raise ValueError( 'Width for interval expansion must a scalar or an array' ' of length n_parameters.') if np.any(w < 0): raise ValueError('Width for interval expansion must be positive.') self._w = w
[docs] def tell(self, fx): """ See :meth:`pints.SingleChainMCMC.tell()`. """ # Check ask/tell pattern if not self._ready_for_tell: raise RuntimeError('Tell called before proposal was set.') self._ready_for_tell = False # Ensure fx is a float fx = float(fx) # If this is the log_pdf of a new point, save the value and use it # to check ``f(x_1) >= y`` if self._sent_proposal: self._proposed_pdf = fx self._sent_proposal = False # Very first call if self._current is None: # Check first point is somewhere sensible if not np.isfinite(fx): raise ValueError( 'Initial point for MCMC must have finite logpdf.') # Set current sample, log pdf of current sample and initialise # proposed sample for next iteration self._current = np.array(self._x0, copy=True) self._current_log_pdf = fx self._proposed = np.array(self._current, copy=True) # Sample height of the slice log_y for the next iteration self._e = np.random.exponential(1) self._current_log_y = self._current_log_pdf - self._e # Set flag to true as we need to initialise the interval expansion # for the next iteration self._first_expansion = True # Return first point in chain, which is x0 return np.copy(self._current), self._current_log_pdf, True # While we expand the interval ``I=(l,r)``, we return None if not self._interval_found: # Set the log_pdf of the current interval ``I``` edges ``l,r`` if self._init_left: self._fx_l = fx self._init_left = False elif self._init_right: self._fx_r = fx self._init_right = False else: if self._v < .5: self._fx_l = fx else: self._fx_r = fx return None # Check ``f(x_1) >= y`` if self._current_log_y < self._proposed_pdf: # Start the ``Acceptance Check`` if self._continue_check: if self._init_left_hat: self._fx_l_hat = fx self._init_left_hat = False elif self._init_right_hat: self._fx_r_hat = fx self._init_right_hat = False elif not self._init_check: if self._proposed[self._active_param_index] < self._m: self._fx_r_hat = fx else: self._fx_l_hat = fx # If the condition is met, the point fails the # ``Acceptance Check`` and is rejected if (self._d and self._current_log_y >= self._fx_l_hat and self._current_log_y >= self._fx_r_hat): # Shrink the interval ``I=(l,r)`` if (self._proposed[self._active_param_index] < self._current[self._active_param_index]): self._l = self._proposed[self._active_param_index] self._temp_l[self._active_param_index] = self._l else: self._r = self._proposed[self._active_param_index] self._temp_r[self._active_param_index] = self._r # Reset variables self._continue_check = False self._d = False self._fx_r_hat = None self._fx_l_hat = None return None # If the rejection condition is not met, we continue the # ``Acceptance Check`` while (r_hat - l_hat ) > 1.1*w return None # We have updated successfully the parameter of the new sample! self._first_expansion = True self._interval_found = False # Reset active parameter index if self._active_param_index == len(self._proposed) - 1: self._active_param_index = 0 # The accepted sample becomes the new current sample self._current = np.array(self._proposed, copy=True) # The log_pdf of the accepted sample is used to construct the # new slice self._current_log_pdf = fx # Sample new log_y used to define the next slice self._e = np.random.exponential(1) self._current_log_y = self._current_log_pdf - self._e # Return the accepted sample return np.copy(self._current), self._current_log_pdf, True else: self._active_param_index += 1 return None # If the trial point is rejected in the ``Threshold Check``, shrink # the interval if (self._proposed[self._active_param_index] < self._current[self._active_param_index]): self._l = self._proposed[self._active_param_index] self._temp_l[self._active_param_index] = self._l else: self._r = self._proposed[self._active_param_index] self._temp_r[self._active_param_index] = self._r # If the point has been rejected, set flag for ``Acceptance Check`` # check to False so that we can sample a new point self._init_check = False self._continue_check = False return None
[docs] def width(self): """ Returns the width used for generating the interval. """ return np.copy(self._w)