Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / statsmodels   python

Repository URL to install this package:

Version: 0.11.1 

/ tsa / x13.py

"""
Run x12/x13-arima specs in a subprocess from Python and curry results back
into python.

Notes
-----
Many of the functions are called x12. However, they are also intended to work
for x13. If this is not the case, it's a bug.
"""
from statsmodels.compat.pandas import deprecate_kwarg

import os
import subprocess
import tempfile
import re
from warnings import warn

import pandas as pd

from statsmodels.compat.python import iteritems
from statsmodels.tools.tools import Bunch
from statsmodels.tools.sm_exceptions import (X13NotFoundError,
                                             IOWarning, X13Error,
                                             X13Warning)

__all__ = ["x13_arima_select_order", "x13_arima_analysis"]

_binary_names = ('x13as.exe', 'x13as', 'x12a.exe', 'x12a')


class _freq_to_period:
    def __getitem__(self, key):
        if key.startswith('M'):
            return 12
        elif key.startswith('Q'):
            return 4
        elif key.startswith('W'):
            return 52


_freq_to_period = _freq_to_period()

_period_to_freq = {12: 'M', 4: 'Q'}
_log_to_x12 = {True: 'log', False: 'none', None: 'auto'}
_bool_to_yes_no = lambda x: 'yes' if x else 'no'  # noqa:E731


def _find_x12(x12path=None, prefer_x13=True):
    """
    If x12path is not given, then either x13as[.exe] or x12a[.exe] must
    be found on the PATH. Otherwise, the environmental variable X12PATH or
    X13PATH must be defined. If prefer_x13 is True, only X13PATH is searched
    for. If it is false, only X12PATH is searched for.
    """
    global _binary_names
    if x12path is not None and x12path.endswith(_binary_names):
        # remove binary from path if given
        x12path = os.path.dirname(x12path)

    if not prefer_x13:  # search for x12 first
        _binary_names = _binary_names[::-1]
        if x12path is None:
            x12path = os.getenv("X12PATH", "")
        if not x12path:
            x12path = os.getenv("X13PATH", "")
    elif x12path is None:
        x12path = os.getenv("X13PATH", "")
        if not x12path:
            x12path = os.getenv("X12PATH", "")

    for binary in _binary_names:
        x12 = os.path.join(x12path, binary)
        try:
            subprocess.check_call(x12, stdout=subprocess.PIPE,
                                  stderr=subprocess.PIPE)
            return x12
        except OSError:
            pass

    else:
        return False


def _check_x12(x12path=None):
    x12path = _find_x12(x12path)
    if not x12path:
        raise X13NotFoundError("x12a and x13as not found on path. Give the "
                               "path, put them on PATH, or set the "
                               "X12PATH or X13PATH environmental variable.")
    return x12path


def _clean_order(order):
    """
    Takes something like (1 1 0)(0 1 1) and returns a arma order, sarma
    order tuple. Also accepts (1 1 0) and return arma order and (0, 0, 0)
    """
    order = re.findall(r"\([0-9 ]*?\)", order)

    def clean(x):
        return tuple(map(int, re.sub("[()]", "", x).split(" ")))

    if len(order) > 1:
        order, sorder = map(clean, order)
    else:
        order = clean(order[0])
        sorder = (0, 0, 0)

    return order, sorder


def run_spec(x12path, specpath, outname=None, meta=False, datameta=False):

    if meta and datameta:
        raise ValueError("Cannot specify both meta and datameta.")
    if meta:
        args = [x12path, "-m " + specpath]
    elif datameta:
        args = [x12path, "-d " + specpath]
    else:
        args = [x12path, specpath]

    if outname:
        args += [outname]

    return subprocess.Popen(args, stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)


def _make_automdl_options(maxorder, maxdiff, diff):
    options = "\n"
    options += "maxorder = ({0} {1})\n".format(maxorder[0], maxorder[1])
    if maxdiff is not None:  # maxdiff always takes precedence
        options += "maxdiff = ({0} {1})\n".format(maxdiff[0], maxdiff[1])
    else:
        options += "diff = ({0} {1})\n".format(diff[0], diff[1])
    return options


def _make_var_names(exog):
    if hasattr(exog, "name"):
        var_names = exog.name
    elif hasattr(exog, "columns"):
        var_names = exog.columns
    else:
        raise ValueError("exog is not a Series or DataFrame or is unnamed.")
    try:
        var_names = " ".join(var_names)
    except TypeError:  # cannot have names that are numbers, pandas default
        from statsmodels.base.data import _make_exog_names
        if exog.ndim == 1:
            var_names = "x1"
        else:
            var_names = " ".join(_make_exog_names(exog))
    return var_names


def _make_regression_options(trading, exog):
    if not trading and exog is None:  # start regression spec
        return ""

    reg_spec = "regression{\n"
    if trading:
        reg_spec += "    variables = (td)\n"
    if exog is not None:
        var_names = _make_var_names(exog)
        reg_spec += "    user = ({0})\n".format(var_names)
        reg_spec += "    data = ({0})\n".format("\n".join(map(str,
                                                exog.values.ravel().tolist())))

    reg_spec += "}\n"  # close out regression spec
    return reg_spec


