Building production-ready data pipelines using Dataflow

This document explains best practices you should consider as you’re developing and testing your pipeline. It’s part of a series that helps you improve the production readiness of your data pipelines by using Dataflow. The series is intended for a technical audience whose responsibilities include developing, deploying, and monitoring Dataflow pipelines and who have a working understanding of Dataflow and Apache Beam.

The documents in the series include the following parts:

  • Overview
  • Planning data pipelines
  • Developing and testing data pipelines (this document)
  • Deploying data pipelines
  • Monitoring data pipelines

The way that the code for your pipeline is implemented has a significant influence on how well the pipeline performs in production. To help you create pipeline code that works correctly and efficiently, this document explains the following:

  • Pipeline runners to support code execution in the different stages of development and deployment.
  • Deployment environments that let you run pipelines during development, testing, preproduction, and production.
  • Open source pipeline code and templates that you can use as is, or as the basis for new pipelines to accelerate code development.
  • Coding best practices for developing your pipeline to improve pipeline observability and performance. Many of these practices are applicable to programming using the Apache Beam SDK (the examples focus on Java) and are not specific to Dataflow. However, in many cases, Dataflow provides features that complement these coding practices for improved production readiness.
  • A best-practices approach for testing pipeline code. First, this document provides an overview that includes the scope and relationship of different test types, such as unit tests, integration tests, and end-to-end tests. Second, each type of test is explored in detail, including methods to create and integrate with test data, and which pipeline runners to use for each test.

We recommend that you read the Testing your pipeline section of this document before you move to the next document in the series, Deploying data pipelines. That document further explains the role of deployment environments for different tests and how to drive test suite automation using continuous integration (CI).

During development and testing, you use different Beam runners to execute pipeline code. The Apache Beam SDK provides a Direct Runner for local development and testing. The Direct Runner can also be used by your release automation tooling (for example, within your CI pipeline) for unit tests and integration tests.

Pipelines that are deployed to Dataflow use the Dataflow Runner, which runs your pipeline in production-like environments. Additionally, the Dataflow Runner can be used for ad hoc development testing, and for end-to-end pipeline tests.

Although this document focuses on running pipelines that were built using the Apache Beam Java SDK, Dataflow also supports Beam pipelines that were developed using other languages. The Apache Beam Java and Python SDKs are generally available for Dataflow, and experimental support is provided for Go. SQL developers can also use Apache Beam SQL and Dataflow SQL to create pipelines that use familiar SQL dialects.

We recommend that you create deployment environments to separate users, data, code, and other resources across different stages of development. When possible, use separate Google Cloud projects to provide isolated environments for the different stages of pipeline development.

The following sections describe a minimal set of deployment environments. You typically establish additional environments based on your needs.

The local environment is a developer’s workstation. Developers should use the Direct Runner to execute their pipeline code locally for development and rapid testing.

Pipelines that are executed locally using the Direct Runner can interact with remote Google Cloud resources, such as Pub/Sub topics or BigQuery tables. Give individual developers separate Cloud projects, so that they have a sandbox for ad hoc testing with Google Cloud services.

Some Google Cloud services, such as Pub/Sub and Cloud Bigtable, provide emulators for local development. You can use these emulators with the Direct Runner to enable end-to-end local development and testing.

The sandbox environment is a Google Cloud project that provides developers with access to Google Cloud services during code development. Pipeline developers can share a Google Cloud project with other developers, or use their own individual projects. Using individual projects reduces planning complexity relating to shared resource usage and quota management.

Developers use the sandbox environment to perform ad hoc pipeline execution using the Dataflow Runner. This is useful for debugging and testing during the code development phase. For example, ad hoc pipeline execution lets developers do the following:

  • Observe the effect of code changes on scaling behavior.
  • Understand potential differences between the behavior of the Direct Runner and the Dataflow Runner.
  • Understand how Dataflow applies graph optimizations.

For ad hoc testing, developers can deploy code from their local environment to Dataflow directly (for example, using Maven) instead of submitting a remote build each time.

The preproduction environment is for development phases that should run in production-like conditions, such as end-to-end testing. Use a separate project for the preproduction environment, and configure it to be similar to production where possible. Similarly, to allow end-to-end tests with production-like scale, make Cloud project quotas for Dataflow and other services used by the pipeline as similar as possible to the production environment.

