Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
hmmlearn / base.py
Size: Mime:
import logging
import string
import sys
from collections import deque

import numpy as np
from scipy import linalg, special
from sklearn.base import BaseEstimator
from sklearn.utils.validation import (
    check_array, check_is_fitted, check_random_state)

from . import _hmmc, _kl_divergence as _kl, _utils
from .utils import normalize, log_normalize


_log = logging.getLogger(__name__)
#: Supported decoder algorithms.
DECODER_ALGORITHMS = frozenset(("viterbi", "map"))


class ConvergenceMonitor:
    """
    Monitor and report convergence to :data:`sys.stderr`.

    Attributes
    ----------
    history : deque
        The log probability of the data for the last two training
        iterations. If the values are not strictly increasing, the
        model did not converge.
    iter : int
        Number of iterations performed while training the model.

    Examples
    --------
    Use custom convergence criteria by subclassing ``ConvergenceMonitor``
    and redefining the ``converged`` method. The resulting subclass can
    be used by creating an instance and pointing a model's ``monitor_``
    attribute to it prior to fitting.

    >>> from hmmlearn.base import ConvergenceMonitor
    >>> from hmmlearn import hmm
    >>>
    >>> class ThresholdMonitor(ConvergenceMonitor):
    ...     @property
    ...     def converged(self):
    ...         return (self.iter == self.n_iter or
    ...                 self.history[-1] >= self.tol)
    >>>
    >>> model = hmm.GaussianHMM(n_components=2, tol=5, verbose=True)
    >>> model.monitor_ = ThresholdMonitor(model.monitor_.tol,
    ...                                   model.monitor_.n_iter,
    ...                                   model.monitor_.verbose)
    """

    _template = "{iter:>10d} {log_prob:>16.8f} {delta:>+16.8f}"

    def __init__(self, tol, n_iter, verbose):
        """
        Parameters
        ----------
        tol : double
            Convergence threshold.  EM has converged either if the maximum
            number of iterations is reached or the log probability improvement
            between the two consecutive iterations is less than threshold.
        n_iter : int
            Maximum number of iterations to perform.
        verbose : bool
            Whether per-iteration convergence reports are printed.
        """
        self.tol = tol
        self.n_iter = n_iter
        self.verbose = verbose
        self.history = deque()
        self.iter = 0

    def __repr__(self):
        class_name = self.__class__.__name__
        params = sorted(dict(vars(self), history=list(self.history)).items())
        return ("{}(\n".format(class_name)
                + "".join(map("    {}={},\n".format, *zip(*params)))
                + ")")

    def _reset(self):
        """Reset the monitor's state."""
        self.iter = 0
        self.history.clear()

    def report(self, log_prob):
        """
        Report convergence to :data:`sys.stderr`.

        The output consists of three columns: iteration number, log
        probability of the data at the current iteration and convergence
        rate.  At the first iteration convergence rate is unknown and
        is thus denoted by NaN.

        Parameters
        ----------
        log_prob : float
            The log probability of the data as computed by EM algorithm
            in the current iteration.
        """
        if self.verbose:
            delta = log_prob - self.history[-1] if self.history else np.nan
            message = self._template.format(
                iter=self.iter + 1, log_prob=log_prob, delta=delta)
            print(message, file=sys.stderr)

        # Allow for some wiggleroom based on precision.
        precision = np.finfo(float).eps ** (1/2)
        if self.history and (log_prob - self.history[-1]) < -precision:
            delta = log_prob - self.history[-1]
            _log.warning(f"Model is not converging.  Current: {log_prob}"
                         f" is not greater than {self.history[-1]}."
                         f" Delta is {delta}")
        self.history.append(log_prob)
        self.iter += 1

    @property
    def converged(self):
        """Whether the EM algorithm converged."""
        # XXX we might want to check that ``log_prob`` is non-decreasing.
        return (self.iter == self.n_iter or
                (len(self.history) >= 2 and
                 self.history[-1] - self.history[-2] < self.tol))


