Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
modin / core / io / text / excel_dispatcher.py
Size: Mime:
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership.  The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Module houses `ExcelDispatcher` class, that is used for reading excel files."""

import pandas
import re
import sys
import warnings

from modin.core.io.text.text_file_dispatcher import TextFileDispatcher
from modin.config import NPartitions

EXCEL_READ_BLOCK_SIZE = 4096


class ExcelDispatcher(TextFileDispatcher):
    """Class handles utils for reading excel files."""

    @classmethod
    def _read(cls, io, **kwargs):
        """
        Read data from `io` according to the passed `read_excel` `kwargs` parameters.

        Parameters
        ----------
        io : str, bytes, ExcelFile, xlrd.Book, path object, or file-like object
            `io` parameter of `read_excel` function.
        **kwargs : dict
            Parameters of `read_excel` function.

        Returns
        -------
        new_query_compiler : BaseQueryCompiler
            Query compiler with imported data for further processing.
        """
        if (
            kwargs.get("engine", None) is not None
            and kwargs.get("engine") != "openpyxl"
        ):
            warnings.warn(
                "Modin only implements parallel `read_excel` with `openpyxl` engine, "
                + 'please specify `engine=None` or `engine="openpyxl"` to '
                + "use Modin's parallel implementation."
            )
            return cls.single_worker_read(io, **kwargs)
        if sys.version_info < (3, 7):
            warnings.warn("Python 3.7 or higher required for parallel `read_excel`.")
            return cls.single_worker_read(io, **kwargs)

        from zipfile import ZipFile
        from openpyxl.worksheet.worksheet import Worksheet
        from openpyxl.worksheet._reader import WorksheetReader
        from openpyxl.reader.excel import ExcelReader
        from modin.core.storage_formats.pandas.parsers import PandasExcelParser

        sheet_name = kwargs.get("sheet_name", 0)
        if sheet_name is None or isinstance(sheet_name, list):
            warnings.warn(
                "`read_excel` functionality is only implemented for a single sheet at a "
                + "time. Multiple sheet reading coming soon!"
            )
            return cls.single_worker_read(io, **kwargs)

        warnings.warn(
            "Parallel `read_excel` is a new feature! If you run into any "
            + "problems, please visit https://github.com/modin-project/modin/issues. "
            + "If you find a new issue and can't file it on GitHub, please "
            + "email bug_reports@modin.org."
        )

        # NOTE: ExcelReader() in read-only mode does not close file handle by itself
        # work around that by passing file object if we received some path
        io_file = open(io, "rb") if isinstance(io, str) else io
        try:
            ex = ExcelReader(io_file, read_only=True)
            ex.read()
            wb = ex.wb

            # Get shared strings
            ex.read_manifest()
            ex.read_strings()
            ws = Worksheet(wb)
        finally:
            if isinstance(io, str):
                # close only if it were us who opened the object
                io_file.close()

        pandas_kw = dict(kwargs)  # preserve original kwargs
        with ZipFile(io) as z:
            from io import BytesIO

            # Convert index to sheet name in file
            if isinstance(sheet_name, int):
                sheet_name = "sheet{}".format(sheet_name + 1)
            else:
                sheet_name = "sheet{}".format(wb.sheetnames.index(sheet_name) + 1)
            if any(sheet_name.lower() in name for name in z.namelist()):
                sheet_name = sheet_name.lower()
            elif any(sheet_name.title() in name for name in z.namelist()):
                sheet_name = sheet_name.title()
            else:
                raise ValueError("Sheet {} not found".format(sheet_name.lower()))
            # Pass this value to the workers
            kwargs["sheet_name"] = sheet_name

            f = z.open("xl/worksheets/{}.xml".format(sheet_name))
            f = BytesIO(f.read())
            total_bytes = cls.file_size(f)

            # Read some bytes from the sheet so we can extract the XML header and first
            # line. We need to make sure we get the first line of the data as well
            # because that is where the column names are. The header information will
            # be extracted and sent to all of the nodes.
            sheet_block = f.read(EXCEL_READ_BLOCK_SIZE)
            end_of_row_tag = b"</row>"
            while end_of_row_tag not in sheet_block:
                sheet_block += f.read(EXCEL_READ_BLOCK_SIZE)
            idx_of_header_end = sheet_block.index(end_of_row_tag) + len(end_of_row_tag)
            sheet_header = sheet_block[:idx_of_header_end]
            # Reset the file pointer to begin at the end of the header information.
            f.seek(idx_of_header_end)
            kwargs["_header"] = sheet_header
            footer = b"</sheetData></worksheet>"
            # Use openpyxml to parse the data
            reader = WorksheetReader(
                ws, BytesIO(sheet_header + footer), ex.shared_strings, False
            )
            # Attach cells to the worksheet
            reader.bind_cells()
            data = PandasExcelParser.get_sheet_data(
                ws, kwargs.get("convert_float", True)
            )
            # Extract column names from parsed data.
            column_names = pandas.Index(data[0])
            index_col = kwargs.get("index_col", None)
            # Remove column names that are specified as `index_col`
            if index_col is not None:
                column_names = column_names.drop(column_names[index_col])

            if not all(column_names):
                # some column names are empty, use pandas reader to take the names from it
                pandas_kw["nrows"] = 1
                df = pandas.read_excel(io, **pandas_kw)
                column_names = df.columns

            # Compute partition metadata upfront so it is uniform for all partitions
            chunk_size = max(1, (total_bytes - f.tell()) // NPartitions.get())
            column_widths, num_splits = cls._define_metadata(
                pandas.DataFrame(columns=column_names), column_names
            )
            kwargs["fname"] = io
            # Skiprows will be used to inform a partition how many rows come before it.
            kwargs["skiprows"] = 0
            row_count = 0
            data_ids = []
            index_ids = []
            dtypes_ids = []

            kwargs["num_splits"] = num_splits

            while f.tell() < total_bytes:
                args = kwargs
                args["skiprows"] = row_count + args["skiprows"]
                args["start"] = f.tell()
                chunk = f.read(chunk_size)
                # This edge case can happen when we have reached the end of the data
                # but not the end of the file.
                if b"<row" not in chunk:
                    break
                row_close_tag = b"</row>"
                row_count = re.subn(row_close_tag, b"", chunk)[1]

                # Make sure we are reading at least one row.
                while row_count == 0:
                    chunk += f.read(chunk_size)
                    row_count += re.subn(row_close_tag, b"", chunk)[1]

                last_index = chunk.rindex(row_close_tag)
                f.seek(-(len(chunk) - last_index) + len(row_close_tag), 1)
                args["end"] = f.tell()

                # If there is no data, exit before triggering computation.
                if b"</row>" not in chunk and b"</sheetData>" in chunk:
                    break
                remote_results_list = cls.deploy(
                    cls.parse, num_returns=num_splits + 2, **args
                )
                data_ids.append(remote_results_list[:-2])
                index_ids.append(remote_results_list[-2])
                dtypes_ids.append(remote_results_list[-1])

                # The end of the spreadsheet
                if b"</sheetData>" in chunk:
                    break

        # Compute the index based on a sum of the lengths of each partition (by default)
        # or based on the column(s) that were requested.
        if index_col is None:
            row_lengths = cls.materialize(index_ids)
            new_index = pandas.RangeIndex(sum(row_lengths))
        else:
            index_objs = cls.materialize(index_ids)
            row_lengths = [len(o) for o in index_objs]
            new_index = index_objs[0].append(index_objs[1:])

        # Compute dtypes by getting collecting and combining all of the partitions. The
        # reported dtypes from differing rows can be different based on the inference in
        # the limited data seen by each worker. We use pandas to compute the exact dtype
        # over the whole column for each column. The index is set below.
        dtypes = cls.get_dtypes(dtypes_ids)

        data_ids = cls.build_partition(data_ids, row_lengths, column_widths)
        # Set the index for the dtypes to the column names
        if isinstance(dtypes, pandas.Series):
            dtypes.index = column_names
        else:
            dtypes = pandas.Series(dtypes, index=column_names)
        new_frame = cls.frame_cls(
            data_ids,
            new_index,
            column_names,
            row_lengths,
            column_widths,
            dtypes=dtypes,
        )
        new_query_compiler = cls.query_compiler_cls(new_frame)
        if index_col is None:
            new_query_compiler._modin_frame.synchronize_labels(axis=0)
        return new_query_compiler