Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tia / analysis / model / ret.py
Size: Mime:
from collections import OrderedDict

import pandas as pd
import numpy as np

from tia.util.decorator import lazy_property
from tia.analysis.model.interface import TxnPlColumns as TPL
from tia.analysis.perf import (
    drawdown_info,
    drawdowns,
    guess_freq,
    downside_deviation,
    periodicity,
)
from tia.analysis.plots import plot_return_on_dollar
from tia.util.mplot import AxesFormat
from tia.util.fmt import PercentFormatter, new_percent_formatter, new_float_formatter


__all__ = [
    "RoiiRetCalculator",
    "AumRetCalculator",
    "FixedAumRetCalculator",
    "CumulativeRets",
    "Performance",
]


def return_on_initial_capital(capital, period_pl, leverage=None):
    """Return the daily return series based on the capital"""
    if capital <= 0:
        raise ValueError("cost must be a positive number not %s" % capital)
    leverage = leverage or 1.0
    eod = capital + (leverage * period_pl.cumsum())
    ltd_rets = (eod / capital) - 1.0
    dly_rets = ltd_rets
    dly_rets.iloc[1:] = (1.0 + ltd_rets).pct_change().iloc[1:]
    return dly_rets


class RetCalculator(object):
    def compute(self, txns):
        raise NotImplementedError()


class RoiiRetCalculator(RetCalculator):
    def __init__(self, leverage=None):
        """
        :param leverage: {None, scalar, Series}, number to scale the position returns
        :return:
        """
        get_lev = None
        if leverage is None:
            pass
        elif np.isscalar(leverage):
            if leverage <= 0:
                raise ValueError(
                    "leverage must be a positive non-zero number, not %s" % leverage
                )
            else:
                get_lev = lambda ts: leverage
        elif isinstance(leverage, pd.Series):
            get_lev = lambda ts: leverage.asof(ts)
        else:
            raise ValueError(
                "leverage must be {None, positive scalar, Datetime/Period indexed Series} not %s"
                % type(leverage)
            )

        self.leverage = leverage
        self.get_lev = get_lev

    def compute(self, txns):
        txnpl = txns.pl.txn_frame
        txnrets = pd.Series(0, index=txnpl.index, name="ret")
        get_lev = self.get_lev
        for pid, pframe in txnpl[[TPL.OPEN_VAL, TPL.PID, TPL.PL, TPL.DT]].groupby(
            TPL.PID
        ):
            if pid != 0:
                cost = abs(pframe[TPL.OPEN_VAL].iloc[0])
                ppl = pframe[TPL.PL]
                lev = None if get_lev is None else get_lev(pframe[TPL.DT].iloc[0])
                ret = return_on_initial_capital(cost, ppl, lev)
                txnrets[ppl.index] = ret

        txnrets.index = txnpl[TPL.DT]
        crets = CumulativeRets(txnrets)
        return Performance(crets)


class FixedAumRetCalculator(RetCalculator):
    def __init__(self, aum, reset_freq="M"):
        self.aum = aum
        self.reset_freq = reset_freq
        # capture what cash flows would be needed on reset date to reset the aum
        self.external_cash_flows = None

    def compute(self, txns):
        ltd = txns.pl.ltd_txn
        grouper = pd.TimeGrouper(self.reset_freq)
        period_rets = pd.Series(np.nan, index=ltd.index)
        aum = self.aum
        at = 0
        cf = OrderedDict()
        for key, grp in ltd.groupby(grouper):
            if grp.empty:
                continue
            eod = aum + grp
            sod = eod.shift(1)
            sod.iloc[0] = aum
            period_rets.iloc[at : at + len(grp.index)] = eod / sod - 1.0
            at += len(grp.index)
            # get aum back to fixed amount
            cf[key] = eod.iloc[-1] - aum
        self.external_cash_flows = pd.Series(cf)
        crets = CumulativeRets(period_rets)
        return Performance(crets)


class AumRetCalculator(RetCalculator):
    def __init__(self, starting_aum, freq="M"):
        self.starting_aum = starting_aum
        self.freq = freq
        self.txn_aum = None

    def compute(self, txns):
        ltd = txns.pl.ltd_txn
        grouper = pd.TimeGrouper(self.freq)
        period_rets = pd.Series(np.nan, index=ltd.index)
        self.txn_aum = txn_aum = pd.Series(np.nan, index=ltd.index)
        sop = self.starting_aum
        at = 0
        for key, grp in ltd.groupby(grouper):
            if grp.empty:
                continue
            eod = sop + grp
            sod = eod.shift(1)
            sod.iloc[0] = sop
            period_rets.iloc[at : at + len(grp.index)] = eod / sod - 1.0
            txn_aum.iloc[at : at + len(grp.index)] = sod
            at += len(grp.index)
            sop = eod.iloc[-1]
        crets = CumulativeRets(period_rets)
        return Performance(crets)


