Source code for pints._nested

#
# Sub-module containing nested samplers
#
# This file is part of PINTS (https://github.com/pints-team/pints/) which is
# released under the BSD 3-clause license. See accompanying LICENSE.md for
# copyright notice and full license details.
#
import pints
import numpy as np

try:
    from scipy.special import logsumexp
except ImportError:     # pragma: no cover
    # Older versions
    from scipy.misc import logsumexp


[docs] class NestedSampler(pints.TunableMethod): """ Abstract base class for nested samplers. Parameters ---------- log_prior : pints.LogPrior A logprior to draw proposal samples from. """ def __init__(self, log_prior): # Store logprior if not isinstance(log_prior, pints.LogPrior): raise ValueError('Given log_prior must extend pints.LogPrior') # prior accessed by subclasses to do prior sampling in ask() step self._log_prior = log_prior # Current value of the threshold log-likelihood value self._running_log_likelihood = -np.inf self._proposed = None # Initialise active point containers self._n_active_points = 400 self._n_parameters = self._log_prior.n_parameters() self._m_active = np.zeros((self._n_active_points, self._n_parameters + 1)) self._min_index = None self._accept_count = 0 self._n_evals = 0
[docs] def active_points(self): """ Returns the active points from nested sampling run. """ return self._m_active
[docs] def ask(self): """ Proposes new point at which to evaluate log-likelihood. """ raise NotImplementedError
def _initialise_active_points(self, m_initial, v_fx): """ Sets initial active points matrix. """ for i, fx in enumerate(v_fx): self._m_active[i, self._n_parameters] = fx self._m_active[:, :-1] = m_initial self._min_index = np.argmin(self._m_active[:, self._n_parameters]) self._set_running_log_likelihood( self._m_active[self._min_index, self._n_parameters])
[docs] def in_initial_phase(self): """ For methods that need an initial phase (see :meth:`needs_initial_phase()`), this method returns ``True`` if the method is currently configured to be in its initial phase. For other methods a ``NotImplementedError`` is returned. """ raise NotImplementedError
[docs] def min_index(self): """ Returns index of sample with lowest log-likelihood. """ return self._min_index
[docs] def n_active_points(self): """ Returns the number of active points that will be used in next run. """ return self._n_active_points
[docs] def n_hyper_parameters(self): """ See :meth:`TunableMethod.n_hyper_parameters()`. """ raise NotImplementedError
[docs] def name(self): """ Name of sampler """ raise NotImplementedError
[docs] def needs_sensitivities(self): """ Determines whether sampler uses sensitivities of the solution. """ return self._needs_sensitivities
[docs] def needs_initial_phase(self): """ Returns ``True`` if this method needs an initial phase, for example ellipsoidal nested sampling has a period of running rejection sampling before it starts to fit ellipsoids to points. """ return False
[docs] def running_log_likelihood(self): """ Returns current value of the threshold log-likelihood value. """ return self._running_log_likelihood
[docs] def set_n_active_points(self, active_points): """ Sets the number of active points for the next run. """ active_points = int(active_points) if active_points <= 5: raise ValueError('Number of active points must be greater than 5.') self._n_active_points = active_points self._m_active = np.zeros((self._n_active_points, self._n_parameters + 1))
[docs] def set_hyper_parameters(self, x): """ See :meth:`TunableMethod.set_hyper_parameters()`. """ raise NotImplementedError
[docs] def set_initial_phase(self, in_initial_phase): """ For methods that need an initial phase (see :meth:`needs_initial_phase()`), this method toggles the initial phase algorithm. For other methods a ``NotImplementedError`` is returned. """ raise NotImplementedError
def _set_running_log_likelihood(self, running_log_likelihood): """ Updates the current value of the threshold log-likelihood value. """ self._running_log_likelihood = running_log_likelihood
[docs] def tell(self, fx): """ If a single evaluation is provided as arguments, a single point is accepted and returned if its likelihood exceeds the current threshold; otherwise None is returned. If multiple evaluations are provided as arguments (for example, if running the algorithm in parallel), None is returned if no points have likelihood exceeding threshold; if a single point passes the threshold, it is returned; if multiple points pass, one is selected uniformly at random and returned and the others are stored for later use. In all cases, two objects are returned: the proposed point (which may be None) and an array of other points that also pass the threshold (which is empty for single evaluation mode but may be non-empty for multiple evaluation mode). """ # for serial evaluation just return point or None and an empty array if np.isscalar(fx): self._n_evals += 1 if np.isnan(fx) or fx < self._running_log_likelihood: return None, np.array([[]]) else: proposed = self._proposed fx_temp = fx winners = np.array([[]]) # if running in parallel, then fx will be a sequence else: a_len = len(fx) self._n_evals += a_len results = [] for i in range(a_len): if np.isnan(fx[i]) or fx[i] < self._running_log_likelihood: results.append(None) else: results.append(fx[i]) n_non_none = sum(x is not None for x in results) # if none pass threshold return None and an empty array if n_non_none == 0: return None, np.array([[]]) # if one passes then return it and an empty array elif n_non_none == 1: fx_temp = next(item for item in results if item is not None) index = results.index(fx_temp) proposed = self._proposed[index] winners = np.array([[]]) # if more than a single point passes select at random from multiple # non-nones and return it and an array of the other points whose # likelihood exceeds threshold else: fx_short = [i for i in results if i] idex = [results.index(i) for i in fx_short] proposed_short = [self._proposed[i] for i in idex] fx_temp = np.random.choice(fx_short) index_temp = results.index(fx_temp) proposed = self._proposed[index_temp] index1 = fx_short.index(fx_temp) del proposed_short[index1] fx_short.remove(fx_temp) winners = np.transpose( np.vstack([np.transpose(proposed_short), fx_short])) self._m_active[self._min_index, :] = np.concatenate( (proposed, np.array([fx_temp]))) self._min_index = np.argmin( self._m_active[:, self._n_parameters]) self._set_running_log_likelihood( np.min(self._m_active[:, self._n_parameters])) self._accept_count += 1 return proposed, winners
[docs] class NestedController(object): """ Uses nested sampling to sample from a posterior distribution. Parameters ---------- log_likelihood : pints.LogPDF A :class:`LogPDF` function that evaluates points in the parameter space. log_prior : pints.LogPrior A :class:`LogPrior` function on the same parameter space. References ---------- .. [1] "Nested Sampling for General Bayesian Computation", John Skilling, Bayesian Analysis 1:4 (2006). https://doi.org/10.1214/06-BA127 .. [2] "Multimodal nested sampling: an efficient and robust alternative to Markov chain Monte Carlo methods for astronomical data analyses" F. Feroz and M. P. Hobson, 2008, Mon. Not. R. Astron. Soc. """ def __init__(self, log_likelihood, log_prior, method=None): # Store log_likelihood and log_prior # if not isinstance(log_likelihood, pints.LogLikelihood): if not isinstance(log_likelihood, pints.LogPDF): raise ValueError( 'Given log_likelihood must extend pints.LogLikelihood') self._log_likelihood = log_likelihood # Store function if not isinstance(log_prior, pints.LogPrior): raise ValueError('Given log_prior must extend pints.LogPrior') self._log_prior = log_prior # Get dimension self._n_parameters = self._log_likelihood.n_parameters() if self._n_parameters != self._log_prior.n_parameters(): raise ValueError( 'Given log_likelihood and log_prior must have same number of' ' parameters.') # Logging self._log_to_screen = True self._log_filename = None self._log_csv = False # By default do serial evaluation self._parallel = False self._n_workers = 1 self.set_parallel() # Parameters common to all routines # Total number of iterations self._iterations = 1000 # Total number of posterior samples self._posterior_samples = 1000 # Convergence criterion in log-evidence self._marginal_log_likelihood_threshold = 0.5 # Initial marginal difference self._diff = np.inf # By default use ellipsoidal sampling if method is None: method = pints.NestedEllipsoidSampler else: try: ok = issubclass(method, pints.NestedSampler) except TypeError: # Not a class ok = False if not ok: raise ValueError( 'Given method must extend pints.NestedSampler.' ) self._sampler = method(log_prior=self._log_prior) # Check if sensitivities are required self._needs_sensitivities = self._sampler.needs_sensitivities() # Performance metrics self._time = None # :meth:`run` can only be called once self._has_run = False
[docs] def active_points(self): """ Returns the active points from nested sampling. """ return self._sampler.active_points()
def _diff_marginal_likelihood(self, i, d): """ Calculates difference in marginal likelihood between current and previous iterations. """ v_temp = np.concatenate(( self._v_log_Z[0:(i - 1)], [np.max(self._sampler._m_active[:, d])] )) w_temp = np.concatenate((self._w[0:(i - 1)], [self._X[i]])) self._diff = ( + logsumexp(self._v_log_Z[0:(i - 1)], b=self._w[0:(i - 1)]) - logsumexp(v_temp, b=w_temp) )
[docs] def effective_sample_size(self): r""" Calculates the effective sample size of posterior samples from a nested sampling run using the formula: .. math:: ESS = exp(-sum_{i=1}^{m} p_i log p_i), in other words, the information. Given by eqn. (39) in [1]_. """ self._log_vP = (self._m_samples_all[:, self._n_parameters] - self._log_Z + np.log(self._w)) return np.exp(-np.sum(self._vP * self._log_vP))
[docs] def inactive_points(self): """ Returns the inactive points from nested sampling. """ return self._m_inactive
def _initialise_callable(self): """ Initialises sensitivities if they are needed; otherwise, returns a callable log likelihood. """ f = self._log_likelihood if self._needs_sensitivities: f = f.evaluateS1 return f def _initialise_evaluator(self, f): """ Initialises parallel runners, if desired. """ # Create evaluator object if self._parallel: # Use at most n_workers workers n_workers = self._n_workers evaluator = pints.ParallelEvaluator( f, n_workers=n_workers) else: evaluator = pints.SequentialEvaluator(f) return evaluator def _initialise_logger(self): """ Initialises logger. """ # Start logging self._logging = self._log_to_screen or self._log_filename if self._logging: if self._log_to_screen: # Show current settings print('Running ' + self._sampler.name()) print('Number of active points: ' + str(self._n_active_points)) print('Total number of iterations: ' + str(self._iterations)) print('Total number of posterior samples: ' + str( self._posterior_samples)) # Set up logger self._logger = pints.Logger() if not self._log_to_screen: self._logger.set_stream(None) if self._log_filename: self._logger.set_filename( self._log_filename, csv=self._log_csv) # Add fields to log self._logger.add_counter('Iter.', max_value=self._iterations) self._logger.add_counter('Eval.', max_value=self._iterations * 10) # Note: removed units from time field, see # https://github.com/pints-team/pints/issues/1467 self._logger.add_time('Time') self._logger.add_float('Delta_log(z)') self._logger.add_float('Acceptance rate') def _initial_points(self): """ Generates initial active points. """ m_initial = self._log_prior.sample(self._n_active_points) v_fx = np.zeros(self._n_active_points) for i in range(0, self._n_active_points): # Calculate likelihood v_fx[i] = self._evaluator.evaluate([m_initial[i, :]])[0] self._sampler._n_evals += 1 # Show progress if self._logging and i >= self._next_message: # Log state self._logger.log(0, self._sampler._n_evals, self._timer.time(), self._diff, 1.0) # Choose next logging point if i > self._message_warm_up: self._next_message = self._message_interval * ( 1 + i // self._message_interval) self._next_message = 0 return v_fx, m_initial
[docs] def iterations(self): """ Returns the total number of iterations that will be performed in the next run. """ return self._iterations
[docs] def log_likelihood_vector(self): """ Returns vector of log likelihoods for each of the stacked ``[m_active, m_inactive]`` points. """ return self._m_samples_all[:, -1]
[docs] def marginal_log_likelihood(self): """ Calculates the marginal log likelihood of nested sampling run. """ # Include active particles in sample m_active = self._sampler.active_points() self._v_log_Z[self._iterations] = logsumexp(m_active[:, self._n_parameters]) self._w[self._iterations:] = float(self._X[self._iterations]) / float( self._sampler.n_active_points()) self._m_samples_all = np.vstack((self._m_inactive, m_active)) # Determine log evidence log_Z = logsumexp(self._v_log_Z, b=self._w[0:(self._iterations + 1)]) self._log_Z_called = True return log_Z
[docs] def marginal_log_likelihood_standard_deviation(self): """ Calculates standard deviation in marginal log likelihood as in [2]_. """ if not self._log_Z_called: self.marginal_log_likelihood() log_L_minus_Z = self._v_log_Z - self._log_Z log_Z_sd = logsumexp(log_L_minus_Z, b=self._w[0:(self._iterations + 1)] * log_L_minus_Z) log_Z_sd = np.sqrt(log_Z_sd / self._sampler.n_active_points()) return log_Z_sd
[docs] def marginal_log_likelihood_threshold(self): """ Returns threshold for determining convergence in estimate of marginal log likelihood which leads to early termination of the algorithm. """ return self._marginal_log_likelihood_threshold
[docs] def n_posterior_samples(self): """ Returns the number of posterior samples that will be returned (see :meth:`set_n_posterior_samples()`). """ return self._posterior_samples
[docs] def parallel(self): """ Returns the number of parallel worker processes this routine will be run on, or ``False`` if parallelisation is disabled. """ return self._n_workers if self._parallel else False
[docs] def posterior_samples(self): """ Returns posterior samples generated during run of nested sampling object. """ return self._m_posterior_samples
[docs] def prior_space(self): """ Returns a vector of X samples which approximates the proportion of prior space compressed. """ return self._X
[docs] def run(self): """ Runs the nested sampling routine and returns a tuple of the posterior samples and an estimate of the marginal likelihood. """ # Can only run once for each controller instance if self._has_run: raise RuntimeError("Controller is valid for single use only") self._has_run = True # Choose method to evaluate f = self._initialise_callable() # Set parallel self._evaluator = self._initialise_evaluator(f) # Set number of active points self._n_active_points = self._sampler.n_active_points() # Start timing self._timer = pints.Timer() # Set up progress reporting self._next_message = 0 self._message_warm_up = 0 self._message_interval = 20 self._initialise_logger() d = self._n_parameters v_fx, m_initial = self._initial_points() self._sampler._initialise_active_points(m_initial, v_fx) # store all inactive points, along with their respective # log-likelihoods (hence, d+1) self._m_inactive = np.zeros((self._iterations, d + 1)) # store weights self._w = np.zeros(self._n_active_points + self._iterations) # store X values (defined in [1]) self._X = np.zeros(self._iterations + 1) self._X[0] = 1 # log marginal likelihood holder self._v_log_Z = np.zeros(self._iterations + 1) # Run! self._X[0] = 1.0 self._i_message = 0 i_winners = 0 m_previous_winners = [] for i in range(0, self._iterations): i_iter_complete = 0 self._i = i a_min_index = self._sampler.min_index() self._X[i + 1] = np.exp(-(i + 1) / self._n_active_points) if i > 0: self._w[i] = 0.5 * (self._X[i - 1] - self._X[i + 1]) else: self._w[i] = self._X[i] - self._X[i + 1] self._v_log_Z[i] = self._sampler.running_log_likelihood() self._m_inactive[i, :] = self._sampler._m_active[a_min_index, :] # check whether previous winners exceed threshold if i_winners > 0: m_previous_winners = m_previous_winners[( m_previous_winners[:, self._n_parameters] > self._sampler.running_log_likelihood()), :] if m_previous_winners.shape[0] > 0: index = np.random.choice(m_previous_winners.shape[0], 1, replace=False) proposed = m_previous_winners[index, :self._n_parameters] fx_temp = m_previous_winners[index, self._n_parameters] m_previous_winners = np.delete(m_previous_winners, index, 0) self._sampler._m_active[self._sampler._min_index, :] = ( np.concatenate((proposed[0], fx_temp)) ) self._sampler._min_index = np.argmin( self._sampler._m_active[:, self._n_parameters]) self._sampler._set_running_log_likelihood( np.min(self._sampler._m_active[:, self._n_parameters]) ) self._sampler._accept_count += 1 i_iter_complete = 1 if i_iter_complete == 0: # Propose new samples proposed = self._sampler.ask(self._n_workers) # Evaluate their fit if self._n_workers > 1: log_likelihood = self._evaluator.evaluate(proposed) else: log_likelihood = self._evaluator.evaluate([proposed])[0] sample, winners = self._sampler.tell(log_likelihood) while sample is None: proposed = self._sampler.ask(self._n_workers) if self._n_workers > 1: log_likelihood = ( # pragma: no cover self._evaluator.evaluate(proposed)) else: log_likelihood = self._evaluator.evaluate( [proposed])[0] sample, winners = self._sampler.tell(log_likelihood) if winners.size > 0: if i_winners == 0: m_previous_winners = winners i_winners = 1 else: m_previous_winners = [m_previous_winners, winners] m_previous_winners = np.concatenate(m_previous_winners) # Check whether within convergence threshold if i > 2: self._diff_marginal_likelihood(i, d) if (np.abs(self._diff) < self._marginal_log_likelihood_threshold): if self._log_to_screen: print( # pragma: no cover 'Convergence obtained with Delta_z = ' + str(self._diff)) # shorten arrays according to current iteration self._iterations = i self._v_log_Z = self._v_log_Z[0:(self._iterations + 1)] self._w = self._w[0:( self._n_active_points + self._iterations)] self._X = self._X[0:(self._iterations + 1)] self._m_inactive = self._m_inactive[0:self._iterations, :] break # Show progress self._update_logger() # Calculate log_evidence and uncertainty self._log_Z = self.marginal_log_likelihood() self._log_Z_sd = self.marginal_log_likelihood_standard_deviation() # Draw samples from posterior n = self._posterior_samples self._m_posterior_samples = self.sample_from_posterior(n) # Stop timer self._time = self._timer.time() return self._m_posterior_samples
[docs] def sample_from_posterior(self, posterior_samples): """ Draws posterior samples based on nested sampling run using importance sampling. This function is automatically called in ``NestedController.run()`` but can also be called afterwards to obtain new posterior samples. """ if posterior_samples < 1: raise ValueError('Number of posterior samples must be positive.') # Calculate probabilities (can this be used to calculate effective # sample size as in importance sampling?) of each particle self._vP = np.exp(self._m_samples_all[:, self._n_parameters] - self._log_Z) * self._w # Draw posterior samples m_theta = self._m_samples_all[:, :-1] vIndex = np.random.choice( range(0, self._iterations + self._sampler.n_active_points()), size=posterior_samples, p=self._vP) m_posterior_samples = m_theta[vIndex, :] return m_posterior_samples
[docs] def set_iterations(self, iterations): """ Sets the total number of iterations to be performed in the next run. """ iterations = int(iterations) if iterations < 0: raise ValueError('Number of iterations cannot be negative.') self._iterations = iterations
[docs] def set_log_to_file(self, filename=None, csv=False): """ Enables logging to file when a filename is passed in, disables it if ``filename`` is ``False`` or ``None``. The argument ``csv`` can be set to ``True`` to write the file in comma separated value (CSV) format. By default, the file contents will be similar to the output on screen. """ if filename: self._log_filename = str(filename) self._log_csv = True if csv else False else: self._log_filename = None self._log_csv = False
[docs] def set_log_to_screen(self, enabled): """ Enables or disables logging to screen. """ self._log_to_screen = True if enabled else False
[docs] def set_marginal_log_likelihood_threshold(self, threshold): """ Sets threshold for determining convergence in estimate of marginal log likelihood which leads to early termination of the algorithm. """ if threshold <= 0: raise ValueError('Convergence threshold must be positive.') self._marginal_log_likelihood_threshold = threshold
[docs] def set_parallel(self, parallel=False): """ Enables/disables parallel evaluation. If ``parallel=True``, the method will run using a number of worker processes equal to the detected cpu core count. The number of workers can be set explicitly by setting ``parallel`` to an integer greater than 0. Parallelisation can be disabled by setting ``parallel`` to ``0`` or ``False``. """ if parallel is True: self._parallel = True self._n_workers = pints.ParallelEvaluator.cpu_count() elif parallel >= 1: self._parallel = True self._n_workers = int(parallel) else: self._parallel = False self._n_workers = 1
[docs] def set_n_posterior_samples(self, posterior_samples): """ Sets the number of posterior samples to generate from points proposed by the nested sampling algorithm. """ posterior_samples = int(posterior_samples) if posterior_samples < 1: raise ValueError( 'Number of posterior samples must be greater than zero.') self._posterior_samples = posterior_samples
[docs] def time(self): """ Returns the time needed for the last run, in seconds, or ``None`` if the controller hasn't run yet. """ return self._time
def _update_logger(self): """ Updates logger if necessary. """ # print(self._i_message) # print(self._next_message) if self._logging: self._i_message += 1 if self._i_message >= self._next_message: # Log state self._logger.log(self._i_message, self._sampler._n_evals, self._timer.time(), self._diff, float(self._sampler._accept_count / (self._sampler._n_evals - self._sampler._n_active_points))) # Choose next logging point if self._i_message > self._message_warm_up: self._next_message = self._message_interval * ( 1 + self._i_message // self._message_interval)