class _AbstractHMM(BaseEstimator):
    """
    Base class for Hidden Markov Models learned via Expectation-Maximization
    and Variational Bayes.
    """

    def __init__(self, n_components, algorithm, random_state, n_iter,
                 tol, verbose, params, init_params, implementation):
        """
        Parameters
        ----------
        n_components : int
            Number of states in the model.
        algorithm : {"viterbi", "map"}, optional
            Decoder algorithm.
        random_state: RandomState or an int seed, optional
            A random number generator instance.
        n_iter : int, optional
            Maximum number of iterations to perform.
        tol : float, optional
            Convergence threshold. EM will stop if the gain in log-likelihood
            is below this value.
        verbose : bool, optional
            Whether per-iteration convergence reports are printed to
            :data:`sys.stderr`.  Convergence can also be diagnosed using the
            :attr:`monitor_` attribute.
        params, init_params : string, optional
            The parameters that get updated during (``params``) or initialized
            before (``init_params``) the training.  Can contain any combination
            of 's' for startprob, 't' for transmat, and other characters for
            subclass-specific emission parameters.  Defaults to all parameters.
        implementation: string, optional
            Determines if the forward-backward algorithm is implemented with
            logarithms ("log"), or using scaling ("scaling").  The default is
            to use logarithms for backwards compatability.  However, the
            scaling implementation is generally faster.
        """

        self.n_components = n_components
        self.params = params
        self.init_params = init_params
        self.algorithm = algorithm
        self.n_iter = n_iter
        self.tol = tol
        self.verbose = verbose
        self.implementation = implementation
        self.random_state = random_state

    def score_samples(self, X, lengths=None):
        """
        Compute the log probability under the model and compute posteriors.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Feature matrix of individual samples.
        lengths : array-like of integers, shape (n_sequences, ), optional
            Lengths of the individual sequences in ``X``. The sum of
            these should be ``n_samples``.

        Returns
        -------
        log_prob : float
            Log likelihood of ``X``.
        posteriors : array, shape (n_samples, n_components)
            State-membership probabilities for each sample in ``X``.

        See Also
        --------
        score : Compute the log probability under the model.
        decode : Find most likely state sequence corresponding to ``X``.
        """
        return self._score(X, lengths, compute_posteriors=True)

    def score(self, X, lengths=None):
        """
        Compute the log probability under the model.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Feature matrix of individual samples.
        lengths : array-like of integers, shape (n_sequences, ), optional
            Lengths of the individual sequences in ``X``. The sum of
            these should be ``n_samples``.

        Returns
        -------
        log_prob : float
            Log likelihood of ``X``.

        See Also
        --------
        score_samples : Compute the log probability under the model and
            posteriors.
        decode : Find most likely state sequence corresponding to ``X``.
        """
        return self._score(X, lengths, compute_posteriors=False)[0]

    def _score(self, X, lengths=None, *, compute_posteriors):
        """
        Helper for `score` and `score_samples`.

        Compute the log probability under the model, as well as posteriors if
        *compute_posteriors* is True (otherwise, an empty array is returned
        for the latter).
        """
        check_is_fitted(self, "startprob_")
        self._check()

        X = check_array(X)
        impl = {
            "scaling": self._score_scaling,
            "log": self._score_log,
        }[self.implementation]
        return impl(
            X=X, lengths=lengths, compute_posteriors=compute_posteriors)

    def _score_log(self, X, lengths=None, *, compute_posteriors):
        """
        Compute the log probability under the model, as well as posteriors if
        *compute_posteriors* is True (otherwise, an empty array is returned
        for the latter).
        """
        log_prob = 0
        sub_posteriors = [np.empty((0, self.n_components))]
        for sub_X in _utils.split_X_lengths(X, lengths):
            log_frameprob = self._compute_log_likelihood(sub_X)
            log_probij, fwdlattice = _hmmc.forward_log(
                self.startprob_, self.transmat_, log_frameprob)
            log_prob += log_probij
            if compute_posteriors:
                bwdlattice = _hmmc.backward_log(
                    self.startprob_, self.transmat_, log_frameprob)
                sub_posteriors.append(
                    self._compute_posteriors_log(fwdlattice, bwdlattice))
        return log_prob, np.concatenate(sub_posteriors)

    def _score_scaling(self, X, lengths=None, *, compute_posteriors):
        log_prob = 0
        sub_posteriors = [np.empty((0, self.n_components))]
        for sub_X in _utils.split_X_lengths(X, lengths):
            frameprob = self._compute_likelihood(sub_X)
            log_probij, fwdlattice, scaling_factors = _hmmc.forward_scaling(
                self.startprob_, self.transmat_, frameprob)
            log_prob += log_probij
            if compute_posteriors:
                bwdlattice = _hmmc.backward_scaling(
                    self.startprob_, self.transmat_,
                    frameprob, scaling_factors)
                sub_posteriors.append(
                    self._compute_posteriors_scaling(fwdlattice, bwdlattice))

        return log_prob, np.concatenate(sub_posteriors)

    def _decode_viterbi(self, X):
        log_frameprob = self._compute_log_likelihood(X)
        return _hmmc.viterbi(self.startprob_, self.transmat_, log_frameprob)

    def _decode_map(self, X):
        _, posteriors = self.score_samples(X)
        log_prob = np.max(posteriors, axis=1).sum()
        state_sequence = np.argmax(posteriors, axis=1)
        return log_prob, state_sequence

    def decode(self, X, lengths=None, algorithm=None):
        """
        Find most likely state sequence corresponding to ``X``.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Feature matrix of individual samples.
        lengths : array-like of integers, shape (n_sequences, ), optional
            Lengths of the individual sequences in ``X``. The sum of
            these should be ``n_samples``.
        algorithm : string
            Decoder algorithm. Must be one of "viterbi" or "map".
            If not given, :attr:`decoder` is used.

        Returns
        -------
        log_prob : float
            Log probability of the produced state sequence.
        state_sequence : array, shape (n_samples, )
            Labels for each sample from ``X`` obtained via a given
            decoder ``algorithm``.

        See Also
        --------
        score_samples : Compute the log probability under the model and
            posteriors.
        score : Compute the log probability under the model.
        """
        check_is_fitted(self, "startprob_")
        self._check()

        algorithm = algorithm or self.algorithm
        if algorithm not in DECODER_ALGORITHMS:
            raise ValueError(f"Unknown decoder {algorithm!r}")

        decoder = {
            "viterbi": self._decode_viterbi,
            "map": self._decode_map
        }[algorithm]

        X = check_array(X)
        log_prob = 0
        sub_state_sequences = []
        for sub_X in _utils.split_X_lengths(X, lengths):
            # XXX decoder works on a single sample at a time!
            sub_log_prob, sub_state_sequence = decoder(sub_X)
            log_prob += sub_log_prob
            sub_state_sequences.append(sub_state_sequence)

        return log_prob, np.concatenate(sub_state_sequences)

    def predict(self, X, lengths=None):
        """
        Find most likely state sequence corresponding to ``X``.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Feature matrix of individual samples.
        lengths : array-like of integers, shape (n_sequences, ), optional
            Lengths of the individual sequences in ``X``. The sum of
            these should be ``n_samples``.

        Returns
        -------
        state_sequence : array, shape (n_samples, )
            Labels for each sample from ``X``.
        """
        _, state_sequence = self.decode(X, lengths)
        return state_sequence

    def predict_proba(self, X, lengths=None):
        """
        Compute the posterior probability for each state in the model.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Feature matrix of individual samples.
        lengths : array-like of integers, shape (n_sequences, ), optional
            Lengths of the individual sequences in ``X``. The sum of
            these should be ``n_samples``.

        Returns
        -------
        posteriors : array, shape (n_samples, n_components)
            State-membership probabilities for each sample from ``X``.
        """
        _, posteriors = self.score_samples(X, lengths)
        return posteriors

    def sample(self, n_samples=1, random_state=None, currstate=None):
        """
        Generate random samples from the model.

        Parameters
        ----------
        n_samples : int
            Number of samples to generate.
        random_state : RandomState or an int seed
            A random number generator instance. If ``None``, the object's
            ``random_state`` is used.
        currstate : int
            Current state, as the initial state of the samples.

        Returns
        -------
        X : array, shape (n_samples, n_features)
            Feature matrix.
        state_sequence : array, shape (n_samples, )
            State sequence produced by the model.

        Examples
        --------
        ::

            # generate samples continuously
            _, Z = model.sample(n_samples=10)
            X, Z = model.sample(n_samples=10, currstate=Z[-1])
        """
        check_is_fitted(self, "startprob_")
        self._check()

        if random_state is None:
            random_state = self.random_state
        random_state = check_random_state(random_state)

        transmat_cdf = np.cumsum(self.transmat_, axis=1)

        if currstate is None:
            startprob_cdf = np.cumsum(self.startprob_)
            currstate = (startprob_cdf > random_state.rand()).argmax()

        state_sequence = [currstate]
        X = [self._generate_sample_from_state(
            currstate, random_state=random_state)]

        for t in range(n_samples - 1):
            currstate = (
                (transmat_cdf[currstate] > random_state.rand()).argmax())
            state_sequence.append(currstate)
            X.append(self._generate_sample_from_state(
                currstate, random_state=random_state))

        return np.atleast_2d(X), np.array(state_sequence, dtype=int)

    def fit(self, X, lengths=None):
        """
        Estimate model parameters.

        An initialization step is performed before entering the
        EM algorithm. If you want to avoid this step for a subset of
        the parameters, pass proper ``init_params`` keyword argument
        to estimator's constructor.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Feature matrix of individual samples.
        lengths : array-like of integers, shape (n_sequences, )
            Lengths of the individual sequences in ``X``. The sum of
            these should be ``n_samples``.

        Returns
        -------
        self : object
            Returns self.
        """
        X = check_array(X)

        if lengths is None:
            lengths = np.asarray([X.shape[0]])

        self._init(X, lengths)
        self._check()
        self.monitor_._reset()

        for iter in range(self.n_iter):
            stats, curr_logprob = self._do_estep(X, lengths)

            # Compute lower bound before updating model parameters
            lower_bound = self._compute_lower_bound(curr_logprob)

            # XXX must be before convergence check, because otherwise
            #     there won't be any updates for the case ``n_iter=1``.
            self._do_mstep(stats)
            self.monitor_.report(lower_bound)
            if self.monitor_.converged:
                break

            if (self.transmat_.sum(axis=1) == 0).any():
                _log.warning("Some rows of transmat_ have zero sum because no "
                             "transition from the state was ever observed.")
        return self

    def _fit_scaling(self, X):
        raise NotImplementedError("Must be overridden in subclass")

    def _fit_log(self, X):
        raise NotImplementedError("Must be overridden in subclass")

    def _compute_posteriors_scaling(self, fwdlattice, bwdlattice):
        posteriors = fwdlattice * bwdlattice
        normalize(posteriors, axis=1)
        return posteriors

    def _compute_posteriors_log(self, fwdlattice, bwdlattice):
        # gamma is guaranteed to be correctly normalized by log_prob at
        # all frames, unless we do approximate inference using pruning.
        # So, we will normalize each frame explicitly in case we
        # pruned too aggressively.
        log_gamma = fwdlattice + bwdlattice
        log_normalize(log_gamma, axis=1)
        with np.errstate(under="ignore"):
            return np.exp(log_gamma)

    def _needs_init(self, code, name):
        if code in self.init_params:
            if hasattr(self, name):
                _log.warning(
                    "Even though the %r attribute is set, it will be "
                    "overwritten during initialization because 'init_params' "
                    "contains %r", name, code)
            return True
        if not hasattr(self, name):
            return True
        return False

    def _check_and_set_n_features(self, X):
        _, n_features = X.shape
        if hasattr(self, "n_features"):
            if self.n_features != n_features:
                raise ValueError(
                    f"Unexpected number of dimensions, got {n_features} but "
                    f"expected {self.n_features}")
        else:
            self.n_features = n_features

    def _get_n_fit_scalars_per_param(self):
        """
        Return a mapping of fittable parameter names (as in ``self.params``)
        to the number of corresponding scalar parameters that will actually be
        fitted.

        This is used to detect whether the user did not pass enough data points
        for a non-degenerate fit.
        """
        raise NotImplementedError("Must be overridden in subclass")

    def _check_sum_1(self, name):
        """Check that an array describes one or more distributions."""
        s = getattr(self, name).sum(axis=-1)
        if not np.allclose(s, 1):
            raise ValueError(
                f"{name} must sum to 1 (got {s:.4f})" if s.ndim == 0 else
                f"{name} rows must sum to 1 (got {s})" if s.ndim == 1 else
                "Expected 1D or 2D array")

    def _check(self):
        """
        Validate model parameters prior to fitting.

        Raises
        ------
        ValueError
            If any of the parameters are invalid, e.g. if :attr:`startprob_`
            don't sum to 1.
        """
        raise NotImplementedError("Must be overridden in subclass")

    def _compute_likelihood(self, X):
        """
        Compute per-component probability under the model.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Feature matrix of individual samples.

        Returns
        -------
        log_prob : array, shape (n_samples, n_components)
            Log probability of each sample in ``X`` for each of the
            model states.
        """
        if (self._compute_log_likelihood  # prevent recursion
                != __class__._compute_log_likelihood.__get__(self)):
            # Probabilities equal to zero do occur, and exp(-LARGE) = 0 is OK.
            with np.errstate(under="ignore"):
                return np.exp(self._compute_log_likelihood(X))
        else:
            raise NotImplementedError("Must be overridden in subclass")

    def _compute_log_likelihood(self, X):
        """
        Compute per-component emission log probability under the model.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Feature matrix of individual samples.

        Returns
        -------
        log_prob : array, shape (n_samples, n_components)
            Emission log probability of each sample in ``X`` for each of the
            model states, i.e., ``log(p(X|state))``.
        """
        if (self._compute_likelihood  # prevent recursion
                != __class__._compute_likelihood.__get__(self)):
            # Probabilities equal to zero do occur, and log(0) = -inf is OK.
            likelihood = self._compute_likelihood(X)
            with np.errstate(divide="ignore"):
                return np.log(likelihood)
        else:
            raise NotImplementedError("Must be overridden in subclass")

    def _generate_sample_from_state(self, state, random_state):
        """
        Generate a random sample from a given component.

        Parameters
        ----------
        state : int
            Index of the component to condition on.
        random_state: RandomState
            A random number generator instance.  (`sample` is the only caller
            for this method and already normalizes *random_state*.)

        Returns
        -------
        X : array, shape (n_features, )
            A random sample from the emission distribution corresponding
            to a given component.
        """
        return ()

    def _initialize_sufficient_statistics(self):
        """
        Initialize sufficient statistics required for M-step.

        The method is *pure*, meaning that it doesn't change the state of
        the instance.  For extensibility computed statistics are stored
        in a dictionary.

        Returns
        -------
        nobs : int
            Number of samples in the data.
        start : array, shape (n_components, )
            An array where the i-th element corresponds to the posterior
            probability of the first sample being generated by the i-th state.
        trans : array, shape (n_components, n_components)
            An array where the (i, j)-th element corresponds to the posterior
            probability of transitioning between the i-th to j-th states.
        """
        stats = {'nobs': 0,
                 'start': np.zeros(self.n_components),
                 'trans': np.zeros((self.n_components, self.n_components))}
        return stats

    def _accumulate_sufficient_statistics(
            self, stats, X, lattice, posteriors, fwdlattice, bwdlattice):
        """
        Update sufficient statistics from a given sample.

        Parameters
        ----------
        stats : dict
            Sufficient statistics as returned by
            :meth:`~.BaseHMM._initialize_sufficient_statistics`.

        X : array, shape (n_samples, n_features)
            Sample sequence.

        lattice : array, shape (n_samples, n_components)
            Probabilities OR Log Probabilities of each sample
            under each of the model states.  Depends on the choice
            of implementation of the Forward-Backward algorithm

        posteriors : array, shape (n_samples, n_components)
            Posterior probabilities of each sample being generated by each
            of the model states.

        fwdlattice, bwdlattice : array, shape (n_samples, n_components)
            forward and backward probabilities.
        """

        impl = {
            "scaling": self._accumulate_sufficient_statistics_scaling,
            "log": self._accumulate_sufficient_statistics_log,
        }[self.implementation]

        return impl(stats=stats, X=X, lattice=lattice, posteriors=posteriors,
                    fwdlattice=fwdlattice, bwdlattice=bwdlattice)

    def _accumulate_sufficient_statistics_scaling(
            self, stats, X, lattice, posteriors, fwdlattice, bwdlattice):
        """
        Implementation of `_accumulate_sufficient_statistics`
        for ``implementation = "log"``.
        """
        stats['nobs'] += 1
        if 's' in self.params:
            stats['start'] += posteriors[0]
        if 't' in self.params:
            n_samples, n_components = lattice.shape
            # when the sample is of length 1, it contains no transitions
            # so there is no reason to update our trans. matrix estimate
            if n_samples <= 1:
                return
            xi_sum = _hmmc.compute_scaling_xi_sum(
                fwdlattice, self.transmat_, bwdlattice, lattice)
            stats['trans'] += xi_sum

    def _accumulate_sufficient_statistics_log(
            self, stats, X, lattice, posteriors, fwdlattice, bwdlattice):
        """
        Implementation of `_accumulate_sufficient_statistics`
        for ``implementation = "log"``.
        """
        stats['nobs'] += 1
        if 's' in self.params:
            stats['start'] += posteriors[0]
        if 't' in self.params:
            n_samples, n_components = lattice.shape
            # when the sample is of length 1, it contains no transitions
            # so there is no reason to update our trans. matrix estimate
            if n_samples <= 1:
                return
            log_xi_sum = _hmmc.compute_log_xi_sum(
                fwdlattice, self.transmat_, bwdlattice, lattice)
            with np.errstate(under="ignore"):
                stats['trans'] += np.exp(log_xi_sum)

    def _do_mstep(self, stats):
        """
        Perform the M-step of EM algorithm.

        Parameters
        ----------
        stats : dict
            Sufficient statistics updated from all available samples.
        """

    def _do_estep(self, X, lengths):
        impl = {
            "scaling": self._fit_scaling,
            "log": self._fit_log,
        }[self.implementation]

        stats = self._initialize_sufficient_statistics()
        self._estep_begin()
        curr_logprob = 0
        for sub_X in _utils.split_X_lengths(X, lengths):
            lattice, logprob, posteriors, fwdlattice, bwdlattice = impl(sub_X)
            # Derived HMM classes will implement the following method to
            # update their probability distributions, so keep
            # a single call to this method for simplicity.
            self._accumulate_sufficient_statistics(
                stats, sub_X, lattice, posteriors, fwdlattice,
                bwdlattice)
            curr_logprob += logprob
        return stats, curr_logprob

    def _estep_begin(self):
        pass

    def _compute_lower_bound(self, curr_logprob):
        raise NotImplementedError("Must be overridden in subclass")