class CumulativeRets(object):
    def __init__(self, rets=None, ltd_rets=None):
        if rets is None and ltd_rets is None:
            raise ValueError("rets or ltd_rets must be specified")

        if rets is None:
            if ltd_rets.empty:
                rets = ltd_rets
            else:
                rets = (1.0 + ltd_rets).pct_change()
                rets.iloc[0] = ltd_rets.iloc[0]

        if ltd_rets is None:
            if rets.empty:
                ltd_rets = rets
            else:
                ltd_rets = (1.0 + rets).cumprod() - 1.0

        self.rets = rets
        self.ltd_rets = ltd_rets

    pds_per_year = property(lambda self: periodicity(self.rets))

    def asfreq(self, freq):
        other_pds_per_year = periodicity(freq)
        if self.pds_per_year < other_pds_per_year:
            msg = "Cannot downsample returns. Cannot convert from %s periods/year to %s"
            raise ValueError(msg % (self.pds_per_year, other_pds_per_year))

        if freq == "B":
            rets = (1.0 + self.rets).groupby(self.rets.index.date).apply(
                lambda s: s.prod()
            ) - 1.0
            # If you do not do this, it will be an object index
            rets.index = pd.DatetimeIndex([i for i in rets.index])
            return CumulativeRets(rets)
        else:
            rets = (1.0 + self.rets).resample(freq, how="prod") - 1.0
            return CumulativeRets(rets)

    # -----------------------------------------------------------
    # Resampled data
    dly = lazy_property(lambda self: self.asfreq("B"), "dly")
    weekly = lazy_property(lambda self: self.asfreq("W"), "weekly")
    monthly = lazy_property(lambda self: self.asfreq("M"), "monthly")
    quarterly = lazy_property(lambda self: self.asfreq("Q"), "quarterly")
    annual = lazy_property(lambda self: self.asfreq("A"), "annual")

    # -----------------------------------------------------------
    # Basic Metrics
    @lazy_property
    def ltd_rets_ann(self):
        return (1.0 + self.ltd_rets) ** (
            self.pds_per_year / pd.expanding_count(self.rets)
        ) - 1.0

    cnt = property(lambda self: self.rets.notnull().astype(int).sum())
    mean = lazy_property(lambda self: self.rets.mean(), "avg")
    mean_ann = lazy_property(lambda self: self.mean * self.pds_per_year, "avg_ann")
    ltd = lazy_property(lambda self: self.ltd_rets.iloc[-1], name="ltd")
    ltd_ann = lazy_property(lambda self: self.ltd_rets_ann.iloc[-1], name="ltd_ann")
    std = lazy_property(lambda self: self.rets.std(), "std")
    std_ann = lazy_property(
        lambda self: self.std * np.sqrt(self.pds_per_year), "std_ann"
    )
    drawdown_info = lazy_property(
        lambda self: drawdown_info(self.rets), "drawdown_info"
    )
    drawdowns = lazy_property(lambda self: drawdowns(self.rets), "drawdowns")
    maxdd = lazy_property(lambda self: self.drawdown_info["maxdd"].min(), "maxdd")
    dd_avg = lazy_property(lambda self: self.drawdown_info["maxdd"].mean(), "dd_avg")
    kurtosis = lazy_property(lambda self: self.rets.kurtosis(), "kurtosis")
    skew = lazy_property(lambda self: self.rets.skew(), "skew")

    sharpe_ann = lazy_property(
        lambda self: np.divide(self.ltd_ann, self.std_ann), "sharpe_ann"
    )
    downside_deviation = lazy_property(
        lambda self: downside_deviation(self.rets, mar=0, full=0, ann=1),
        "downside_deviation",
    )
    sortino = lazy_property(
        lambda self: self.ltd_ann / self.downside_deviation, "sortino"
    )

    @lazy_property
    def maxdd_dt(self):
        ddinfo = self.drawdown_info
        if ddinfo.empty:
            return None
        else:
            return self.drawdown_info["maxdd dt"].ix[
                self.drawdown_info["maxdd"].idxmin()
            ]

    # -----------------------------------------------------------
    # Expanding metrics
    expanding_mean = property(
        lambda self: pd.expanding_mean(self.rets), "expanding_avg"
    )
    expanding_mean_ann = property(
        lambda self: self.expanding_mean * self.pds_per_year, "expanding_avg_ann"
    )
    expanding_std = lazy_property(
        lambda self: pd.expanding_std(self.rets), "expanding_std"
    )
    expanding_std_ann = lazy_property(
        lambda self: self.expanding_std * np.sqrt(self.pds_per_year),
        "expanding_std_ann",
    )
    expanding_sharpe_ann = property(
        lambda self: np.divide(self.ltd_rets_ann, self.expanding_std_ann)
    )

    # -----------------------------------------------------------
    # Rolling metrics
    rolling_mean = property(lambda self: pd.rolling_mean(self.rets), "rolling_avg")
    rolling_mean_ann = property(
        lambda self: self.rolling_mean * self.pds_per_year, "rolling_avg_ann"
    )

    def rolling_ltd_rets(self, n):
        return pd.rolling_apply(self.rets, n, lambda s: (1.0 + s).prod() - 1.0)

    def rolling_ltd_rets_ann(self, n):
        tot = self.rolling_ltd_rets(n)
        return tot ** (self.pds_per_year / n)

    def rolling_std(self, n):
        return pd.rolling_std(self.rets, n)

    def rolling_std_ann(self, n):
        return self.rolling_std(n) * np.sqrt(self.pds_per_year)

    def rolling_sharpe_ann(self, n):
        return self.rolling_ltd_rets_ann(n) / self.rolling_std_ann(n)

    def iter_by_year(self):
        """Split the return objects by year and iterate"""
        for key, grp in self.rets.groupby(lambda x: x.year):
            yield key, CumulativeRets(rets=grp)

    def truncate(self, before=None, after=None):
        rets = self.rets.truncate(before=before, after=after)
        return CumulativeRets(rets=rets)

    @lazy_property
    def summary(self):
        d = OrderedDict()
        d["ltd"] = self.ltd
        d["ltd ann"] = self.ltd_ann
        d["mean"] = self.mean
        d["mean ann"] = self.mean_ann
        d["std"] = self.std
        d["std ann"] = self.std_ann
        d["sharpe ann"] = self.sharpe_ann
        d["sortino"] = self.sortino
        d["maxdd"] = self.maxdd
        d["maxdd dt"] = self.maxdd_dt
        d["dd avg"] = self.dd_avg
        d["cnt"] = self.cnt
        return pd.Series(d, name=self.rets.index.freq or guess_freq(self.rets.index))

    def _repr_html_(self):
        from tia.util.fmt import new_dynamic_formatter

        fmt = new_dynamic_formatter(
            method="row", precision=2, pcts=1, trunc_dot_zeros=1, parens=1
        )
        df = self.summary.to_frame()
        return fmt(df)._repr_html_()

    def get_alpha_beta(self, bm_rets):
        if isinstance(bm_rets, pd.Series):
            bm = CumulativeRets(bm_rets)
        elif isinstance(bm_rets, CumulativeRets):
            bm = bm_rets
        else:
            raise ValueError(
                "bm_rets must be series or CumulativeRetPerformace not %s"
                % (type(bm_rets))
            )

        bm_freq = guess_freq(bm_rets)
        if self.pds_per_year != bm.pds_per_year:
            tgt = {
                "B": "dly",
                "W": "weekly",
                "M": "monthly",
                "Q": "quarterly",
                "A": "annual",
            }.get(bm_freq, None)
            if tgt is None:
                raise ValueError(
                    "No mapping for handling benchmark with frequency: %s" % bm_freq
                )
            tmp = getattr(self, tgt)
            y = tmp.rets
            y_ann = tmp.ltd_ann
        else:
            y = self.rets
            y_ann = self.ltd_ann

        x = bm.rets.truncate(y.index[0], y.index[-1])
        x_ann = bm.ltd_ann

        model = pd.ols(x=x, y=y)
        beta = model.beta[0]
        alpha = y_ann - beta * x_ann
        return pd.Series({"alpha": alpha, "beta": beta}, name=bm_freq)

    def plot_ltd(
        self, ax=None, style="k", label="ltd", show_dd=1, title=True, legend=1
    ):
        ltd = self.ltd_rets
        ax = ltd.plot(ax=ax, style=style, label=label)
        if show_dd:
            dd = self.drawdowns
            dd.plot(style="r", label="drawdowns", alpha=0.5, ax=ax)
            ax.fill_between(dd.index, 0, dd.values, facecolor="red", alpha=0.25)
            fmt = PercentFormatter

            AxesFormat().Y.percent().X.label("").apply(ax)
            legend and ax.legend(loc="upper left", prop={"size": 12})

            # show the actualy date and value
            mdt, mdd = self.maxdd_dt, self.maxdd
            bbox_props = dict(boxstyle="round", fc="w", ec="0.5", alpha=0.25)
            try:
                dtstr = "{0}".format(mdt.to_period())
            except:
                # assume daily
                dtstr = "{0}".format(hasattr(mdt, "date") and mdt.date() or mdt)
            ax.text(
                mdt,
                dd[mdt],
                "{1} \n {0}".format(fmt(mdd), dtstr).strip(),
                ha="center",
                va="top",
                size=8,
                bbox=bbox_props,
            )

        if title is True:
            pf = new_percent_formatter(1, parens=False, trunc_dot_zeros=True)
            ff = new_float_formatter(precision=1, parens=False, trunc_dot_zeros=True)
            total = pf(self.ltd_ann)
            vol = pf(self.std_ann)
            sh = ff(self.sharpe_ann)
            mdd = pf(self.maxdd)
            title = (
                "ret$\mathregular{_{ann}}$ %s     vol$\mathregular{_{ann}}$ %s     sharpe %s     maxdd %s"
                % (total, vol, sh, mdd)
            )

        title and ax.set_title(title, fontdict=dict(fontsize=10, fontweight="bold"))
        return ax

    def plot_ret_on_dollar(
        self,
        title=None,
        show_maxdd=1,
        figsize=None,
        ax=None,
        append=0,
        label=None,
        **plot_args
    ):
        plot_return_on_dollar(
            self.rets,
            title=title,
            show_maxdd=show_maxdd,
            figsize=figsize,
            ax=ax,
            append=append,
            label=label,
            **plot_args
        )

    def plot_hist(self, ax=None, **histplot_kwargs):
        pf = new_percent_formatter(precision=1, parens=False, trunc_dot_zeros=1)
        ff = new_float_formatter(precision=1, parens=False, trunc_dot_zeros=1)

        ax = self.rets.hist(ax=ax, **histplot_kwargs)
        AxesFormat().X.percent(1).apply(ax)
        m, s, sk, ku = pf(self.mean), pf(self.std), ff(self.skew), ff(self.kurtosis)
        txt = (
            "$\mathregular{\mu}$=%s   $\mathregular{\sigma}$=%s   skew=%s   kurt=%s"
            % (m, s, sk, ku)
        )
        bbox = dict(facecolor="white", alpha=0.5)
        ax.text(
            0,
            1,
            txt,
            fontdict={"fontweight": "bold"},
            bbox=bbox,
            ha="left",
            va="top",
            transform=ax.transAxes,
        )
        return ax

    def filter(self, mask, keep_ltd=0):
        if isinstance(mask, pd.Series):
            mask = mask.values
        rets = self.rets.ix[mask]
        ltd = None
        if keep_ltd:
            ltd = self.ltd_rets.ix[mask]
        return CumulativeRets(rets=rets, ltd_rets=ltd)