Depending on your requirements, you can further separate preproduction into multiple environments. For example, a quality control environment can support the work of quality analysts to test SLIs such as data correctness, freshness, and performance under different workload conditions.

End-to-end tests include integration with data sources and sinks within the scope of testing. Consider how these can be made available within the preproduction environment. Test data can be stored in the preproduction environment itself—for example, test data is stored in a Cloud Storage bucket with your input data. In other cases, test data might originate from outside the preproduction environment, such as a Pub/Sub topic (through a separate subscription) that’s in the production environment.

You should also use the preproduction environment to test pipeline updates before any changes are made to production. It’s important to test and verify update procedures for streaming pipelines, particularly if it’s necessary to coordinate multiple steps (such as with running parallel pipelines) to avoid downtime.

The production environment lives in a dedicated Cloud project. Continuous delivery copies deployment artifacts to the production environment when all end-to-end tests have passed.

This section discusses coding and development best practices. Many of these practices complement and enhance aspects of pipeline development and operationalization, such as improving developer productivity, promoting pipeline testability, increasing performance, and enabling deeper insights with monitoring.

Before you begin development, you should set up deployment environments that support your development, testing, and delivery lifecycle.

To accelerate pipeline development, check to see if Google provides an existing Dataflow template that you can use directly with no coding effort. Some templates allow you to add custom logic as a pipeline step—for example, the Pub/Sub topic to BigQuery template provides a parameter to execute a JavaScript user-defined function (UDF) that’s stored in Cloud Storage. Google-provided templates are open source under the Apache License 2.0, so you can use them as the basis for new pipelines. The templates are also useful as code examples for reference.

The Beam programming model unifies batch and streaming data processing, so the reusability of transforms is a key benefit. Creating a shared library of common transforms promotes reusability, testability, and code ownership by different teams.

Consider the following two Java code examples, which both read payment events. The first one is from an unbounded source (Pub/Sub):

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

// Initial read transform
PCollection<PaymentEvent> payments = 
    p.apply("Read from topic", 
        PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
        .apply("Parse strings into payment events",
            ParDo.of(new ParsePaymentEventFn()));

The second one is from a bounded source (a relational database):

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection<PaymentEvent> payments = 
    p.apply(
        "Read from database table", 
        JdbcIO.<PaymentEvent>read()
            .withDataSourceConfiguration(...)
            .withQuery(...)
            .withRowMapper(new RowMapper<PaymentEvent>() {
              ...
            }));

Assuming that both pipelines perform the same processing, they can use the same transforms through a shared library for the remaining processing steps. How you implement code reusability best practices varies by programming language and build tool. For example, if you use Maven, you can separate transform code into its own module. You can then include the module as a submodule in larger multi-module projects for different pipelines, as shown in the following code example:

// Reuse transforms across both pipelines
payments
    .apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
    .apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
  ...

For more information, see the Apache Beam documentation for best practices on writing user code for Beam transforms and for the recommended style guide for PTransforms.

Your pipeline might encounter situations where it’s not possible to process elements. This can occur for different reasons, but a common cause is data issues. For example, an element that contains badly formatted JSON can cause parsing failures.

In this situation, one approach is to catch an exception within the DoFn.ProcessElement method. In your exception block, you log the error and drop the element. However, this causes the data to be lost and prevents the data from being inspected later for manual handling or troubleshooting.

A better approach is to use a pattern called a dead letter queue (or dead letter file). You catch exceptions in the DoFn.ProcessElement method and log errors as you normally would. But instead of dropping the failed element, you use branching outputs to write failed elements into a separate PCollection object. These elements can then be written to a data sink for later inspection and handling by using a separate transform. For more information, see Handling Invalid Inputs in Dataflow on the Google Cloud blog.

The following Java code example from the blog post shows how to implement the dead letter queue pattern.

TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};

PCollection<Input> input = /* ... */;

