Big Data Processing with Apache Beam
Big Data Processing with Apache Beam
06 October 2021
Introduction
In this world, daily every minute, every second, lots of data is generated from a variety of data sources. So, it is very tedious to extract and process information from it. In order to solve these problems, Apache Beam comes into the picture.
Apache Beam is an open-source unified programming model to define and execute data processing pipelines, transformation, including ETL and processing batch and streaming data. Using your favorite programming language (Python, Java and Go currently), you can use Apache Beam SDK for your jobs and execute your pipeline on your favorite runner like Apache Spark, Apache Flink, Cloud Dataflow, Amazon Kinesis, etc.
Data Ingestion & type of data
Our data is of two types; batch data and streaming data. Depending on the use cases we choose different architectural models to process our data. Here, we will move ahead by using Python code for further operations. Apache Beam SDK requires Python version 3.6 or higher. Now, Install the apache beam SDK using the following command.
Local
pip install apache-beam
Google Cloud Platform
pip install apache-beam[gcp]
Amazon Web Server
pip install apache-beam[aws]
For I/O operations you can read and write data from various data sources like Avro, Parquet, BigQuery, PubSub, MongoDB, TFRecord, etc.
Batch Data
First of all, collect historical data into data lakes where we put raw data (unprocessed data). To do some processing and transformation, put the data into a storage service (S3 bucket, Cloud storage, on-premise storage device, etc). This is called the extraction of data from data lakes.
beam.io.ReadFromText(‘’)
Stream Data
This is real-time data generated from data centers, automobiles, Maps, Health care, log devices, and sensors, etc. For ingesting streaming data, use Apache Kafka or any other messaging services (like Cloud Pub/Sub, SNS). In the Pub/Sub, you can filter data according to your need.
beam.io.ReadFromPubSub(subscription=subscription_name)
Processing & Transform
First, create a Pipeline object and set the pipeline execution environment (Apache Spark, Apache Flink, Cloud Dataflow and Amazon Kinesis, etc.). Now, create a Pcollection from some external storage or data source then apply PTransforms to transform each element in the Pcollection to produce output Pcollection.
You can filter, group, analyze or do other processing on data. Finally, store the final Pcollection to some external storage system using I/O libraries. When you run this pipeline, it creates a workflow graph of the pipeline, which executes asynchronously on the runner engine.
Pipelines – It encapsulates the entire process of reading bounded or unbounded data from the external sources, transforming it, and saving the output into external storage sources like BigQuery, etc.
Pcollections – It defines the data on which the data pipeline works, it could be either bounded data or unbounded data. We can create Pcollections from any external system (Data lakes, geographical data, health care).
PTransforms – It takes Pcollection as an input data, applies processing function(ParDo, Map, Filter etc) on it, and produces another Pcollection.
Pipeline IO – It enables you to read/write data from/to various external sources.
Windowing
Windowing is a mechanism for handling streaming or unbounded data. Windowing divides data based on the timestamp value. Windowing becomes especially important when you are creating a pipeline on unbounded data and do some aggregated transformations like groupByKey and CoGroupByKey because to aggregate data based on some key value.
These are 4 different kinds of windows that divide the elements of your Pcollection.
- Fixed Time Windows – Fixed-size with non-overlapping windows.
- Sliding Time Windows – Fixed-size with overlapping windows that has a window duration.
- Per-Session Windows – It contains elements that are within a certain gap duration of another element.
- Single Global Window – By default, all elements in your Pcollection are in a single global window.
from apache_beam.transforms.window import (
TimestampedValue,
Sessions,
Duration,
GlobalWindows, FixedWindows, SlidingWindows,
)
Trigger
Window results can be at different moments. If we receive partial results, they can be computed earlier; before the end of the window. Hence, it will produce early results. In order to control late events, there is a mechanism called triggers.
Apache Beam has 4 different types of triggers:
- Event time trigger – This trigger is based on the element’s event time property.
- Processing time trigger – This trigger is based on processing time.
- Data-driven trigger – It uses the number of data arrived to make the computation.
- SComposite trigger – It allows combining different types of triggers with predicates.
Deploy on Google Cloud Dataflow Engine
Once you complete the Beam pipeline you can run it on either an on-premise, cloud, or local system. For the GCP environment, Cloud Dataflow is a service offered by Google Cloud for executing the Apache Beam pipeline.
Now, open your notebook instances and create your beam pipeline in Python.
import apache_beam as beam
argv = [
‘–project={0}’.format(project),
‘–job_name=ch04timecorr’,
‘–save_main_session’,
‘–staging_location=gs://{0}/healthcare/staging/’.format(bucket),
‘–temp_location=gs://{0}/healthcare/temp/’.format(bucket),
‘–setup_file=./setup.py’,
‘–max_num_workers=8’,
‘–region={}’.format(region),
‘–autoscaling_algorithm=THROUGHPUT_BASED’,
‘–runner=DataflowRunner’
]
def run(project, bucket, dataset, region):
pipeline = beam.Pipeline(argv=argv)
# do beam processing and operations
pipeline.run()
if __name__ == ‘__main__’:
import argparse
parser = argparse.ArgumentParser(description=’Run pipeline on the GCP)
parser.add_argument(‘-p’,’–project’, help=’Unique project ID’, required=True)
parser.add_argument(‘-b’,’–bucket’, help=’Bucket where your data were ingested.’, required=True)
parser.add_argument(‘-r’,’–region’, help=’Region in which to run the job.’, required=True)
parser.add_argument(‘-d’,’–dataset’, help=’Google BigQuery dataset’, default=’healthcare’)
args = vars(parser.parse_args())
run(project=args[‘project’], bucket=args[‘bucket’], dataset=args[‘dataset’], region=args[‘region’])
Google Cloud Dataflow is a fully managed data real time data analytics service that minimizes latency. It costs you according to the demand. It is a fully managed service that is autoscale on-demand. Here you don’t need to worry about DevOps work and infrastructure overhead. Using google managed services like Cloud Pub/Sub, Dataflow, and Bigquery you can easily create a real-time data analytics pipeline for ETL kind of job.