Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
tia / analysis / model / port.py
Size: Mime:
from collections import OrderedDict

import pandas as pd

from tia.analysis.model.interface import (
    CostCalculator,
    EodMarketData,
    PositionColumns as PC,
)
from tia.analysis.model.pos import Positions
from tia.analysis.model.ret import RoiiRetCalculator
from tia.analysis.model.txn import Txns
from tia.analysis.util import insert_level
from tia.util.decorator import lazy_property


__all__ = [
    "SingleAssetPortfolio",
    "PortfolioPricer",
    "PortfolioSubset",
    "PortfolioSummary",
]


class PortfolioPricer(CostCalculator, EodMarketData):
    def __init__(self, multiplier=1.0, closing_pxs=None, dvds=None):
        if not isinstance(closing_pxs, pd.Series):
            raise ValueError(
                "closing_pxs must be a Series not {0}".format(type(closing_pxs))
            )
        if dvds is not None and not isinstance(dvds, pd.Series):
            raise ValueError("dvds be a Series not {0}".format(type(dvds)))

        self._multiplier = multiplier
        self._dvds = dvds
        self._closing_pxs = closing_pxs

    multiplier = property(lambda self: self._multiplier)
    dvds = property(lambda self: self._dvds)

    def truncate(self, before=None, after=None):
        return PortfolioPricer(
            self.multiplier,
            self._closing_pxs.truncate(before=before, after=after),
            dvds=self._dvds,
        )

    def get_closing_pxs(self, start=None, end=None):
        pxs = self._closing_pxs
        if start or end:
            start = start or pxs.index[0]
            end = end or pxs.index[-1]
            pxs = pxs.ix[start:end]
        return pxs

    def get_mkt_val(self, pxs=None):
        """  return the market value series for the specified Series of pxs """
        pxs = self._closing_pxs if pxs is None else pxs
        return pxs * self.multiplier

    def get_premium(self, qty, px, ts=None):
        return -qty * px * self.multiplier

    def get_eod_frame(self):
        close = self.get_closing_pxs()
        mktval = self.get_mkt_val(close)
        dvds = self.dvds
        df = pd.DataFrame({"close": close, "mkt_val": mktval, "dvds": dvds})
        df.index.name = "date"
        # drop dvds outside the px range
        return df.truncate(before=close.index[0], after=close.index[-1])


class SingleAssetPortfolio(object):
    def __init__(self, pricer, trades, ret_calc=None):
        """
        :param pricer: PortfolioPricer
        :param trades: list of Trade objects
        """
        self.trades = tuple(trades)
        self.pricer = pricer
        self._ret_calc = ret_calc or RoiiRetCalculator()

    txns = lazy_property(
        lambda self: Txns(self.trades, self.pricer, self.ret_calc), "txns"
    )
    positions = lazy_property(lambda self: Positions(self.txns), "positions")
    pl = property(lambda self: self.txns.pl)
    performance = property(lambda self: self.txns.performance)

    # --------------------------------------------------
    # direct access to common attributes
    dly_pl = property(lambda self: self.pl.dly)
    monthly_pl = property(lambda self: self.pl.monthly)
    dly_rets = property(lambda self: self.performance.dly)
    monthly_rets = property(lambda self: self.performance.monthly)

    @property
    def ret_calc(self):
        return self._ret_calc

    @ret_calc.setter
    def ret_calc(self, calc):
        self._ret_calc = calc
        if hasattr(self, "_txns"):
            self.txns.ret_calc = calc

    def clear_cache(self):
        for attr in ["_txns", "_positions", "_long", "_short"]:
            if hasattr(self, attr):
                delattr(self, attr)

    def subset(self, pids):
        txns = self.txns
        stxns = txns.subset(pids)
        if stxns == txns:  # return same object
            return self
        else:
            # TODO: rethink logic - maybe split trades (l/s) in Portfolio constructor as now
            # passing split trades back to portfolio subset
            port = SingleAssetPortfolio(
                self.pricer, stxns.trades, ret_calc=self.ret_calc
            )
            port._txns = stxns
            if hasattr(self, "_positions"):
                port._positions = self.positions.subset(stxns)
            return port

    @lazy_property
    def long(self):
        return PortfolioSubset.longs(self)

    @lazy_property
    def short(self):
        return PortfolioSubset.shorts(self)

    winner = property(lambda self: PortfolioSubset.winners(self))
    loser = property(lambda self: PortfolioSubset.losers(self))

    def buy_and_hold(
        self, qty=1.0, start_dt=None, end_dt=None, start_px=None, end_px=None
    ):
        """Construct a portfolio which opens a position with size qty at start (or first data in pricer) and
        continues to the specified end date. It uses the end of day market prices defined by the pricer
        (or prices supplied)

        :param qty:
        :param start: datetime
        :param end: datetime
        :param which: which price series to use for inital trade px
        :param ret_cacls: portfolio return calculator
        :return: SingleAssetPortfolio
        """
        from tia.analysis.model.trd import TradeBlotter

        eod = self.pricer.get_eod_frame().close

        start_dt = start_dt and pd.to_datetime(start_dt) or eod.index[0]
        start_px = start_px or eod.asof(start_dt)
        end_dt = end_dt and pd.to_datetime(end_dt) or eod.index[-1]
        end_px = end_px or eod.asof(end_dt)

        pricer = self.pricer.trunace(start_dt, end_dt)
        blotter = TradeBlotter()
        blotter.ts = start_dt
        blotter.open(qty, start_px)
        blotter.ts = end_dt
        blotter.close(end_px)
        trds = blotter.trades
        return SingleAssetPortfolio(pricer, trds, ret_calc=self.ret_calc)


