# fiftyone.utils.beam¶

Apache Beam utilities.

Copyright 2017-2022, Voxel51, Inc.

Classes:

 ExportBatch(dataset_name[, view_stages, …])
ImportBatch(dataset_name[, parse_fcn, …])
MergeBatch(dataset_name[, parse_fcn, …])

Functions:

 beam_export(sample_collection, num_shards[, …]) Exports the given sample collection in the specified number shards via Apache Beam.
beam_import(dataset, samples[, parse_fcn, …]) Imports the given samples into the dataset via Apache Beam.
beam_merge(dataset, samples[, parse_fcn, …]) Merges the given samples into the dataset via Apache Beam.
fiftyone.utils.beam.beam_import(dataset, samples, parse_fcn=None, expand_schema=True, validate=True, options=None, verbose=False)

Imports the given samples into the dataset via Apache Beam.

This function is a parallelized alternative to fiftyone.core.dataset.Dataset.add_samples().

Note

The insertion order of the samples is not guaranteed.

Example:

import fiftyone as fo
import fiftyone.utils.beam as foub

samples = range(10000)

def make_sample(idx):
return fo.Sample(filepath="image%d.png" % idx, uuid=idx)

#
# Option 1: build the samples on the workers
#

dataset = fo.Dataset()

foub.beam_import(dataset, samples, parse_fcn=make_sample)
print(dataset)

#
# Option 2: build the samples in the main thread
#
# This is generally not preferred but may be necessary if your
# parse_fcn is not serializable
#

dataset = fo.Dataset()

samples = map(make_sample, samples)

foub.beam_import(dataset, samples)
print(dataset)

Parameters
• dataset – a fiftyone.core.dataset.Dataset

• samples – an iterable of samples. If no parse_fcn is provided, these must be fiftyone.core.sample.Sample instances. If a parse_fcn is provided, these are passed to it for parsing

• parse_fcn (None) – an optional function that converts elements of samples to fiftyone.core.sample.Sample instances

• expand_schema (True) – whether to dynamically add new sample fields encountered to the dataset schema. If False, an error is raised if a sample’s schema is not a subset of the dataset schema

• validate (True) – whether to validate that the fields of each sample are compliant with the dataset schema before adding it

• options (None) – a apache_beam.options.pipeline_options.PipelineOptions that configures how to run the pipeline. By default, the pipeline will be run via Beam’s direct runner using multiprocessing.cpu_count() threads

• verbose (False) – whether to log the Beam pipeline’s messages

fiftyone.utils.beam.beam_merge(dataset, samples, parse_fcn=None, options=None, verbose=False, **kwargs)

Merges the given samples into the dataset via Apache Beam.

This function is a parallelized alternative to fiftyone.core.dataset.Dataset.merge_samples().

Note

This function is only useful for merging in-memory samples into a dataset. If you are merging a sample collection, simply call fiftyone.core.dataset.Dataset.merge_samples().

Example:

import fiftyone as fo
import fiftyone.utils.beam as foub
import fiftyone.zoo as foz

samples = iter(dataset.select_fields("predictions"))

foub.beam_merge(dataset, samples, fields={"predictions": "predictions2"})

print(dataset.count("predictions.detections"))
print(dataset.count("predictions2.detections"))

Parameters
fiftyone.utils.beam.beam_export(sample_collection, num_shards, options=None, verbose=False, render_kwargs=None, **kwargs)

Exports the given sample collection in the specified number shards via Apache Beam.

This function is a parallelized alternative to fiftyone.core.collections.SampleCollection.export() that effectively performs the following sharded export in parallel:

for idx, (first, last) in enumerate(shards, 1):
_kwargs = render_kwargs(kwargs, idx)
sample_collection[first:last].export(**_kwargs)


Example:

from apache_beam.options.pipeline_options import PipelineOptions

import fiftyone as fo
import fiftyone.utils.beam as foub
import fiftyone.zoo as foz

# Use multithreading instead of the default multiprocessing
options = PipelineOptions(
runner="direct",
direct_num_workers=10,
)

foub.beam_export(
dataset,
num_shards=20,
options=options,
dataset_type=fo.types.TFObjectDetectionDataset,
label_field="ground_truth",
tf_records_path="/tmp/beam/tf.records-%05d-of-00020",
)

Parameters
• sample_collection – a fiftyone.core.collections.SampleCollection

• num_shards – the number of shards to write

• options (None) – a apache_beam.options.pipeline_options.PipelineOptions that configures how to run the pipeline. By default, the pipeline will be run via Beam’s direct runner using min(num_shards, multiprocessing.cpu_count()) processes

• verbose (False) – whether to log the Beam pipeline’s messages

