Boosting Dataflow Efficiency: How We Reduced Processing Time from 1 Day to 30 Minutes in Dataflow

In this blog post, we will share how we reduced the processing time of our streaming application running on dataflow named XCATS. We were dealing with a complex Apache Beam streaming job running on Google Dataflow engine, implemented in Java.

This job processes millions of events daily from two main data sources and four different enrichment sources, calculates cross-charge amounts with joining sources and then publishing the results to an output Pub/Sub. Additionally, all output messages are persisted in Google BigQuery for further analysis and reporting.

Picture1

As the scale of our data grew, we started encountering performance bottlenecks and inefficiencies in our processing pipeline.

We will share our experience with optimizing one of our enrichment methods, reducing the processing time for one flow from 1 day to just 30 minutes. We will also provide sample code for both the old and new algorithms in Java and show how this change impacted our CPU and Memory utilizations and overall performance.

Problem


In our dataflow pipeline, we were integrating a small enrichment source. Our initial strategy involved using Apache Beam's State and CoGroupByKey to pair this small dataset with the main data flows. However, this methodology presented some critical issues.

Problem: The pipeline was slow, taking a full day to process data, and the application was costly. The inefficiency was not only in terms of processing power but rather in the economic sense, making it an expensive solution to maintain. Inefficiency not only poses and economic burden but also has implications for the environment, making it an unsustainable solution in the long run.

Root Cause: This inefficiency was primarily due to a classic Stream Processing pitfall known as Data Skew and High Fan-out. The application of Apache Beam's State and CoGroupByKey in our pipeline caused key partitions with a sparse number of key-value pairs to be assigned to a single worker. As our system was inundated with millions of events, this lone worker quickly became a bottleneck, leading to significant internal and external backlogs.

Despite an increase in the number of workers to the maximum permitted, their CPU and memory utilization remained surprisingly low. This indicated that our processing method was inefficient, as it was not optimally utilizing available resources.

The following screenshot further illustrates the performance bottleneck of one of the related processes:

Picture2

Old Algorithm: Using CoGroupByKey

Here's a sample code snippet(without state detail)for our original approach using CoGroupByKey in Java (For production solution we use Stateful processing):

public class OldAlgorithm {

    public static void main(String[] args) {
        // Create the pipeline
        Pipeline pipeline = ...

        // Read the main data from Pub/Sub topic
        PCollection<String> mainDataInput = pipeline.apply("Read Main Data",
                PubsubIO.readStrings().fromTopic("projects/YOUR_PROJECT_ID/topics/YOUR_MAIN_DATA_TOPIC"));

        // Process the main data and convert it to a PCollection of KV<String, MainData>
        PCollection<KV<String, MainData>> mainDataFlow = mainDataInput.apply("Process Main Data", ParDo.of(new MainDataParser()));

        // Read the small enrichment data from Pub/Sub
        PCollection<String> smallEnrichmentInput = pipeline.apply("Read Small Enrichment Data", PubsubIO.readStrings().fromTopic(
                "projects/YOUR_PROJECT_ID/topics/YOUR_SMALL_ENRICHMENT_TOPIC"));


// In production code we use Apache Beam State feature for this enrichment, and we had stored it in state, so we didn't need to reread  from the source again

        // Process the small enrichment data and convert it to a PCollection of KV<String, SmallEnrichmentData>
        PCollection<KV<String, SmallEnrichmentData>> smallEnrichmentSource = smallEnrichmentInput.apply("Process Small Enrichment Data",
                ParDo.of(new SmallEnrichmentParser()));

        // Define TupleTags for CoGroupByKey
        TupleTag<MainData> mainDataTag = new TupleTag<>();
        TupleTag<SmallEnrichmentData> smallEnrichmentTag = new TupleTag<>();

        // Perform CoGroupByKey on main data flow and small enrichment source
        PCollection<KV<String, CoGbkResult>> joinedData = KeyedPCollectionTuple.of(mainDataTag, mainDataFlow)
                .and(smallEnrichmentTag, smallEnrichmentSource)
                .apply(CoGroupByKey.create());

        // Define a DoFn to process the joined data
        class ProcessJoinedDataFn extends DoFn<KV<String, CoGbkResult>, EnrichedData> {

            private final TupleTag<MainData> mainDataTag;
            private final TupleTag<SmallEnrichmentData> smallEnrichmentTag;

            public ProcessJoinedDataFn(TupleTag<MainData> mainDataTag, TupleTag<SmallEnrichmentData> smallEnrichmentTag) {
                this.mainDataTag = mainDataTag;
                this.smallEnrichmentTag = smallEnrichmentTag;
            }

            @ProcessElement
            public void processElement(ProcessContext context) {
                KV<String, CoGbkResult> element = context.element();
                String key = element.getKey();
                Iterable<MainData> mainDataList = element
                        .getValue()
                        .getAll(mainDataTag);
                Iterable<SmallEnrichmentData> smallEnrichmentDataList = element.getValue().getAll(smallEnrichmentTag);

                // Process the joined data and output EnrichedData instances
                for (MainData mainData : mainDataList) {
                    for (SmallEnrichmentData smallEnrichmentData : smallEnrichmentDataList) {
                        EnrichedData enrichedData = new EnrichedData(mainData, smallEnrichmentData);
                        context.output(enrichedData);
                    }
                }
            }
        }

        // Process the joined data
        PCollection<EnrichedData> enrichedData = joinedData.apply("Process Joined Data", ParDo.of(new ProcessJoinedDataFn(mainDataTag, smallEnrichmentTag)));

        // Write the enriched data to the desired output, for example, to a file or a database

        // Run the pipeline
        pipeline.run().waitUntilFinish();
    }
}

