Repository URL to install this package:
|
Version:
0.3.2 ▾
|
from collections import OrderedDict
import pandas as pd
import numpy as np
from tia.util.decorator import lazy_property
from tia.analysis.model.interface import TxnPlColumns as TPL
from tia.analysis.perf import (
drawdown_info,
drawdowns,
guess_freq,
downside_deviation,
periodicity,
)
from tia.analysis.plots import plot_return_on_dollar
from tia.util.mplot import AxesFormat
from tia.util.fmt import PercentFormatter, new_percent_formatter, new_float_formatter
__all__ = [
"RoiiRetCalculator",
"AumRetCalculator",
"FixedAumRetCalculator",
"CumulativeRets",
"Performance",
]
def return_on_initial_capital(capital, period_pl, leverage=None):
"""Return the daily return series based on the capital"""
if capital <= 0:
raise ValueError("cost must be a positive number not %s" % capital)
leverage = leverage or 1.0
eod = capital + (leverage * period_pl.cumsum())
ltd_rets = (eod / capital) - 1.0
dly_rets = ltd_rets
dly_rets.iloc[1:] = (1.0 + ltd_rets).pct_change().iloc[1:]
return dly_rets
class RetCalculator(object):
def compute(self, txns):
raise NotImplementedError()
class RoiiRetCalculator(RetCalculator):
def __init__(self, leverage=None):
"""
:param leverage: {None, scalar, Series}, number to scale the position returns
:return:
"""
get_lev = None
if leverage is None:
pass
elif np.isscalar(leverage):
if leverage <= 0:
raise ValueError(
"leverage must be a positive non-zero number, not %s" % leverage
)
else:
get_lev = lambda ts: leverage
elif isinstance(leverage, pd.Series):
get_lev = lambda ts: leverage.asof(ts)
else:
raise ValueError(
"leverage must be {None, positive scalar, Datetime/Period indexed Series} not %s"
% type(leverage)
)
self.leverage = leverage
self.get_lev = get_lev
def compute(self, txns):
txnpl = txns.pl.txn_frame
txnrets = pd.Series(0, index=txnpl.index, name="ret")
get_lev = self.get_lev
for pid, pframe in txnpl[[TPL.OPEN_VAL, TPL.PID, TPL.PL, TPL.DT]].groupby(
TPL.PID
):
if pid != 0:
cost = abs(pframe[TPL.OPEN_VAL].iloc[0])
ppl = pframe[TPL.PL]
lev = None if get_lev is None else get_lev(pframe[TPL.DT].iloc[0])
ret = return_on_initial_capital(cost, ppl, lev)
txnrets[ppl.index] = ret
txnrets.index = txnpl[TPL.DT]
crets = CumulativeRets(txnrets)
return Performance(crets)
class FixedAumRetCalculator(RetCalculator):
def __init__(self, aum, reset_freq="M"):
self.aum = aum
self.reset_freq = reset_freq
# capture what cash flows would be needed on reset date to reset the aum
self.external_cash_flows = None
def compute(self, txns):
ltd = txns.pl.ltd_txn
grouper = pd.TimeGrouper(self.reset_freq)
period_rets = pd.Series(np.nan, index=ltd.index)
aum = self.aum
at = 0
cf = OrderedDict()
for key, grp in ltd.groupby(grouper):
if grp.empty:
continue
eod = aum + grp
sod = eod.shift(1)
sod.iloc[0] = aum
period_rets.iloc[at : at + len(grp.index)] = eod / sod - 1.0
at += len(grp.index)
# get aum back to fixed amount
cf[key] = eod.iloc[-1] - aum
self.external_cash_flows = pd.Series(cf)
crets = CumulativeRets(period_rets)
return Performance(crets)
class AumRetCalculator(RetCalculator):
def __init__(self, starting_aum, freq="M"):
self.starting_aum = starting_aum
self.freq = freq
self.txn_aum = None
def compute(self, txns):
ltd = txns.pl.ltd_txn
grouper = pd.TimeGrouper(self.freq)
period_rets = pd.Series(np.nan, index=ltd.index)
self.txn_aum = txn_aum = pd.Series(np.nan, index=ltd.index)
sop = self.starting_aum
at = 0
for key, grp in ltd.groupby(grouper):
if grp.empty:
continue
eod = sop + grp
sod = eod.shift(1)
sod.iloc[0] = sop
period_rets.iloc[at : at + len(grp.index)] = eod / sod - 1.0
txn_aum.iloc[at : at + len(grp.index)] = sod
at += len(grp.index)
sop = eod.iloc[-1]
crets = CumulativeRets(period_rets)
return Performance(crets)
class CumulativeRets(object):
def __init__(self, rets=None, ltd_rets=None):
if rets is None and ltd_rets is None:
raise ValueError("rets or ltd_rets must be specified")
if rets is None:
if ltd_rets.empty:
rets = ltd_rets
else:
rets = (1.0 + ltd_rets).pct_change()
rets.iloc[0] = ltd_rets.iloc[0]
if ltd_rets is None:
if rets.empty:
ltd_rets = rets
else:
ltd_rets = (1.0 + rets).cumprod() - 1.0
self.rets = rets
self.ltd_rets = ltd_rets
pds_per_year = property(lambda self: periodicity(self.rets))
def asfreq(self, freq):
other_pds_per_year = periodicity(freq)
if self.pds_per_year < other_pds_per_year:
msg = "Cannot downsample returns. Cannot convert from %s periods/year to %s"
raise ValueError(msg % (self.pds_per_year, other_pds_per_year))
if freq == "B":
rets = (1.0 + self.rets).groupby(self.rets.index.date).apply(
lambda s: s.prod()
) - 1.0
# If you do not do this, it will be an object index
rets.index = pd.DatetimeIndex([i for i in rets.index])
return CumulativeRets(rets)
else:
rets = (1.0 + self.rets).resample(freq, how="prod") - 1.0
return CumulativeRets(rets)
# -----------------------------------------------------------
# Resampled data
dly = lazy_property(lambda self: self.asfreq("B"), "dly")
weekly = lazy_property(lambda self: self.asfreq("W"), "weekly")
monthly = lazy_property(lambda self: self.asfreq("M"), "monthly")
quarterly = lazy_property(lambda self: self.asfreq("Q"), "quarterly")
annual = lazy_property(lambda self: self.asfreq("A"), "annual")
# -----------------------------------------------------------
# Basic Metrics
@lazy_property
def ltd_rets_ann(self):
return (1.0 + self.ltd_rets) ** (
self.pds_per_year / pd.expanding_count(self.rets)
) - 1.0
cnt = property(lambda self: self.rets.notnull().astype(int).sum())
mean = lazy_property(lambda self: self.rets.mean(), "avg")
mean_ann = lazy_property(lambda self: self.mean * self.pds_per_year, "avg_ann")
ltd = lazy_property(lambda self: self.ltd_rets.iloc[-1], name="ltd")
ltd_ann = lazy_property(lambda self: self.ltd_rets_ann.iloc[-1], name="ltd_ann")
std = lazy_property(lambda self: self.rets.std(), "std")
std_ann = lazy_property(
lambda self: self.std * np.sqrt(self.pds_per_year), "std_ann"
)
drawdown_info = lazy_property(
lambda self: drawdown_info(self.rets), "drawdown_info"
)
drawdowns = lazy_property(lambda self: drawdowns(self.rets), "drawdowns")
maxdd = lazy_property(lambda self: self.drawdown_info["maxdd"].min(), "maxdd")
dd_avg = lazy_property(lambda self: self.drawdown_info["maxdd"].mean(), "dd_avg")
kurtosis = lazy_property(lambda self: self.rets.kurtosis(), "kurtosis")
skew = lazy_property(lambda self: self.rets.skew(), "skew")
sharpe_ann = lazy_property(
lambda self: np.divide(self.ltd_ann, self.std_ann), "sharpe_ann"
)
downside_deviation = lazy_property(
lambda self: downside_deviation(self.rets, mar=0, full=0, ann=1),
"downside_deviation",
)
sortino = lazy_property(
lambda self: self.ltd_ann / self.downside_deviation, "sortino"
)
@lazy_property
def maxdd_dt(self):
ddinfo = self.drawdown_info
if ddinfo.empty:
return None
else:
return self.drawdown_info["maxdd dt"].ix[
self.drawdown_info["maxdd"].idxmin()
]
# -----------------------------------------------------------
# Expanding metrics
expanding_mean = property(
lambda self: pd.expanding_mean(self.rets), "expanding_avg"
)
expanding_mean_ann = property(
lambda self: self.expanding_mean * self.pds_per_year, "expanding_avg_ann"
)
expanding_std = lazy_property(
lambda self: pd.expanding_std(self.rets), "expanding_std"
)
expanding_std_ann = lazy_property(
lambda self: self.expanding_std * np.sqrt(self.pds_per_year),
"expanding_std_ann",
)
expanding_sharpe_ann = property(
lambda self: np.divide(self.ltd_rets_ann, self.expanding_std_ann)
)
# -----------------------------------------------------------
# Rolling metrics
rolling_mean = property(lambda self: pd.rolling_mean(self.rets), "rolling_avg")
rolling_mean_ann = property(
lambda self: self.rolling_mean * self.pds_per_year, "rolling_avg_ann"
)
def rolling_ltd_rets(self, n):
return pd.rolling_apply(self.rets, n, lambda s: (1.0 + s).prod() - 1.0)
def rolling_ltd_rets_ann(self, n):
tot = self.rolling_ltd_rets(n)
return tot ** (self.pds_per_year / n)
def rolling_std(self, n):
return pd.rolling_std(self.rets, n)
def rolling_std_ann(self, n):
return self.rolling_std(n) * np.sqrt(self.pds_per_year)
def rolling_sharpe_ann(self, n):
return self.rolling_ltd_rets_ann(n) / self.rolling_std_ann(n)
def iter_by_year(self):
"""Split the return objects by year and iterate"""
for key, grp in self.rets.groupby(lambda x: x.year):
yield key, CumulativeRets(rets=grp)
def truncate(self, before=None, after=None):
rets = self.rets.truncate(before=before, after=after)
return CumulativeRets(rets=rets)
@lazy_property
def summary(self):
d = OrderedDict()
d["ltd"] = self.ltd
d["ltd ann"] = self.ltd_ann
d["mean"] = self.mean
d["mean ann"] = self.mean_ann
d["std"] = self.std
d["std ann"] = self.std_ann
d["sharpe ann"] = self.sharpe_ann
d["sortino"] = self.sortino
d["maxdd"] = self.maxdd
d["maxdd dt"] = self.maxdd_dt
d["dd avg"] = self.dd_avg
d["cnt"] = self.cnt
return pd.Series(d, name=self.rets.index.freq or guess_freq(self.rets.index))
def _repr_html_(self):
from tia.util.fmt import new_dynamic_formatter
fmt = new_dynamic_formatter(
method="row", precision=2, pcts=1, trunc_dot_zeros=1, parens=1
)
df = self.summary.to_frame()
return fmt(df)._repr_html_()
def get_alpha_beta(self, bm_rets):
if isinstance(bm_rets, pd.Series):
bm = CumulativeRets(bm_rets)
elif isinstance(bm_rets, CumulativeRets):
bm = bm_rets
else:
raise ValueError(
"bm_rets must be series or CumulativeRetPerformace not %s"
% (type(bm_rets))
)
bm_freq = guess_freq(bm_rets)
if self.pds_per_year != bm.pds_per_year:
tgt = {
"B": "dly",
"W": "weekly",
"M": "monthly",
"Q": "quarterly",
"A": "annual",
}.get(bm_freq, None)
if tgt is None:
raise ValueError(
"No mapping for handling benchmark with frequency: %s" % bm_freq
)
tmp = getattr(self, tgt)
y = tmp.rets
y_ann = tmp.ltd_ann
else:
y = self.rets
y_ann = self.ltd_ann
x = bm.rets.truncate(y.index[0], y.index[-1])
x_ann = bm.ltd_ann
model = pd.ols(x=x, y=y)
beta = model.beta[0]
alpha = y_ann - beta * x_ann
return pd.Series({"alpha": alpha, "beta": beta}, name=bm_freq)
def plot_ltd(
self, ax=None, style="k", label="ltd", show_dd=1, title=True, legend=1
):
ltd = self.ltd_rets
ax = ltd.plot(ax=ax, style=style, label=label)
if show_dd:
dd = self.drawdowns
dd.plot(style="r", label="drawdowns", alpha=0.5, ax=ax)
ax.fill_between(dd.index, 0, dd.values, facecolor="red", alpha=0.25)
fmt = PercentFormatter
AxesFormat().Y.percent().X.label("").apply(ax)
legend and ax.legend(loc="upper left", prop={"size": 12})
# show the actualy date and value
mdt, mdd = self.maxdd_dt, self.maxdd
bbox_props = dict(boxstyle="round", fc="w", ec="0.5", alpha=0.25)
try:
dtstr = "{0}".format(mdt.to_period())
except:
# assume daily
dtstr = "{0}".format(hasattr(mdt, "date") and mdt.date() or mdt)
ax.text(
mdt,
dd[mdt],
"{1} \n {0}".format(fmt(mdd), dtstr).strip(),
ha="center",
va="top",
size=8,
bbox=bbox_props,
)
if title is True:
pf = new_percent_formatter(1, parens=False, trunc_dot_zeros=True)
ff = new_float_formatter(precision=1, parens=False, trunc_dot_zeros=True)
total = pf(self.ltd_ann)
vol = pf(self.std_ann)
sh = ff(self.sharpe_ann)
mdd = pf(self.maxdd)
title = (
"ret$\mathregular{_{ann}}$ %s vol$\mathregular{_{ann}}$ %s sharpe %s maxdd %s"
% (total, vol, sh, mdd)
)
title and ax.set_title(title, fontdict=dict(fontsize=10, fontweight="bold"))
return ax
def plot_ret_on_dollar(
self,
title=None,
show_maxdd=1,
figsize=None,
ax=None,
append=0,
label=None,
**plot_args
):
plot_return_on_dollar(
self.rets,
title=title,
show_maxdd=show_maxdd,
figsize=figsize,
ax=ax,
append=append,
label=label,
**plot_args
)
def plot_hist(self, ax=None, **histplot_kwargs):
pf = new_percent_formatter(precision=1, parens=False, trunc_dot_zeros=1)
ff = new_float_formatter(precision=1, parens=False, trunc_dot_zeros=1)
ax = self.rets.hist(ax=ax, **histplot_kwargs)
AxesFormat().X.percent(1).apply(ax)
m, s, sk, ku = pf(self.mean), pf(self.std), ff(self.skew), ff(self.kurtosis)
txt = (
"$\mathregular{\mu}$=%s $\mathregular{\sigma}$=%s skew=%s kurt=%s"
% (m, s, sk, ku)
)
bbox = dict(facecolor="white", alpha=0.5)
ax.text(
0,
1,
txt,
fontdict={"fontweight": "bold"},
bbox=bbox,
ha="left",
va="top",
transform=ax.transAxes,
)
return ax
def filter(self, mask, keep_ltd=0):
if isinstance(mask, pd.Series):
mask = mask.values
rets = self.rets.ix[mask]
ltd = None
if keep_ltd:
ltd = self.ltd_rets.ix[mask]
return CumulativeRets(rets=rets, ltd_rets=ltd)
class Performance(object):
def __init__(self, txn_rets):
if isinstance(txn_rets, pd.Series):
txn_rets = CumulativeRets(txn_rets)
self.txn_details = txn_rets
txn = property(lambda self: self.txn_details.rets)
ltd_txn = property(lambda self: self.txn_details.ltd_rets)
dly_details = lazy_property(lambda self: self.txn_details.dly, "dly_details")
dly = property(lambda self: self.dly_details.rets)
ltd_dly = property(lambda self: self.dly_details.ltd_rets)
ltd_dly_ann = property(lambda self: self.dly_details.ltd_rets_ann)
weekly_details = lazy_property(
lambda self: self.txn_details.weekly, "weekly_details"
)
weekly = property(lambda self: self.weekly_details.rets)
ltd_weekly = property(lambda self: self.weekly_details.ltd_rets)
ltd_weekly_ann = property(lambda self: self.weekly_details.ltd_rets_ann)
monthly_details = lazy_property(
lambda self: self.txn_details.monthly, "monthly_details"
)
monthly = property(lambda self: self.monthly_details.rets)
ltd_monthly = property(lambda self: self.monthly_details.ltd_rets)
ltd_monthly_ann = property(lambda self: self.monthly_details.ltd_rets_ann)
quarterly_details = lazy_property(
lambda self: self.txn_details.quarterly, "quarterly_details"
)
quarterly = property(lambda self: self.quarterly_details.rets)
ltd_quarterly = property(lambda self: self.quarterly_details.ltd_rets)
ltd_quarterly_ann = property(lambda self: self.quarterly_details.ltd_rets_ann)
annual_details = lazy_property(
lambda self: self.txn_details.annual, "annual_details"
)
annual = property(lambda self: self.annual_details.rets)
ltd_annual = property(lambda self: self.annual_details.ltd_rets)
ltd_annual_ann = property(lambda self: self.annual_details.ltd_rets_ann)
def iter_by_year(self):
"""Split the return objects by year and iterate"""
for yr, details in self.txn_details.iter_by_year():
yield yr, Performance(details)
def filter(self, txn_mask):
details = self.txn_details.filter(txn_mask)
return Performance(details)
def truncate(self, before=None, after=None):
details = self.txn_details.truncate(before, after)
return Performance(details)
def report_by_year(
self,
summary_fct=None,
years=None,
ltd=1,
prior_n_yrs=None,
first_n_yrs=None,
ranges=None,
bm_rets=None,
):
"""Summary the returns
:param summary_fct: function(Rets) and returns a dict or Series
:param years: int, array, boolean or None. If boolean and False, then show no years. If int or array
show only those years, else show all years if None
:param ltd: include live to date summary
:param prior_n_years: integer or list. Include summary for N years of return data prior to end date
:param first_n_years: integer or list. Include summary for N years of return data after start date
:param ranges: list of ranges. The range consists of a year start and year end
:param dm_dly_rets: daily return series for the benchmark for beta/alpha calcs
:return: DataFrame
"""
if years and np.isscalar(years):
years = [years]
if summary_fct is None:
def summary_fct(performance):
monthly = performance.monthly_details
dly = performance.dly_details
data = OrderedDict()
data["ltd ann"] = monthly.ltd_ann
data["mret avg"] = monthly.mean
data["mret std ann"] = monthly.std_ann
data["sharpe ann"] = monthly.sharpe_ann
data["sortino"] = monthly.sortino
data["maxdd"] = dly.maxdd
data["maxdd dt"] = dly.maxdd_dt
if bm_rets is not None:
abseries = performance.get_alpha_beta(bm_rets)
prefix = {"weekly": "wkly ", "monthly": "mret "}.get(
abseries.name, abseries.name
)
data["{0}beta".format(prefix)] = abseries["beta"]
data["{0}alpha".format(prefix)] = abseries["alpha"]
data["avg dd"] = dly.dd_avg
data["best month"] = monthly.rets.max()
data["worst month"] = monthly.rets.min()
data["nmonths"] = monthly.cnt
return data
results = OrderedDict()
if years is not False:
for yr, robj in self.iter_by_year():
if years is None or yr in years:
results[yr] = summary_fct(robj)
# First n years
if first_n_yrs:
first_n_yrs = first_n_yrs if not np.isscalar(first_n_yrs) else [first_n_yrs]
for first in first_n_yrs:
after = "12/31/%s" % (self.dly.index[0].year + first)
firstN = self.truncate(after=after)
results["first {0}yrs".format(first)] = summary_fct(firstN)
# Ranges
if ranges:
for range in ranges:
yr_start, yr_end = range
rng_rets = self.truncate("1/1/%s" % yr_start, "12/31/%s" % yr_end)
results["{0}-{1}".format(yr_start, yr_end)] = summary_fct(rng_rets)
# Prior n years
if prior_n_yrs:
prior_n_yrs = prior_n_yrs if not np.isscalar(prior_n_yrs) else [prior_n_yrs]
for prior in prior_n_yrs:
before = "1/1/%s" % (self.dly.index[-1].year - prior)
priorN = self.truncate(before)
results["past {0}yrs".format(prior)] = summary_fct(priorN)
# LTD
if ltd:
results["ltd"] = summary_fct(self)
return pd.DataFrame(results, index=list(results.values())[0].keys()).T