class BaseHMM(_AbstractHMM):
    """
    Base class for Hidden Markov Models learned from Expectation-Maximization.

    This class allows for easy evaluation of, sampling from, and maximum a
    posteriori estimation of the parameters of a HMM.

    Attributes
    ----------
    monitor_ : ConvergenceMonitor
        Monitor object used to check the convergence of EM.
    startprob_ : array, shape (n_components, )
        Initial state occupation distribution.
    transmat_ : array, shape (n_components, n_components)
        Matrix of transition probabilities between states.

    Notes
    -----
    Normally, one should use a subclass of `.BaseHMM`, with its specialization
    towards a given emission model.  In rare cases, the base class can also be
    useful in itself, if one simply wants to generate a sequence of states
    using `.BaseHMM.sample`.  In that case, the feature matrix will have zero
    features.
    """

    def __init__(self, n_components=1,
                 startprob_prior=1.0, transmat_prior=1.0,
                 algorithm="viterbi", random_state=None,
                 n_iter=10, tol=1e-2, verbose=False,
                 params=string.ascii_letters,
                 init_params=string.ascii_letters,
                 implementation="log"):
        """
        Parameters
        ----------
        n_components : int
            Number of states in the model.
        startprob_prior : array, shape (n_components, ), optional
            Parameters of the Dirichlet prior distribution for
            :attr:`startprob_`.
        transmat_prior : array, shape (n_components, n_components), optional
            Parameters of the Dirichlet prior distribution for each row
            of the transition probabilities :attr:`transmat_`.
        algorithm : {"viterbi", "map"}, optional
            Decoder algorithm.
        random_state: RandomState or an int seed, optional
            A random number generator instance.
        n_iter : int, optional
            Maximum number of iterations to perform.
        tol : float, optional
            Convergence threshold. EM will stop if the gain in log-likelihood
            is below this value.
        verbose : bool, optional
            Whether per-iteration convergence reports are printed to
            :data:`sys.stderr`.  Convergence can also be diagnosed using the
            :attr:`monitor_` attribute.
        params, init_params : string, optional
            The parameters that get updated during (``params``) or initialized
            before (``init_params``) the training.  Can contain any combination
            of 's' for startprob, 't' for transmat, and other characters for
            subclass-specific emission parameters.  Defaults to all parameters.
        implementation: string, optional
            Determines if the forward-backward algorithm is implemented with
            logarithms ("log"), or using scaling ("scaling").  The default is
            to use logarithms for backwards compatability.  However, the
            scaling implementation is generally faster.
        """
        super().__init__(
            n_components=n_components, algorithm=algorithm,
            random_state=random_state, n_iter=n_iter, tol=tol,
            verbose=verbose, params=params, init_params=init_params,
            implementation=implementation)
        self.startprob_prior = startprob_prior
        self.transmat_prior = transmat_prior
        self.monitor_ = ConvergenceMonitor(self.tol, self.n_iter, self.verbose)

    def get_stationary_distribution(self):
        """Compute the stationary distribution of states."""
        # The stationary distribution is proportional to the left-eigenvector
        # associated with the largest eigenvalue (i.e., 1) of the transition
        # matrix.
        check_is_fitted(self, "transmat_")
        eigvals, eigvecs = linalg.eig(self.transmat_.T)
        eigvec = np.real_if_close(eigvecs[:, np.argmax(eigvals)])
        return eigvec / eigvec.sum()

    def _fit_scaling(self, X):
        frameprob = self._compute_likelihood(X)
        log_prob, fwdlattice, scaling_factors = _hmmc.forward_scaling(
            self.startprob_, self.transmat_, frameprob)
        bwdlattice = _hmmc.backward_scaling(
            self.startprob_, self.transmat_, frameprob, scaling_factors)
        posteriors = self._compute_posteriors_scaling(fwdlattice, bwdlattice)
        return frameprob, log_prob, posteriors, fwdlattice, bwdlattice

    def _fit_log(self, X):
        log_frameprob = self._compute_log_likelihood(X)
        log_prob, fwdlattice = _hmmc.forward_log(
            self.startprob_, self.transmat_, log_frameprob)
        bwdlattice = _hmmc.backward_log(
            self.startprob_, self.transmat_, log_frameprob)
        posteriors = self._compute_posteriors_log(fwdlattice, bwdlattice)
        return log_frameprob, log_prob, posteriors, fwdlattice, bwdlattice

    def _do_mstep(self, stats):
        """
        Perform the M-step of EM algorithm.

        Parameters
        ----------
        stats : dict
            Sufficient statistics updated from all available samples.
        """
        # If a prior is < 1, `prior - 1 + starts['start']` can be negative.  In
        # that case maximization of (n1+e1) log p1 + ... + (ns+es) log ps under
        # the conditions sum(p) = 1 and all(p >= 0) show that the negative
        # terms can just be set to zero.
        # The ``np.where`` calls guard against updating forbidden states
        # or transitions in e.g. a left-right HMM.
        if 's' in self.params:
            startprob_ = np.maximum(self.startprob_prior - 1 + stats['start'],
                                    0)
            self.startprob_ = np.where(self.startprob_ == 0, 0, startprob_)
            normalize(self.startprob_)
        if 't' in self.params:
            transmat_ = np.maximum(self.transmat_prior - 1 + stats['trans'], 0)
            self.transmat_ = np.where(self.transmat_ == 0, 0, transmat_)
            normalize(self.transmat_, axis=1)

    def _compute_lower_bound(self, curr_logprob):
        return curr_logprob

    def _init(self, X, lengths=None):
        """
        Initialize model parameters prior to fitting.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Feature matrix of individual samples.
        """
        self._check_and_set_n_features(X)
        init = 1. / self.n_components
        random_state = check_random_state(self.random_state)
        if self._needs_init("s", "startprob_"):
            self.startprob_ = random_state.dirichlet(
                np.full(self.n_components, init))
        if self._needs_init("t", "transmat_"):
            self.transmat_ = random_state.dirichlet(
                np.full(self.n_components, init), size=self.n_components)
        n_fit_scalars_per_param = self._get_n_fit_scalars_per_param()
        if n_fit_scalars_per_param is not None:
            n_fit_scalars = sum(
                n_fit_scalars_per_param[p] for p in self.params)
            if X.size < n_fit_scalars:
                _log.warning(
                    "Fitting a model with %d free scalar parameters with only "
                    "%d data points will result in a degenerate solution.",
                    n_fit_scalars, X.size)

    def _check_sum_1(self, name):
        """Check that an array describes one or more distributions."""
        s = getattr(self, name).sum(axis=-1)
        if not np.allclose(s, 1):
            raise ValueError(
                f"{name} must sum to 1 (got {s:.4f})" if s.ndim == 0 else
                f"{name} rows must sum to 1 (got {s})" if s.ndim == 1 else
                "Expected 1D or 2D array")

    def _check(self):
        """
        Validate model parameters prior to fitting.

        Raises
        ------
        ValueError
            If any of the parameters are invalid, e.g. if :attr:`startprob_`
            don't sum to 1.
        """
        self.startprob_ = np.asarray(self.startprob_)
        if len(self.startprob_) != self.n_components:
            raise ValueError("startprob_ must have length n_components")
        self._check_sum_1("startprob_")

        self.transmat_ = np.asarray(self.transmat_)
        if self.transmat_.shape != (self.n_components, self.n_components):
            raise ValueError(
                "transmat_ must have shape (n_components, n_components)")
        self._check_sum_1("transmat_")

    def aic(self, X, lengths=None):
        """
        Akaike information criterion for the current model on the input X.

        AIC = -2*logLike + 2 * num_free_params

        https://en.wikipedia.org/wiki/Akaike_information_criterion

        Parameters
        ----------
        X : array of shape (n_samples, n_dimensions)
            The input samples.
        lengths : array-like of integers, shape (n_sequences, )
            Lengths of the individual sequences in ``X``. The sum of
            these should be ``n_samples``.

        Returns
        -------
        aic : float
            The lower the better.
        """
        n_params = sum(self._get_n_fit_scalars_per_param().values())
        return -2 * self.score(X, lengths=lengths) + 2 * n_params

    def bic(self, X, lengths=None):
        """
        Bayesian information criterion for the current model on the input X.

        BIC = -2*logLike + num_free_params * log(num_of_data)

        https://en.wikipedia.org/wiki/Bayesian_information_criterion

        Parameters
        ----------
        X : array of shape (n_samples, n_dimensions)
            The input samples.
        lengths : array-like of integers, shape (n_sequences, )
            Lengths of the individual sequences in ``X``. The sum of
            these should be ``n_samples``.

        Returns
        -------
        bic : float
            The lower the better.
        """
        n_params = sum(self._get_n_fit_scalars_per_param().values())
        return -2 * self.score(X, lengths=lengths) + n_params * np.log(len(X))

