Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

/ python / pipeline.py

## @package pipeline
# Module caffe2.python.pipeline





from caffe2.python import core, queue_util
from caffe2.python.dataio import Reader, Writer
from caffe2.python.net_builder import NetBuilder, ops
from caffe2.python.schema import as_record, Field
from caffe2.python.task import Node, Task, TaskGroup


class Output(object):
    """
    Represents the result of a processor function. A processor can either
    return an Output, or it can return a record, in which case an Output will be
    created for it afterwards.
    """
    def __init__(self, nets=None, record=None, should_stop=None):
        builder_children = NetBuilder.current().get()
        assert nets is None or len(builder_children) == 0, (
            'Cannot both use `ops` syntax and return a list of nets.')
        if nets is None:
            nets = builder_children
        if isinstance(nets, core.Net):
            nets = [nets]
        self.nets = [] if nets is None else list(nets)
        self.record = None if record is None else as_record(record)
        self.should_stop = should_stop


DEFAULT_QUEUE_CAPACITY = 100


def _init_output(output, capacity, global_init_net, global_exit_net):
    if output is None:
        out_queue = queue_util.Queue(
            capacity=(
                capacity if capacity is not None
                else DEFAULT_QUEUE_CAPACITY))
        writer = out_queue.writer()
    elif isinstance(output, Writer):
        assert capacity is None, 'capacity would not be used.'
        out_queue = None
        writer = output
    elif hasattr(output, 'writer'):
        assert capacity is None, 'capacity would not be used.'
        out_queue = output
        writer = output.writer()
    else:
        raise ValueError('output must be a reader, queue or stream.')
    writer.setup_ex(global_init_net, global_exit_net)
    return out_queue, writer


def make_processor(processor, reader=None):
    if processor is None:
        return lambda rec: rec
    elif isinstance(processor, core.Net):
        return NetProcessor(processor)
    else:
        if reader is not None and hasattr(processor, "schema_func"):
            def processor_schema():
                return processor.schema_func(reader)

            processor.schema = processor_schema
        return processor


def normalize_processor_output(output):
    """
    Allow for processors to return results in several formats.
    TODO(azzolini): simplify once all processors use NetBuilder API.
    """
    if isinstance(output, Output):
        """ Processor returned an Output. """
        return output
    elif isinstance(output, Field):
        """ Processor returned a record. """
        return Output(record=output)
    elif isinstance(output, tuple):
        is_record_and_blob = (
            len(output) == 2 and
            isinstance(output[0], Field) and
            isinstance(output[1], core.BlobReference))
        if is_record_and_blob:
            """ Processor returned (record, stop_blob) """
            return Output(None, *output)
        else:
            """ Processor returned (nets, record, stop_blob) """
            return Output(*output)
    else:
        """ Processor returned nets, no output """
        return Output(output)


def pipe(
        input, output=None, num_threads=1, processor=None, name=None,
        capacity=None, group=None, num_runtime_threads=1):
    """
    Given a Reader, Queue or DataStream in `input`, and optionally, a Writer,
    Queue or DataStream in `output`, creates a Task that, when run, will
    pipe the input into the output, using multiple parallel threads.
    Additionally, if a processor is given, it will be called between reading
    and writing steps, allowing it to transform the record.

    Args:
        input:       either a Reader, Queue or DataStream that will be read
                     until a stop is signaled either by the reader or the
                     writer.
        output:      either a Writer, a Queue or a DataStream that will be
                     written to as long as neither reader nor writer signal
                     a stop condition. If output is not provided or is None,
                     a Queue is created with given `capacity` and written to.
        num_threads: number of concurrent threads used for processing and
                     piping. If set to 0, no Task is created, and a
                     reader is returned instead -- the reader returned will
                     read from the reader passed in and process it.
                     ** DEPRECATED **. Use `num_runtime_threads` instead.
                     This option will be removed once all readers/processors
                     support `num_runtime_threads`.
        processor:   (optional) function that takes an input record and
                     optionally returns a record; this will be called
                     between read and write steps. If the processor does
                     not return a record, a writer will not be instantiated.
                     Processor can also be a core.Net with input and output
                     records properly set. In that case, a NetProcessor is
                     instantiated, cloning the net for each of the threads.
        name:        (optional) name of the task to be created.
        capacity:    when output is not passed, a queue of given `capacity`
                     is created and written to.
        group:       (optional) explicitly add the created Task to this
                     TaskGroup, instead of using the currently active one.
        num_runtime_threads: Similar to `num_threads`, but instead of expanding
                     the tasks with a `for` loop in python, does that at
                     runtime. This is preferable to `num_threads`, but some
                     processors/readers still require to be called multiple
                     times in python.

    Returns:
        Output Queue, DataStream, Reader, or None, depending on the parameters
        passed.
    """
    result, _ = _pipe_step(
        input, output, num_threads, processor, name, capacity, group,
        num_runtime_threads)
    return result


def pipe_and_output(
        input, output=None, num_threads=1, processor=None, name=None,
        capacity=None, group=None, num_runtime_threads=1, final_outputs=None):
    """
    Similar to `pipe`, with the additional ability for the pipe Task to
    return output values to the `Session` once done.

    Returns:
        Tuple (out_queue, *task_outputs)
            out_queue:    same as return value of `pipe`.
            task_outputs: TaskOutput object, fetchable from the client after
                          session.run() returns.
    """
    assert num_threads > 0
    result, task = _pipe_step(
        input, output, num_threads, processor, name, capacity, group,
        num_runtime_threads, final_outputs)
    output = None
    if final_outputs is not None:
        output = task.outputs()
        if type(final_outputs) not in (list, tuple):
            output = output[0]
    return result, output


