## @package pipeline
# Module caffe2.python.pipeline
from caffe2.python import core, queue_util
from caffe2.python.dataio import Reader, Writer
from caffe2.python.net_builder import NetBuilder, ops
from caffe2.python.schema import as_record, Field
from caffe2.python.task import Node, Task, TaskGroup
class Output(object):
"""
Represents the result of a processor function. A processor can either
return an Output, or it can return a record, in which case an Output will be
created for it afterwards.
"""
def __init__(self, nets=None, record=None, should_stop=None):
builder_children = NetBuilder.current().get()
assert nets is None or len(builder_children) == 0, (
'Cannot both use `ops` syntax and return a list of nets.')
if nets is None:
nets = builder_children
if isinstance(nets, core.Net):
nets = [nets]
self.nets = [] if nets is None else list(nets)
self.record = None if record is None else as_record(record)
self.should_stop = should_stop
DEFAULT_QUEUE_CAPACITY = 100
def _init_output(output, capacity, global_init_net, global_exit_net):
if output is None:
out_queue = queue_util.Queue(
capacity=(
capacity if capacity is not None
else DEFAULT_QUEUE_CAPACITY))
writer = out_queue.writer()
elif isinstance(output, Writer):
assert capacity is None, 'capacity would not be used.'
out_queue = None
writer = output
elif hasattr(output, 'writer'):
assert capacity is None, 'capacity would not be used.'
out_queue = output
writer = output.writer()
else:
raise ValueError('output must be a reader, queue or stream.')
writer.setup_ex(global_init_net, global_exit_net)
return out_queue, writer
def make_processor(processor, reader=None):
if processor is None:
return lambda rec: rec
elif isinstance(processor, core.Net):
return NetProcessor(processor)
else:
if reader is not None and hasattr(processor, "schema_func"):
def processor_schema():
return processor.schema_func(reader)
processor.schema = processor_schema
return processor
def normalize_processor_output(output):
"""
Allow for processors to return results in several formats.
TODO(azzolini): simplify once all processors use NetBuilder API.
"""
if isinstance(output, Output):
""" Processor returned an Output. """
return output
elif isinstance(output, Field):
""" Processor returned a record. """
return Output(record=output)
elif isinstance(output, tuple):
is_record_and_blob = (
len(output) == 2 and
isinstance(output[0], Field) and
isinstance(output[1], core.BlobReference))
if is_record_and_blob:
""" Processor returned (record, stop_blob) """
return Output(None, *output)
else:
""" Processor returned (nets, record, stop_blob) """
return Output(*output)
else:
""" Processor returned nets, no output """
return Output(output)
def pipe(
input, output=None, num_threads=1, processor=None, name=None,
capacity=None, group=None, num_runtime_threads=1):
"""
Given a Reader, Queue or DataStream in `input`, and optionally, a Writer,
Queue or DataStream in `output`, creates a Task that, when run, will
pipe the input into the output, using multiple parallel threads.
Additionally, if a processor is given, it will be called between reading
and writing steps, allowing it to transform the record.
Args:
input: either a Reader, Queue or DataStream that will be read
until a stop is signaled either by the reader or the
writer.
output: either a Writer, a Queue or a DataStream that will be
written to as long as neither reader nor writer signal
a stop condition. If output is not provided or is None,
a Queue is created with given `capacity` and written to.
num_threads: number of concurrent threads used for processing and
piping. If set to 0, no Task is created, and a
reader is returned instead -- the reader returned will
read from the reader passed in and process it.
** DEPRECATED **. Use `num_runtime_threads` instead.
This option will be removed once all readers/processors
support `num_runtime_threads`.
processor: (optional) function that takes an input record and
optionally returns a record; this will be called
between read and write steps. If the processor does
not return a record, a writer will not be instantiated.
Processor can also be a core.Net with input and output
records properly set. In that case, a NetProcessor is
instantiated, cloning the net for each of the threads.
name: (optional) name of the task to be created.
capacity: when output is not passed, a queue of given `capacity`
is created and written to.
group: (optional) explicitly add the created Task to this
TaskGroup, instead of using the currently active one.
num_runtime_threads: Similar to `num_threads`, but instead of expanding
the tasks with a `for` loop in python, does that at
runtime. This is preferable to `num_threads`, but some
processors/readers still require to be called multiple
times in python.
Returns:
Output Queue, DataStream, Reader, or None, depending on the parameters
passed.
"""
result, _ = _pipe_step(
input, output, num_threads, processor, name, capacity, group,
num_runtime_threads)
return result
def pipe_and_output(
input, output=None, num_threads=1, processor=None, name=None,
capacity=None, group=None, num_runtime_threads=1, final_outputs=None):
"""
Similar to `pipe`, with the additional ability for the pipe Task to
return output values to the `Session` once done.
Returns:
Tuple (out_queue, *task_outputs)
out_queue: same as return value of `pipe`.
task_outputs: TaskOutput object, fetchable from the client after
session.run() returns.
"""
assert num_threads > 0
result, task = _pipe_step(
input, output, num_threads, processor, name, capacity, group,
num_runtime_threads, final_outputs)
output = None
if final_outputs is not None:
output = task.outputs()
if type(final_outputs) not in (list, tuple):
output = output[0]
return result, output
def processor_name(processor):
if hasattr(processor, 'name'):
return processor.name
if hasattr(processor, 'func_name'):
if processor.func_name == '<lambda>':
return processor.__module__
if hasattr(processor, 'im_class'):
return '%s.%s' % (processor.im_class.__name__, processor.func_name)
return processor.func_name
return processor.__class__.__name__
def _runtime_threads_task(name, group, final_outputs, reader, num_threads,
output, capacity):
node_name = str(Node.current())
profiler_name = "{0}/{1}/{2}/{3}/{4}".format(
node_name,
"pipe",
name,
processor_name(input) if input else "NoInput",
processor_name(output) if output else "NoOutput")
with Task(name=name, group=group, outputs=final_outputs,
num_instances=num_threads) as task:
global_exit_net = core.Net('pipe:exit')
global_init_net = core.Net('pipe:init')
reader.setup_ex(global_init_net, global_exit_net)
init_net = core.Net('pipe:instance:init')
exit_net = core.Net('pipe:instance:exit')
read_nets, status, rec = reader.read_record_ex(init_net, exit_net)
init_net.ConstantFill(
[], [status],
shape=[],
value=False,
dtype=core.DataType.BOOL
)
if rec is not None:
out_queue, writer = _init_output(
output, capacity, global_init_net, global_exit_net)
write_nets, _ = writer.write_record_ex(
rec, init_net, exit_net, status)
else:
out_queue = None
write_nets = []
with ops.task_init():
ops.net(global_init_net)
with ops.task_instance_init():
ops.net(init_net)
timer_start_net = core.Net('timer_start')
timer = timer_start_net.TimerBegin([], counter_name=profiler_name)
timer_end_net = core.Net('timer_end')
timer_end_net.TimerEnd(timer, [])
ops.net(core.execution_step(
'body',
[timer_start_net] + list(read_nets) + list(write_nets) +
[timer_end_net],
should_stop_blob=status))
ops.net(timer_end_net)
with ops.task_instance_exit():
ops.net(exit_net)
with ops.task_exit():
ops.net(global_exit_net)
return out_queue, task
def _static_threads_task(name, group, final_outputs, reader, num_threads,
output, capacity):
node_name = str(Node.current())
profiler_name = "{0}/{1}/{2}/{3}/{4}".format(
node_name,
"pipe",
name,
processor_name(input) if input else "NoInput",
processor_name(output) if output else "NoOutput")
with Task(name=name, group=group, outputs=final_outputs) as task:
global_exit_net = core.Net('exit')
global_init_net = core.Net('init')
reader.setup_ex(global_init_net, global_exit_net)
out_queue = None
writer = None
steps = []
for thread_id in range(num_threads):
with NetBuilder(name='t:%d' % thread_id) as nb:
init_net = core.Net('init')
exit_net = core.Net('exit')
read_nets, status, rec = reader.read_record_ex(
init_net, exit_net)
init_net.ConstantFill(
[], [status],
shape=[],
value=False,
dtype=core.DataType.BOOL
)
if rec is not None:
if writer is None:
# hack so that the out queue gets the right name prefix
# (otherwise they would be prefixed with the thread id)
with NetBuilder(_fullname=task.name):
out_queue, writer = _init_output(
output, capacity, global_init_net,
global_exit_net)
write_nets, _ = writer.write_record_ex(
rec, init_net, exit_net, status)
else:
write_nets = []
timer_start_net = core.Net('timer_start')
timer = timer_start_net.TimerBegin([], counter_name=profiler_name)
timer_end_net = core.Net('timer_end')
timer_end_net.TimerEnd(timer, [])
ops.net(init_net)
ops.net(core.execution_step(
'body',
[timer_start_net] + list(read_nets) + list(write_nets) +
[timer_end_net],
should_stop_blob=status))
ops.net(timer_end_net)
ops.net(exit_net)
steps.append(core.to_execution_step(nb))
ops.net(global_init_net)
ops.net(core.execution_step('body', steps, concurrent_substeps=True))
ops.net(global_exit_net)
return out_queue, task
def _pipe_step(
input, output=None, num_threads=1, processor=None, name=None,
capacity=None, group=None, num_runtime_threads=None, final_outputs=None):
"""
"""
assert num_threads <= 1 or num_runtime_threads <= 1, (
'Only one of num_threads or num_runtime_threads must be set.')
if isinstance(input, Reader):
reader = input
elif hasattr(input, 'reader'):
reader = input.reader()
else:
raise ValueError(
'Input must be a reader, queue or stream. Got {}'.format(type(input)))
if processor is not None:
reader = ProcessingReader(reader, processor)
if num_threads == 0 or num_runtime_threads == 0:
assert output is None
return reader, None
if name is None and processor is not None:
name = processor_name(processor)
if name is None and output is not None:
name = 'pipe_into:%s' % processor_name(output)
if name is None:
name = 'pipe_from:%s' % processor_name(input)
if num_threads > 1:
return _static_threads_task(
Loading ...