cost optimization

Why Your Data Pipelines Need Closed-Loop Feedback Control

As data teams scale up on the cloud, data platform teams need to ensure the workloads they are responsible for are meeting business objectives.  At scale with dozens of data engineers building hundreds of production jobs, controlling their performance at scale is untenable for a myriad of reasons from technical to human.

The missing link today is the establishment of a closed loop feedback system that helps automatically drive pipeline infrastructure towards business goals.  That was a mouthful, so let’s dive in and get more concrete about this problem.

The problem for data platform teams today

Data platform teams have to manage fundamentally distinct shareholders from management to engineers.  Oftentimes these two teams have opposing goals, and platform managers can be squeezed by both ends.  

Many real conversations we’ve had with platform managers and data engineers typically go like this:


“Our CEO wants me to lower cloud costs and make sure our SLAs are hit to keep our customers happy.”

Okay, so what’s the problem?

“The problem is that I can’t actually change anything directly, I need other people to help and that is the bottleneck”

So basically, platform teams find themselves handcuffed and face enormous friction when trying to actually implement improvements.  Let’s zoom into the reasons why.

What’s holding back the platform team?

  • Data Teams are out of technical scope – Tuning clusters or complex configurations (Databricks, Snowflake) is a time consuming task where data teams would rather be focusing on actual pipelines and SQL code.  Many engineers don’t have the skillset, support structure, or even know what the costs are for their jobs.  Identifying and solving root cause problems is also a daunting task that gets in the way of just standing up a functional pipeline.

  • Too many layers of abstraction – Let’s just zoom in on one stack: Databricks runs their own version of Apache Spark, which runs on a cloud provider’s virtualized compute (AWS, Azure, GCP), with different network options, and different storage options (DBFS, S3, Blob), and by the way everything can be updated independently and randomly throughout the year.  The amount of options is overwhelming and it’s impossible for platform folks to ensure everything is up to date and optimal.

  • Legacy code – One unfortunate reality is simply just legacy code.  Oftentimes teams in a company can change, people come and go, and over time, the knowledge of any one particular job can fade away.  This effect makes it even more difficult to tune or optimize a particular job.

  • Change is scary – There’s an innate fear to change.  If a production job is flowing, do we want to risk tweaking it?  The old adage comes to mind: “if it ain’t broke, don’t fix it.”  Oftentimes this fear is real, if a job is not idempotent or there are other downstream effects, a botched job can cause a real headache.  This creates a psychological barrier to even trying to improve job performance.

  • At scale there are too many jobs – Typically platform managers oversee hundreds if not thousands of production jobs.  Future company growth ensures this number will only increase.  Given all of the points above, even if you had a local expert, going in and tweaking jobs one at a time is simply not realistic.  While this can work for a select few high priority jobs, it leaves the bulk of a company’s workloads more or less uncared for.  

Clearly it’s an uphill battle for data platform teams to quickly make their systems more efficient at scale.  We believe the solution is a paradigm shift in how pipelines are built.  Pipelines need a closed loop control system that constantly drives a pipeline towards business goals without humans in the loop.  Let’s dig in.

What does a closed loop control for a pipeline mean?

Today’s pipelines are what is known as an “open loop” system in which jobs just run without any feedback.  To illustrate what I’m talking about, pictured below shows where “Job 1” just runs every day, with a cost of $50 per run.  Let’s say the business goal is for that job to cost $30.  Well, until somebody actually does something, that cost will remain at $50 for the foreseeable future – as seen in the cost vs. time plot.

What if instead, we had a system that actually fed back the output statistics of the job so that the next day’s deployment can be improved?  It would look something like this:

What you see here is a classic feedback loop, where in this case the desired “set point” is a cost of $30.  Since this job is run every day, we can take the feedback of the real cost and send it to an “update config” block that takes in the cost differential (in this case $20) and as a result apply a change in “Job 1’s configurations.  For example, the “update config” block may reduce the number of nodes in the Databricks cluster.  

What does this look like in production?

In reality this doesn’t happen in a single shot.  The “update config” model is now responsible for tweaking the infrastructure to try to get the cost down to $30.  As you can imagine, over time the system will improve and eventually hit the desired cost of $30, as shown in the image below.

This may all sound fine and dandy, but you may be scratching your head and asking “what is this magical ‘update config’ block?”  Well that’s where the rubber meets the road.  That block is a mathematical model that can input a numerical goal delta, and output an infrastructure configuration or maybe code change.

It’s not easy to make and will vary depending on the goal (e.g. costs vs. runtime vs. utilization).  This model must fundamentally predict the impact of an infrastructure change on business goals – not an easy thing to do.

Nobody can predict the future