PCollectionTuple outputTuple =
    input.apply(ParDo.of(new DoFn<Input, Output>() {
      @Override
      void processElement(ProcessContext c) {
        try {
          c.output(process(c.element()));
        } catch (Exception e) {
          LOG.severe("Failed to process input {} -- adding to dead letter file",
              c.element(), e);
          c.sideOutput(deadLetterTag, c.element());
        }
      }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
    .apply(BigQueryIO.write(...));

// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...

You can use Cloud Monitoring to apply different monitoring and alerting policies for your pipeline’s dead letter queue. For example, you can visualize the number and size of elements processed by your dead letter transform and configure alerting to trigger if certain threshold conditions are met.

You can handle data that has unexpected (but valid) schemas using a dead letter pattern, which just writes failed elements to a separate PCollection object. However, there could be cases where you want to automatically handle elements that reflect a mutated schema as valid elements. For example, if an element’s schema reflects a mutation like the addition of new fields, you can adapt the schema of the data sink to accommodate mutations.

Automatic schema mutation relies on the branching-output approach used by the dead letter pattern. However, in this case it triggers a transform that mutates the destination schema whenever additive schemas are encountered. For an example of this approach, see How to handle mutating JSON schemas in a streaming pipeline, with Square Enix on the Google Cloud blog.

Be careful when you use automatic schema mutation, because there are circumstances where you don’t want a schema mutation. This can include situations where unintended inputs into the pipeline have incorrect schemas (not just additive ones) that should not trigger schema mutations in data sinks. You can help avoid unintended consequences by manually handling schema mutations using a dead letter queue. You probably want to use manual handling if the data sources are not fully trusted or if you want to be sure that you don’t automatically mutate the schema.

Joining datasets is a common use case for data pipelines. Side inputs provide a flexible way to solve common data processing problems such as data enrichment and keyed lookups. Unlike PCollection objects, side inputs are also mutable, and they can be determined at runtime. For example, the values in a side input might be computed by another branch in your pipeline or determined by calling a remote service.

Dataflow supports side inputs by persisting data into persistent storage (similar to a shared disk), which makes the complete side input available to all workers. This means that side input sizes can be very large and might not fit into worker memory. Reading from a large side input can cause performance issues if workers need to constantly read from persistent storage.

The CoGroupByKey transform is a core Beam transform that merges (flattens) multiple PCollection objects and groups elements that have a common key. Unlike a side input, which makes the entire side input data available to each worker, CoGroupByKey performs a shuffle (grouping) operation to distribute data across workers. CoGroupByKey is therefore ideal when the PCollection objects you want to join are very large and don’t fit into worker memory.

Follow these guidelines to help decide whether to use side inputs or CoGroupByKey:

  • Use side inputs when one of the PCollection objects you are joining is disproportionately smaller than the other, and where the smaller PCollection object fits into worker memory. Caching the side input entirely into memory makes it fast and efficient to fetch elements.
  • Use CoGroupByKey if you need to fetch a large proportion of a PCollection object that significantly exceeds worker memory.
  • Use side inputs when you have a PCollection object that should be joined multiple times in your pipeline. Instead of using multiple CoGroupByKey transforms, you can create a single side input that can be reused by multiple ParDo transforms.

For more information, see the Side input metrics section of Using the Dataflow monitoring interface.

DoFn instance processes batches of elements called bundles, which are atomic units of work that consist of zero or more elements. Individual elements are then processed by the DoFn.ProcessElement method, which is executed for every element. Because the DoFn.ProcessElement method is called for every element, any time-consuming or computationally expensive operations that are invoked by that method cause these operations to be executed for every single element processed by the method.

If you need to perform costly operations only once for a batch of elements, include those operations in the DoFn.Setup and DoFn.StartBundle methods instead of in DoFn.ProcessElement. Examples include the following:

  • Parsing a configuration file that controls some aspect of the DoFn instance’s behavior. This action should be invoked only once when the DoFn instance is initialized by using the DoFn.Setup method.
  • Instantiating a short-lived client that should be reused across all elements in a bundle—for example, if all elements in the bundle should be sent over a single network connection. This action can be invoked once per bundle by using the DoFn.StartBundle method.

When you call external services, you can reduce per-call overheads by using the GroupIntoBatches transform to create batches of elements of a specified size. Batching allows elements to be sent to an external service as one payload instead of individually.

In combination with batching, you can limit the maximum number of parallel (concurrent) calls to the external service by choosing appropriate keys to partition the incoming data. The maximum parallelization is determined by the number of partitions. For example, if every element is given the same key, a downstream transform for calling the external service would not run in parallel.

Consider one of the following approaches to produce keys for elements:

  • Choose an attribute of the dataset to use as data keys, such as user IDs.
  • Generate data keys to randomly split elements over a fixed number of partitions, where the number of possible key values determines the number of partitions. You need to create enough partitions for parallelism, and each partition also needs to have enough elements for GroupIntoBatches to be useful.

The following Java code sample shows how to randomly split elements over 10 partitions:

// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;

int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
    sensitiveData
        .apply("Assign data into partitions",
            ParDo.of(new DoFn<String, KV<Long, String>>() {
              Random random = new Random();

              @ProcessElement
              public void assignRandomPartition(ProcessContext context) {
                context.output(
                  KV.of(randomPartitionNumber(), context.element()));
              }
              private static int randomPartitionNumber() {
                return random.nextInt(numPartitions);
              }
            }))
        .apply("Create batches of sensitive data",
            GroupIntoBatches.<Long, String>ofSize(100L));

// Use batched sensitive data to fully utilize Redaction API
// (which has a rate limit but allows large payloads)
batchedData
    .apply("Call Redaction API in batches", callRedactionApiOnBatch());

Dataflow builds a graph of steps that represents your pipeline, based on the transforms and data that you used to construct it. This is called the pipeline execution graph.

When you deploy your pipeline, Dataflow might modify your pipeline’s execution graph to improve performance. This can include fusing some operations together, a process known as fusion optimization, to avoid the performance and cost impact of writing every intermediate PCollection object in your pipeline.

There are cases in which Dataflow might incorrectly determine the optimal way to fuse operations in the pipeline. This can limit the Dataflow service’s ability to make use of all available workers. In those cases, you might want to prevent some operations from being fused.

Consider the following example Beam code. A GenerateSequence transform creates a small bounded PCollection object, which is then further processed by two downstream ParDo transforms.

import com.google.common.math.LongMath;
...

public class FusedStepsPipeline {

  final class FindLowerPrimesFn extends DoFn<Long, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Long n = c.element();
      if (n > 1) {
        for (long i = 2; i < n; i++) {
          if (LongMath.isPrime(i)) {
            c.output(Long.toString(i));
          }
        }
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(options);

    PCollection<Long> sequence = p.apply("Generate Sequence",
        GenerateSequence
            .from(0)
            .to(1000000));

    // Pipeline branch 1
    sequence.apply("Find Primes Less-than-N",
        ParDo.of(new FindLowerPrimesFn()));

    // Pipeline branch 2
    sequence.apply("Increment Number", 
        MapElements.via(new SimpleFunction<Long, Long>() {
          public Long apply(Long n) {
            return ++n;
          }
        }));

    p.run().waitUntilFinish();
  }
}

The Find Primes Less-than-N transform can be computationally expensive and is likely to execute slowly for large numbers. In contrast, you would expect the Increment Number transform to complete quickly.

The following diagram shows a graphical representation of the pipeline in the Dataflow monitoring interface.

Monitoring the job using the Dataflow monitoring interfaces shows the same slow rate of processing for both transforms, namely 13 elements per second. You might expect the Increment Number transform to process elements quickly, but instead it appears to be tied to the same rate of processing as Find Primes Less-than-N.

The reason is that Dataflow has fused the steps into a single stage, which prevents them from executing independently. You can use the following gcloud command to find out or confirm which steps have been fused:

gcloud dataflow jobs describe --full job-id --format json

In the resulting output, the fused steps are described in the ExecutionStageSummary object in the ComponentTransform array:

...

    "executionPipelineStage": [
      {
        "componentSource": [
          ...
        ],
        "componentTransform": [
          {
            "name": "s1",
            "originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
            "userName": "Generate Sequence/Read(BoundedCountingSource)"
          },
          {
            "name": "s2",
            "originalTransform": "Find Primes Less-than-N",
            "userName": "Find Primes Less-than-N"
          },
          {
            "name": "s3",
            "originalTransform": "Increment Number/Map",
            "userName": "Increment Number/Map"
          }
        ],
        "id": "S01",
        "kind": "PAR_DO_KIND",
        "name": "F0"
      }

...

In this scenario, the Find Primes Less-than-N transform is the slow step, so breaking the fusion before that step is an appropriate strategy. One method to unfuse steps is to insert a GroupByKey transform and ungroup before the step, as shown in the following Java code sample:

sequence
    .apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
      public KV<Long, Void> apply(Long n) {
        return KV.of(n, null);
      }
    }))
    .apply("Group By Key", GroupByKey.<Long, Void>create())
    .apply("Emit Keys", Keys.<Long>create())
    .apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));

