Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tia / analysis / perf.py
Size: Mime:
"""
inspiration from R Package - PerformanceAnalytics
"""
from collections import OrderedDict

import pandas as pd
import numpy as np

from tia.analysis.util import per_series


PER_YEAR_MAP = {
    "BA": 1.0,
    "BAS": 1.0,
    "A": 1.0,
    "AS": 1.0,
    "BQ": 4.0,
    "BQS": 4.0,
    "Q": 4.0,
    "QS": 4.0,
    "D": 365.0,
    "B": 252.0,
    "BMS": 12.0,
    "BM": 12.0,
    "MS": 12.0,
    "M": 12.0,
    "W": 52.0,
}


def guess_freq(index):
    # admittedly weak way of doing this...This needs to be abolished
    if isinstance(index, (pd.Series, pd.DataFrame)):
        index = index.index

    if hasattr(index, "freqstr") and index.freqstr:
        return index.freqstr[0]
    elif len(index) < 3:
        raise Exception("cannot guess frequency with less than 3 items")
    else:
        lb = min(7, len(index))
        idx_zip = lambda: list(zip(index[-lb:-1], index[-(lb - 1) :]))

        diff = min([t2 - t1 for t1, t2, in idx_zip()])
        if diff.days <= 1:
            if 5 in index.dayofweek or 6 in index.dayofweek:
                return "D"
            else:
                return "B"
        elif diff.days == 7:
            return "W"
        else:
            diff = min([t2.month - t1.month for t1, t2, in idx_zip()])
            if diff == 1:
                return "M"
            diff = min([t2.year - t1.year for t1, t2, in idx_zip()])
            if diff == 1:
                return "A"

            strs = ",".join([i.strftime("%Y-%m-%d") for i in index[-lb:]])
            raise Exception(
                "unable to determine frequency, last %s dates %s" % (lb, strs)
            )


def periodicity(freq_or_frame):
    """
    resolve the number of periods per year
    """
    if hasattr(freq_or_frame, "rule_code"):
        rc = freq_or_frame.rule_code
        rc = rc.split("-")[0]
        factor = PER_YEAR_MAP.get(rc, None)
        if factor is not None:
            return factor / abs(freq_or_frame.n)
        else:
            raise Exception(
                "Failed to determine periodicity. No factor mapping for %s"
                % freq_or_frame
            )
    elif isinstance(freq_or_frame, str):
        factor = PER_YEAR_MAP.get(freq_or_frame, None)
        if factor is not None:
            return factor
        else:
            raise Exception(
                "Failed to determine periodicity. No factor mapping for %s"
                % freq_or_frame
            )
    elif isinstance(freq_or_frame, (pd.Series, pd.DataFrame, pd.TimeSeries)):
        freq = freq_or_frame.index.freq
        if not freq:
            freq = pd.infer_freq(freq_or_frame.index)
            if freq:
                return periodicity(freq)
            else:
                # Attempt to resolve it
                import warnings

                freq = guess_freq(freq_or_frame.index)
                warnings.warn("frequency not set. guessed it to be %s" % freq)
                return periodicity(freq)
        else:
            return periodicity(freq)
    else:
        raise ValueError("periodicity expects DataFrame, Series, or rule_code property")


periods_in_year = periodicity


def _resolve_periods_in_year(scale, frame):
    """Convert the scale to an annualzation factor.  If scale is None then attempt to resolve from frame. If scale is a scalar then
    use it. If scale is a string then use it to lookup the annual factor
    """
    if scale is None:
        return periodicity(frame)
    elif isinstance(scale, str):
        return periodicity(scale)
    elif np.isscalar(scale):
        return scale
    else:
        raise ValueError("scale must be None, scalar, or string, not %s" % type(scale))


def excess_returns(returns, bm=0):
    """
    Return the excess amount of returns above the given benchmark bm
    """
    return returns - bm


def returns(
    prices, method="simple", periods=1, fill_method="pad", limit=None, freq=None
):
    """
    compute the returns for the specified prices.
    method: [simple,compound,log], compound is log
    """
    if method not in ("simple", "compound", "log"):
        raise ValueError("Invalid method type. Valid values are ('simple', 'compound')")

    if method == "simple":
        return prices.pct_change(
            periods=periods, fill_method=fill_method, limit=limit, freq=freq
        )
    else:
        if freq is not None:
            raise NotImplementedError("TODO: implement this logic if needed")

        if isinstance(prices, pd.Series):
            if fill_method is None:
                data = prices
            else:
                data = prices.fillna(method=fill_method, limit=limit)

            data = np.log(data / data.shift(periods=periods))
            mask = pd.isnull(prices.values)
            np.putmask(data.values, mask, np.nan)
            return data
        else:
            return pd.DataFrame(
                {
                    name: returns(col, method, periods, fill_method, limit, freq)
                    for name, col in prices.items()
                },
                columns=prices.columns,
                index=prices.index,
            )


