# -*- coding: utf-8 -*-
from collections import defaultdict
from functools import partial
import itertools
import operator
import re
import numpy as np
from pandas._libs import internals as libinternals, lib
from pandas.compat import map, range, zip
from pandas.util._validators import validate_bool_kwarg
from pandas.core.dtypes.cast import (
find_common_type, infer_dtype_from_scalar, maybe_convert_objects,
maybe_promote)
from pandas.core.dtypes.common import (
_NS_DTYPE, is_datetimelike_v_numeric, is_extension_array_dtype,
is_extension_type, is_list_like, is_numeric_v_string_like, is_scalar)
import pandas.core.dtypes.concat as _concat
from pandas.core.dtypes.generic import ABCExtensionArray, ABCSeries
from pandas.core.dtypes.missing import isna
import pandas.core.algorithms as algos
from pandas.core.arrays.sparse import _maybe_to_sparse
from pandas.core.base import PandasObject
from pandas.core.index import Index, MultiIndex, ensure_index
from pandas.core.indexing import maybe_convert_indices
from pandas.io.formats.printing import pprint_thing
from .blocks import (
Block, CategoricalBlock, DatetimeTZBlock, ExtensionBlock,
ObjectValuesExtensionBlock, _extend_blocks, _merge_blocks, _safe_reshape,
get_block_type, make_block)
from .concat import ( # all for concatenate_block_managers
combine_concat_plans, concatenate_join_units, get_mgr_concatenation_plan,
is_uniform_join_units)
# TODO: flexible with index=None and/or items=None
class BlockManager(PandasObject):
"""
Core internal data structure to implement DataFrame, Series, Panel, etc.
Manage a bunch of labeled 2D mixed-type ndarrays. Essentially it's a
lightweight blocked set of labeled data to be manipulated by the DataFrame
public API class
Attributes
----------
shape
ndim
axes
values
items
Methods
-------
set_axis(axis, new_labels)
copy(deep=True)
get_dtype_counts
get_ftype_counts
get_dtypes
get_ftypes
apply(func, axes, block_filter_fn)
get_bool_data
get_numeric_data
get_slice(slice_like, axis)
get(label)
iget(loc)
take(indexer, axis)
reindex_axis(new_labels, axis)
reindex_indexer(new_labels, indexer, axis)
delete(label)
insert(loc, label, value)
set(label, value)
Parameters
----------
Notes
-----
This is *not* a public API class
"""
__slots__ = ['axes', 'blocks', '_ndim', '_shape', '_known_consolidated',
'_is_consolidated', '_blknos', '_blklocs']
def __init__(self, blocks, axes, do_integrity_check=True):
self.axes = [ensure_index(ax) for ax in axes]
self.blocks = tuple(blocks)
for block in blocks:
if block.is_sparse:
if len(block.mgr_locs) != 1:
raise AssertionError("Sparse block refers to multiple "
"items")
else:
if self.ndim != block.ndim:
raise AssertionError(
'Number of Block dimensions ({block}) must equal '
'number of axes ({self})'.format(block=block.ndim,
self=self.ndim))
if do_integrity_check:
self._verify_integrity()
self._consolidate_check()
self._rebuild_blknos_and_blklocs()
def make_empty(self, axes=None):
""" return an empty BlockManager with the items axis of len 0 """
if axes is None:
axes = [ensure_index([])] + [ensure_index(a)
for a in self.axes[1:]]
# preserve dtype if possible
if self.ndim == 1:
blocks = np.array([], dtype=self.array_dtype)
else:
blocks = []
return self.__class__(blocks, axes)
def __nonzero__(self):
return True
# Python3 compat
__bool__ = __nonzero__
@property
def shape(self):
return tuple(len(ax) for ax in self.axes)
@property
def ndim(self):
return len(self.axes)
def set_axis(self, axis, new_labels):
new_labels = ensure_index(new_labels)
old_len = len(self.axes[axis])
new_len = len(new_labels)
if new_len != old_len:
raise ValueError(
'Length mismatch: Expected axis has {old} elements, new '
'values have {new} elements'.format(old=old_len, new=new_len))
self.axes[axis] = new_labels
def rename_axis(self, mapper, axis, copy=True, level=None):
"""
Rename one of axes.
Parameters
----------
mapper : unary callable
axis : int
copy : boolean, default True
level : int, default None
"""
obj = self.copy(deep=copy)
obj.set_axis(axis, _transform_index(self.axes[axis], mapper, level))
return obj
@property
def _is_single_block(self):
if self.ndim == 1:
return True
if len(self.blocks) != 1:
return False
blk = self.blocks[0]
return (blk.mgr_locs.is_slice_like and
blk.mgr_locs.as_slice == slice(0, len(self), 1))
def _rebuild_blknos_and_blklocs(self):
"""
Update mgr._blknos / mgr._blklocs.
"""
new_blknos = np.empty(self.shape[0], dtype=np.int64)
new_blklocs = np.empty(self.shape[0], dtype=np.int64)
new_blknos.fill(-1)
new_blklocs.fill(-1)
for blkno, blk in enumerate(self.blocks):
rl = blk.mgr_locs
new_blknos[rl.indexer] = blkno
new_blklocs[rl.indexer] = np.arange(len(rl))
if (new_blknos == -1).any():
raise AssertionError("Gaps in blk ref_locs")
self._blknos = new_blknos
self._blklocs = new_blklocs
@property
def items(self):
return self.axes[0]
def _get_counts(self, f):
""" return a dict of the counts of the function in BlockManager """
self._consolidate_inplace()
counts = dict()
for b in self.blocks:
v = f(b)
counts[v] = counts.get(v, 0) + b.shape[0]
return counts
def get_dtype_counts(self):
return self._get_counts(lambda b: b.dtype.name)
def get_ftype_counts(self):
return self._get_counts(lambda b: b.ftype)
def get_dtypes(self):
dtypes = np.array([blk.dtype for blk in self.blocks])
return algos.take_1d(dtypes, self._blknos, allow_fill=False)
def get_ftypes(self):
ftypes = np.array([blk.ftype for blk in self.blocks])
return algos.take_1d(ftypes, self._blknos, allow_fill=False)
def __getstate__(self):
block_values = [b.values for b in self.blocks]
block_items = [self.items[b.mgr_locs.indexer] for b in self.blocks]
axes_array = [ax for ax in self.axes]
extra_state = {
'0.14.1': {
'axes': axes_array,
'blocks': [dict(values=b.values, mgr_locs=b.mgr_locs.indexer)
for b in self.blocks]
}
}
# First three elements of the state are to maintain forward
# compatibility with 0.13.1.
return axes_array, block_values, block_items, extra_state
def __setstate__(self, state):
def unpickle_block(values, mgr_locs):
return make_block(values, placement=mgr_locs)
if (isinstance(state, tuple) and len(state) >= 4 and
'0.14.1' in state[3]):
state = state[3]['0.14.1']
self.axes = [ensure_index(ax) for ax in state['axes']]
self.blocks = tuple(unpickle_block(b['values'], b['mgr_locs'])
for b in state['blocks'])
else:
# discard anything after 3rd, support beta pickling format for a
# little while longer
ax_arrays, bvalues, bitems = state[:3]
self.axes = [ensure_index(ax) for ax in ax_arrays]
if len(bitems) == 1 and self.axes[0].equals(bitems[0]):
# This is a workaround for pre-0.14.1 pickles that didn't
# support unpickling multi-block frames/panels with non-unique
# columns/items, because given a manager with items ["a", "b",
# "a"] there's no way of knowing which block's "a" is where.
#
# Single-block case can be supported under the assumption that
# block items corresponded to manager items 1-to-1.
all_mgr_locs = [slice(0, len(bitems[0]))]
else:
all_mgr_locs = [self.axes[0].get_indexer(blk_items)
for blk_items in bitems]
self.blocks = tuple(
unpickle_block(values, mgr_locs)
for values, mgr_locs in zip(bvalues, all_mgr_locs))
self._post_setstate()
def _post_setstate(self):
self._is_consolidated = False
self._known_consolidated = False
self._rebuild_blknos_and_blklocs()
def __len__(self):
return len(self.items)
def __unicode__(self):
output = pprint_thing(self.__class__.__name__)
for i, ax in enumerate(self.axes):
if i == 0:
output += u'\nItems: {ax}'.format(ax=ax)
else:
output += u'\nAxis {i}: {ax}'.format(i=i, ax=ax)
for block in self.blocks:
output += u'\n{block}'.format(block=pprint_thing(block))
return output
def _verify_integrity(self):
mgr_shape = self.shape
tot_items = sum(len(x.mgr_locs) for x in self.blocks)
for block in self.blocks:
if block._verify_integrity and block.shape[1:] != mgr_shape[1:]:
construction_error(tot_items, block.shape[1:], self.axes)
if len(self.items) != tot_items:
raise AssertionError('Number of manager items must equal union of '
'block items\n# manager items: {0}, # '
'tot_items: {1}'.format(
len(self.items), tot_items))
def apply(self, f, axes=None, filter=None, do_integrity_check=False,
consolidate=True, **kwargs):
"""
iterate over the blocks, collect and create a new block manager
Parameters
----------
f : the callable or function name to operate on at the block level
axes : optional (if not supplied, use self.axes)
filter : list, if supplied, only call the block if the filter is in
the block
do_integrity_check : boolean, default False. Do the block manager
integrity check
consolidate: boolean, default True. Join together blocks having same
dtype
Returns
-------
Block Manager (new object)
"""
result_blocks = []
# filter kwarg is used in replace-* family of methods
if filter is not None:
filter_locs = set(self.items.get_indexer_for(filter))
if len(filter_locs) == len(self.items):
Loading ...