# -*- coding: utf-8 -*-
# TODO: Needs a better name; too many modules are already called "concat"
from collections import defaultdict
import copy
import numpy as np
from pandas._libs import internals as libinternals, tslibs
from pandas.util._decorators import cache_readonly
from pandas.core.dtypes.cast import maybe_promote
from pandas.core.dtypes.common import (
_get_dtype, is_categorical_dtype, is_datetime64_dtype,
is_datetime64tz_dtype, is_extension_array_dtype, is_float_dtype,
is_numeric_dtype, is_sparse, is_timedelta64_dtype)
import pandas.core.dtypes.concat as _concat
from pandas.core.dtypes.missing import isna
import pandas.core.algorithms as algos
def get_mgr_concatenation_plan(mgr, indexers):
"""
Construct concatenation plan for given block manager and indexers.
Parameters
----------
mgr : BlockManager
indexers : dict of {axis: indexer}
Returns
-------
plan : list of (BlockPlacement, JoinUnit) tuples
"""
# Calculate post-reindex shape , save for item axis which will be separate
# for each block anyway.
mgr_shape = list(mgr.shape)
for ax, indexer in indexers.items():
mgr_shape[ax] = len(indexer)
mgr_shape = tuple(mgr_shape)
if 0 in indexers:
ax0_indexer = indexers.pop(0)
blknos = algos.take_1d(mgr._blknos, ax0_indexer, fill_value=-1)
blklocs = algos.take_1d(mgr._blklocs, ax0_indexer, fill_value=-1)
else:
if mgr._is_single_block:
blk = mgr.blocks[0]
return [(blk.mgr_locs, JoinUnit(blk, mgr_shape, indexers))]
ax0_indexer = None
blknos = mgr._blknos
blklocs = mgr._blklocs
plan = []
for blkno, placements in libinternals.get_blkno_placements(blknos,
mgr.nblocks,
group=False):
assert placements.is_slice_like
join_unit_indexers = indexers.copy()
shape = list(mgr_shape)
shape[0] = len(placements)
shape = tuple(shape)
if blkno == -1:
unit = JoinUnit(None, shape)
else:
blk = mgr.blocks[blkno]
ax0_blk_indexer = blklocs[placements.indexer]
unit_no_ax0_reindexing = (len(placements) == len(blk.mgr_locs) and
# Fastpath detection of join unit not
# needing to reindex its block: no ax0
# reindexing took place and block
# placement was sequential before.
((ax0_indexer is None and
blk.mgr_locs.is_slice_like and
blk.mgr_locs.as_slice.step == 1) or
# Slow-ish detection: all indexer locs
# are sequential (and length match is
# checked above).
(np.diff(ax0_blk_indexer) == 1).all()))
# Omit indexer if no item reindexing is required.
if unit_no_ax0_reindexing:
join_unit_indexers.pop(0, None)
else:
join_unit_indexers[0] = ax0_blk_indexer
unit = JoinUnit(blk, shape, join_unit_indexers)
plan.append((placements, unit))
return plan
class JoinUnit(object):
def __init__(self, block, shape, indexers=None):
# Passing shape explicitly is required for cases when block is None.
if indexers is None:
indexers = {}
self.block = block
self.indexers = indexers
self.shape = shape
def __repr__(self):
return '{name}({block!r}, {indexers})'.format(
name=self.__class__.__name__, block=self.block,
indexers=self.indexers)
@cache_readonly
def needs_filling(self):
for indexer in self.indexers.values():
# FIXME: cache results of indexer == -1 checks.
if (indexer == -1).any():
return True
return False
@cache_readonly
def dtype(self):
if self.block is None:
raise AssertionError("Block is None, no dtype")
if not self.needs_filling:
return self.block.dtype
else:
return _get_dtype(maybe_promote(self.block.dtype,
self.block.fill_value)[0])
@cache_readonly
def is_na(self):
if self.block is None:
return True
if not self.block._can_hold_na:
return False
# Usually it's enough to check but a small fraction of values to see if
# a block is NOT null, chunks should help in such cases. 1000 value
# was chosen rather arbitrarily.
values = self.block.values
if self.block.is_categorical:
values_flat = values.categories
elif is_sparse(self.block.values.dtype):
return False
elif self.block.is_extension:
values_flat = values
else:
values_flat = values.ravel(order='K')
total_len = values_flat.shape[0]
chunk_len = max(total_len // 40, 1000)
for i in range(0, total_len, chunk_len):
if not isna(values_flat[i:i + chunk_len]).all():
return False
return True
def get_reindexed_values(self, empty_dtype, upcasted_na):
if upcasted_na is None:
# No upcasting is necessary
fill_value = self.block.fill_value
values = self.block.get_values()
else:
fill_value = upcasted_na
if self.is_na:
if getattr(self.block, 'is_object', False):
# we want to avoid filling with np.nan if we are
# using None; we already know that we are all
# nulls
values = self.block.values.ravel(order='K')
if len(values) and values[0] is None:
fill_value = None
if (getattr(self.block, 'is_datetimetz', False) or
is_datetime64tz_dtype(empty_dtype)):
if self.block is None:
array = empty_dtype.construct_array_type()
return array(np.full(self.shape[1], fill_value.value),
dtype=empty_dtype)
pass
elif getattr(self.block, 'is_categorical', False):
pass
elif getattr(self.block, 'is_sparse', False):
pass
elif getattr(self.block, 'is_extension', False):
pass
else:
missing_arr = np.empty(self.shape, dtype=empty_dtype)
missing_arr.fill(fill_value)
return missing_arr
if not self.indexers:
if not self.block._can_consolidate:
# preserve these for validation in _concat_compat
return self.block.values
if self.block.is_bool and not self.block.is_categorical:
# External code requested filling/upcasting, bool values must
# be upcasted to object to avoid being upcasted to numeric.
values = self.block.astype(np.object_).values
elif self.block.is_extension:
values = self.block.values
else:
# No dtype upcasting is done here, it will be performed during
# concatenation itself.
values = self.block.get_values()
if not self.indexers:
# If there's no indexing to be done, we want to signal outside
# code that this array must be copied explicitly. This is done
# by returning a view and checking `retval.base`.
values = values.view()
else:
for ax, indexer in self.indexers.items():
values = algos.take_nd(values, indexer, axis=ax,
fill_value=fill_value)
return values
def concatenate_join_units(join_units, concat_axis, copy):
"""
Concatenate values from several join units along selected axis.
"""
if concat_axis == 0 and len(join_units) > 1:
# Concatenating join units along ax0 is handled in _merge_blocks.
raise AssertionError("Concatenating join units along axis0")
empty_dtype, upcasted_na = get_empty_dtype_and_na(join_units)
to_concat = [ju.get_reindexed_values(empty_dtype=empty_dtype,
upcasted_na=upcasted_na)
for ju in join_units]
if len(to_concat) == 1:
# Only one block, nothing to concatenate.
concat_values = to_concat[0]
if copy:
if isinstance(concat_values, np.ndarray):
# non-reindexed (=not yet copied) arrays are made into a view
# in JoinUnit.get_reindexed_values
if concat_values.base is not None:
concat_values = concat_values.copy()
else:
concat_values = concat_values.copy()
else:
concat_values = _concat._concat_compat(to_concat, axis=concat_axis)
return concat_values
def get_empty_dtype_and_na(join_units):
"""
Return dtype and N/A values to use when concatenating specified units.
Returned N/A value may be None which means there was no casting involved.
Returns
-------
dtype
na
"""
if len(join_units) == 1:
blk = join_units[0].block
if blk is None:
return np.float64, np.nan
if is_uniform_reindex(join_units):
# XXX: integrate property
empty_dtype = join_units[0].block.dtype
upcasted_na = join_units[0].block.fill_value
return empty_dtype, upcasted_na
has_none_blocks = False
dtypes = [None] * len(join_units)
for i, unit in enumerate(join_units):
if unit.block is None:
has_none_blocks = True
else:
dtypes[i] = unit.dtype
upcast_classes = defaultdict(list)
null_upcast_classes = defaultdict(list)
for dtype, unit in zip(dtypes, join_units):
if dtype is None:
continue
if is_categorical_dtype(dtype):
upcast_cls = 'category'
elif is_datetime64tz_dtype(dtype):
upcast_cls = 'datetimetz'
elif issubclass(dtype.type, np.bool_):
upcast_cls = 'bool'
elif issubclass(dtype.type, np.object_):
upcast_cls = 'object'
elif is_datetime64_dtype(dtype):
upcast_cls = 'datetime'
elif is_timedelta64_dtype(dtype):
upcast_cls = 'timedelta'
elif is_sparse(dtype):
upcast_cls = dtype.subtype.name
elif is_extension_array_dtype(dtype):
upcast_cls = 'object'
elif is_float_dtype(dtype) or is_numeric_dtype(dtype):
upcast_cls = dtype.name
else:
upcast_cls = 'float'
# Null blocks should not influence upcast class selection, unless there
# are only null blocks, when same upcasting rules must be applied to
# null upcast classes.
if unit.is_na:
null_upcast_classes[upcast_cls].append(dtype)
else:
upcast_classes[upcast_cls].append(dtype)
if not upcast_classes:
upcast_classes = null_upcast_classes
# create the result
if 'object' in upcast_classes:
return np.dtype(np.object_), np.nan
elif 'bool' in upcast_classes:
if has_none_blocks:
return np.dtype(np.object_), np.nan
else:
return np.dtype(np.bool_), None
elif 'category' in upcast_classes:
return np.dtype(np.object_), np.nan
elif 'datetimetz' in upcast_classes:
# GH-25014. We use NaT instead of iNaT, since this eventually
# ends up in DatetimeArray.take, which does not allow iNaT.
dtype = upcast_classes['datetimetz']
return dtype[0], tslibs.NaT
elif 'datetime' in upcast_classes:
return np.dtype('M8[ns]'), tslibs.iNaT
Loading ...