def processor_name(processor):
    if hasattr(processor, 'name'):
        return processor.name
    if hasattr(processor, 'func_name'):
        if processor.func_name == '<lambda>':
            return processor.__module__
        if hasattr(processor, 'im_class'):
            return '%s.%s' % (processor.im_class.__name__, processor.func_name)
        return processor.func_name
    return processor.__class__.__name__


def _runtime_threads_task(name, group, final_outputs, reader, num_threads,
                          output, capacity):
    node_name = str(Node.current())
    profiler_name = "{0}/{1}/{2}/{3}/{4}".format(
        node_name,
        "pipe",
        name,
        processor_name(input) if input else "NoInput",
        processor_name(output) if output else "NoOutput")

    with Task(name=name, group=group, outputs=final_outputs,
              num_instances=num_threads) as task:
        global_exit_net = core.Net('pipe:exit')
        global_init_net = core.Net('pipe:init')
        reader.setup_ex(global_init_net, global_exit_net)

        init_net = core.Net('pipe:instance:init')
        exit_net = core.Net('pipe:instance:exit')
        read_nets, status, rec = reader.read_record_ex(init_net, exit_net)
        init_net.ConstantFill(
            [], [status],
            shape=[],
            value=False,
            dtype=core.DataType.BOOL
        )

        if rec is not None:
            out_queue, writer = _init_output(
                output, capacity, global_init_net, global_exit_net)
            write_nets, _ = writer.write_record_ex(
                rec, init_net, exit_net, status)
        else:
            out_queue = None
            write_nets = []

        with ops.task_init():
            ops.net(global_init_net)
        with ops.task_instance_init():
            ops.net(init_net)

        timer_start_net = core.Net('timer_start')
        timer = timer_start_net.TimerBegin([], counter_name=profiler_name)
        timer_end_net = core.Net('timer_end')
        timer_end_net.TimerEnd(timer, [])

        ops.net(core.execution_step(
            'body',
            [timer_start_net] + list(read_nets) + list(write_nets) +
            [timer_end_net],
            should_stop_blob=status))
        ops.net(timer_end_net)

        with ops.task_instance_exit():
            ops.net(exit_net)
        with ops.task_exit():
            ops.net(global_exit_net)

    return out_queue, task


def _static_threads_task(name, group, final_outputs, reader, num_threads,
                         output, capacity):
    node_name = str(Node.current())
    profiler_name = "{0}/{1}/{2}/{3}/{4}".format(
        node_name,
        "pipe",
        name,
        processor_name(input) if input else "NoInput",
        processor_name(output) if output else "NoOutput")

    with Task(name=name, group=group, outputs=final_outputs) as task:
        global_exit_net = core.Net('exit')
        global_init_net = core.Net('init')
        reader.setup_ex(global_init_net, global_exit_net)

        out_queue = None
        writer = None

        steps = []
        for thread_id in range(num_threads):
            with NetBuilder(name='t:%d' % thread_id) as nb:
                init_net = core.Net('init')
                exit_net = core.Net('exit')
                read_nets, status, rec = reader.read_record_ex(
                    init_net, exit_net)
                init_net.ConstantFill(
                    [], [status],
                    shape=[],
                    value=False,
                    dtype=core.DataType.BOOL
                )

                if rec is not None:
                    if writer is None:
                        # hack so that the out queue gets the right name prefix
                        # (otherwise they would be prefixed with the thread id)
                        with NetBuilder(_fullname=task.name):
                            out_queue, writer = _init_output(
                                output, capacity, global_init_net,
                                global_exit_net)
                    write_nets, _ = writer.write_record_ex(
                        rec, init_net, exit_net, status)
                else:
                    write_nets = []

                timer_start_net = core.Net('timer_start')
                timer = timer_start_net.TimerBegin([], counter_name=profiler_name)
                timer_end_net = core.Net('timer_end')
                timer_end_net.TimerEnd(timer, [])

                ops.net(init_net)
                ops.net(core.execution_step(
                    'body',
                    [timer_start_net] + list(read_nets) + list(write_nets) +
                    [timer_end_net],
                    should_stop_blob=status))
                ops.net(timer_end_net)
                ops.net(exit_net)
            steps.append(core.to_execution_step(nb))
        ops.net(global_init_net)
        ops.net(core.execution_step('body', steps, concurrent_substeps=True))
        ops.net(global_exit_net)
    return out_queue, task


def _pipe_step(
        input, output=None, num_threads=1, processor=None, name=None,
        capacity=None, group=None, num_runtime_threads=None, final_outputs=None):
    """
    """
    assert num_threads <= 1 or num_runtime_threads <= 1, (
        'Only one of num_threads or num_runtime_threads must be set.')

    if isinstance(input, Reader):
        reader = input
    elif hasattr(input, 'reader'):
        reader = input.reader()
    else:
        raise ValueError(
            'Input must be a reader, queue or stream. Got {}'.format(type(input)))

    if processor is not None:
        reader = ProcessingReader(reader, processor)

    if num_threads == 0 or num_runtime_threads == 0:
        assert output is None
        return reader, None

    if name is None and processor is not None:
        name = processor_name(processor)
    if name is None and output is not None:
        name = 'pipe_into:%s' % processor_name(output)
    if name is None:
        name = 'pipe_from:%s' % processor_name(input)

    if num_threads > 1:
        return _static_threads_task(
Loading ...