_BaseHMM = BaseHMM  # Backcompat name, will be deprecated in the future.


class VariationalBaseHMM(_AbstractHMM):

    def __init__(self, n_components=1,
                 startprob_prior=None, transmat_prior=None,
                 algorithm="viterbi", random_state=None,
                 n_iter=100, tol=1e-6, verbose=False,
                 params="ste", init_params="ste",
                 implementation="log"):
        super().__init__(
            n_components=n_components, algorithm=algorithm,
            random_state=random_state, n_iter=n_iter, tol=tol,
            verbose=verbose, params=params, init_params=init_params,
            implementation=implementation)

        self.startprob_prior = startprob_prior
        self.transmat_prior = transmat_prior
        self.monitor_ = ConvergenceMonitor(
            self.tol, self.n_iter, self.verbose)

    def _init(self, X, lengths=None):
        """
        Initialize model parameters prior to fitting.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Feature matrix of individual samples.
        lengths : array-like of integers, shape (n_sequences, )
            Lengths of the individual sequences in ``X``. The sum of
            these should be ``n_samples``.
        """
        self._check_and_set_n_features(X)
        nc = self.n_components
        uniform_prior = 1 / nc
        random_state = check_random_state(self.random_state)
        if (self._needs_init("s", "startprob_posterior_")
                or self._needs_init("s", "startprob_prior_")):
            if self.startprob_prior is None:
                startprob_init = uniform_prior
            else:
                startprob_init = self.startprob_prior

            self.startprob_prior_ = np.full(nc, startprob_init)
            self.startprob_posterior_ = random_state.dirichlet(
                np.full(nc, uniform_prior)) * len(lengths)

        if (self._needs_init("t", "transmat_posterior_")
                or self._needs_init("t", "transmat_prior_")):
            if self.transmat_prior is None:
                transmat_init = uniform_prior
            else:
                transmat_init = self.transmat_prior
            self.transmat_prior_ = np.full(
                (nc, nc), transmat_init)
            self.transmat_posterior_ = random_state.dirichlet(
                np.full(nc, uniform_prior), size=nc)
            self.transmat_posterior_ *= sum(lengths) / nc

        n_fit_scalars_per_param = self._get_n_fit_scalars_per_param()
        if n_fit_scalars_per_param is not None:
            n_fit_scalars = sum(
                n_fit_scalars_per_param[p] for p in self.params)
            if X.size < n_fit_scalars:
                _log.warning(
                    "Fitting a model with %d free scalar parameters with only "
                    "%d data points will result in a degenerate solution.",
                    n_fit_scalars, X.size)

    # For Variational Inference, we compute the forward/backward algorithm
    # using subnormalized probabilities.
    def _fit_scaling(self, X):
        frameprob = self._compute_subnorm_likelihood(X)
        logprob, fwdlattice, scaling_factors = _hmmc.forward_scaling(
            self.startprob_subnorm_, self.transmat_subnorm_, frameprob)

        bwdlattice = _hmmc.backward_scaling(
            self.startprob_subnorm_, self.transmat_subnorm_,
            frameprob, scaling_factors)
        posteriors = self._compute_posteriors_scaling(fwdlattice, bwdlattice)
        return frameprob, logprob, posteriors, fwdlattice, bwdlattice

    def _fit_log(self, X):
        framelogprob = self._compute_subnorm_log_likelihood(X)
        logprob, fwdlattice = _hmmc.forward_log(
            self.startprob_subnorm_, self.transmat_subnorm_, framelogprob)
        bwdlattice = _hmmc.backward_log(
            self.startprob_subnorm_, self.transmat_subnorm_, framelogprob)
        posteriors = self._compute_posteriors_log(fwdlattice, bwdlattice)
        return framelogprob, logprob, posteriors, fwdlattice, bwdlattice

    def _check(self):
        """
        Validate model parameters prior to fitting.

        Raises
        ------
        ValueError
            If any of the parameters are invalid, e.g. if :attr:`startprob_`
            don't sum to 1.
        """
        nc = self.n_components

        self.startprob_prior_ = np.asarray(self.startprob_prior_)
        if len(self.startprob_prior_) != nc:
            raise ValueError("startprob_prior_ must have length n_components")
        self.startprob_posterior_ = np.asarray(self.startprob_posterior_)
        if len(self.startprob_posterior_) != nc:
            raise ValueError("startprob_posterior_ must have length "
                             "n_components")

        self.transmat_prior_ = np.asarray(self.transmat_prior_)
        if self.transmat_prior_.shape != (nc, nc):
            raise ValueError("transmat_prior_ must have shape "
                             "(n_components, n_components)")
        self.transmat_posterior_ = np.asarray(self.transmat_posterior_)
        if self.transmat_posterior_.shape != (nc, nc):
            raise ValueError("transmat_posterior_ must have shape "
                             "(n_components, n_components)")

    def _compute_subnorm_likelihood(self, X):
        if (self._compute_subnorm_log_likelihood !=  # prevent recursion
                __class__._compute_subnorm_log_likelihood.__get__(self)):
            return np.exp(self._compute_subnorm_log_likelihood(X))
        else:
            raise NotImplementedError("Must be overridden in subclass")

    def _compute_subnorm_log_likelihood(self, X):
        if (self._compute_subnorm_likelihood !=  # prevent recursion
                __class__._compute_subnorm_likelihood.__get__(self)):
            return np.log(self._compute_subnorm_likelihood(X))
        else:
            raise NotImplementedError("Must be overridden in subclass")

    def _accumulate_sufficient_statistics_scaling(
            self, stats, X, lattice, posteriors, fwdlattice, bwdlattice):
        """
        Implementation of `_accumulate_sufficient_statistics`
        for ``implementation = "log"``.
        """
        stats['nobs'] += 1
        if 's' in self.params:
            stats['start'] += posteriors[0]
        if 't' in self.params:
            n_samples, n_components = lattice.shape
            # when the sample is of length 1, it contains no transitions
            # so there is no reason to update our trans. matrix estimate
            if n_samples <= 1:
                return

            xi_sum = _hmmc.compute_scaling_xi_sum(fwdlattice,
                                                  self.transmat_subnorm_,
                                                  bwdlattice, lattice)
            stats['trans'] += xi_sum

    def _accumulate_sufficient_statistics_log(
            self, stats, X, lattice, posteriors, fwdlattice, bwdlattice):
        """
        Implementation of `_accumulate_sufficient_statistics`
        for ``implementation = "log"``.
        """
        stats['nobs'] += 1
        if 's' in self.params:
            stats['start'] += posteriors[0]
        if 't' in self.params:
            n_samples, n_components = lattice.shape
            # when the sample is of length 1, it contains no transitions
            # so there is no reason to update our trans. matrix estimate
            if n_samples <= 1:
                return

            log_xi_sum = _hmmc.compute_log_xi_sum(
                fwdlattice, self.transmat_subnorm_, bwdlattice,
                lattice)
            with np.errstate(under="ignore"):
                stats['trans'] += np.exp(log_xi_sum)

    def _estep_begin(self):
        """
        Update the subnormalized model parameters.  Called at the beginning of
        each iteration of fit()
        """
        startprob_log_subnorm = (
            special.digamma(self.startprob_posterior_)
            - special.digamma(self.startprob_posterior_.sum()))
        self.startprob_subnorm_ = np.exp(startprob_log_subnorm)

        transmat_log_subnorm = (
            special.digamma(self.transmat_posterior_)
            - special.digamma(self.transmat_posterior_.sum(axis=1)[:, None]))
        self.transmat_subnorm_ = np.exp(transmat_log_subnorm)

    def _do_mstep(self, stats):
        """
        Perform the M-step of EM algorithm.

        Parameters
        ----------
        stats : dict
            Sufficient statistics updated from all available samples.
        """
        if 's' in self.params:
            self.startprob_posterior_ = self.startprob_prior_ + stats['start']
            # For compatability in _AbstractHMM
            self.startprob_ = (self.startprob_posterior_
                               / self.startprob_posterior_.sum())
        if 't' in self.params:
            self.transmat_posterior_ = self.transmat_prior_ + stats['trans']
            # For compatability in _AbstractHMM
            self.transmat_ = (self.transmat_posterior_
                              / self.transmat_posterior_.sum(axis=1)[:, None])

    def _compute_lower_bound(self, curr_logprob):
        """
        Compute the Variational Lower Bound of the model as currently
        configured.

        Following the pattern elsewhere, derived implementations should call
        this method to get the contribution of the current log_prob,
        transmat, and startprob towards the lower bound

        Parameters
        ----------
        curr_logprob : float
                       The current log probability of the data as computed at
                       the subnormalized model parameters.

        Returns
        -------
        lower_bound: float
                     Returns the computed lower bound contribution of the
                     log_prob, startprob, and transmat.
        """
        # Get the contribution from the state transitions,
        # initial probabilities, and the likelihood of the sequences
        startprob_lower_bound = -_kl.kl_dirichlet(
            self.startprob_posterior_, self.startprob_prior_)
        transmat_lower_bound = 0
        for i in range(self.n_components):
            transmat_lower_bound -= _kl.kl_dirichlet(
                self.transmat_posterior_[i], self.transmat_prior_[i])
        return startprob_lower_bound + transmat_lower_bound + curr_logprob