You can also combine these unfusing steps into a reusable composite transform.

After you unfuse the steps, when you run the pipeline, you see that Increment Number completes in a matter of seconds, and the much longer-running Find Primes Less-than-N transform executes in a separate stage.

This is one example of applying a group and ungroup operation to unfuse steps. You can use other approaches for other circumstances. In this case, handling duplicate output is not an issue, given the consecutive output of the GenerateSequence transform. KV objects with duplicate keys are deduplicated to a single key in the group (GroupByKey) and ungroup (Keys) transforms. To retain duplicates after the group and ungroup operations, you can create KV pairs by using a random key and the original input as the value, group using the random key, and then emit the values for each key as the output.

Beam metrics is a utility class for producing various metrics for reporting the properties of an executing pipeline. When you use Cloud Monitoring, Beam metrics are available as Cloud Monitoring custom metrics (also called User Counters).

The following Java snippet is an example of Counter metrics used in a DoFn subclass.

final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};

final class ParseEventFn extends DoFn<String, MyObject> {

  private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
  private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
  private Gson gsonParser;

  @Setup
  public setup() {
    gsonParser = new Gson();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    try {
      MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
      if (myObj.getPayload() != null) {
        //  Output the element if non-empty payload
        c.output(successTag, myObj);
      }
      else {
        // Increment empty payload counter
        emptyCounter.inc();
      }
    }
    catch (JsonParseException e) {
      // Increment malformed JSON counter
      malformedCounter.inc();
      // Output the element to dead letter queue
      c.output(errorTag, c.element());
    }
  }
}

