Repository URL to install this package:
|
Version:
0.1.7 ▾
|
deep-forest
/
_io.py
|
|---|
"""
Implement methods on dumping and loading large objects using joblib. This
class is designed to support the partial mode in deep forest.
"""
__all__ = ["Buffer"]
import os
import shutil
import warnings
import tempfile
from joblib import load, dump
class Buffer(object):
"""
The class of dumping and loading large array objects including the data
and estimators.
Parameters
----------
partial_mode : bool
- If ``True``, a temporary buffer on the local disk is created to
cache objects such as data and estimators.
- If ``False``, all objects are directly stored in memory without
extra processing.
store_est : bool, default=True
Whether to cache the estimators to the local buffer.
store_pred : bool, default=True
Whether to cache the predictor to the local buffer.
store_data : bool, default=False
Whether to cache the intermediate data to the local buffer.
"""
def __init__(
self,
use_buffer,
buffer_dir=None,
store_est=True,
store_pred=True,
store_data=False,
):
self.use_buffer = use_buffer
self.store_est = store_est and use_buffer
self.store_pred = store_pred and use_buffer
self.store_data = store_data and use_buffer
self.buffer_dir = os.getcwd() if buffer_dir is None else buffer_dir
# Create buffer
if self.use_buffer:
self.buffer = tempfile.TemporaryDirectory(
prefix="buffer_", dir=self.buffer_dir
)
if store_data:
self.data_dir_ = tempfile.mkdtemp(
prefix="data_", dir=self.buffer.name
)
if store_est or store_pred:
self.model_dir_ = tempfile.mkdtemp(
prefix="model_", dir=self.buffer.name
)
self.pred_dir_ = os.path.join(self.model_dir_, "predictor.est")
@property
def name(self):
"""Return the buffer name."""
if self.use_buffer:
return self.buffer.name
else:
return None
def cache_data(self, layer_idx, X, is_training_data=True):
"""
When ``X`` is a large array, it is not recommended to directly pass the
array to all processors because the array will be copied multiple
times and cause extra overheads. Instead, dumping the array to the
local buffer and reading it as the ``numpy.memmap`` mode across
processors is able to speed up the training and evaluating process.
Parameters
----------
layer_idx : int
The index of the cascade layer that utilizes ``X``.
X : ndarray of shape (n_samples, n_features)
The training / testing data to be cached.
is_training_data : bool, default=True
Whether ``X`` is the training data.
Returns
-------
X: {ndarray, ndarray in numpy.memmap mode}
- If ``self.store_data`` is ``True``, return the memory-mapped
object of `X` cached to the local buffer.
- If ``self.store_data`` is ``False``, return the original ``X``.
"""
if not self.store_data:
return X
if is_training_data:
cache_dir = os.path.join(
self.data_dir_, "joblib_train_{}.mmap".format(layer_idx)
)
# Delete
if os.path.exists(cache_dir):
os.unlink(cache_dir)
else:
cache_dir = os.path.join(
self.data_dir_, "joblib_test_{}.mmap".format(layer_idx)
)
# Delete
if os.path.exists(cache_dir):
os.unlink(cache_dir)
# Dump and reload data in the numpy.memmap mode
dump(X, cache_dir)
X_mmap = load(cache_dir, mmap_mode="r+")
return X_mmap
def cache_estimator(self, layer_idx, est_idx, est_name, est):
"""
Dumping the fitted estimator to the buffer is highly recommended,
especially when the python version is below 3.8. When the size of
estimator is large, for instance, several gigabytes in the memory,
sending it back from each processor will cause the struct error.
Reference:
https://bugs.python.org/issue17560
Parameters
----------
layer_idx : int
The index of the cascade layer that contains the estimator to be
cached.
est_idx : int
The index of the estimator in the cascade layer to be cached.
est_name : {"rf", "erf", "custom"}
The name of the estimator to be cached.
est : object
The object of base estimator.
Returns
-------
cache_dir : {string, object}
- If ``self.store_est`` is ``True``, return the absolute path to
the location of the cached estimator.
- If ``self.store_est`` is ``False``, return the estimator.
"""
if not self.store_est:
return est
filename = "{}-{}-{}.est".format(layer_idx, est_idx, est_name)
cache_dir = os.path.join(self.model_dir_, filename)
dump(est, cache_dir)
return cache_dir
def cache_predictor(self, predictor):
"""
Please refer to `cache_estimator`.
Parameters
----------
predictor : object
The object of the predictor.
Returns
-------
pred_dir : {string, object}
- If ``self.store_pred`` is ``True``, return the absolute path to
the location of the cached predictor.
- If ``self.store_pred`` is ``False``, return the predictor.
"""
if not self.store_pred:
return predictor
dump(predictor, self.pred_dir_)
return self.pred_dir_
def load_estimator(self, estimator_path):
if not os.path.exists(estimator_path):
msg = "Missing estimator in the path: {}."
raise FileNotFoundError(msg.format(estimator_path))
estimator = load(estimator_path)
return estimator
def load_predictor(self, predictor):
# Since this function is always called from `cascade.py`, the input
# `predictor` could be the actual predictor object. If so, this
# function will directly return the predictor.
if not isinstance(predictor, str):
return predictor
if not os.path.exists(predictor):
msg = "Missing predictor in the path: {}."
raise FileNotFoundError(msg.format(predictor))
predictor = load(predictor)
return predictor
def del_estimator(self, layer_idx):
"""Used for the early stopping stage in deep forest."""
for est_name in os.listdir(self.model_dir_):
if est_name.startswith(str(layer_idx)):
try:
os.unlink(os.path.join(self.model_dir_, est_name))
except OSError:
msg = (
"Permission denied when deleting the dumped"
" estimators during the early stopping stage."
)
warnings.warn(msg, RuntimeWarning)
def close(self):
"""Clean up the buffer."""
try:
self.buffer.cleanup()
except OSError:
msg = "Permission denied when cleaning up the local buffer."
warnings.warn(msg, RuntimeWarning)
def model_mkdir(dirname):
"""Make the directory for saving the model."""
if os.path.isdir(dirname):
msg = "The directory to be created already exists {}."
raise RuntimeError(msg.format(dirname))
os.mkdir(dirname)
os.mkdir(os.path.join(dirname, "estimator"))
def model_saveobj(dirname, obj_type, obj, partial_mode=False):
"""Save objects of the deep forest according to the specified type."""
if not os.path.isdir(dirname):
msg = "Cannot find the target directory: {}. Please create it first."
raise RuntimeError(msg.format(dirname))
if obj_type in ("param", "binner"):
if not isinstance(obj, dict):
msg = "{} to be saved should be in the form of dict."
raise RuntimeError(msg.format(obj_type))
dump(obj, os.path.join(dirname, "{}.pkl".format(obj_type)))
elif obj_type == "layer":
if not isinstance(obj, dict):
msg = "The layer to be saved should be in the form of dict."
raise RuntimeError(msg)
est_path = os.path.join(dirname, "estimator")
if not os.path.isdir(est_path):
msg = "Cannot find the target directory: {}."
raise RuntimeError(msg.format(est_path))
# If `partial_mode` is True, each base estimator in the model is the
# path to the dumped estimator, and we only need to move it to the
# target directory.
if partial_mode:
for _, layer in obj.items():
for estimator_key, estimator in layer.estimators_.items():
dest = os.path.join(est_path, estimator_key + ".est")
shutil.move(estimator, dest)
# Otherwise, we directly use `joblib.dump` to save the estimator to
# the target directory.
else:
for _, layer in obj.items():
for estimator_key, estimator in layer.estimators_.items():
dest = os.path.join(est_path, estimator_key + ".est")
dump(estimator, dest)
elif obj_type == "predictor":
pred_path = os.path.join(dirname, "estimator", "predictor.est")
# Same as `layer`
if partial_mode:
shutil.move(obj, pred_path)
else:
dump(obj, pred_path)
else:
raise ValueError("Unknown object type: {}.".format(obj_type))
def model_loadobj(dirname, obj_type, d=None):
"""Load objects of the deep forest from the given directory."""
if not os.path.isdir(dirname):
msg = "Cannot find the target directory: {}."
raise RuntimeError(msg.format(dirname))
if obj_type in ("param", "binner"):
obj = load(os.path.join(dirname, "{}.pkl".format(obj_type)))
return obj
elif obj_type == "layer":
from ._layer import (
ClassificationCascadeLayer,
RegressionCascadeLayer,
CustomCascadeLayer,
)
if not isinstance(d, dict):
msg = "Loading layers requires the dict from `param.pkl`."
raise RuntimeError(msg)
n_estimators = d["n_estimators"]
n_layers = d["n_layers"]
layers = {}
for layer_idx in range(n_layers):
if not d["use_custom_estimator"]:
if d["is_classifier"]:
layer_ = ClassificationCascadeLayer(
layer_idx=layer_idx,
n_outputs=d["n_outputs"],
criterion=d["criterion"],
n_estimators=d["n_estimators"],
partial_mode=d["partial_mode"],
buffer=d["buffer"],
verbose=d["verbose"],
)
else:
layer_ = RegressionCascadeLayer(
layer_idx=layer_idx,
n_outputs=d["n_outputs"],
criterion=d["criterion"],
n_estimators=d["n_estimators"],
partial_mode=d["partial_mode"],
buffer=d["buffer"],
verbose=d["verbose"],
)
for est_type in ("rf", "erf"):
for est_idx in range(n_estimators):
est_key = "{}-{}-{}".format(
layer_idx, est_idx, est_type
)
dest = os.path.join(
dirname, "estimator", est_key + ".est"
)
if not os.path.isfile(dest):
msg = "Missing estimator in the path: {}."
raise RuntimeError(msg.format(dest))
if d["partial_mode"]:
layer_.estimators_.update(
{est_key: os.path.abspath(dest)}
)
else:
est = load(dest)
layer_.estimators_.update({est_key: est})
else:
layer_ = CustomCascadeLayer(
layer_idx=layer_idx,
n_splits=1, # will not be used
n_outputs=d["n_outputs"],
estimators=[None] * n_estimators, # will not be used
partial_mode=d["partial_mode"],
buffer=d["buffer"],
verbose=d["verbose"],
)
for est_idx in range(n_estimators):
est_key = "{}-{}-custom".format(layer_idx, est_idx)
dest = os.path.join(dirname, "estimator", est_key + ".est")
if not os.path.isfile(dest):
msg = "Missing estimator in the path: {}."
raise RuntimeError(msg.format(dest))
if d["partial_mode"]:
layer_.estimators_.update({est_key: dest})
else:
est = load(dest)
layer_.estimators_.update({est_key: est})
layer_key = "layer_{}".format(layer_idx)
layers.update({layer_key: layer_})
return layers
elif obj_type == "predictor":
if not isinstance(d, dict):
msg = "Loading the predictor requires the dict from `param.pkl`."
raise RuntimeError(msg)
pred_path = os.path.join(dirname, "estimator", "predictor.est")
if not os.path.isfile(pred_path):
msg = "Missing predictor in the path: {}."
raise RuntimeError(msg.format(pred_path))
if d["partial_mode"]:
return os.path.abspath(pred_path)
else:
predictor = load(pred_path)
return predictor
else:
raise ValueError("Unknown object type: {}.".format(obj_type))