"""
Run x12/x13-arima specs in a subprocess from Python and curry results back
into python.
Notes
-----
Many of the functions are called x12. However, they are also intended to work
for x13. If this is not the case, it's a bug.
"""
from statsmodels.compat.pandas import deprecate_kwarg
import os
import subprocess
import tempfile
import re
from warnings import warn
import pandas as pd
from statsmodels.compat.python import iteritems
from statsmodels.tools.tools import Bunch
from statsmodels.tools.sm_exceptions import (X13NotFoundError,
IOWarning, X13Error,
X13Warning)
__all__ = ["x13_arima_select_order", "x13_arima_analysis"]
_binary_names = ('x13as.exe', 'x13as', 'x12a.exe', 'x12a')
class _freq_to_period:
def __getitem__(self, key):
if key.startswith('M'):
return 12
elif key.startswith('Q'):
return 4
elif key.startswith('W'):
return 52
_freq_to_period = _freq_to_period()
_period_to_freq = {12: 'M', 4: 'Q'}
_log_to_x12 = {True: 'log', False: 'none', None: 'auto'}
_bool_to_yes_no = lambda x: 'yes' if x else 'no' # noqa:E731
def _find_x12(x12path=None, prefer_x13=True):
"""
If x12path is not given, then either x13as[.exe] or x12a[.exe] must
be found on the PATH. Otherwise, the environmental variable X12PATH or
X13PATH must be defined. If prefer_x13 is True, only X13PATH is searched
for. If it is false, only X12PATH is searched for.
"""
global _binary_names
if x12path is not None and x12path.endswith(_binary_names):
# remove binary from path if given
x12path = os.path.dirname(x12path)
if not prefer_x13: # search for x12 first
_binary_names = _binary_names[::-1]
if x12path is None:
x12path = os.getenv("X12PATH", "")
if not x12path:
x12path = os.getenv("X13PATH", "")
elif x12path is None:
x12path = os.getenv("X13PATH", "")
if not x12path:
x12path = os.getenv("X12PATH", "")
for binary in _binary_names:
x12 = os.path.join(x12path, binary)
try:
subprocess.check_call(x12, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return x12
except OSError:
pass
else:
return False
def _check_x12(x12path=None):
x12path = _find_x12(x12path)
if not x12path:
raise X13NotFoundError("x12a and x13as not found on path. Give the "
"path, put them on PATH, or set the "
"X12PATH or X13PATH environmental variable.")
return x12path
def _clean_order(order):
"""
Takes something like (1 1 0)(0 1 1) and returns a arma order, sarma
order tuple. Also accepts (1 1 0) and return arma order and (0, 0, 0)
"""
order = re.findall(r"\([0-9 ]*?\)", order)
def clean(x):
return tuple(map(int, re.sub("[()]", "", x).split(" ")))
if len(order) > 1:
order, sorder = map(clean, order)
else:
order = clean(order[0])
sorder = (0, 0, 0)
return order, sorder
def run_spec(x12path, specpath, outname=None, meta=False, datameta=False):
if meta and datameta:
raise ValueError("Cannot specify both meta and datameta.")
if meta:
args = [x12path, "-m " + specpath]
elif datameta:
args = [x12path, "-d " + specpath]
else:
args = [x12path, specpath]
if outname:
args += [outname]
return subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
def _make_automdl_options(maxorder, maxdiff, diff):
options = "\n"
options += "maxorder = ({0} {1})\n".format(maxorder[0], maxorder[1])
if maxdiff is not None: # maxdiff always takes precedence
options += "maxdiff = ({0} {1})\n".format(maxdiff[0], maxdiff[1])
else:
options += "diff = ({0} {1})\n".format(diff[0], diff[1])
return options
def _make_var_names(exog):
if hasattr(exog, "name"):
var_names = exog.name
elif hasattr(exog, "columns"):
var_names = exog.columns
else:
raise ValueError("exog is not a Series or DataFrame or is unnamed.")
try:
var_names = " ".join(var_names)
except TypeError: # cannot have names that are numbers, pandas default
from statsmodels.base.data import _make_exog_names
if exog.ndim == 1:
var_names = "x1"
else:
var_names = " ".join(_make_exog_names(exog))
return var_names
def _make_regression_options(trading, exog):
if not trading and exog is None: # start regression spec
return ""
reg_spec = "regression{\n"
if trading:
reg_spec += " variables = (td)\n"
if exog is not None:
var_names = _make_var_names(exog)
reg_spec += " user = ({0})\n".format(var_names)
reg_spec += " data = ({0})\n".format("\n".join(map(str,
exog.values.ravel().tolist())))
reg_spec += "}\n" # close out regression spec
return reg_spec
def _make_forecast_options(forecast_periods):
if forecast_periods is None:
return ""
forecast_spec = "forecast{\n"
forecast_spec += "maxlead = ({0})\n}}\n".format(forecast_periods)
return forecast_spec
def _check_errors(errors):
errors = errors[errors.find("spc:")+4:].strip()
if errors and 'ERROR' in errors:
raise X13Error(errors)
elif errors and 'WARNING' in errors:
warn(errors, X13Warning)
def _convert_out_to_series(x, dates, name):
"""
Convert x to a DataFrame where x is a string in the format given by
x-13arima-seats output.
"""
from io import StringIO
from pandas import read_csv
out = read_csv(StringIO(x), skiprows=2,
header=None, sep='\t', engine='python')
return out.set_index(dates).rename(columns={1: name})[name]
def _open_and_read(fname):
# opens a file, reads it, and make sure it's closed
with open(fname, 'r') as fin:
fout = fin.read()
return fout
class Spec(object):
@property
def spec_name(self):
return self.__class__.__name__.replace("Spec", "")
def create_spec(self, **kwargs):
spec = """{name} {{
{options}
}}
"""
return spec.format(name=self.spec_name,
options=self.options)
def set_options(self, **kwargs):
options = ""
for key, value in iteritems(kwargs):
options += "{0}={1}\n".format(key, value)
self.__dict__.update({key: value})
self.options = options
class SeriesSpec(Spec):
"""
Parameters
----------
data
appendbcst : bool
appendfcst : bool
comptype
compwt
decimals
modelspan
name
period
precision
to_print
to_save
span
start
title
type
Notes
-----
Rarely used arguments
divpower
missingcode
missingval
saveprecision
trimzero
"""
def __init__(self, data, name='Unnamed Series', appendbcst=False,
appendfcst=False,
comptype=None, compwt=1, decimals=0, modelspan=(),
period=12, precision=0, to_print=[], to_save=[], span=(),
start=(1, 1), title='', series_type=None, divpower=None,
missingcode=-99999, missingval=1000000000):
appendbcst, appendfcst = map(_bool_to_yes_no, [appendbcst,
appendfcst,
])
series_name = "\"{0}\"".format(name[:64]) # trim to 64 characters
title = "\"{0}\"".format(title[:79]) # trim to 79 characters
self.set_options(data=data, appendbcst=appendbcst,
appendfcst=appendfcst, period=period, start=start,
title=title, name=series_name,
)
def pandas_to_series_spec(x):
# from statsmodels.tools.data import _check_period_index
# check_period_index(x)
if hasattr(x, 'columns'): # convert to series
if len(x.columns) > 1:
raise ValueError("Does not handle DataFrame with more than one "
"column")
x = x[x.columns[0]]
data = "({0})".format("\n".join(map(str, x.values.tolist())))
# get periodicity
# get start / first data
# give it a title
try:
period = _freq_to_period[x.index.freqstr]
except (AttributeError, ValueError):
from pandas.tseries.api import infer_freq
period = _freq_to_period[infer_freq(x.index)]
start_date = x.index[0]
if period == 12:
year, stperiod = start_date.year, start_date.month
elif period == 4:
year, stperiod = start_date.year, start_date.quarter
else: # pragma: no cover
raise ValueError("Only monthly and quarterly periods are supported."
" Please report or send a pull request if you want "
"this extended.")
if hasattr(x, 'name'):
name = x.name or "Unnamed Series"
else:
name = 'Unnamed Series'
series_spec = SeriesSpec(data=data, name=name, period=period,
title=name, start="{0}.{1}".format(year,
stperiod))
return series_spec
@deprecate_kwarg('forecast_years', 'forecast_periods')
def x13_arima_analysis(endog, maxorder=(2, 1), maxdiff=(2, 1), diff=None,
exog=None, log=None, outlier=True, trading=False,
forecast_periods=None, retspec=False,
speconly=False, start=None, freq=None,
print_stdout=False, x12path=None, prefer_x13=True):
"""
Perform x13-arima analysis for monthly or quarterly data.
Parameters
----------
endog : array_like, pandas.Series
The series to model. It is best to use a pandas object with a
DatetimeIndex or PeriodIndex. However, you can pass an array-like
object. If your object does not have a dates index then ``start`` and
``freq`` are not optional.
maxorder : tuple
The maximum order of the regular and seasonal ARMA polynomials to
examine during the model identification. The order for the regular
polynomial must be greater than zero and no larger than 4. The
order for the seasonal polynomial may be 1 or 2.
maxdiff : tuple
The maximum orders for regular and seasonal differencing in the
automatic differencing procedure. Acceptable inputs for regular
differencing are 1 and 2. The maximum order for seasonal differencing
is 1. If ``diff`` is specified then ``maxdiff`` should be None.
Loading ...