Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
aiofile / utils.py
Size: Mime:
import asyncio
import io

from .aio import AIOFile


class Reader:
    __slots__ = '_chunk_size', '__offset', 'file', '__lock'

    def __init__(self, aio_file: AIOFile,
                 offset: int = 0, chunk_size: int = 32 * 1024):

        self._chunk_size = int(chunk_size)
        self.__offset = int(offset)
        self.file = aio_file
        self.__lock = asyncio.Lock(loop=self.file.loop)

    @asyncio.coroutine
    def read_chunk(self):
        with (yield from self.__lock):
            chunk = yield from self.file.read(
                self._chunk_size,
                self.__offset
            )

            chunk_size = len(chunk)
            self.__offset += chunk_size

            return chunk

    @asyncio.coroutine
    def __anext__(self):
        chunk = yield from self.read_chunk()

        if not chunk:
            raise StopAsyncIteration(chunk)

        return chunk

    def __aiter__(self):
        return self

    @asyncio.coroutine
    def __next__(self):
        return (yield from self.read_chunk())

    def __iter__(self):
        return self


class Writer:
    __slots__ = '__chunk_size', '__offset', '__aio_file', '__lock'

    def __init__(self, aio_file: AIOFile, offset: int = 0):
        self.__offset = int(offset)
        self.__aio_file = aio_file
        self.__lock = asyncio.Lock(loop=self.__aio_file.loop)

    @asyncio.coroutine
    def __call__(self, data):
        with (yield from self.__lock):
            yield from self.__aio_file.write(data, self.__offset)
            self.__offset += len(data)


class LineReader:
    def __init__(self, aio_file: AIOFile, offset: int = 0,
                 chunk_size: int = 255, line_sep='\n'):

        self.__reader = Reader(aio_file, chunk_size=chunk_size, offset=offset)
        self._buffer = io.BytesIO() if aio_file.mode.binary else io.StringIO()
        self.linesep = (
            line_sep.encode() if self.__reader.file.mode.binary else line_sep
        )

    @asyncio.coroutine
    def readline(self):
        while True:
            chunk = yield from self.__reader.read_chunk()

            if chunk:
                if self.linesep not in chunk:
                    self._buffer.write(chunk)
                    continue

                self._buffer.write(chunk)

            self._buffer.seek(0)
            line = self._buffer.readline()
            tail = self._buffer.read()

            self._buffer.seek(0)
            self._buffer.truncate(0)
            self._buffer.write(tail)

            return line

    @asyncio.coroutine
    def __anext__(self):
        line = yield from self.readline()

        if not line:
            raise StopAsyncIteration(line)

        return line

    def __aiter__(self):
        return self

    @asyncio.coroutine
    def __next__(self):
        return (yield from self.readline())

    def __iter__(self):
        return self