The example code uses two counters, one counter to track JSON parsing failures (malformedCounter), and another counter to track whether the JSON message is valid but contains an empty payload (emptyCounter). The custom metric names in Cloud Monitoring are custom.googleapis.com/dataflow/malformedJson andcustom.googleapis.com/dataflow/emptyPayload. The custom metrics can be used to create visualizations and alerting policies in Cloud Monitoring.

In software development, unit tests, integration tests, and end-to-end tests are common types of software testing. These testing types are also applicable to data pipelines.

The Apache Beam SDK provides functionality to enable these tests, which should target different deployment environments. The following diagram illustrates how unit tests, integration tests, and end-to-end tests apply to different parts of your pipeline and data.

Building production-ready data pipelines using Dataflow 2

The diagram shows the scope of different tests and how they relate to transforms (DoFn and PTransform subclasses), pipelines, data sources, and data sinks.

The following sections describe how various formal software tests apply to data pipelines using Dataflow. As you read through this section, refer back to this diagram to understand how the different types of tests are related.

Unit tests assess the correct functioning of DoFn subclasses and composite transforms (PTransform subclasses) by comparing the output of those transforms with a verified set of data inputs and outputs. Typically, developers can run these tests in the local environment. The tests can also run automatically through unit-test automation using continuous integration (CI) in the build environment.

The Direct Runner is used to run unit tests using a smaller subset of reference test data that focuses on testing the business logic of your transforms. The test data should be small enough to fit into local memory on the machine that runs the test.

The Apache Beam SDK provides a JUnit rule called TestPipeline for unit-testing individual transforms (DoFn subclasses), composite transforms (PTransform subclasses), and entire pipelines. You can use TestPipeline on a Beam pipeline runner such as the Direct Runner or the Dataflow Runner to apply assertions on the contents of PCollection objects using PAssert, as shown in the following code snippet of a JUnit test class:

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

By factoring your code into reusable transforms (for example, as top-level or static nested classes), you can create targeted tests for different parts of your pipeline. Aside from the benefits of testing, reusable transforms promote code maintainability and reusability by naturally encapsulating the business logic of your pipeline into component parts. In contrast, testing individual parts of your pipeline can be difficult if the pipeline uses anonymous inner classes to implement transforms.

The following Java snippet shows the implementation of transforms as anonymous inner classes, which doesn’t easily allow testing.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output = 
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