class PortfolioSubset(object):
    @staticmethod
    def longs(port):
        return port.subset(port.positions.long_pids)

    @staticmethod
    def shorts(port):
        return port.subset(port.positions.short_pids)

    @staticmethod
    def winners(port):
        frame = port.positions.frame
        pids = frame[frame[PC.PL] >= 0].index
        return port.subset(pids)

    @staticmethod
    def losers(port):
        frame = port.positions.frame
        pids = frame[frame[PC.PL] < 0].index
        return port.subset(pids)

    @staticmethod
    def top_pl(port, n=10):
        pids = port.positions.frame[PC.PL].order()[-n:].index
        return port.subset(pids)

    @staticmethod
    def top_rets(port, n=10):
        pids = port.positions.frame[PC.RET].order()[-n:].index
        return port.subset(pids)

    @staticmethod
    def bottom_pl(port, n=10):
        pids = port.positions.frame[PC.PL].order()[:n].index
        return port.subset(pids)

    @staticmethod
    def bottom_rets(port, n=10):
        pids = port.positions.frame[PC.RET].order()[:n].index
        return port.subset(pids)

    @staticmethod
    def top_durations(port, n=10):
        pids = port.positions.frame[PC.DURATION].order()[-n:].index
        return port.subset(pids)

    @staticmethod
    def bottom_durations(port, n=10):
        pids = port.positions.frame[PC.DURATION].order()[:n].index
        return port.subset(pids)


