Source code for pints._optimisers._irpropmin

#
# Improved Rprop local optimiser.
#
# This file is part of PINTS (https://github.com/pints-team/pints/) which is
# released under the BSD 3-clause license. See accompanying LICENSE.md for
# copyright notice and full license details.
#
import pints

import numpy as np


[docs] class IRPropMin(pints.Optimiser): """ iRprop- algorithm, as described in Figure 3 of [1]_. The name "iRprop-" was introduced by [1]_, and is a variation on the "Resilient backpropagation (Rprop)" optimiser introduced in [2]_. This is a local optimiser that requires gradient information, although it uses only the direction (sign) of the gradient in each dimension and ignores the magnitude. Instead, it maintains a separate step size for each dimension which grows when the sign of the gradient stays the same and shrinks when it changes. Pseudo-code is given below. Here ``p_j[i]`` denotes the j-th parameter at iteration ``i``, and ``df_j[i]`` is the corresponding derivative of the objective function (so both are scalars):: if df_j[i] * df_j[i - 1] > 0: step_size_j[i] = 1.2 * step_size_j[i - 1] elif df_j[i] * df_j[i - 1] < 0: step_size_j[i] = 0.5 * step_size_j[i - 1] df_j[i] = 0 step_size_j[i] = min(max(step_size_j[i], min_step_size), max_step_size) p_j[i] = p_j[i] - sign(df_j[i]) * step_size_j[i] The line ``df_j[i] = 0`` has two effects: 1. It sets the update at this iteration to zero (using ``sign(df_j[i]) * step_size_j[i] = 0 * step_size_j[i]``). 2. It ensures that the next iteration is performed (since ``df_j[i] * df_j[i - 1] == 0`` so neither if-statement holds). In this implementation, the initial ``step_size`` is set to ``sigma0``, the default minimum step size is set as ``1e-3 * min(sigma0)``, and no default maximum step size is set. Minimum and maximum step sizes can be changed with :meth:`set_min_step_size` and :meth:`set_max_step_size` or through the hyper-parameter interface. If boundaries are provided, an extra step is added at the end of the algorithm that reduces the step size where boundary constraints are not met. For :class:`RectangularBoundaries` this works on a per-dimension basis:: while p_j[i] < lower or p_j[i] >= upper: step_size_j[i] *= 0.5 p_j[i] = p_j[i] - sign(df_j[i]) * step_size_j[i] For general boundaries a more complex heuristic is used: First, the step size in all dimensions is reduced until the boundary constraints are met. Next, this reduction is undone for each dimension in turn: if the constraint is still met without the reduction the step size in this dimension is left unchanged. The numbers 0.5 and 1.2 shown in the (main and boundary) pseudo-code are technically hyper-parameters, but are fixed in this implementation. References ---------- .. [1] Empirical Evaluation of the Improved Rprop Learning Algorithms. Igel and Hüsken, 2003, Neurocomputing https://doi.org/10.1016/S0925-2312(01)00700-7 .. [2] A direct adaptive method for faster backpropagation learning: the RPROP algorithm. Riedmiller and Braun, 1993. https://doi.org/10.1109/ICNN.1993.298623 """ def __init__(self, x0, sigma0=0.1, boundaries=None): super().__init__(x0, sigma0, boundaries) # Set optimiser state self._running = False self._ready_for_tell = False # Best solution found self._x_best = self._x0 self._f_best = np.inf # Minimum and maximum step sizes self._step_min = 1e-3 * np.min(self._sigma0) self._step_max = None # Current point, score, and gradient self._current = self._x0 self._current_f = np.inf self._current_df = None # Proposed next point (read-only, so can be passed to user) self._proposed = self._x0 self._proposed.setflags(write=False) # Step size adaptations # TODO: Could be hyperparameters, but almost nobody varies these? self._eta_min = 0.5 # 0 < eta_min < 1 self._eta_max = 1.2 # 1 < eta_max # Current step sizes self._step_size = np.array(self._sigma0) # Rectangular boundaries self._lower = self._upper = None if isinstance(self._boundaries, pints.RectangularBoundaries): self._lower = self._boundaries.lower() self._upper = self._boundaries.upper() # Reduced step sizes due to boundary violations self._breaches = []
[docs] def ask(self): """ See :meth:`Optimiser.ask()`. """ # First call if not self._running: if (self._step_min is not None and self._step_max is not None and self._step_min >= self._step_max): raise Exception( 'Max step size must be larger than min step size (current' ' settings: min_step_size = ' + str(self._step_min) + ', ' ' max_step_size = ' + str(self._step_max) + ').') self._running = True # Ready for tell now self._ready_for_tell = True # Return proposed points (just the one) return [self._proposed]
[docs] def f_best(self): """ See :meth:`Optimiser.f_best()`. """ return self._f_best
[docs] def f_guessed(self): """ See :meth:`Optimiser.f_guessed()`. """ return self._current_f
def _log_init(self, logger): """ See :meth:`Loggable._log_init()`. """ logger.add_float('Min. step') logger.add_float('Max. step') logger.add_string('Bound corr.', 11) def _log_write(self, logger): """ See :meth:`Loggable._log_write()`. """ logger.log(np.min(self._step_size)) logger.log(np.max(self._step_size)) logger.log(','.join([str(x) for x in self._breaches]))
[docs] def max_step_size(self): """ Returns the maximum step size (or ``None`` if not set). """ return self._step_max
[docs] def min_step_size(self): """ Returns the minimum step size (or ``None`` if not set). """ return self._step_min
[docs] def name(self): """ See :meth:`Optimiser.name()`. """ return 'iRprop-'
[docs] def needs_sensitivities(self): """ See :meth:`Optimiser.needs_sensitivities()`. """ return True
[docs] def n_hyper_parameters(self): """ See :meth:`pints.TunableMethod.n_hyper_parameters()`. """ return 2
[docs] def running(self): """ See :meth:`Optimiser.running()`. """ return self._running
[docs] def set_hyper_parameters(self, x): """ See :meth:`pints.TunableMethod.set_hyper_parameters()`. The hyper-parameter vector is ``[min_step_size, max_step_size]``. """ self.set_min_step_size(x[0]) self.set_max_step_size(x[1])
[docs] def set_max_step_size(self, step_size): """ Sets the maximum step size (use ``None`` to let step sizes grow indefinitely). """ self._step_max = None if step_size is None else float(step_size)
[docs] def set_min_step_size(self, step_size): """ Sets the minimum step size (use ``None`` to let step sizes shrink indefinitely). """ self._step_min = None if step_size is None else float(step_size)
[docs] def tell(self, reply): """ See :meth:`Optimiser.tell()`. """ # Check ask-tell pattern if not self._ready_for_tell: raise Exception('ask() not called before tell()') self._ready_for_tell = False # Unpack reply fx, dfx = reply[0] # First iteration if self._current_df is None: self._current = self._proposed self._current_f = fx self._current_df = dfx # Get product of new and previous gradient dprod = dfx * self._current_df # Adapt step sizes self._step_size[dprod > 0] *= self._eta_max self._step_size[dprod < 0] *= self._eta_min # Bound step sizes if self._step_min is not None: self._step_size = np.maximum(self._step_size, self._step_min) if self._step_max is not None: self._step_size = np.minimum(self._step_size, self._step_max) # Remove "weight backtracking" # This step ensures that, for each i where dprod < 0: # 1. p[i] will remain unchanged this iteration (sign(0) == 0) # 2. p[i] will change in the next iteration (dprod == 0), using # the step size set in the current iteration dfx[dprod < 0] = 0 # Update current position self._current = self._proposed self._current_f = fx self._current_df = dfx # Take step in direction indicated by current gradient p = self._current - self._step_size * np.sign(dfx) # Allow boundaries to reduce step size if self._lower is not None: # Rectangular boundaries: reduce individual step sizes until OK out = np.logical_or(p < self._lower, p >= self._upper) self._breaches = np.flatnonzero(out) sign = np.sign(dfx) while np.any(out): self._step_size[out] *= self._eta_min p = self._current - self._step_size * sign out = np.logical_or(p < self._lower, p >= self._upper) elif self._boundaries is not None and not self._boundaries.check(p): # General boundaries: reduce all step sizes until OK s = np.copy(self._step_size) sign = np.sign(dfx) while not self._boundaries.check(p): s *= self._eta_min p = self._current - s * sign # Attempt restoring one-by-one self._breaches = [] for i, s_big in enumerate(self._step_size): small = s[i] s[i] = s_big if not self._boundaries.check(self._current - s * sign): s[i] = small self._breaches.append(i) self._step_size = s p = self._current - s * sign # An alternative method would be to reduce each dimension's step # size one at a time, and check if that restores the boundaries. # However, if that doesn't work we then need to test all tuples of # dimenions, then triples, etc. The method above looks clumsy, but # avoids these combinatorics. elif self._breaches: # No boundary breaces: empty list (if needed) self._breaches = [] # Store proposed as read-only, so that it can be passed to user self._proposed = p self._proposed.setflags(write=False) # Update x_best and f_best if self._f_best > fx: self._f_best = fx self._x_best = self._current
[docs] def x_best(self): """ See :meth:`Optimiser.x_best()`. """ return self._x_best
[docs] def x_guessed(self): """ See :meth:`Optimiser.x_guessed()`. """ return self._current