class Performance(object):
    def __init__(self, txn_rets):
        if isinstance(txn_rets, pd.Series):
            txn_rets = CumulativeRets(txn_rets)
        self.txn_details = txn_rets

    txn = property(lambda self: self.txn_details.rets)
    ltd_txn = property(lambda self: self.txn_details.ltd_rets)

    dly_details = lazy_property(lambda self: self.txn_details.dly, "dly_details")
    dly = property(lambda self: self.dly_details.rets)
    ltd_dly = property(lambda self: self.dly_details.ltd_rets)
    ltd_dly_ann = property(lambda self: self.dly_details.ltd_rets_ann)

    weekly_details = lazy_property(
        lambda self: self.txn_details.weekly, "weekly_details"
    )
    weekly = property(lambda self: self.weekly_details.rets)
    ltd_weekly = property(lambda self: self.weekly_details.ltd_rets)
    ltd_weekly_ann = property(lambda self: self.weekly_details.ltd_rets_ann)

    monthly_details = lazy_property(
        lambda self: self.txn_details.monthly, "monthly_details"
    )
    monthly = property(lambda self: self.monthly_details.rets)
    ltd_monthly = property(lambda self: self.monthly_details.ltd_rets)
    ltd_monthly_ann = property(lambda self: self.monthly_details.ltd_rets_ann)

    quarterly_details = lazy_property(
        lambda self: self.txn_details.quarterly, "quarterly_details"
    )
    quarterly = property(lambda self: self.quarterly_details.rets)
    ltd_quarterly = property(lambda self: self.quarterly_details.ltd_rets)
    ltd_quarterly_ann = property(lambda self: self.quarterly_details.ltd_rets_ann)

    annual_details = lazy_property(
        lambda self: self.txn_details.annual, "annual_details"
    )
    annual = property(lambda self: self.annual_details.rets)
    ltd_annual = property(lambda self: self.annual_details.ltd_rets)
    ltd_annual_ann = property(lambda self: self.annual_details.ltd_rets_ann)

    def iter_by_year(self):
        """Split the return objects by year and iterate"""
        for yr, details in self.txn_details.iter_by_year():
            yield yr, Performance(details)

    def filter(self, txn_mask):
        details = self.txn_details.filter(txn_mask)
        return Performance(details)

    def truncate(self, before=None, after=None):
        details = self.txn_details.truncate(before, after)
        return Performance(details)

    def report_by_year(
        self,
        summary_fct=None,
        years=None,
        ltd=1,
        prior_n_yrs=None,
        first_n_yrs=None,
        ranges=None,
        bm_rets=None,
    ):
        """Summary the returns
        :param summary_fct: function(Rets) and returns a dict or Series
        :param years: int, array, boolean or None. If boolean and False, then show no years. If int or array
                      show only those years, else show all years if None
        :param ltd: include live to date summary
        :param prior_n_years: integer or list. Include summary for N years of return data prior to end date
        :param first_n_years: integer or list. Include summary for N years of return data after start date
        :param ranges: list of ranges. The range consists of a year start and year end
        :param dm_dly_rets: daily return series for the benchmark for beta/alpha calcs
        :return: DataFrame
        """
        if years and np.isscalar(years):
            years = [years]

        if summary_fct is None:

            def summary_fct(performance):
                monthly = performance.monthly_details
                dly = performance.dly_details
                data = OrderedDict()
                data["ltd ann"] = monthly.ltd_ann
                data["mret avg"] = monthly.mean
                data["mret std ann"] = monthly.std_ann
                data["sharpe ann"] = monthly.sharpe_ann
                data["sortino"] = monthly.sortino
                data["maxdd"] = dly.maxdd
                data["maxdd dt"] = dly.maxdd_dt
                if bm_rets is not None:
                    abseries = performance.get_alpha_beta(bm_rets)
                    prefix = {"weekly": "wkly ", "monthly": "mret "}.get(
                        abseries.name, abseries.name
                    )
                    data["{0}beta".format(prefix)] = abseries["beta"]
                    data["{0}alpha".format(prefix)] = abseries["alpha"]
                data["avg dd"] = dly.dd_avg
                data["best month"] = monthly.rets.max()
                data["worst month"] = monthly.rets.min()
                data["nmonths"] = monthly.cnt
                return data

        results = OrderedDict()

        if years is not False:
            for yr, robj in self.iter_by_year():
                if years is None or yr in years:
                    results[yr] = summary_fct(robj)

        # First n years
        if first_n_yrs:
            first_n_yrs = first_n_yrs if not np.isscalar(first_n_yrs) else [first_n_yrs]
            for first in first_n_yrs:
                after = "12/31/%s" % (self.dly.index[0].year + first)
                firstN = self.truncate(after=after)
                results["first {0}yrs".format(first)] = summary_fct(firstN)

        # Ranges
        if ranges:
            for range in ranges:
                yr_start, yr_end = range
                rng_rets = self.truncate("1/1/%s" % yr_start, "12/31/%s" % yr_end)
                results["{0}-{1}".format(yr_start, yr_end)] = summary_fct(rng_rets)

        # Prior n years
        if prior_n_yrs:
            prior_n_yrs = prior_n_yrs if not np.isscalar(prior_n_yrs) else [prior_n_yrs]
            for prior in prior_n_yrs:
                before = "1/1/%s" % (self.dly.index[-1].year - prior)
                priorN = self.truncate(before)
                results["past {0}yrs".format(prior)] = summary_fct(priorN)

        # LTD
        if ltd:
            results["ltd"] = summary_fct(self)

        return pd.DataFrame(results, index=list(results.values())[0].keys()).T