Compare the previous example with the following one, where the anonymous inner classes have been refactored into named concrete DoFn subclasses. Individual unit tests can be created for each concrete DoFn subclass that makes up the end-to-end pipeline.

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output = 
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

In effect, testing each DoFn subclass is similar to unit-testing a batch pipeline that contains a single transform. You use the Create transform to create a PCollection object of test data, which is then passed to the DoFn object. You use PAssert to assert that the contents of the PCollection object are correct. The following Java code sample uses the PAssert class to check for correct output form.

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams = 
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

Integration tests verify the correct functioning of your entire pipeline. You should consider the following types of integration tests:

  • A transform integration test that assesses the integrated functionality of all the individual transforms that make up your data pipeline. You can think of transform integration tests as a unit test for your entire pipeline, excluding integration with external data sources and sinks. The Beam SDK provides methods to supply test data to your data pipeline and to verify the results of processing. The Direct Runner is used to run transform integration tests.
  • A system integration test that assesses your data pipeline’s integration with external data sources and sinks. For your pipeline to communicate with external systems, you need to configure your tests with appropriate credentials to access external services. Streaming pipelines also run indefinitely, so you need to decide when and how to terminate the running pipeline. By using the Direct Runner to run system integration tests, you quickly verify the integration between your pipeline and other systems without needing to submit a Dataflow job and waiting for it to finish.

As with unit tests, you should design transform and system integration tests to provide rapid defect detection and feedback without slowing developer productivity. Longer-running tests, such as those that run as Dataflow jobs, might be better suited as an end-to-end test that runs less frequently.

You can think of a data pipeline simply as one or more related transforms. You can therefore create an encapsulating composite transform for your pipeline and use TestPipeline to perform an integration test of your entire pipeline. Depending on whether you want to test the pipeline in batch or streaming mode, you supply test data using either the Create or TestStream transforms.

In your production environment, your pipeline likely integrates with different data sources and sinks. However, for unit tests and transformation integration tests, you should focus on verifying the business logic of your pipeline code by providing test inputs and verifying the output directly. In addition to simplifying your tests, this approach allows you to isolate pipeline-specific issues from those that might be caused by data sources and sinks.

For batch pipelines, you use the Create transform to create a PCollection object of your input test data out of a standard in-memory collection, such as a Java List object. Using the Create transform is appropriate if your test data is small enough to include in code. You can then use PAssert on the output PCollection objects to determine the correctness of your pipeline code. This approach is supported by the Direct Runner and by the Dataflow Runner.

If test data is read from an external data source, you use a read transform to store your test data in a PCollection object, such as TextIO.Read. We recommend this approach for larger test data sets.

The following Java code snippet shows assertions against output PCollection objects from a composite transform that includes some or all of the individual transforms that constitute a pipeline (WeatherStatsPipeline). The approach is similar to unit-testing individual transforms in a pipeline.

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms …
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

To test windowing behavior, you can also use the Create transform to create elements with timestamps, as shown in the following code snippet:

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

Testing streaming pipelines

Streaming pipelines contain assumptions that define how unbounded data should be handled. These assumptions are often about the timeliness of data in real-world conditions, and therefore have an impact on correctness depending on whether the assumptions prove to be true or false. Integration tests for streaming pipelines should therefore include tests that simulate the non-deterministic nature of streaming data arrival.

To enable such tests, the Beam SDK provides the TestStream class to model the effects of element timings (early, on-time, or late data) on the results of your data pipeline. You can use these tests together with the PAssert class to verify against expected results.

TestStream is supported by the Direct Runner and the Dataflow Runner. The following code sample creates a TestStream transform:

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount = 
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

For more information about TestStream, see Testing Unbounded Pipelines in Apache Beam. For more information about how to use the Beam SDK for unit testing, see the Apache Beam documentation.

Using Google Cloud services in integration tests

The Direct Runner can integrate with Google Cloud services, so your system integration tests can use Pub/Sub, BigQuery, and other services as needed. When you use the Direct Runner, your pipeline runs as the user account that you configured by using the gcloud command-line tool or as a service account that you specified using the GOOGLE_APPLICATION_CREDENTIALS environment variable. Therefore, you must ensure that you’ve granted sufficient permissions to this account for any required resources before you run your pipeline.

For entirely local integration tests, you can use local emulators for some Google Cloud services. Local emulators are available for Pub/Sub and Bigtable.