One subtle thing is that no “update config” model is 100% accurate.  In the 4th blue dot, you can actually see that the cost goes UP at one point.  This is because the model is trying to predict a change in the configurations that will lower costs, but because nothing can predict with 100% accuracy, sometimes it will be wrong locally, and as a result the cost may go up for a single run, while the system is “training.”

But, over time, we can see that the total cost does in fact go down.  You can think of it as an intelligent trial and error process, since predicting the impact of configuration changes with 100% accuracy is straight up impossible.

The big “so what?” – Set any goal and go

The approach above is a general strategy and not one that is limited to just cost savings.  The “set point” above is simply a goal that a data platform person puts in.  It can be any kind of goal, for example runtime is a great example.  

Let’s say we want a job to be under a 1 hour runtime (or SLA).  We can let the system above tweak the configurations until the SLA is hit.  Or what if it’s more complicated, a cost and SLA goal simultaneously?  No problem at all, the system can optimize to hit your goals over many parameters.  In addition to cost and runtime, other business use cases goals are:

  • Resource Utilization: Independent of cost and runtime, am I using the resources I have properly?
  • Energy Efficiency: Am I consuming the least amount of resources possible to minimize my carbon footprint?
  • Fault Tolerance: Is my job actually resilient to failure? Meaning do I want to over-spec it just in case I get preempted or just in case there are no SPOT instances available?
  • Scalability: Does my job scale? What if I have a spike in input data by 10x, will my job crash?
  • Latency: Are my jobs hitting my latency goals? Response time goals?

In theory, all a data platform person has to do is set goals, and then an automatic system can iteratively improve the infrastructure until the goals are hit.  There are no humans in the loop, no engineers to get on board.  The platform team just sets the goals they’ve received from their stakeholders.  Sounds like a dream.

So far we’ve been pretty abstract.  Let’s dive into a some concrete use cases that are hopefully compelling to people:

Example feature #1: Group jobs by business goals

Let’s say you’re a data platform manager and you oversee the operation of hundreds of production jobs.  Right now, they all have their own cost and runtime.  A simple graph below shows a cartoon example, where basically all of the jobs are randomly scattered across a cost and runtime graph.

What if you want to lower costs at scale?  What if you want to change the runtime (or SLA) of many jobs at once?  Right now you’d be stuck.

Now imagine if you had the closed loop control system above implemented for all of your jobs.  All you’d have to do is set the high level business goals of your jobs (in this case SLA runtime requirements), and the feedback control system would do its best to find the infrastructure that accomplishes your goals.  The end state will look like this:

Here we see each job’s color represents a different business goal, as defined by the SLA.  The closed loop feedback control system behind the scenes changed the cluster / warehouse size, various configurations, or even adjusted entire pipelines to try to hit the SLA runtime goals at the lowest cost.  Typically longer job runtimes lead to lower cost opportunities.

Example feature #2: Auto-healing jobs

As most data platform people can confirm, things are always changing in their data pipelines.  Two very popular scenarios are: data size growing over time, and code changes.  Both of which can cause erratic behavior when it comes to cost and runtime.

The illustration below shows the basic concept.  Let’s walk through the example from left to right:

  • Start:  Let’s say you have a job and over time the data size grows.  Normally your cluster stays the same and as a result both costs and runtime increases.
  • Start Feedback:  Over time the runtime approaches the SLA requirement and the feedback control system kicks in at the green arrow.  At this point, the control system changes the cluster to stay below the red line while minimizing costs.
  • Code Change:  At some point a developer pushes a new update to the code which causes a spike in the cost and runtime.  The feedback control system kicks in and adjusts the cluster to work better with the new code change.

Hopefully these two examples explain the potential benefit of how a closed loop control pipeline can be beneficial.  Of course in reality there are many details that have been left out and some design principles companies will have to adhere to.  One big one is a way for configurations to revert back to a previous state in case something goes wrong.  An idempotent pipeline would also be ideal here in case many iterations are needed.

Conclusion

Data pipelines are complex systems, and like any other complex system, they need feedback and control to ensure a stable performance.  Not only does this help solve technical or business problems, it will dramatically help free up data platform and engineering teams to focus on actually building pipelines.  

Like we mentioned before, a lot of this hinges on the performance of the “update config” block.  This is the critical piece of intelligence that is needed to the success of the feedback loop.  It is not trivial to build this block and is the main technical barrier today.  It can be an algorithm or a machine learning model, and utilize historical data.  It is the main technical component we’ve been working on over the past several years.

In our next post we’ll show an actual implementation of this system applied to Databricks Jobs, so you can believe that what we’re talking about is real!

Interested in learning more about closed loop controls for your Databricks pipelines? Reach out to Jeff Chou and the rest of the Sync Team.

Databricks driver sizing impact on cost and performance