• render_kwargs (None) – a function that renders kwargs for the current shard. The function should have signature def render_kwargs(kwargs, idx) -> kwargs, where idx in [1, num_shards] is the shard index. By default, any string-valued arguments that contain format patterns like %05d will be rendered via value % idx

• **kwargs – keyword arguments for fiftyone.core.collections.SampleCollection.export()

class fiftyone.utils.beam.ImportBatch(dataset_name, parse_fcn=None, expand_schema=True, validate=True)

Bases: apache_beam.transforms.core.DoFn

 BundleFinalizerParam alias of apache_beam.transforms.core._BundleFinalizerParam
RestrictionParam alias of apache_beam.transforms.core._RestrictionDoFnParam
StateParam alias of apache_beam.transforms.core._StateDoFnParam
TimerParam alias of apache_beam.transforms.core._TimerDoFnParam
WatermarkEstimatorParam alias of apache_beam.transforms.core._WatermarkEstimatorParam

setup()

Called to prepare an instance for processing bundles of elements.

This is a good place to initialize transient in-memory resources, such as network connections. The resources can then be disposed in DoFn.teardown.

start_bundle()

Called before a bundle of elements is processed on a worker.

Elements to be processed are split into bundles and distributed to workers. Before a worker calls process() on the first element of its bundle, it calls this method.

process(sample)

Method to use for processing elements.

This is invoked by DoFnRunner for each element of a input PCollection.

The following parameters can be used as default values on process arguments to indicate that a DoFn accepts the corresponding parameters. For example, a DoFn might accept the element and its timestamp with the following signature:

def process(element=DoFn.ElementParam, timestamp=DoFn.TimestampParam):
...


The full set of parameters is:

• DoFn.ElementParam: element to be processed, should not be mutated.

• DoFn.SideInputParam: a side input that may be used when processing.

• DoFn.TimestampParam: timestamp of the input element.

• DoFn.WindowParam: Window the input element belongs to.

• DoFn.TimerParam: a userstate.RuntimeTimer object defined by the spec of the parameter.

• DoFn.StateParam: a userstate.RuntimeState object defined by the spec of the parameter.

• DoFn.KeyParam: key associated with the element.

• DoFn.RestrictionParam: an iobase.RestrictionTracker will be provided here to allow treatment as a Splittable DoFn. The restriction tracker will be derived from the restriction provider in the parameter.

• DoFn.WatermarkEstimatorParam: a function that can be used to track output watermark of Splittable DoFn implementations.

Parameters
• element – The element to be processed

• *args – side inputs

• **kwargs – other keyword arguments.

Returns

An Iterable of output elements or None.

finish_bundle()

Called after a bundle of elements is processed on a worker.

class fiftyone.utils.beam.MergeBatch(dataset_name, parse_fcn=None, key_field='filepath', key_fcn=None, expand_schema=True, **kwargs)

Bases: apache_beam.transforms.core.DoFn

 BundleFinalizerParam alias of apache_beam.transforms.core._BundleFinalizerParam
RestrictionParam alias of apache_beam.transforms.core._RestrictionDoFnParam
StateParam alias of apache_beam.transforms.core._StateDoFnParam
TimerParam alias of apache_beam.transforms.core._TimerDoFnParam
WatermarkEstimatorParam alias of apache_beam.transforms.core._WatermarkEstimatorParam

 Returns the display data associated to a pipeline component. Called after a bundle of elements is processed on a worker. from_runner_api(fn_proto, context) Converts from an FunctionSpec to a Fn object. get_input_batch_type(input_element_type) Determine the batch type expected as input to process_batch. get_output_batch_type(input_element_type) Determine the batch type produced by this DoFn’s process_batch implementation and/or its process implementation with @yields_batch. Gets and/or initializes type hints for this object. infer_output_type(input_type) process(sample) Method to use for processing elements. process_batch(batch, *args, **kwargs) register_pickle_urn(pickle_urn) Registers and implements the given urn via pickling. register_urn(urn, parameter_type[, fn]) Registers a urn with a constructor. Called to prepare an instance for processing bundles of elements. Called before a bundle of elements is processed on a worker. Called to use to clean up this instance before it is discarded. to_runner_api(context) Returns an FunctionSpec encoding this Fn. to_runner_api_parameter(context) A decorator on process fn specifying that the fn performs an unbounded amount of work per input element. with_input_types(*arg_hints, **kwarg_hints) with_output_types(*arg_hints, **kwarg_hints) A decorator to apply to process indicating it yields batches. A decorator to apply to process_batch indicating it yields elements.
setup()

Called to prepare an instance for processing bundles of elements.

This is a good place to initialize transient in-memory resources, such as network connections. The resources can then be disposed in DoFn.teardown.

start_bundle()

Called before a bundle of elements is processed on a worker.

Elements to be processed are split into bundles and distributed to workers. Before a worker calls process() on the first element of its bundle, it calls this method.