For system integration testing of streaming pipelines, you can use the setBlockOnRun method (defined in the DirectOptions interface) to have the Direct Runner execute your pipeline asynchronously. Otherwise, pipeline execution will block the calling parent process (for example, a script in your build pipeline) until the pipeline is manually terminated. If you execute the pipeline asynchronously, you can use the returned PipelineResult instance to cancel execution of the pipeline, as shown in the following code sample:

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned 
    result.cancel();
}

End-to-end tests

End-to-end tests verify the correct operation of your end-to-end pipeline by executing it on the Dataflow Runner under conditions that closely resemble production. This includes verifying that the business logic functions correctly using the Dataflow Runner, and testing whether the pipeline performs as expected under production-like loads. You typically run end-to-end tests in a preproduction Google Cloud environment.

You should use different types of end-to-end tests to test your pipeline at different scales, for example:

  • Run small-scale end-to-end tests using a small proportion (such as 1%) of your test dataset to quickly validate pipeline functionality in the preproduction environment.
  • Run large-scale end-to-end tests using a full test dataset to validate pipeline functionality under production-like data volumes and conditions.

For streaming pipelines, we recommend that you run test pipelines in parallel with your production pipeline if they can use the same data. This lets you compare results and operational behavior such as autoscaling and performance.

End-to-end tests help to predict how well your pipeline will meet your requirements in production, including your SLOs. The preproduction environment allows your pipeline to be tested under production-like conditions. Within end-to-end tests, pipelines run using the Dataflow Runner to process complete reference datasets that match (or closely resemble) datasets in production.

It might not be possible for you to generate synthetic data for testing that accurately simulates real data. To address this problem, one approach is to use cleansed extracts from production data sources to create reference datasets, in which any sensitive data is de-identified through appropriate transformations. We recommend using Cloud Data Loss Prevention (DLP) for this purpose. Cloud DLP can detect sensitive data from a range of content types and data sources and apply a range of de-identification techniques including redaction, masking, format preserving encryption, and date-shifting.

Differences in end-to-end tests for batch and streaming pipelines

Before you execute a full end-to-end test against a large test dataset, you might want to execute a dry run with a smaller percentage of the test data (such as a 1% dry run) and verify expected behavior in a shorter amount of time. As with integration tests using the Direct Runner, you can use PAssert on PCollection objects when you execute pipelines using the Dataflow Runner. For more information about PAssert, see the Unit testing section.

Depending on your use case, verifying very large output from end-to-end tests might be impractical, costly, or otherwise challenging. In that case, you can verify representative samples from the output result set instead. For example, you can use BigQuery to sample and compare output rows with a reference dataset of expected results.

For streaming pipelines, simulating realistic streaming conditions with synthetic data might be challenging. Therefore, a common way to provide streaming data for end-to-end tests is to integrate testing with production data sources. If you’re using Pub/Sub as a data source, you can enable a separate datastream for end-to-end tests through additional subscriptions to existing topics. You can then compare the results of different pipelines that consume the same data, which is useful for verifying candidate pipelines against other preproduction and production pipelines.

The following diagram shows how this method allows a production pipeline and test pipeline to run in parallel in different deployment environments.

Running a test pipeline in parallel with a production pipeline using a single Pub/Sub streaming source.

In the diagram, both pipelines read from the same Pub/Sub topic, but they use separate subscriptions. This allows the two pipelines to process the same data independently and allows you to compare the results. The test pipeline uses a separate service account from the production project, and therefore avoids using the Pub/Sub subscriber quota for the production project.

Unlike batch pipelines, streaming pipelines continue to run until they’re explicitly cancelled. In end-to-end tests, you should decide whether to leave the pipeline running (perhaps until the next end-to-end test is run), or cancel the pipeline at a point that represents test completion so that you can inspect the results.

The type of test data you use influences this decision. For example, if you use a bounded set of test data that’s fed to the streaming pipeline, such as a table that’s converted into a stream, you will likely cancel the pipeline when all elements have completed processing. Alternatively, If you use a real data source (such as an existing Pub/Sub topic that’s used in production) or if you otherwise generate test data continually, you might want to keep test pipelines running over a longer period. This lets you compare the behavior against the production environment, or even against other test pipelines.

This article has been published from the source link without modifications to the text. Only the headline has been changed.

Source link