Developing Data Processing Pipeline With Apache Beam

Implementation of the beam pipeline that cleans the data and writes the data to BigQuery for analysis.

The Anatomy of a Data Pipeline

Key Concepts of Pipeline
Developing Data Processing Pipeline With Apache Beam 2
Common Transforms in Pipeline

Basic flow of the pipeline

Developing Data Processing Pipeline With Apache Beam 3
Pipeline Flow
Developing Data Processing Pipeline With Apache Beam 4
Enabling API — Image By Author
Developing Data Processing Pipeline With Apache Beam 5
Creating GCS bucket — Image By Author
git clone https://github.com/aniket-g/batch-pipeline-using-apache-beam-python
gsutil cp beers.csv gs://ag-pipeline/batch/
sudo pip3 install apache_beam[gcp]
def discard_incomplete(data):
    """Filters out records that don't have an information."""
    return len(data['abv']) > 0 and len(data['id']) > 0 and len(data['name']) > 0 and len(data['style']) > 0
def convert_types(data):
    """Converts string values to their appropriate type."""
    data['abv'] = float(data['abv']) if 'abv' in data else None
    data['id'] = int(data['id']) if 'id' in data else None
    data['name'] = str(data['name']) if 'name' in data else None
    data['style'] = str(data['style']) if 'style' in data else None
    data['ounces'] = float(data['ounces']) if 'ounces' in data else None
    return data
def del_unwanted_cols(data):
    """Deleting unwanted columns"""
    del data['ibu']
    del data['brewery_id']
    return data
p = beam.Pipeline(options=PipelineOptions())(p | 'ReadData' >> beam.io.ReadFromText('gs://purchases-3/beers.csv', skip_header_lines =1)
       | 'Split' >> beam.Map(lambda x: x.split(','))
       | 'format to dict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "id": x[2], "name": x[3], "style": x[4], "ounces": x[5]}) 
       | 'DelIncompleteData' >> beam.Filter(discard_incomplete)
       | 'Convertypes' >> beam.Map(convert_types)
       | 'DelUnwantedData' >> beam.Map(del_unwanted_cols)
       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           '{0}:beer.beer_data'.format(PROJECT_ID),
           schema=SCHEMA,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
    result = p.run()
python3 batch.py --runner DataFlowRunner --project aniket-g --temp_location gs://ag-pipeline/batch/temp --staging_location gs://ag-pipeline/batch/stag --region asia-east1 --job_name drinkbeer
Developing Data Processing Pipeline With Apache Beam 6
Pipeline status — Image By Author
Developing Data Processing Pipeline With Apache Beam 7
DAG — pipeline steps — Image By Author
Developing Data Processing Pipeline With Apache Beam 8
BigQuery table — Image By Author
# Beer style with highest alcohol by volume
SELECT
  style,
  SUM(abv) AS volume
FROM
  `aniket-g.beer.beer_data`
GROUP BY
  style
ORDER BY
  volume DESC
Developing Data Processing Pipeline With Apache Beam 9
Bigquery Insights — Image By Author
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from sys import argv

PROJECT_ID = 'aniket-g'
SCHEMA = 'sr:INTEGER,abv:FLOAT,id:INTEGER,name:STRING,style:STRING,ounces:FLOAT'

def discard_incomplete(data):
    """Filters out records that don't have an information."""
    return len(data['abv']) > 0 and len(data['id']) > 0 and len(data['name']) > 0 and len(data['style']) > 0


def convert_types(data):
    """Converts string values to their appropriate type."""
    data['abv'] = float(data['abv']) if 'abv' in data else None
    data['id'] = int(data['id']) if 'id' in data else None
    data['name'] = str(data['name']) if 'name' in data else None
    data['style'] = str(data['style']) if 'style' in data else None
    data['ounces'] = float(data['ounces']) if 'ounces' in data else None
    return data

def del_unwanted_cols(data):
    """Delete the unwanted columns"""
    del data['ibu']
    del data['brewery_id']
    return data

if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    known_args = parser.parse_known_args(argv)

    p = beam.Pipeline(options=PipelineOptions())

    (p | 'ReadData' >> beam.io.ReadFromText('gs://ag-pipeline/batch/beers.csv', skip_header_lines =1)
       | 'SplitData' >> beam.Map(lambda x: x.split(','))
       | 'FormatToDict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "ibu": x[2], "id": x[3], "name": x[4], "style": x[5], "brewery_id": x[6], "ounces": x[7]}) 
       | 'DeleteIncompleteData' >> beam.Filter(discard_incomplete)
       | 'ChangeDataType' >> beam.Map(convert_types)
       | 'DeleteUnwantedData' >> beam.Map(del_unwanted_cols)
       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           '{0}:beer.beer_data'.format(PROJECT_ID),
           schema=SCHEMA,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
    result = p.run()

This article has been published from the source link without modifications to the text. Only the headline has been changed.

Source link