process(sample)

Method to use for processing elements.

This is invoked by DoFnRunner for each element of a input PCollection.

The following parameters can be used as default values on process arguments to indicate that a DoFn accepts the corresponding parameters. For example, a DoFn might accept the element and its timestamp with the following signature:

def process(element=DoFn.ElementParam, timestamp=DoFn.TimestampParam):
...


The full set of parameters is:

• DoFn.ElementParam: element to be processed, should not be mutated.

• DoFn.SideInputParam: a side input that may be used when processing.

• DoFn.TimestampParam: timestamp of the input element.

• DoFn.WindowParam: Window the input element belongs to.

• DoFn.TimerParam: a userstate.RuntimeTimer object defined by the spec of the parameter.

• DoFn.StateParam: a userstate.RuntimeState object defined by the spec of the parameter.

• DoFn.KeyParam: key associated with the element.

• DoFn.RestrictionParam: an iobase.RestrictionTracker will be provided here to allow treatment as a Splittable DoFn. The restriction tracker will be derived from the restriction provider in the parameter.

• DoFn.WatermarkEstimatorParam: a function that can be used to track output watermark of Splittable DoFn implementations.

Parameters
• element – The element to be processed

• *args – side inputs

• **kwargs – other keyword arguments.

Returns

An Iterable of output elements or None.

finish_bundle()

Called after a bundle of elements is processed on a worker.

class fiftyone.utils.beam.ExportBatch(dataset_name, view_stages=None, render_kwargs=None)

Bases: apache_beam.transforms.core.DoFn

 BundleFinalizerParam alias of apache_beam.transforms.core._BundleFinalizerParam
RestrictionParam alias of apache_beam.transforms.core._RestrictionDoFnParam
StateParam alias of apache_beam.transforms.core._StateDoFnParam
TimerParam alias of apache_beam.transforms.core._TimerDoFnParam
WatermarkEstimatorParam alias of apache_beam.transforms.core._WatermarkEstimatorParam

 default_render_kwargs(kwargs, idx) Returns the display data associated to a pipeline component. Called after a bundle of elements is processed on a worker. from_runner_api(fn_proto, context) Converts from an FunctionSpec to a Fn object. get_input_batch_type(input_element_type) Determine the batch type expected as input to process_batch. get_output_batch_type(input_element_type) Determine the batch type produced by this DoFn’s process_batch implementation and/or its process implementation with @yields_batch. Gets and/or initializes type hints for this object. infer_output_type(input_type) process(element, **kwargs) Method to use for processing elements. process_batch(batch, *args, **kwargs) register_pickle_urn(pickle_urn) Registers and implements the given urn via pickling. register_urn(urn, parameter_type[, fn]) Registers a urn with a constructor. Called to prepare an instance for processing bundles of elements. Called before a bundle of elements is processed on a worker. Called to use to clean up this instance before it is discarded. to_runner_api(context) Returns an FunctionSpec encoding this Fn. to_runner_api_parameter(context) A decorator on process fn specifying that the fn performs an unbounded amount of work per input element. with_input_types(*arg_hints, **kwarg_hints) with_output_types(*arg_hints, **kwarg_hints) A decorator to apply to process indicating it yields batches. A decorator to apply to process_batch indicating it yields elements.
static default_render_kwargs(kwargs, idx)
setup()

Called to prepare an instance for processing bundles of elements.

This is a good place to initialize transient in-memory resources, such as network connections. The resources can then be disposed in DoFn.teardown.

process(element, **kwargs)

Method to use for processing elements.

This is invoked by DoFnRunner for each element of a input PCollection.

The following parameters can be used as default values on process arguments to indicate that a DoFn accepts the corresponding parameters. For example, a DoFn might accept the element and its timestamp with the following signature:

def process(element=DoFn.ElementParam, timestamp=DoFn.TimestampParam):
...


The full set of parameters is:

• DoFn.ElementParam: element to be processed, should not be mutated.

• DoFn.SideInputParam: a side input that may be used when processing.

• DoFn.TimestampParam: timestamp of the input element.

• DoFn.WindowParam: Window the input element belongs to.

• DoFn.TimerParam: a userstate.RuntimeTimer object defined by the spec of the parameter.

• DoFn.StateParam: a userstate.RuntimeState object defined by the spec of the parameter.

• DoFn.KeyParam: key associated with the element.

• DoFn.RestrictionParam: an iobase.RestrictionTracker will be provided here to allow treatment as a Splittable DoFn. The restriction tracker will be derived from the restriction provider in the parameter.

• DoFn.WatermarkEstimatorParam: a function that can be used to track output watermark of Splittable DoFn implementations.

Parameters
• element – The element to be processed

• *args – side inputs

• **kwargs – other keyword arguments.

Returns

An Iterable of output elements or None.