New Algorithm: Using SideInput and DoFn functions

After careful analysis of our data processing needs and requirements, we decided to employ the Apache Beam SideInput feature and DoFn functions to optimize our Google DataFlow job. SideInput, for those unfamiliar, is a feature that allows us to bring in additional data, or 'enrichment' data, to the main data stream during processing. This is particularly beneficial when the enrichment data is relatively small, as it's then more efficient to bring this smaller dataset to the larger main data stream, rather than the other way around. 

In our case, the primary reason behind this decision was the nature of our enrichment dataset. It is comparatively small, with a size of less than 1 GB in memory, and does not change frequently. These characteristics make it a perfect candidate for the SideInput approach, allowing us to optimize our data processing by reducing the amount of data movement.

To further improve efficiency, we also transitioned our enrichment dataset source from a streaming topic to a table. This decision was driven by the fact that our dataset is a slow-changing external dataset, and as such, it's more efficient to handle it as a static table that gets updated periodically, rather than a continuous stream. To ensure we're working with the most up-to-date data, we introduced a time ticker with GenerateSequence.from(0).withRate(1, Duration.standardMinutes(60L)) to read and refresh the data every hour.

Code:

public class NewAlgorithm {
    public static void main(String[] args) {
        // Create the pipeline
        Pipeline pipeline = Pipeline.create(options);

        // Read the main data from Pub/Sub topic
        PCollection<String> mainDataInput = pipeline.apply("Read Main Data",
                PubsubIO.readStrings().fromTopic("projects/YOUR_PROJECT_ID/topics/YOUR_MAIN_DATA_TOPIC"));

        // Process the main data and convert it to a PCollection of MainData
        PCollection<MainData> mainDataFlow = mainDataInput.apply("Process Main Data", ParDo.of(new MainDataParser()));

        // Generate sequence with a time ticker
        PCollection<Long> ticks = pipeline.apply("Generate Ticks", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(60L)));

        // Read the small enrichment data from BigQuery table
        PCollection<SmallEnrichmentData> smallEnrichmentSource = ticks.apply("Read Small Enrichment Data",
                BigQueryIO.read().from("YOUR_PROJECT_ID:YOUR_DATASET_ID.YOUR_TABLE_ID")
                        .usingStandardSql().withTemplateCompatibility()
                        .withCoder(SmallEnrichmentDataCoder.of()));

        // Generate a PCollectionView from the small enrichment data
        PCollectionView<Iterable<SmallEnrichmentData>> smallEnrichmentSideInput = smallEnrichmentSource.apply("Window and AsIterable", Window.into(
                FixedWindows.of(Duration.standardHours(1)))).apply(View.asIterable());

        // Define a DoFn to process the main data with the small enrichment data
        public static class EnrichMainDataFn extends DoFn<MainData, EnrichedData> {

            private final PCollectionView<Iterable<SmallEnrichmentData>> smallEnrichmentSideInput;

            public EnrichMainDataFn(PCollectionView<Iterable<SmallEnrichmentData>> smallEnrichmentSideInput) {
                this.smallEnrichmentSideInput = smallEnrichmentSideInput;
            }