class PortfolioSummary(object):
    def __init__(self):
        self.total_key = "All"
        self.iter_fcts = []

    def __call__(self, port, analyze_fct=None):
        """analyze_fct: fct(port) which can return Series, or map of key to Series. If key to series, then
        the key is used as an additional index value.

        :param port: Portfolio or dict of key->Portfolio
        :param analyze_fct:
        :return:
        """
        iter_fcts = self.iter_fcts
        lvls = len(iter_fcts)

        analyze_fct = self.analyze_returns if analyze_fct is None else analyze_fct

        def _iter_all_lvls(lvl, keys, parent, results):
            if lvl < (lvls - 1):
                # exhaust combinations
                for key, child in iter_fcts[lvl](parent):
                    _iter_all_lvls(lvl + 1, keys + [key], child, results)
            else:
                # at the bottom
                for key, child in iter_fcts[lvl](parent):
                    idx_names = ["lvl{0}".format(i + 1) for i in range(lvls)]
                    idx_vals = [[k] for k in keys + [key]]
                    idx = pd.MultiIndex.from_arrays(idx_vals, names=idx_names)
                    res = analyze_fct(child)
                    if isinstance(res, pd.Series):
                        res = res.to_frame().T
                        res.index = idx
                        results.append(res)
                    else:
                        for k, v in res.items():
                            # prepend current levels to key name
                            v = v.to_frame().T
                            idx = pd.MultiIndex.from_arrays(
                                idx_vals + [k], names=idx_names + ["lvl%s" % lvls]
                            )
                            v.index = idx
                            results.append(v)

        if lvls == 0:

            def _get_res(p):
                res = analyze_fct(p)
                return res.to_frame().T if isinstance(res, pd.Series) else res

            if hasattr(port, "iteritems"):
                pieces = []
                for k, p in port.items():
                    res = _get_res(p)
                    defidx = res.index.nlevels == 1 and (res.index == 0).all()
                    res = insert_level(res, k, axis=1, level_name="lvl1")
                    if defidx:
                        res.index = res.index.droplevel(1)
                    pieces.append(res)
                return pd.concat(pieces)
            else:
                return _get_res(port)
        else:
            if hasattr(port, "iteritems"):
                pieces = []
                for k, p in port.items():
                    results = []
                    _iter_all_lvls(0, [], p, results)
                    tmp = pd.concat(results)
                    tmp.index.names = [
                        "lvl%s" % (i + 2) for i in range(len(tmp.index.names))
                    ]
                    tmp = insert_level(tmp, k, level_name="lvl1", axis=1)
                    pieces.append(tmp)
                return pd.concat(pieces)
            else:
                results = []
                _iter_all_lvls(0, [], port, results)
                return pd.concat(results)

    def add_iter_fct(self, siter):
        self.iter_fcts.append(siter)
        return self

    def include_win_loss(self, total=1):
        def _split_port(port):
            if total:
                yield self.total_key, port
            yield "winner", PortfolioSubset.winners(port)
            yield "loser", PortfolioSubset.losers(port)

        self.add_iter_fct(_split_port)
        return self

    def include_long_short(self, total=1):
        def _split_port(port):
            if total:
                yield self.total_key, port
            yield "long", port.long
            yield "short", port.short

        self.add_iter_fct(_split_port)
        return self

    @staticmethod
    def analyze_returns(port):
        monthly = port.performance.monthly_details
        dly = port.performance.dly_details
        stats = port.positions.stats
        data = OrderedDict()
        data[("port", "ltd ann")] = monthly.ltd_ann
        data[("port", "mret avg")] = monthly.mean
        data[("port", "mret avg ann")] = monthly.mean_ann
        data[("port", "mret std ann")] = monthly.std_ann
        data[("port", "sharpe ann")] = monthly.sharpe_ann
        data[("port", "sortino")] = monthly.sortino
        data[("port", "maxdd")] = dly.maxdd
        data[("port", "maxdd dt")] = dly.maxdd_dt
        data[("port", "avg dd")] = dly.dd_avg
        data[("port", "nmonths")] = monthly.cnt
        # pos data
        data[("pos", "cnt")] = stats.cnt
        data[("pos", "win cnt")] = stats.win_cnt
        data[("pos", "lose cnt")] = stats.lose_cnt
        data[("pos", "winpct")] = stats.win_pct
        data[("pos", "ret avg")] = stats.ret_avg
        data[("pos", "ret std")] = stats.ret_std
        data[("pos", "ret min")] = stats.ret_min
        data[("pos", "ret max")] = stats.ret_max
        data[("pos", "dur avg")] = stats.duration_avg
        data[("pos", "dur max")] = stats.duration_max
        return pd.Series(data, index=pd.MultiIndex.from_tuples(list(data.keys())))

    @staticmethod
    def analyze_pl(port):
        monthly = port.pl.monthly_details
        dstats = port.pl.dly_details
        stats = port.positions.stats
        data = OrderedDict()
        data[("port", "ltd")] = monthly.ltd_frame.pl.iloc[-1]
        data[("port", "mpl avg")] = monthly.mean
        data[("port", "mpl std")] = monthly.std
        data[("port", "mpl std ann")] = monthly.std_ann
        data[("port", "mpl max")] = monthly.frame.pl.max()
        data[("port", "mpl min")] = monthly.frame.pl.min()
        data[("port", "maxdd")] = dstats.maxdd
        data[("port", "maxdd dt")] = dstats.maxdd_dt
        data[("port", "avg dd")] = dstats.dd_avg
        data[("port", "nmonths")] = monthly.cnt
        # pos data
        data[("pos", "cnt")] = stats.cnt
        data[("pos", "win cnt")] = stats.win_cnt
        data[("pos", "lose cnt")] = stats.lose_cnt
        data[("pos", "winpct")] = stats.win_pct
        data[("pos", "pl avg")] = stats.pl_avg
        data[("pos", "pl std")] = stats.pl_std
        data[("pos", "pl min")] = stats.pl_min
        data[("pos", "pl max")] = stats.pl_max
        return pd.Series(data, index=pd.MultiIndex.from_tuples(list(data.keys())))