HomeData EngineeringData DIYDeveloping Data Processing Pipeline With Apache Beam

Developing Data Processing Pipeline With Apache Beam

Implementation of the beam pipeline that cleans the data and writes the data to BigQuery for analysis.

The Anatomy of a Data Pipeline

Key Concepts of Pipeline
Developing Data Processing Pipeline With Apache Beam 2
Common Transforms in Pipeline

Basic flow of the pipeline

Developing Data Processing Pipeline With Apache Beam 3
Pipeline Flow
Developing Data Processing Pipeline With Apache Beam 4
Enabling API — Image By Author
Developing Data Processing Pipeline With Apache Beam 5
Creating GCS bucket — Image By Author
git clone https://github.com/aniket-g/batch-pipeline-using-apache-beam-python
gsutil cp beers.csv gs://ag-pipeline/batch/
sudo pip3 install apache_beam[gcp]
def discard_incomplete(data):
    """Filters out records that don't have an information."""
    return len(data['abv']) > 0 and len(data['id']) > 0 and len(data['name']) > 0 and len(data['style']) > 0
def convert_types(data):
    """Converts string values to their appropriate type."""
    data['abv'] = float(data['abv']) if 'abv' in data else None
    data['id'] = int(data['id']) if 'id' in data else None
    data['name'] = str(data['name']) if 'name' in data else None
    data['style'] = str(data['style']) if 'style' in data else None
    data['ounces'] = float(data['ounces']) if 'ounces' in data else None
    return data
def del_unwanted_cols(data):
    """Deleting unwanted columns"""
    del data['ibu']
    del data['brewery_id']
    return data
p = beam.Pipeline(options=PipelineOptions())(p | 'ReadData' >> beam.io.ReadFromText('gs://purchases-3/beers.csv', skip_header_lines =1)
       | 'Split' >> beam.Map(lambda x: x.split(','))
       | 'format to dict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "id": x[2], "name": x[3], "style": x[4], "ounces": x[5]}) 
       | 'DelIncompleteData' >> beam.Filter(discard_incomplete)
       | 'Convertypes' >> beam.Map(convert_types)
       | 'DelUnwantedData' >> beam.Map(del_unwanted_cols)
       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           '{0}:beer.beer_data'.format(PROJECT_ID),
           schema=SCHEMA,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
    result = p.run()
python3 batch.py --runner DataFlowRunner --project aniket-g --temp_location gs://ag-pipeline/batch/temp --staging_location gs://ag-pipeline/batch/stag --region asia-east1 --job_name drinkbeer
Developing Data Processing Pipeline With Apache Beam 6
Pipeline status — Image By Author
Developing Data Processing Pipeline With Apache Beam 7
DAG — pipeline steps — Image By Author
Developing Data Processing Pipeline With Apache Beam 8
BigQuery table — Image By Author
# Beer style with highest alcohol by volume
SELECT
  style,
  SUM(abv) AS volume
FROM
  `aniket-g.beer.beer_data`
GROUP BY
  style
ORDER BY
  volume DESC
Developing Data Processing Pipeline With Apache Beam 9
Bigquery Insights — Image By Author
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from sys import argv

PROJECT_ID = 'aniket-g'
SCHEMA = 'sr:INTEGER,abv:FLOAT,id:INTEGER,name:STRING,style:STRING,ounces:FLOAT'

def discard_incomplete(data):
    """Filters out records that don't have an information."""
    return len(data['abv']) > 0 and len(data['id']) > 0 and len(data['name']) > 0 and len(data['style']) > 0


def convert_types(data):
    """Converts string values to their appropriate type."""
    data['abv'] = float(data['abv']) if 'abv' in data else None
    data['id'] = int(data['id']) if 'id' in data else None
    data['name'] = str(data['name']) if 'name' in data else None
    data['style'] = str(data['style']) if 'style' in data else None
    data['ounces'] = float(data['ounces']) if 'ounces' in data else None
    return data

def del_unwanted_cols(data):
    """Delete the unwanted columns"""
    del data['ibu']
    del data['brewery_id']
    return data

if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    known_args = parser.parse_known_args(argv)

    p = beam.Pipeline(options=PipelineOptions())

    (p | 'ReadData' >> beam.io.ReadFromText('gs://ag-pipeline/batch/beers.csv', skip_header_lines =1)
       | 'SplitData' >> beam.Map(lambda x: x.split(','))
       | 'FormatToDict' >> beam.Map(lambda x: {"sr": x[0], "abv": x[1], "ibu": x[2], "id": x[3], "name": x[4], "style": x[5], "brewery_id": x[6], "ounces": x[7]}) 
       | 'DeleteIncompleteData' >> beam.Filter(discard_incomplete)
       | 'ChangeDataType' >> beam.Map(convert_types)
       | 'DeleteUnwantedData' >> beam.Map(del_unwanted_cols)
       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           '{0}:beer.beer_data'.format(PROJECT_ID),
           schema=SCHEMA,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
    result = p.run()

This article has been published from the source link without modifications to the text. Only the headline has been changed.

Source link

Most Popular