How to use Apach Beam using Python


Apache Beam

  • is a big data processing standard from Google (2016)
  • supports both batch and streaming data
  • is executable on many platforms such as
    • Spark
    • Flink
    • Dataflow etc.
  • has two SDK languages: Java and Python

Apache Beam has three core concepts:
  • Pipeline, which implements a Directed Acyclic Graph (DAG) of tasks.
  • PCollection, is the data structure in beam, i.e., all data during the process should be PCollection
  • Transform is where you implement your logic, and each Transform use a PCollection as an input and output another PCollection

The pipeline can be represented as follows, where | is an operator for applying transforms.

[Output PCollection] = [Input PCollection] | [Label] >> [Transform]





In this post, we provide a "hello world" example for word counting using Apach Beam in Python 3.


from past.builtins import unicode
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

with beam.Pipeline(options=PipelineOptions()) as p:
    lines = p | "Create" >> beam.Create(["cat dog", "snake cat", "dog cat cat"])
    counts = (
        lines 
        | "Split" >> (beam.FlatMap(lambda x: x.split(" "))
                           .with_output_types(unicode))
        | "Pair with one" >> beam.Map(lambda x: (x, 1))
        | "Group and sum" >> beam.CombinePerKey(sum)
        
    )
    counts | "Print" >> beam.ParDo(
        lambda w_c: print('%s: %s' % (w_c[0], w_c[1]))
    )


which will print out

cat: 3
dog: 3
tiger: 1
fish: 2

Let's have a brief overview of what the code is doing
  • beam.Create creates a PCollection from memory data, and is usually used for testing;
  • beam.FlatMap has two actions which are Map and Flatten
  • beam.Map is a mapping action to map a word string to (word, 1)
  • beam.CombinePerKey applies to two-element tuples, which groups by the first element, and applies the provided function to the list of second elements
  • beam.ParDo here is used for basic transform to print out the counts

Transforms


Clearly, Transforms play important roles on transforming data in the pipeline. Some commonly used higher-level transforms are listed as below:

Transform Meaning
Create(value) Creates a PCollection from an iterable.
Filter(fn) Use callable fn to filter out elements.
Map(fn) Use callable fn to do a one-to-one transformation.
FlatMap(fn) Similar to Map, but fn needs to return an iterable of zero or more elements, and these iterables will be flattened into one PCollection.
Flatten() Merge several PCollections into a single one.
Partition(fn) Split a PCollection into several partitions. fn is a PartitionFn or a callable that accepts two arguments - element, num_partitions.
GroupByKey() Works on a PCollection of key/value pairs (two-element tuples), groups by common key, and returns (key, iter<value>) pairs.
CoGroupByKey() Groups results across several PCollections by key. e.g. input (k, v) and (k, w), output (k, (iter<v>, iter<w>)).
RemoveDuplicates() Get distint values in PCollection.
CombinePerKey(fn) Similar to GroupByKey, but combines the values by a CombineFn or a callable that takes an iterable, such as sum, max.
CombineGlobally(fn) Reduces a PCollection to a single value by applying fn.

 





How to write your own Transforms then?


Good question! We can easily write our own Transforms with the following format with beam's DoFn


class SplitFn(beam.DoFn):
    def process(self, element):
        return element.split(" ")


You just need to write your own class for beam.DoFn, and your logic goes in the process function. Now, we can use our own Transforms instead of built-in FlatMap with split()


        | "Split" >> (beam.ParDo(SplitFn())
                           .with_output_types(unicode))