def returns_cumulative(returns, geometric=True, expanding=False):
    """return the cumulative return

    Parameters
    ----------
    returns : DataFrame or Series
    geometric : bool, default is True
                If True, geometrically link returns
    expanding : bool default is False
                If True, return expanding series/frame of returns
                If False, return the final value(s)
    """
    if expanding:
        if geometric:
            return (1.0 + returns).cumprod() - 1.0
        else:
            return returns.cumsum()
    else:
        if geometric:
            return (1.0 + returns).prod() - 1.0
        else:
            return returns.sum()


def rolling_returns_cumulative(returns, window, min_periods=1, geometric=True):
    """return the rolling cumulative returns

    Parameters
    ----------
    returns : DataFrame or Series
    window : number of observations
    min_periods : minimum number of observations in a window
    geometric : link the returns geometrically
    """
    if geometric:
        rc = lambda x: (1.0 + x[np.isfinite(x)]).prod() - 1.0
    else:
        rc = lambda x: (x[np.isfinite(x)]).sum()

    return pd.rolling_apply(returns, window, rc, min_periods=min_periods)


def returns_annualized(returns, geometric=True, scale=None, expanding=False):
    """return the annualized cumulative returns

    Parameters
    ----------
    returns : DataFrame or Series
    geometric : link the returns geometrically
    scale: None or scalar or string (ie 12 for months in year),
           If None, attempt to resolve from returns
           If scalar, then use this as the annualization factor
           If string, then pass this to periodicity function to resolve annualization factor
    expanding: bool, default is False
                If True, return expanding series/frames.
                If False, return final result.
    """
    scale = _resolve_periods_in_year(scale, returns)
    if expanding:
        if geometric:
            n = pd.expanding_count(returns)
            return ((1.0 + returns).cumprod() ** (scale / n)) - 1.0
        else:
            return pd.expanding_mean(returns) * scale
    else:
        if geometric:
            n = returns.count()
            return ((1.0 + returns).prod() ** (scale / n)) - 1.0
        else:
            return returns.mean() * scale


def drawdowns(returns, geometric=True):
    """
    compute the drawdown series for the period return series
    return: periodic return Series or DataFrame
    """
    wealth = 1.0 + returns_cumulative(returns, geometric=geometric, expanding=True)
    values = wealth.values
    if values.ndim == 2:
        ncols = values.shape[-1]
        values = np.vstack(([1.0] * ncols, values))
        maxwealth = pd.expanding_max(values)[1:]
        dds = wealth / maxwealth - 1.0
        dds[dds > 0] = 0  # Can happen if first returns are positive
        return dds
    elif values.ndim == 1:
        values = np.hstack(([1.0], values))
        maxwealth = pd.expanding_max(values)[1:]
        dds = wealth / maxwealth - 1.0
        dds[dds > 0] = 0  # Can happen if first returns are positive
        return dds
    else:
        raise ValueError("unable to process array with %s dimensions" % values.ndim)


def max_drawdown(returns=None, geometric=True, dd=None, inc_date=False):
    """
    compute the max draw down.
    returns: period return Series or DataFrame
    dd: drawdown Series or DataFrame (mutually exclusive with returns)
    """
    if (returns is None and dd is None) or (returns is not None and dd is not None):
        raise ValueError("returns and drawdowns are mutually exclusive")

    if returns is not None:
        dd = drawdowns(returns, geometric=geometric)

    if isinstance(dd, pd.DataFrame):
        vals = [max_drawdown(dd=dd[c], inc_date=inc_date) for c in dd.columns]
        cols = ["maxxdd"] + (inc_date and ["maxdd_dt"] or [])
        res = pd.DataFrame(vals, columns=cols, index=dd.columns)
        return res if inc_date else res.maxdd
    else:
        mddidx = dd.idxmin()
        # if mddidx == dd.index[0]:
        # # no maxff
        #    return 0 if not inc_date else (0, None)
        # else:
        sub = dd[:mddidx]
        start = sub[::-1].idxmax()
        mdd = dd[mddidx]
        # return start, mddidx, mdd
        return mdd if not inc_date else (mdd, mddidx)


