Apache Beam
- is a big data processing standard from Google (2016)
- supports both batch and streaming data
- is executable on many platforms such as
- Spark
- Flink
- Dataflow etc.
- has two SDK languages: Java and Python
Apache Beam has three core concepts:
- Pipeline, which implements a Directed Acyclic Graph (DAG) of tasks.
- PCollection, is the data structure in beam, i.e., all data during the process should be PCollection
- Transform is where you implement your logic, and each Transform use a PCollection as an input and output another PCollection
The pipeline can be represented as follows, where | is an operator for applying transforms.
[Output PCollection] = [Input PCollection] | [Label] >> [Transform]
In this post, we provide a "hello world" example for word counting using Apach Beam in Python 3.
from past.builtins import unicode
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
with beam.Pipeline(options=PipelineOptions()) as p:
lines = p | "Create" >> beam.Create(["cat dog", "snake cat", "dog cat cat"])
counts = (
lines
| "Split" >> (beam.FlatMap(lambda x: x.split(" "))
.with_output_types(unicode))
| "Pair with one" >> beam.Map(lambda x: (x, 1))
| "Group and sum" >> beam.CombinePerKey(sum)
)
counts | "Print" >> beam.ParDo(
lambda w_c: print('%s: %s' % (w_c[0], w_c[1]))
)
which will print out
cat: 3
dog: 3
tiger: 1
fish: 2
Let's have a brief overview of what the code is doing- beam.Create creates a PCollection from memory data, and is usually used for testing;
- beam.FlatMap has two actions which are Map and Flatten
- beam.Map is a mapping action to map a word string to (word, 1)
- beam.CombinePerKey applies to two-element tuples, which groups by the first element, and applies the provided function to the list of second elements
- beam.ParDo here is used for basic transform to print out the counts
Transforms
Clearly, Transforms play important roles on transforming data in the pipeline. Some commonly used higher-level transforms are listed as below:
Transform | Meaning |
---|---|
Create(value) | Creates a PCollection from an iterable. |
Filter(fn) | Use callable fn to filter out elements. |
Map(fn) | Use callable fn to do a one-to-one transformation. |
FlatMap(fn) | Similar to Map , but fn needs to return an iterable of zero or more elements, and these iterables will be flattened into one PCollection. |
Flatten() | Merge several PCollections into a single one. |
Partition(fn) | Split a PCollection into several partitions. fn is a PartitionFn or a callable that accepts two arguments - element , num_partitions . |
GroupByKey() | Works on a PCollection of key/value pairs (two-element tuples), groups by common key, and returns (key, iter<value>) pairs. |
CoGroupByKey() | Groups results across several PCollections by key. e.g. input (k, v) and (k, w) , output (k, (iter<v>, iter<w>)) . |
RemoveDuplicates() | Get distint values in PCollection. |
CombinePerKey(fn) | Similar to GroupByKey , but combines the values by a CombineFn or a callable that takes an iterable, such as sum , max . |
CombineGlobally(fn) | Reduces a PCollection to a single value by applying fn . |
How to write your own Transforms then?
Good question! We can easily write our own Transforms with the following format with beam's DoFn
class SplitFn(beam.DoFn):
def process(self, element):
return element.split(" ")
You just need to write your own class for beam.DoFn, and your logic goes in the process function. Now, we can use our own Transforms instead of built-in FlatMap with split()
| "Split" >> (beam.ParDo(SplitFn())
.with_output_types(unicode))