Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aaronreidsmith / pandas   python

Repository URL to install this package:

Version: 0.25.3 

/ tests / io / parser / test_multi_thread.py

"""
Tests multithreading behaviour for reading and
parsing files for each parser defined in parsers.py
"""
from io import BytesIO
from multiprocessing.pool import ThreadPool

import numpy as np

import pandas as pd
from pandas import DataFrame
import pandas.util.testing as tm


def _construct_dataframe(num_rows):
    """
    Construct a DataFrame for testing.

    Parameters
    ----------
    num_rows : int
        The number of rows for our DataFrame.

    Returns
    -------
    df : DataFrame
    """
    df = DataFrame(np.random.rand(num_rows, 5), columns=list("abcde"))
    df["foo"] = "foo"
    df["bar"] = "bar"
    df["baz"] = "baz"
    df["date"] = pd.date_range("20000101 09:00:00", periods=num_rows, freq="s")
    df["int"] = np.arange(num_rows, dtype="int64")
    return df


def test_multi_thread_string_io_read_csv(all_parsers):
    # see gh-11786
    parser = all_parsers
    max_row_range = 10000
    num_files = 100

    bytes_to_df = [
        "\n".join(
            ["{i:d},{i:d},{i:d}".format(i=i) for i in range(max_row_range)]
        ).encode()
        for _ in range(num_files)
    ]
    files = [BytesIO(b) for b in bytes_to_df]

    # Read all files in many threads.
    pool = ThreadPool(8)

    results = pool.map(parser.read_csv, files)
    first_result = results[0]

    for result in results:
        tm.assert_frame_equal(first_result, result)


def _generate_multi_thread_dataframe(parser, path, num_rows, num_tasks):
    """
    Generate a DataFrame via multi-thread.

    Parameters
    ----------
    parser : BaseParser
        The parser object to use for reading the data.
    path : str
        The location of the CSV file to read.
    num_rows : int
        The number of rows to read per task.
    num_tasks : int
        The number of tasks to use for reading this DataFrame.

    Returns
    -------
    df : DataFrame
    """

    def reader(arg):
        """
        Create a reader for part of the CSV.

        Parameters
        ----------
        arg : tuple
            A tuple of the following:

            * start : int
                The starting row to start for parsing CSV
            * nrows : int
                The number of rows to read.

        Returns
        -------
        df : DataFrame
        """
        start, nrows = arg

        if not start:
            return parser.read_csv(
                path, index_col=0, header=0, nrows=nrows, parse_dates=["date"]
            )

        return parser.read_csv(
            path,
            index_col=0,
            header=None,
            skiprows=int(start) + 1,
            nrows=nrows,
            parse_dates=[9],
        )

    tasks = [
        (num_rows * i // num_tasks, num_rows // num_tasks) for i in range(num_tasks)
    ]

    pool = ThreadPool(processes=num_tasks)
    results = pool.map(reader, tasks)

    header = results[0].columns

    for r in results[1:]:
        r.columns = header

    final_dataframe = pd.concat(results)
    return final_dataframe


def test_multi_thread_path_multipart_read_csv(all_parsers):
    # see gh-11786
    num_tasks = 4
    num_rows = 100000

    parser = all_parsers
    file_name = "__thread_pool_reader__.csv"
    df = _construct_dataframe(num_rows)

    with tm.ensure_clean(file_name) as path:
        df.to_csv(path)

        final_dataframe = _generate_multi_thread_dataframe(
            parser, path, num_rows, num_tasks
        )
        tm.assert_frame_equal(df, final_dataframe)