@per_series(result_is_frame=1)
def drawdown_info(returns, geometric=True):
    """Return a DataFrame containing information about ALL the drawdowns for the rets. The frame
    contains the columns:
    'dd start': drawdown start date
    'dd end': drawdown end date
    'maxdd': maximium drawdown
    'maxdd dt': maximum drawdown
    'days': duration of drawdown
    """
    dd = drawdowns(returns, geometric=True).to_frame()
    last = dd.index[-1]
    dd.columns = ["vals"]
    dd["nonzero"] = (dd.vals != 0).astype(int)
    dd["gid"] = (dd.nonzero.shift(1) != dd.nonzero).astype(int).cumsum()
    idxname = dd.index.name or "index"
    ixs = (
        dd.reset_index()
        .groupby(["nonzero", "gid"])[idxname]
        .apply(lambda x: np.array(x))
    )
    rows = []
    if 1 in ixs:
        for ix in ixs[1]:
            sub = dd.ix[ix]
            # need to get t+1 since actually draw down ends on the 0 value
            end = dd.index[
                dd.index.get_loc(sub.index[-1]) + (last != sub.index[-1] and 1 or 0)
            ]
            rows.append([sub.index[0], end, sub.vals.min(), sub.vals.idxmin()])
    f = pd.DataFrame.from_records(
        rows, columns=["dd start", "dd end", "maxdd", "maxdd dt"]
    )
    f["days"] = (f["dd end"] - f["dd start"]).astype("timedelta64[D]")
    return f


def std_annualized(returns, scale=None, expanding=0):
    scale = _resolve_periods_in_year(scale, returns)
    if expanding:
        return np.sqrt(scale) * pd.expanding_std(returns)
    else:
        return np.sqrt(scale) * returns.std()


def sharpe(returns, rfr=0, expanding=0):
    """
    returns: periodic return string
    rfr: risk free rate
    expanding: bool
    """
    if expanding:
        excess = excess_returns(returns, rfr)
        return pd.expanding_mean(excess) / pd.expanding_std(returns)
    else:
        return excess_returns(returns, rfr).mean() / returns.std()


def sharpe_annualized(returns, rfr_ann=0, scale=None, expanding=False, geometric=False):
    scale = _resolve_periods_in_year(scale, returns)
    stdann = std_annualized(returns, scale=scale, expanding=expanding)
    retsann = returns_annualized(
        returns, scale=scale, expanding=expanding, geometric=geometric
    )
    return (retsann - rfr_ann) / stdann


def downside_deviation(rets, mar=0, expanding=0, full=0, ann=0):
    """Compute the downside deviation for the specifed return series
    :param rets: periodic return series
    :param mar: minimum acceptable rate of return (MAR)
    :param full: If True, use the lenght of full series. If False, use only values below MAR
    :param expanding:
    :param ann: True if result should be annualized
    """
    below = rets[rets < mar]
    if expanding:
        n = pd.expanding_count(rets)[below.index] if full else pd.expanding_count(below)
        dd = np.sqrt(((below - mar) ** 2).cumsum() / n)
        if ann:
            dd *= np.sqrt(periods_in_year(rets))
        return dd.reindex(rets.index).ffill()
    else:
        n = rets.count() if full else below.count()
        dd = np.sqrt(((below - mar) ** 2).sum() / n)
        if ann:
            dd *= np.sqrt(periods_in_year(rets))
        return dd


def sortino_ratio(rets, rfr_ann=0, mar=0, full=0, expanding=0):
    """Compute the sortino ratio as (Ann Rets - Risk Free Rate) / Downside Deviation Ann

    :param rets: period return series
    :param rfr_ann: annualized risk free rate
    :param mar: minimum acceptable rate of return (MAR)
    :param full: If True, use the lenght of full series. If False, use only values below MAR
    :param expanding:
    :return:
    """
    annrets = returns_annualized(rets, expanding=expanding) - rfr_ann
    return annrets / downside_deviation(
        rets, mar=mar, expanding=expanding, full=full, ann=1
    )


def information_ratio(rets, bm_rets, scale=None, expanding=False):
    """Information ratio, a common measure of manager efficiency, evaluates excess returns over a benchmark
    versus tracking error.

    :param rets: period returns
    :param bm_rets: periodic benchmark returns (not annualized)
    :param scale: None or the scale to be used for annualization
    :param expanding:
    :return:
    """
    scale = _resolve_periods_in_year(scale, rets)
    rets_ann = returns_annualized(rets, scale=scale, expanding=expanding)
    bm_rets_ann = returns_annualized(rets, scale=scale, expanding=expanding)
    tracking_error_ann = std_annualized(
        (rets - bm_rets), scale=scale, expanding=expanding
    )
    return (rets_ann - bm_rets_ann) / tracking_error_ann


def upside_potential_ratio(rets, mar=0, full=0, expanding=0):
    if isinstance(rets, pd.Series):
        above = rets[rets > mar]
        excess = -mar + above
        if expanding:
            n = pd.expanding_count(rets) if full else pd.expanding_count(above)
            upside = excess.cumsum() / n
            downside = downside_deviation(rets, mar=mar, full=full, expanding=1)
            return (upside / downside).reindex(rets.index).fillna(method="ffill")
        else:
            n = rets.count() if full else above.count()
            upside = excess.sum() / n
            downside = downside_deviation(rets, mar=mar, full=full)
            return upside / downside
    else:
        vals = {
            c: upside_potential_ratio(rets[c], mar=mar, full=full, expanding=expanding)
            for c in rets.columns
        }
        if expanding:
            return pd.DataFrame(vals, columns=rets.columns)
        else:
            return pd.Series(vals)


