from statsmodels.compat.python import lrange, lzip
from statsmodels.compat.numpy import recarray_select
import numpy as np
import numpy.lib.recfunctions as nprf
import pandas as pd
from pandas import DataFrame
from pandas.tseries import offsets
from pandas.tseries.frequencies import to_offset
from statsmodels.tools.validation import int_like, bool_like, string_like
from statsmodels.tools.sm_exceptions import ValueWarning
from statsmodels.tools.data import _is_using_pandas, _is_recarray
from statsmodels.tools.validation import array_like
def add_trend(x, trend="c", prepend=False, has_constant='skip'):
"""
Add a trend and/or constant to an array.
Parameters
----------
x : array_like
Original array of data.
trend : str {'n', 'c', 't', 'ct', 'ctt'}
The trend to add.
* 'n' add no trend.
* 'c' add constant only.
* 't' add trend only.
* 'ct' add constant and linear trend.
* 'ctt' add constant and linear and quadratic trend.
prepend : bool
If True, prepends the new data to the columns of X.
has_constant : str {'raise', 'add', 'skip'}
Controls what happens when trend is 'c' and a constant column already
exists in x. 'raise' will raise an error. 'add' will add a column of
1s. 'skip' will return the data without change. 'skip' is the default.
Returns
-------
array_like
The original data with the additional trend columns. If x is a
recarray or pandas Series or DataFrame, then the trend column names
are 'const', 'trend' and 'trend_squared'.
See Also
--------
statsmodels.tools.tools.add_constant
Add a constant column to an array.
Notes
-----
Returns columns as ['ctt','ct','c'] whenever applicable. There is currently
no checking for an existing trend.
"""
prepend = bool_like(prepend, 'prepend')
trend = string_like(trend, 'trend', options=('n', 'c', 't', 'ct', 'ctt'))
has_constant = string_like(has_constant, 'has_constant',
options=('raise', 'add', 'skip'))
# TODO: could be generalized for trend of aribitrary order
columns = ['const', 'trend', 'trend_squared']
if trend == 'n':
return x.copy()
elif trend == "c": # handles structured arrays
columns = columns[:1]
trendorder = 0
elif trend == "ct" or trend == "t":
columns = columns[:2]
if trend == "t":
columns = columns[1:2]
trendorder = 1
elif trend == "ctt":
trendorder = 2
is_recarray = _is_recarray(x)
is_pandas = _is_using_pandas(x, None) or is_recarray
if is_pandas or is_recarray:
if is_recarray:
# deprecated: remove recarray support after 0.12
import warnings
from statsmodels.tools.sm_exceptions import recarray_warning
warnings.warn(recarray_warning, FutureWarning)
descr = x.dtype.descr
x = pd.DataFrame.from_records(x)
elif isinstance(x, pd.Series):
x = pd.DataFrame(x)
else:
x = x.copy()
else:
x = np.asanyarray(x)
nobs = len(x)
trendarr = np.vander(np.arange(1, nobs + 1, dtype=np.float64), trendorder + 1)
# put in order ctt
trendarr = np.fliplr(trendarr)
if trend == "t":
trendarr = trendarr[:, 1]
if "c" in trend:
if is_pandas or is_recarray:
# Mixed type protection
def safe_is_const(s):
try:
return np.ptp(s) == 0.0 and np.any(s != 0.0)
except:
return False
col_const = x.apply(safe_is_const, 0)
else:
ptp0 = np.ptp(np.asanyarray(x), axis=0)
col_is_const = ptp0 == 0
nz_const = col_is_const & (x[0] != 0)
col_const = nz_const
if np.any(col_const):
if has_constant == 'raise':
msg = "x contains a constant. Adding a constant with " \
"trend='{0}' is not allowed.".format(trend)
raise ValueError(msg)
elif has_constant == 'skip':
columns = columns[1:]
trendarr = trendarr[:, 1:]
order = 1 if prepend else -1
if is_recarray or is_pandas:
trendarr = pd.DataFrame(trendarr, index=x.index, columns=columns)
x = [trendarr, x]
x = pd.concat(x[::order], 1)
else:
x = [trendarr, x]
x = np.column_stack(x[::order])
if is_recarray:
x = x.to_records(index=False)
new_descr = x.dtype.descr
extra_col = len(new_descr) - len(descr)
if prepend:
descr = new_descr[:extra_col] + descr
else:
descr = descr + new_descr[-extra_col:]
x = x.astype(np.dtype(descr))
return x
def add_lag(x, col=None, lags=1, drop=False, insert=True):
"""
Returns an array with lags included given an array.
Parameters
----------
x : array_like
An array or NumPy ndarray subclass. Can be either a 1d or 2d array with
observations in columns.
col : 'string', int, or None
If data is a structured array or a recarray, `col` can be a string
that is the name of the column containing the variable. Or `col` can
be an int of the zero-based column index. If it's a 1d array `col`
can be None.
lags : int
The number of lags desired.
drop : bool
Whether to keep the contemporaneous variable for the data.
insert : bool or int
If True, inserts the lagged values after `col`. If False, appends
the data. If int inserts the lags at int.
Returns
-------
array : ndarray
Array with lags
Examples
--------
>>> import statsmodels.api as sm
>>> data = sm.datasets.macrodata.load(as_pandas=False)
>>> data = data.data[['year','quarter','realgdp','cpi']]
>>> data = sm.tsa.add_lag(data, 'realgdp', lags=2)
Notes
-----
Trims the array both forward and backward, so that the array returned
so that the length of the returned array is len(`X`) - lags. The lags are
returned in increasing order, ie., t-1,t-2,...,t-lags
"""
lags = int_like(lags, 'lags')
drop = bool_like(drop, 'drop')
if x.dtype.names:
names = x.dtype.names
if not col and np.squeeze(x).ndim > 1:
raise IndexError("col is None and the input array is not 1d")
elif len(names) == 1:
col = names[0]
if isinstance(col, int):
col = x.dtype.names[col]
contemp = x[col]
# make names for lags
tmp_names = [col + '_'+'L(%i)' % i for i in range(1, lags+1)]
ndlags = lagmat(contemp, maxlag=lags, trim='Both')
# get index for return
if insert is True:
ins_idx = list(names).index(col) + 1
elif insert is False:
ins_idx = len(names) + 1
else: # insert is an int
if insert > len(names):
import warnings
warnings.warn("insert > number of variables, inserting at the"
" last position", ValueWarning)
ins_idx = insert
first_names = list(names[:ins_idx])
last_names = list(names[ins_idx:])
if drop:
if col in first_names:
first_names.pop(first_names.index(col))
else:
last_names.pop(last_names.index(col))
if first_names: # only do this if x is not "empty"
# Workaround to avoid NumPy FutureWarning
_x = recarray_select(x, first_names)
first_arr = nprf.append_fields(_x[lags:], tmp_names, ndlags.T,
usemask=False)
else:
first_arr = np.zeros(len(x)-lags, dtype=lzip(tmp_names,
(x[col].dtype,)*lags))
for i,name in enumerate(tmp_names):
first_arr[name] = ndlags[:,i]
if last_names:
return nprf.append_fields(first_arr, last_names,
[x[name][lags:] for name in last_names], usemask=False)
else: # lags for last variable
return first_arr
else: # we have an ndarray
if x.ndim == 1: # make 2d if 1d
x = x[:,None]
if col is None:
col = 0
# handle negative index
if col < 0:
col = x.shape[1] + col
contemp = x[:,col]
if insert is True:
ins_idx = col + 1
elif insert is False:
ins_idx = x.shape[1]
else:
if insert < 0: # handle negative index
insert = x.shape[1] + insert + 1
if insert > x.shape[1]:
insert = x.shape[1]
import warnings
warnings.warn("insert > number of variables, inserting at the"
" last position", ValueWarning)
ins_idx = insert
ndlags = lagmat(contemp, lags, trim='Both')
first_cols = lrange(ins_idx)
last_cols = lrange(ins_idx,x.shape[1])
if drop:
if col in first_cols:
first_cols.pop(first_cols.index(col))
else:
last_cols.pop(last_cols.index(col))
return np.column_stack((x[lags:,first_cols],ndlags,
x[lags:,last_cols]))
def detrend(x, order=1, axis=0):
"""
Detrend an array with a trend of given order along axis 0 or 1.
Parameters
----------
x : array_like, 1d or 2d
Data, if 2d, then each row or column is independently detrended with
the same trendorder, but independent trend estimates.
order : int
The polynomial order of the trend, zero is constant, one is
linear trend, two is quadratic trend.
axis : int
Axis can be either 0, observations by rows, or 1, observations by
columns.
Returns
-------
ndarray
The detrended series is the residual of the linear regression of the
data on the trend of given order.
"""
order = int_like(order, 'order')
axis = int_like(axis, 'axis')
if x.ndim == 2 and int(axis) == 1:
x = x.T
elif x.ndim > 2:
raise NotImplementedError('x.ndim > 2 is not implemented until it is needed')
nobs = x.shape[0]
if order == 0:
# Special case demean
resid = x - x.mean(axis=0)
else:
trends = np.vander(np.arange(float(nobs)), N=order + 1)
beta = np.linalg.pinv(trends).dot(x)
resid = x - np.dot(trends, beta)
if x.ndim == 2 and int(axis) == 1:
resid = resid.T
return resid
def lagmat(x, maxlag, trim='forward', original='ex', use_pandas=False):
"""
Create 2d array of lags.
Parameters
----------
x : array_like
Data; if 2d, observation in rows and variables in columns.
maxlag : int
All lags from zero to maxlag are included.
trim : {'forward', 'backward', 'both', 'none', None}
The trimming method to use.
* 'forward' : trim invalid observations in front.
* 'backward' : trim invalid initial observations.
* 'both' : trim invalid observations on both sides.
Loading ...