Implementation of the beam pipeline that cleans the data and writes the data to BigQuery for analysis.
There are various technologies related to big data in the market such as Hadoop, Apache Spark, Apache Flink, etc, and maintaining those is a big challenge for both developers and businesses. Which tool is the best for batch and streaming data? Are the performance and speed of one particular tool enough in our use case? How should you integrate different data sources? If these questions often appear in your business, you may want to consider Apache Beam.
Apache Beam is an open-source, unified model for constructing both batch and streaming data processing pipelines. Beam supports multiple language-specific SDKs for writing pipelines against the Beam Model such as Java, Python, and Go and Runners for executing them on distributed processing backends, including Apache Flink, Apache Spark, Google Cloud Dataflow and Hazelcast Jet.
We will be running this pipeline using Google Cloud Platform products so you need to avail your free offer of using these products up to their specified free usage limit, New users will also get $300 to spend on Google Cloud Platform products during your free trial.
Here we are going to use Python SDK and Cloud Dataflow to run the pipeline.
The Anatomy of a Data Pipeline
- Pipeline: manages a directed acyclic graph (DAG) of PTransforms and PCollections that is ready for execution.
- PCollection: represents a collection of bounded or unbounded data.
- PTransform: transforms input PCollections into output PCollections.
- PipelineRunner: represents where and how the pipeline should execute.
- I/O transform: Beam comes with a number of “IOs” — library PTransforms that read or write data to various external storage systems.
I have clipped some commonly used higher-level transforms (Ptransforms) below, we are going to use some of them in our pipeline.
ParDo is a primary beam transform for generic parallel processing which is not in the above image. The ParDo processing paradigm is similar to the “Map” phase of a Map/Shuffle/Reduce-style algorithm: a ParDo transform considers each element in the input PCollection, performs some processing on that element, and emits zero, or multiple elements to an output PCollection.
Pipe ‘|’ is the operator to apply transforms, and each transform can be optionally supplied with a unique label. Transforms can be chained, and we can compose arbitrary shapes of transforms, and at runtime, they’ll be represented as DAG.
The above concepts are core to create the apache beam pipeline, so let’s move further to create our first batch pipeline which will clean the dataset and write it to BigQuery.
Basic flow of the pipeline
- Read the data from google cloud storage bucket (Batch).
- Apply some transformations such as splitting data by comma separator, dropping unwanted columns, convert data types, etc.
- Write the data into data Sink (BigQuery) and analyze it.
Here we are going to use Craft Beers Dataset from Kaggle.
Description of the beer dataset
abv: The alcoholic content by volume with 0 being no alcohol and 1 being pure alcohol
ibu: International bittering units, which specify how bitter a drink is
name: The name of the beer
style: Beer style (lager, ale, IPA, etc.)
brewery_id: Unique identifier for a brewery that produces this beer
ounces: Size of beer in ounces
We will upload this dataset to google cloud bucket.
Before we run the pipeline, we need to enable Dataflow and Bigquery APIs. Type Dataflow API in GCP search box and enable it.
Similarly, you need to enable BigQuery API.
Dataflow will use cloud bucket as a staging location to store temporary files. We will create a cloud storage bucket and choose the nearest location (Region).
For example— if you are in Asia, you must select Asia region for the speed and performance of computation (Dataflow Job).
We will create BigQuery dataset and table with the appropriate schema as a data sink where our output from the dataflow job will reside in. The Dataset region will be your nearest location. It is Asia-south1 (Mumbai) in our case. You need to provide the output schema (already given in batch.py) while creating the table in BigQuery.
Next open cloud shell editor and set your project property if it is not already set and will clone the GitHub repository which has all supported files and data.
git clone https://github.com/aniket-g/batch-pipeline-using-apache-beam-python
Once it is done, change into the directory where all files reside.
Now copy the beer.csv file into our bucket using the command given below.
gsutil cp beers.csv gs://ag-pipeline/batch/
Alternatively, you can upload that CSV file by going to the Storage Bucket.
To run the pipeline, you need to have Apache Beam library installed on Virtual Machine.
sudo pip3 install apache_beam[gcp]
That’s all.
Now we will walk through the pipeline code to know how it works. Mostly we will look at the Ptransforms in the pipeline.
def discard_incomplete(data):
"""Filters out records that don't have an information."""
return len(data['abv']) > 0 and len(data['id']) > 0 and len(data['name']) > 0 and len(data['style']) > 0
We have filtered out the data which does not have information or null values in it.
def convert_types(data):
"""Converts string values to their appropriate type."""
data['abv'] = float(data['abv']) if 'abv' in data else None
data['id'] = int(data['id']) if 'id' in data else None
data['name'] = str(data['name']) if 'name' in data else None
data['style'] = str(data['style']) if 'style' in data else None
data['ounces'] = float(data['ounces']) if 'ounces' in data else None
return data
The above function will convert the string values to their appropriate data type.
def del_unwanted_cols(data):
"""Deleting unwanted columns"""
del data['ibu']
del data['brewery_id']
return data
In the above function, we deleted unwanted columns which ended up in cleaned data.
p = beam.Pipeline(options=PipelineOptions())(p | 'ReadData' >> beam.io.ReadFromText('gs://purchases-3/beers.csv', skip_header_lines =1) | 'Split' >> beam.Map(lambda x: x.split(',')) | 'format to dict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "id": x[2], "name": x[3], "style": x[4], "ounces": x[5]}) | 'DelIncompleteData' >> beam.Filter(discard_incomplete) | 'Convertypes' >> beam.Map(convert_types) | 'DelUnwantedData' >> beam.Map(del_unwanted_cols) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( '{0}:beer.beer_data'.format(PROJECT_ID), schema=SCHEMA, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) result = p.run()
beam.io.ReadFromText — reads the data from external sources into the PCollection
beam.map — works like ParDo, applied Map in multiple ways to transform every element in PCollection. The Map accepts a function that returns a single element for every input element in the PCollection.
beam.Filter — accepts a function that keeps elements that return True, and filters out the remaining elements.
beam.io.WriteToBigQuery — Write transform to a BigQuerySink accepts PCollections of dictionaries. It requires the following arguments
- TableReference can be a PROJECT:DATASET.TABLE or DATASET.TABLE string.
- TableSchema can be a NAME:TYPE{,NAME:TYPE}* string (e.g. ‘month:STRING,event_count:INTEGER’).
Now we run pipeline using dataflow runner using the following syntax.
python3 batch.py --runner DataFlowRunner --project aniket-g --temp_location gs://ag-pipeline/batch/temp --staging_location gs://ag-pipeline/batch/stag --region asia-east1 --job_name drinkbeer
Currently, Dataflow provides regional endpoints for some regions which do not include Asia-south1 hence I chose Asia-east1 in Region.
- project — The ID of your Google Cloud project.
- runner — The pipeline runner that will parse your program and construct your pipeline. It can be direct runner also if you want to debug your pipeline. Here we are using Dataflow runner.
- staging_location — A Cloud Storage path for Dataflow to stage code packages needed by workers executing the job.
- temp_location — A Cloud Storage path for Dataflow to stage temporary job files created during the execution of the pipeline.
- region — You can specify region where you want to run your dataflow runner.
- job_name (Optional) — Give any name to the dataflow pipeline.
Now go to Dataflow, you can see your job is running of batch type.
Once it is completed and succeeded, you will see results in the BigQuery beer_data table.
Now we can query out the data to get some insights.
# Beer style with highest alcohol by volume
SELECT
style,
SUM(abv) AS volume
FROM
`aniket-g.beer.beer_data`
GROUP BY
style
ORDER BY
volume DESC
Review the code:
import apache_beam as beam import argparse from apache_beam.options.pipeline_options import PipelineOptions from sys import argv PROJECT_ID = 'aniket-g' SCHEMA = 'sr:INTEGER,abv:FLOAT,id:INTEGER,name:STRING,style:STRING,ounces:FLOAT' def discard_incomplete(data): """Filters out records that don't have an information.""" return len(data['abv']) > 0 and len(data['id']) > 0 and len(data['name']) > 0 and len(data['style']) > 0 def convert_types(data): """Converts string values to their appropriate type.""" data['abv'] = float(data['abv']) if 'abv' in data else None data['id'] = int(data['id']) if 'id' in data else None data['name'] = str(data['name']) if 'name' in data else None data['style'] = str(data['style']) if 'style' in data else None data['ounces'] = float(data['ounces']) if 'ounces' in data else None return data def del_unwanted_cols(data): """Delete the unwanted columns""" del data['ibu'] del data['brewery_id'] return data if __name__ == '__main__': parser = argparse.ArgumentParser() known_args = parser.parse_known_args(argv) p = beam.Pipeline(options=PipelineOptions()) (p | 'ReadData' >> beam.io.ReadFromText('gs://ag-pipeline/batch/beers.csv', skip_header_lines =1) | 'SplitData' >> beam.Map(lambda x: x.split(',')) | 'FormatToDict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "ibu": x[2], "id": x[3], "name": x[4], "style": x[5], "brewery_id": x[6], "ounces": x[7]}) | 'DeleteIncompleteData' >> beam.Filter(discard_incomplete) | 'ChangeDataType' >> beam.Map(convert_types) | 'DeleteUnwantedData' >> beam.Map(del_unwanted_cols) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( '{0}:beer.beer_data'.format(PROJECT_ID), schema=SCHEMA, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) result = p.run()
The main objective of this article is to demonstrate how we can create a cleaning pipeline using an apache beam. I have used only one dataset which has beers information while another dataset has breweries information which could have given more insights.
This article has been published from the source link without modifications to the text. Only the headline has been changed.