@per_series()
def rolling_percentileofscore(series, window, min_periods=None):
    """Computue the score percentile for the specified window."""
    import scipy.stats as stats

    def _percentile(arr):
        score = arr[-1]
        vals = arr[:-1]
        return stats.percentileofscore(vals, score)

    notnull = series.dropna()
    min_periods = min_periods or window
    if notnull.empty:
        return pd.Series(np.nan, index=series.index)
    else:
        return pd.rolling_apply(
            notnull, window, _percentile, min_periods=min_periods
        ).reindex(series.index)


@per_series()
def expanding_percentileofscore(series, min_periods=None):
    import scipy.stats as stats

    def _percentile(arr):
        score = arr[-1]
        vals = arr[:-1]
        return stats.percentileofscore(vals, score)

    notnull = series.dropna()
    if notnull.empty:
        return pd.Series(np.nan, index=series.index)
    else:
        return pd.expanding_apply(
            notnull, _percentile, min_periods=min_periods
        ).reindex(series.index)


def hurst_exponent(px, lags=list(range(2, 100))):
    """
    describe the prices
    http://www.quantstart.com/articles/Basics-of-Statistical-Mean-Reversion-Testing
    H = the hurts exponent
    H < 0.5 - mean reverting
    H = 0.5 - geometric brownian motion (aka random)
    H > 0.5 - trending series
    """
    ts = px.reset_index(drop=True).dropna()
    # Calculate the array of the variances of the lagged differences
    tau = [np.sqrt(ts.diff(lag).std()) for lag in lags]
    # Use a linear fit to estimate the Hurst Exponent
    poly = np.polyfit(np.log(lags), np.log(tau), 1)
    # Return the Hurst exponent from the polyfit output
    return poly[0] * 2.0


def summarize_returns(
    period_rets, rollup="M", prefix=1, ret_method="compound", yearly=1, ltd=1
):
    # TODO - should be able to handle DateTimeIndex
    if not isinstance(period_rets.index, pd.PeriodIndex):
        raise Exception("expected periodic return series")

    def _analyze(rets):
        rets = rets.dropna()
        if rollup != rets.index.freqstr:
            if ret_method == "simple":
                resampled = rets.resample(rollup, "sum")
            elif ret_method == "compound":
                resampled = (1.0 + rets).resample(rollup, "prod") - 1.0
            else:
                raise Exception("ret_method must be on of [simple, compound]")
        else:
            resampled = rets

        rfreq = rollup.lower().replace("b", "d")
        pfreq = period_rets.index.freqstr.lower().replace("b", "d")
        fmap = {"d": "daily", "m": "monthly", "w": "weekly", "q": "quarterly"}
        rfreq = fmap.get(rfreq, pfreq)
        pfreq = fmap.get(pfreq, pfreq)

        pds_per_yr = periodicity(resampled)
        d = OrderedDict()
        d["{0}_ret_avg".format(rfreq)] = ret_avg = resampled.mean()
        d["{0}_ret_cum".format(rfreq)] = returns_cumulative(resampled)
        d["{0}_stdev".format(rfreq)] = resampled.std()
        d["{0}_stdev_ann".format(rfreq)] = std_ann = std_annualized(resampled)
        d["{0}_sharpe_ann".format(rfreq)] = pds_per_yr * ret_avg / std_ann
        d["{0}_sortino".format(rfreq)] = sortino_ratio(resampled, full=0)
        mdd = max_drawdown(rets, inc_date=1)
        d["{0}_maxdd".format(pfreq)] = mdd[0]
        d["{0}_maxdd_dt".format(pfreq)] = mdd[1]
        return d

    if isinstance(period_rets, pd.DataFrame):
        # Multi-indexed data frame needed to show multiple rows per year
        arr = []
        for c in period_rets:
            res = summarize_returns(
                period_rets[c], rollup=rollup, yearly=yearly, ltd=ltd
            )
            names = list(res.index.names)
            names.append(period_rets.index.name)
            res.index = pd.MultiIndex.from_arrays(
                [res.index, [c] * len(res.index)], names=names
            )
            arr.append(res)
        return pd.concat(arr).sortlevel()
    else:
        arrs = []
        if yearly:
            arrs.append(
                period_rets.groupby(lambda k: k.year)
                .apply(lambda tmp: _analyze(tmp.sort_index()))
                .unstack(1)
            )
        if ltd:
            arrs.append(pd.DataFrame(_analyze(period_rets), index=["LTD"]))
        summary = pd.concat(arrs)
        summary.index.name = "period"
        return summary