As many previous blog posts have reported, tuning and optimizing the cluster configurations of Apache Spark is a notoriously difficult problem.  Especially when a data engineer needs to lower costs or accelerate runtimes on platforms such as EMR or Databricks on AWS, tuning these parameters becomes a high priority.  

Here at Sync, we will experimentally explore the impact of driver sizing in the Databricks platform on the TPC-DS 1TB benchmark, to see if we can obtain an understanding of the relationship between the driver instance size and cost/runtime of the job.

Driver node review

For those who may be less familiar with the driver node details in Apache Spark, there are many excellent previous blog posts as well as the official documentation on this topic and I will recommend users to read those if they are not familiar.  As a quick summary, the driver is an important part of the Apache Spark system and effectively acts as the “brain” of the entire operation.  

The driver program runs the main() function, creates the spark context, and schedules tasks onto the worker nodes.  Aside from these high level functions, we’d like to note that the driver node is also used in the execution of some functions, most famously when using the collect operation and broadcast joins.  During those functions, data is moved to the driver node and if it’s not appropriately sized, can cause a driver side out of memory error which can shut down the entire cluster.

As a quick side note, for broadcast joins, It looks like a ticket has been filed to change this behavior (at least for broadcast joins) in the open source Spark core.  So people should be aware that this may change in the future.

How Does Driver Sizing Impact Performance As a Function of the Number Of Workers?

The main experimental question we want to ask is “how does driver sizing impact performance as a function of the number of workers?”  The reason why we want to correlate driver size with the number of workers is that the number of workers is a very important parameter when tuning systems for either cost or runtime goals.  Observing how the driver impacts the worker scaling of the job is a key part of understanding and optimizing a cluster.

Fundamentally, the maximum number of tasks that can be executed in parallel is determined by the number of workers and executors.  Since the driver node is responsible for scheduling these tasks, we wanted to see if the number of workers changes the hardware requirements of the driver.  For example, does scheduling 1 million tasks require a different driver instance type than scheduling 10 tasks?  

Experimental Setup

The technical parameters of the experiment are below:

  • Data Platform:  Databricks
  • Compute type: Jobs (ephemeral cluster, 1 job per cluster)
  • Photon Enabled: No
  • Fixed parameters::  All worker nodes are i3.xlarge, all configs default
  • Sweep parameters:  Driver instance size (r5a.large, r5a.xlarge, r5a.4xlarge), number of workers
  • AWS market:  On-demand (to eliminate spot fluctuations)
  • Workload: Databrick’s own benchmark on  TPC-DS 1TB (all queries run sequentially)

For reference, here are the hardware specifications of the 3 different drivers used on AWS:

The result

We will break down the results into 3 main plots.  The first is below where we look at runtime vs. number of workers for the 3 different driver types.  In the plot below we see that as the number of workers increases the runtime decreases.  We note here that the scaling trend is not linear and there is a typical “elbow” scaling that occurs.  We published previously the general concept of scaling jobs.  We observe here that the largest driver, r5a.4xlarge, yielded the fastest performance across all worker sizes.

In the plot below we see the cost (DBU’s in $) vs. number of workers.  For the most part we see that the medium sized driver, r5a.xlarge is the most economical, except for the smallest number of workers where the smallest driver size r5a.large was the cheapest.

Putting both plots together, we can see the general summary when we plot cost vs. runtime.  The small numbers next to each point show the number of workers.  In general, the ideal points should be toward the bottom left, as that indicates a configuration that is both faster and cheaper.  Points that are higher up or to the right are more expensive and slower.  

Some companies are only concerned about service level agreement (SLA) timelines, and do not actually need the “fastest” possible runtime.  A more useful way to think about the plot below is to ask the question “what is the maximum time you want to spend running this job?”  Once that number is known, you can then select the configuration with the cheapest cost that matches your SLA.  

For example, consider the SLA scenarios below:

1)  SLA of 2500s – If you need your job to be completed in 2,500s or less, then you should select the r5a.4xlarge driver with a worker size of 50.

2)  SLA of 4000s – If you need your job to be completed in 4,000s or less, then you should select the r5a.xlarge driver with a worker size of 20.

3)  SLA of 10,000s – If you need your job to be completed in 10,000s or less, then you should select the r5a.large driver with a worker size of 5.

Key Insights