def _make_forecast_options(forecast_periods):
    if forecast_periods is None:
        return ""
    forecast_spec = "forecast{\n"
    forecast_spec += "maxlead = ({0})\n}}\n".format(forecast_periods)
    return forecast_spec


def _check_errors(errors):
    errors = errors[errors.find("spc:")+4:].strip()
    if errors and 'ERROR' in errors:
        raise X13Error(errors)
    elif errors and 'WARNING' in errors:
        warn(errors, X13Warning)


def _convert_out_to_series(x, dates, name):
    """
    Convert x to a DataFrame where x is a string in the format given by
    x-13arima-seats output.
    """
    from io import StringIO
    from pandas import read_csv
    out = read_csv(StringIO(x), skiprows=2,
                   header=None, sep='\t', engine='python')
    return out.set_index(dates).rename(columns={1: name})[name]


def _open_and_read(fname):
    # opens a file, reads it, and make sure it's closed
    with open(fname, 'r') as fin:
        fout = fin.read()
    return fout


class Spec(object):
    @property
    def spec_name(self):
        return self.__class__.__name__.replace("Spec", "")

    def create_spec(self, **kwargs):
        spec = """{name} {{
        {options}
        }}
        """
        return spec.format(name=self.spec_name,
                           options=self.options)

    def set_options(self, **kwargs):
        options = ""
        for key, value in iteritems(kwargs):
            options += "{0}={1}\n".format(key, value)
            self.__dict__.update({key: value})
        self.options = options


class SeriesSpec(Spec):
    """
    Parameters
    ----------
    data
    appendbcst : bool
    appendfcst : bool
    comptype
    compwt
    decimals
    modelspan
    name
    period
    precision
    to_print
    to_save
    span
    start
    title
    type

    Notes
    -----
    Rarely used arguments

    divpower
    missingcode
    missingval
    saveprecision
    trimzero
    """
    def __init__(self, data, name='Unnamed Series', appendbcst=False,
                 appendfcst=False,
                 comptype=None, compwt=1, decimals=0, modelspan=(),
                 period=12, precision=0, to_print=[], to_save=[], span=(),
                 start=(1, 1), title='', series_type=None, divpower=None,
                 missingcode=-99999, missingval=1000000000):

        appendbcst, appendfcst = map(_bool_to_yes_no, [appendbcst,
                                                       appendfcst,
                                                       ])

        series_name = "\"{0}\"".format(name[:64])  # trim to 64 characters
        title = "\"{0}\"".format(title[:79])  # trim to 79 characters
        self.set_options(data=data, appendbcst=appendbcst,
                         appendfcst=appendfcst, period=period, start=start,
                         title=title, name=series_name,
                         )


def pandas_to_series_spec(x):
    # from statsmodels.tools.data import _check_period_index
    # check_period_index(x)
    if hasattr(x, 'columns'):  # convert to series
        if len(x.columns) > 1:
            raise ValueError("Does not handle DataFrame with more than one "
                             "column")
        x = x[x.columns[0]]

    data = "({0})".format("\n".join(map(str, x.values.tolist())))

    # get periodicity
    # get start / first data
    # give it a title
    try:
        period = _freq_to_period[x.index.freqstr]
    except (AttributeError, ValueError):
        from pandas.tseries.api import infer_freq
        period = _freq_to_period[infer_freq(x.index)]
    start_date = x.index[0]
    if period == 12:
        year, stperiod = start_date.year, start_date.month
    elif period == 4:
        year, stperiod = start_date.year, start_date.quarter
    else:  # pragma: no cover
        raise ValueError("Only monthly and quarterly periods are supported."
                         " Please report or send a pull request if you want "
                         "this extended.")

    if hasattr(x, 'name'):
        name = x.name or "Unnamed Series"
    else:
        name = 'Unnamed Series'
    series_spec = SeriesSpec(data=data, name=name, period=period,
                             title=name, start="{0}.{1}".format(year,
                                                                stperiod))
    return series_spec


@deprecate_kwarg('forecast_years', 'forecast_periods')
def x13_arima_analysis(endog, maxorder=(2, 1), maxdiff=(2, 1), diff=None,
                       exog=None, log=None, outlier=True, trading=False,
                       forecast_periods=None, retspec=False,
                       speconly=False, start=None, freq=None,
                       print_stdout=False, x12path=None, prefer_x13=True):
    """
    Perform x13-arima analysis for monthly or quarterly data.

    Parameters
    ----------
    endog : array_like, pandas.Series
        The series to model. It is best to use a pandas object with a
        DatetimeIndex or PeriodIndex. However, you can pass an array-like
        object. If your object does not have a dates index then ``start`` and
        ``freq`` are not optional.
    maxorder : tuple
        The maximum order of the regular and seasonal ARMA polynomials to
        examine during the model identification. The order for the regular
        polynomial must be greater than zero and no larger than 4. The
        order for the seasonal polynomial may be 1 or 2.
    maxdiff : tuple
        The maximum orders for regular and seasonal differencing in the
        automatic differencing procedure. Acceptable inputs for regular
        differencing are 1 and 2. The maximum order for seasonal differencing
        is 1. If ``diff`` is specified then ``maxdiff`` should be None.
Loading ...