            @ProcessElement
            public void processElement(ProcessContext context) {
                MainData mainData = context.element();
                Iterable<SmallEnrichmentData> smallEnrichmentDataList = context.sideInput(smallEnrichmentSideInput);

                // Process the main data and small enrichment data and output EnrichedData instances
                for (SmallEnrichmentData smallEnrichmentData : smallEnrichmentDataList) {
                    EnrichedData enrichedData = new EnrichedData(mainData, smallEnrichmentData);
                    context.output(enrichedData);
                }
            }
        }

        // Process the main data with the small enrichment data
        PCollection<EnrichedData> enrichedData = mainDataFlow.apply("Enrich Main Data", ParDo.of(new EnrichMainDataFn(smallEnrichmentSideInput))
                .withSideInputs(smallEnrichmentSideInput));

        // Write the enriched data to the desired output,
    }
}

Test Case:

To evaluate the effectiveness of our optimization efforts using the Apache Beam SideInput feature, we designed a comprehensive test to compare the performance of our old and new algorithms. The test setup and dataset details are as follows:

1. We published 5 million records to a Pub/Sub topic, which was used to fill up the Apache Beam ValueState in the job for stream to stream join.

2. We created a small table containing the enrichment dataset for small enrichment. Old algorithm uses ValueState and new algorithm uses SideInput feature.

3. We then used 5 million source records to generate the output for both the old and new jobs. It is important to note that these source records inflate in the application, resulting in a total of 15 million records that need to be processed.

4. For our Google DataFlow jobs, we set the minimum number of workers to 1 and the maximum number of workers to 15.

Results

We will examine the impact of our optimization efforts on the number of workers and CPU utilization in our Google DataFlow jobsby comparing two screenshots taken during the first hour of job execution, we can gain insights into the effectiveness of our old algorithms without SideInput versus the new implementation using SideInput.

Screenshot 1: Old Algorithm without SideInput

Picture3

This screenshot displays the performance of our old algorithm, which did not utilize the Apache Beam SideInput feature. In this scenario, we observe low CPU utilization despite having 15 workers deployed. These workers were stuck, a consequence of the auto scale feature provided by Google DataFlow, which is based on backlog size.

Screenshot 2: New Algorithm with SideInput

Picture4

The second screenshot displays the performance of our new algorithms, which leverage the SideInput feature. In this case, we can see that the DataFlow job is using high CPU when new events are received. Additionally, the maximum number of workers is only utilized temporarily, indicating a more efficient and dynamic allocation of resources.

To demonstrate the impact of our optimization, we've compared the metrics of the old job (without SideInput) and the new job (with SideInput). The table below shows a detailed comparison of these metrics:

Metrics

These metrics demonstrate impressive reductions in vCPU consumption, memory usage, and HDD PD time, highlighting the effectiveness of our optimization. Please refer to the 'Resource Metrics Comparison' image for more details.

Resource Metrics Comparision:

Picture5

The substantial improvements in these key metrics highlight the effectiveness of using the Apache Beam SideInput feature in our Google DataFlow jobs. Not only do these optimizations lead to more efficient processing, but they also result in significant cost savings for our data processing tasks

In our previous implementation without the use of SideInput, the job took more than approximately 24 hours to complete, but the new job with SideInput was completed in about 30 minutes, so the algorithm has resulted in a 97.92% reduction in the execution period.

As a result, we can maintain high performance while minimizing the cost and complexity of our data processing tasks.

Warning: Using SideInput for Large Datasets

Please be aware that using SideInput in Apache Beam is recommended only for small datasets that can fit into the worker's memory. The total amount of data that should be processed using SideInput should not exceed 1 GB.

Larger datasets can cause significant performance degradation and may even result in your pipeline failing due to memory constraints. If you need to process a dataset larger than 1 GB, consider alternative approaches like using CoGroupByKey, partitioning your data, or using a distributed database to perform the necessary join operations. Always evaluate the size of your dataset before deciding on using SideInput to ensure efficient and successful processing of your data.

Conclusion

By switching from CoGroupByKey to SideInput and using DoFn functions, we were able to significantly improve the efficiency of our data processing pipeline. The new approach allowed us to distribute the small dataset across all workers and process millions of events much faster. As a result, we reduced the processing time for one flow from 1 days to just 30 minutes. This optimization also had a positive impact on our CPU utilization, ensuring that our resources were used more effectively.

If you're experiencing similar performance bottlenecks in your Apache Beam dataflow jobs, consider re-evaluating your enrichment methods and exploring options such as SideInput and DoFn to boost your processing efficiency.

Thank you for reading this blog. If you have any further questions or if there's anything else we can assist you with, feel free to ask.

On behalf of Team 77, Hazal and Eyyub

Some useful links:

** Google Dataflow

** Apache Beam

** Stateful processing