It’s very convenient to see the scaling trend of all 3 drivers plotted in this manner, as there are several key insights gained here:

  1. There is a general “good” optimal driver for TPC-DS 1TB – across the spectrum, it’s clear that r5a.xlarge is a good choice generally as it is usually cheaper and faster than the other driver sizes.  This shows the danger that if your driver is too big or too small, you could be wasting money and time.  
  2. At the extremes, driver size matters for TPC-DS 1TB  – At the wings of either large clusters (50 workers) or small clusters (5 workers) we can see that the best driver selection can swing between all 3 drivers.  
  3. Drivers can be too big – At 12 workers, the r5a.4xlarge performance is slightly faster but significantly more expensive than the other two driver types.  Unless that slight speedup is important, it’s clear to see that if a driver is too large, then the extra cost of the larger driver is not worth the slight speedup.  It’s like buying a Ferrari to just sit in traffic – definitely not worth it (although you will look cool).
  4. Small driver bottleneck – For the small driver curve (r5a.large), we see that the blue line’s elbow occurs at a higher runtime than the middle driver (r5a.xlarge).  This implies that the smaller driver is creating a runtime bottleneck for the entire workload as the cluster becomes larger.  The next section will dive into why.

Root cause analysis for the “small driver bottleneck”

To investigate the cause of the small driver bottleneck, we looked into the Spark eventlogs to see what values changed as we scaled the number of workers.  In the Spark UI in Databricks, the typical high level metrics for each task are shown below and plotted graphically.  The image below shows an example of a single task broken down into the 7 main metrics:

When we aggregated all of these values across all tasks, across all the different drivers and workers, the numbers were all pretty consistent, except for one number:  “Scheduler Delay”.   For those who may not be familiar, the formal definition from the Databricks Spark UI, is shown in the image below:

“Scheduler delay includes time to ship the task from the scheduler to the executor, and time to send the task result from the executor to the scheduler. If scheduler delay is large, consider decreasing the size of tasks or decreasing the size of task results.”

In the graph below, we plot the total aggregated scheduler delays of all tasks for each of the job configurations vs the number of workers.  It is expected that the aggregated scheduler delay should increase for a larger number of workers since there are more tasks.  For example, if there are 100 tasks, each with 1s of scheduler delay, the total aggregated scheduler day is 100s (even if all 100 tasks executed in parallel and the “wall clock” scheduler delay is only 1s).  Therefore, if there are 1000 tasks, the total aggregated scheduler should increase as well.  

Theoretically this should scale roughly linearly with the number of workers for a “healthy” system.  For the “middle” and “large” sized drivers (r5a.xlarge and r5a.4xlarge respectively), we see the expected growth of the scheduler delay.  However, for the “small” r5a.large driver, we see a very non-linear growth of the total aggregated scheduler delay, which contributes to the overall longer job runtime.  This appears to be a large contributor to the “small driver bottleneck” issue.

To understand a bit deeper as to what is the formal definition of Scheduler Delay, let’s look at the Spark source code inside the function AppStatusUtils.scala.  At a high level, scheduler delay is a simple calculation as shown in the code below:

schedulerDelay = duration – runTime – deserializeTime – serlializeTime – gettingResultTime

To put it in normal text, scheduler delay is basically a catch-all term, that is the time the task is spent doing something that is not executing, serializing data, or getting results.  A further question would be to see which one of these terms is increasing or decreasing due to the smaller driver?  Maybe duration is increasing, or maybe gettingResultTime is decreasing?  

If we look at the apples to apples case of 32 workers for the “medium” r5a.xlarge driver and the “small” r5a.large driver, the runtime of the “small” driver was significantly longer.  One could hypothesize that the average duration per task is longer (vs. one of the other terms becoming smaller).  

In summary, our hypothesis here is that by reducing the driver size (number of VCPUs and memory), we are incurring an additional time “tax” on each task by taking, on average, slightly longer to ship a task from the scheduler on the driver to each executor.  

A simple analogy here is, imagine you’re sitting in bumper to bumper traffic on a highway, and then all of a sudden every car (a task in Spark) just grew 20% longer, if there are enough cars you could be set back miles.

Conclusion

Based on the data described above, the answer to the question above is that inappropriately sized drivers can lead to excess cost and performance as workers scale up and down.  We present a hypothesis that a driver that is “too small” with too few VCPUs and memory, could cause, on average, an increase in the task duration via an additional overhead in the scheduler delay.  

This final conclusion is not terribly new to those familiar with Spark, but we hope seeing actual data can help create a quantitative understanding on the impact of driver sizing.  There are of course many other things that could cause a poor driver to elongate or even crash a job, (as described earlier via the OOM errors).  This analysis was just a deep dive into one observation.

I’d like to put a large caveat here that this analysis was specific to the TPC-DS workload, and it would be difficult to generalize these findings across all workloads.  Although the TPC-DS benchmark is a collection of very common SQL queries, in reality individual code, or things like user defined functions, could throw these conclusions out the window.  The only way to know for sure about your workloads is to run some driver sizing experiments.

As we’ve mentioned many times before, distributed computing is complicated, and optimizing your cluster for your job needs to be done on an individual basis.  Which is why we built the Apache Spark Autotuner for EMR and Databricks on AWS to help data engineers quickly find the answers they are looking for.