apache spark

Why Your Data Pipelines Need Closed-Loop Feedback Control

As data teams scale up on the cloud, data platform teams need to ensure the workloads they are responsible for are meeting business objectives.  At scale with dozens of data engineers building hundreds of production jobs, controlling their performance at scale is untenable for a myriad of reasons from technical to human.

The missing link today is the establishment of a closed loop feedback system that helps automatically drive pipeline infrastructure towards business goals.  That was a mouthful, so let’s dive in and get more concrete about this problem.

The problem for data platform teams today

Data platform teams have to manage fundamentally distinct shareholders from management to engineers.  Oftentimes these two teams have opposing goals, and platform managers can be squeezed by both ends.  

Many real conversations we’ve had with platform managers and data engineers typically go like this:


“Our CEO wants me to lower cloud costs and make sure our SLAs are hit to keep our customers happy.”

Okay, so what’s the problem?

“The problem is that I can’t actually change anything directly, I need other people to help and that is the bottleneck”

So basically, platform teams find themselves handcuffed and face enormous friction when trying to actually implement improvements.  Let’s zoom into the reasons why.

What’s holding back the platform team?

  • Data Teams are out of technical scope – Tuning clusters or complex configurations (Databricks, Snowflake) is a time consuming task where data teams would rather be focusing on actual pipelines and SQL code.  Many engineers don’t have the skillset, support structure, or even know what the costs are for their jobs.  Identifying and solving root cause problems is also a daunting task that gets in the way of just standing up a functional pipeline.

  • Too many layers of abstraction – Let’s just zoom in on one stack: Databricks runs their own version of Apache Spark, which runs on a cloud provider’s virtualized compute (AWS, Azure, GCP), with different network options, and different storage options (DBFS, S3, Blob), and by the way everything can be updated independently and randomly throughout the year.  The amount of options is overwhelming and it’s impossible for platform folks to ensure everything is up to date and optimal.

  • Legacy code – One unfortunate reality is simply just legacy code.  Oftentimes teams in a company can change, people come and go, and over time, the knowledge of any one particular job can fade away.  This effect makes it even more difficult to tune or optimize a particular job.

  • Change is scary – There’s an innate fear to change.  If a production job is flowing, do we want to risk tweaking it?  The old adage comes to mind: “if it ain’t broke, don’t fix it.”  Oftentimes this fear is real, if a job is not idempotent or there are other downstream effects, a botched job can cause a real headache.  This creates a psychological barrier to even trying to improve job performance.

  • At scale there are too many jobs – Typically platform managers oversee hundreds if not thousands of production jobs.  Future company growth ensures this number will only increase.  Given all of the points above, even if you had a local expert, going in and tweaking jobs one at a time is simply not realistic.  While this can work for a select few high priority jobs, it leaves the bulk of a company’s workloads more or less uncared for.  

Clearly it’s an uphill battle for data platform teams to quickly make their systems more efficient at scale.  We believe the solution is a paradigm shift in how pipelines are built.  Pipelines need a closed loop control system that constantly drives a pipeline towards business goals without humans in the loop.  Let’s dig in.

What does a closed loop control for a pipeline mean?

Today’s pipelines are what is known as an “open loop” system in which jobs just run without any feedback.  To illustrate what I’m talking about, pictured below shows where “Job 1” just runs every day, with a cost of $50 per run.  Let’s say the business goal is for that job to cost $30.  Well, until somebody actually does something, that cost will remain at $50 for the foreseeable future – as seen in the cost vs. time plot.

What if instead, we had a system that actually fed back the output statistics of the job so that the next day’s deployment can be improved?  It would look something like this:

What you see here is a classic feedback loop, where in this case the desired “set point” is a cost of $30.  Since this job is run every day, we can take the feedback of the real cost and send it to an “update config” block that takes in the cost differential (in this case $20) and as a result apply a change in “Job 1’s configurations.  For example, the “update config” block may reduce the number of nodes in the Databricks cluster.  

What does this look like in production?

In reality this doesn’t happen in a single shot.  The “update config” model is now responsible for tweaking the infrastructure to try to get the cost down to $30.  As you can imagine, over time the system will improve and eventually hit the desired cost of $30, as shown in the image below.

This may all sound fine and dandy, but you may be scratching your head and asking “what is this magical ‘update config’ block?”  Well that’s where the rubber meets the road.  That block is a mathematical model that can input a numerical goal delta, and output an infrastructure configuration or maybe code change.

It’s not easy to make and will vary depending on the goal (e.g. costs vs. runtime vs. utilization).  This model must fundamentally predict the impact of an infrastructure change on business goals – not an easy thing to do.

Nobody can predict the future

One subtle thing is that no “update config” model is 100% accurate.  In the 4th blue dot, you can actually see that the cost goes UP at one point.  This is because the model is trying to predict a change in the configurations that will lower costs, but because nothing can predict with 100% accuracy, sometimes it will be wrong locally, and as a result the cost may go up for a single run, while the system is “training.”

But, over time, we can see that the total cost does in fact go down.  You can think of it as an intelligent trial and error process, since predicting the impact of configuration changes with 100% accuracy is straight up impossible.

The big “so what?” – Set any goal and go

The approach above is a general strategy and not one that is limited to just cost savings.  The “set point” above is simply a goal that a data platform person puts in.  It can be any kind of goal, for example runtime is a great example.  

Let’s say we want a job to be under a 1 hour runtime (or SLA).  We can let the system above tweak the configurations until the SLA is hit.  Or what if it’s more complicated, a cost and SLA goal simultaneously?  No problem at all, the system can optimize to hit your goals over many parameters.  In addition to cost and runtime, other business use cases goals are:

  • Resource Utilization: Independent of cost and runtime, am I using the resources I have properly?
  • Energy Efficiency: Am I consuming the least amount of resources possible to minimize my carbon footprint?
  • Fault Tolerance: Is my job actually resilient to failure? Meaning do I want to over-spec it just in case I get preempted or just in case there are no SPOT instances available?
  • Scalability: Does my job scale? What if I have a spike in input data by 10x, will my job crash?
  • Latency: Are my jobs hitting my latency goals? Response time goals?

In theory, all a data platform person has to do is set goals, and then an automatic system can iteratively improve the infrastructure until the goals are hit.  There are no humans in the loop, no engineers to get on board.  The platform team just sets the goals they’ve received from their stakeholders.  Sounds like a dream.

So far we’ve been pretty abstract.  Let’s dive into a some concrete use cases that are hopefully compelling to people:

Example feature #1: Group jobs by business goals

Let’s say you’re a data platform manager and you oversee the operation of hundreds of production jobs.  Right now, they all have their own cost and runtime.  A simple graph below shows a cartoon example, where basically all of the jobs are randomly scattered across a cost and runtime graph.

What if you want to lower costs at scale?  What if you want to change the runtime (or SLA) of many jobs at once?  Right now you’d be stuck.

Now imagine if you had the closed loop control system above implemented for all of your jobs.  All you’d have to do is set the high level business goals of your jobs (in this case SLA runtime requirements), and the feedback control system would do its best to find the infrastructure that accomplishes your goals.  The end state will look like this:

Here we see each job’s color represents a different business goal, as defined by the SLA.  The closed loop feedback control system behind the scenes changed the cluster / warehouse size, various configurations, or even adjusted entire pipelines to try to hit the SLA runtime goals at the lowest cost.  Typically longer job runtimes lead to lower cost opportunities.

Example feature #2: Auto-healing jobs

As most data platform people can confirm, things are always changing in their data pipelines.  Two very popular scenarios are: data size growing over time, and code changes.  Both of which can cause erratic behavior when it comes to cost and runtime.

The illustration below shows the basic concept.  Let’s walk through the example from left to right:

  • Start:  Let’s say you have a job and over time the data size grows.  Normally your cluster stays the same and as a result both costs and runtime increases.
  • Start Feedback:  Over time the runtime approaches the SLA requirement and the feedback control system kicks in at the green arrow.  At this point, the control system changes the cluster to stay below the red line while minimizing costs.
  • Code Change:  At some point a developer pushes a new update to the code which causes a spike in the cost and runtime.  The feedback control system kicks in and adjusts the cluster to work better with the new code change.

Hopefully these two examples explain the potential benefit of how a closed loop control pipeline can be beneficial.  Of course in reality there are many details that have been left out and some design principles companies will have to adhere to.  One big one is a way for configurations to revert back to a previous state in case something goes wrong.  An idempotent pipeline would also be ideal here in case many iterations are needed.

Conclusion

Data pipelines are complex systems, and like any other complex system, they need feedback and control to ensure a stable performance.  Not only does this help solve technical or business problems, it will dramatically help free up data platform and engineering teams to focus on actually building pipelines.  

Like we mentioned before, a lot of this hinges on the performance of the “update config” block.  This is the critical piece of intelligence that is needed to the success of the feedback loop.  It is not trivial to build this block and is the main technical barrier today.  It can be an algorithm or a machine learning model, and utilize historical data.  It is the main technical component we’ve been working on over the past several years.

In our next post we’ll show an actual implementation of this system applied to Databricks Jobs, so you can believe that what we’re talking about is real!

Interested in learning more about closed loop controls for your Databricks pipelines? Reach out to Jeff Chou and the rest of the Sync Team.

How to Use the Gradient CLI Tool to Optimize Databricks / EMR Programmatically

Introduction:

The Gradient Command Line Interface (CLI) is a powerful yet easy utility to automate the optimization of your Spark jobs from your terminal, command prompt, or automation scripts. 

Whether you are a Data Engineer, SysDevOps administrator, or just an Apache Spark enthusiast, knowing how to use the Gradient CLI can be incredibly beneficial as it can dramatically reduce the cost of your Spark workloads and while helping you hit your pipeline SLAs. 

If you are new to Gradient, you can learn more about it in the Sync Docs. In this tutorial, we’ll walk you through the Gradient CLI’s installation process and give you some examples of how to get started. This is meant to be a tour of the CLI’s overall capabilities. For an end to end recipe on how to integrate with Gradient take a look at our Quick Start and Integration Guides.

Pre Work

This tutorial assumes that you have already created a Gradient account and generated your

Sync API keys. If you haven’t generated your key yet, you can do so on the Accounts tab of the Gradient UI.

Step 1: Setting up your Environment

Let’s start by making sure our environment meets all the prerequisites. The Gradient CLI is actually part of the Sync Library, which requires Python v3.7 or above and which only runs on Linux/Unix based systems.

python --version

I am on a Mac and running python version 3.10, so I am good to go, but before we get started I am going to create a Python virtual environment with vEnv. This is a good practice for whenever you install any new Python tool, as it allows you to avoid conflicts between projects and makes environment management simpler. For this example, I am creating a virtual environment called gradient-cli that will reside under the ~/VirtualEnvironments path.

python -m venv ~/VirtualEnvironments/gradient-cli

Step 2: Install the Sync Library

Once you’ve confirmed that your system meets the prerequisites, it’s time to install the Sync Library. Start by activating your new virtual environment.

source ~/VirtualEnvironments/gradient-cli/bin/activate

Next use the pip package installer to install the latest version of the Sync Library.

pip install https://github.com/synccomputingcode/syncsparkpy/archive/latest.tar.gz

You can confirm that the installation was successful by viewing the CLI executable’s version by using the –version or –help options.

sync-cli --help

Step 3. Configure the Sync Library

Configuring the CLI with your credentials and preferences is the final step for the installation and setup for the Sync CLI. To do this, run the configure command:

sync-cli configure

You will be prompted for the following values:

Sync API key ID:

Sync API key secret:

Default prediction preference (performance, balanced, economy) [economy]:

Would you like to configure a Databricks workspace? [y/n]:

Databricks host (prefix with https://):

Databricks token:

Databricks AWS region name:

If you remember from the Pre Work, your Sync API key & secret are found on the Accounts tab of the Gradient UI. For this tutorial we are running on Databricks, so you will need to provide a Databricks Workspace and an Access token.


Databricks recommends that you set up a service principal for automation tasks. As noted in their docs, service principals give automated tools and scripts API-only access to Databricks resources, providing greater security than using users or groups.

These values are stored in ~/.sync/config.

Congrats! You are now ready to interact with Gradient from your terminal, command prompt, or automation scripts.

Step 4. Example Uses

Below are some tasks you can complete using the CLI. This is useful when you want to automate Gradient processes and incorporate them into larger workflows.

Projects

All Gradient recommendations are stored in Projects. Projects are associated with a single Spark job or a group of jobs running on the same cluster. Here are some useful commands you can use to manage your projects with the CLI. For an exhaustive list of commands use the –help option.

Project Commands:

create – Create a project

sync-cli projects create --description [TEXT] --job-id [Databricks Job ID] PROJECT_NAME

delete – Delete a project

sync-cli projects delete PROJECT_ID

get – Get info on a project

sync-cli projects get PROJECT_ID

list – List all projects for account

sync-cli projects list

Predictions

You can also use the CLI to manage, generate and retrieve predictions. This is useful when you want to automate the implementation of recommendations within your Databricks or EMR environments.

Prediction commands:

get – Retrieve a specific prediction

sync-cli predictions get --preference [performance|balanced|economy] PREDICTION_ID

list – List all predictions for account or project

sync-cli predictions list --platform [aws-emr|aws-databricks] --project TEXT

status – Get the status of a previously initiated prediction

sync-cli predictions status PREDICTION_ID

The CLI also provides platform specific commands to generate and retrieve predictions.

Databricks

For Databricks you can generate a recommendation for a previously completed job run with the following command:

sync-cli aws-databricks create-prediction --plan [Standard|Premium|Enterprise] --compute ['Jobs Compute'|'All Purpose Compute'] --project [Your Project ID] RUN_ID

If the run you provided was not already configured with the Gradient agent when it executed, you can still generate a recommendation but the basis metrics may be missing some time sensitive information that may no longer be available. To enable evaluation of prior logs executed without the Gradient agent, you can add the –allow-incomplete-cluster-report option. However, to avoid this issue altogether, you can implement the agent and re-run the job.

Alternatively, you can use the following command to run the job and request a recommendation with a single command:

sync-cli aws-databricks run-job --plan [Standard|Premium|Enterprise] --compute ['Jobs Compute'|'All Purpose Compute'] --project [Your Project ID] JOB_ID

This method is useful in cases when you are able to manually run your job without interfering with scheduled runs.

Finally, to implement a recommendation and run the job with the new configuration, you can issue the following command:

sync-cli aws-databricks run-prediction --preference [performance|balanced|economy] JOB_ID PREDICTION_ID

EMR

Similarly, for Spark EMR, you can generate a recommendation for a previously completed job. EMR does not have the same issue with regard to ephemeral cost data not being available, so you can request a recommendation on a previous run without the Gradient agent.

sync-cli aws-emr create-prediction --region [Your AWS Region] CLUSTER_ID

Use the following command to do so:

If you want to manually rerun the EMR job and immediately request a Gradient recommendation, use the following command:

sync-cli aws-emr record-run --region [Your AWS Region] CLUSTER_ID PROJECT

To execute the EMR job using the recommended configuration, use the following command:

sync-cli aws-emr run-prediction --region [Your AWS Region] PREDICTION_ID

Products

Gradient is constantly working on adding support for new data engineering platforms. To see which platforms are supported by your version of the CLI, you can use the following command:

sync-cli products

Configuration

Should you ever need to update your CLI configurations, you can call config again to change one or more your values.

sync-cli configure --api-key-id TEXT --api-key-secret TEXT --prediction-preference TEXT --databricks-host TEXT --databricks-token TEXT --databricks-region TEXT

Token

The Token command returns an access token that you can use against our REST API with clients like postman

sync-cli token

Conclusion

With these simple commands, you can automate the end to end optimization of all your Databricks or EMR workloads, dramatically reducing your costs and improving the performance. For more information refer to our developer docs or reach out to us at info@synccomputing.com.

Do Graviton instances lower costs for Spark on EMR on AWS?

Here at Sync we are passionate about optimizing cloud infrastructure for Apache Spark workloads.  One question we receive a lot is

“Do Graviton instances help lower costs?”  


For a little background information, AWS built their own processors which promise to be a “major leap” in performance.  Specifically for Spark on EMR, AWS published a report that claimed Graviton can help reduce costs up to 30% and speed up performance up to 15%.  These are fantastic results, and who doesn’t love a company that builds their own hardware.

As an independent company, here at Sync, we of course always want to verify AWS’s claims on the performance of Graviton instances.  So in this blog post we run several experiments with the TPC-DS benchmark with various driver and worker count configurations on several different instance classes to see for ourselves how these instances stack up.

The Experiment

The goal of the experiment is to see how Graviton instances perform relative to other popular instances that people use.  There are of course hundreds of instances types, so we only selected 10 popular instances to make this a feasible study.  

As for the workload, we selected the fan favorite benchmark, TPC-DS 1TB, with all 98 queries run in series.  This is different compared to what AWS used in their study, which was to look at individual queries within the benchmark.  We decided to track the total job runtime of all queries since we’re just looking for the high level “average” performance to see if any interesting trends appear.  Results of course may vary query by query, and of course your individual code is a complete wildcard.  We make no claim that these results are generally true for all workloads or your specific workloads.

The details of the experimental sweeps are shown below:

  • Workload:  TPC-DS 1TB (queries 1-98 run in series)
  • EMR Version:  6.2.0
  • Instances: [r6g, m5dn, c5, i3, m6g, r5, m5d, m5, c6g, r5d] (bold are the Graviton instances)
  • Driver Node sizes: *.xlarge, *.2xlarge, *.4xlarge  (* = instances)
  • Worker Nodes: *.xlarge
  • Number of workers: [5,12,20,32,50]
  • Cores.executor: 4
  • Market: on-demand, list pricing
  • Cost data:  True AWS costs extracted from the cost and usage reports, includes both EC2 and EMR fees

The Result

Below is a global view of all of the experiments run showing cost vs. runtime.  Each dot represents a different configuration as described by the list above.  Points that are in the bottom left hand corner edge are ideal as they are both cheaper and faster.

At a high level, we see that the c6g instances (light green dots) were the lowest cost with comparable runtimes, which was interesting to see.  The other two graviton instance (r6g and m6g) skewed lower-left than most of the other instances as well.  

One deviation is the c5 instances performed surprisingly well on both the cost and runtime curves.  They were quite similar to the best graviton chip, the c6g.

To make the information a bit easier to digest, we take an average of the runtime and cost data to do a clear side by side comparison of the different instances.  The salmon colored bars are the Graviton enabled instances.  

In the graph below the runtime of Graviton instances were comparable with other instances.  The r6g instances were the fastest instances, although not by much – only about 6.5% faster than m6g.  The one negative standout was that the i3 instances took around 20% longer runtime than all of the other instances.

More variation is seen in the cost breakdown, where we see that the Graviton instances were typically lower cost than their non-Graviton counterparts, some by a wide margin.  What really stole the show were the “c” class instances, where c5 actually was cheaper by about 10% than the m6g and r6g Graviton instances.  

The global winner was the c6g instance, which was the absolute cheapest.  It’s interesting to see the significant cost difference between the max (i3) and min (c6g), which shows a 70% cost difference!  

Based on the data above, it’s interesting to see that the runtime of Graviton instances was comparable to other non-Graviton instances.  So, what then was the cause of the huge cost differential?  It seems at the end of the day the total job cost generally followed the trends of the list prices of the machines.  Let’s look deeper.

The table below shows the list price of the instances and their on-demand list price, in order of lowest to highest cost.  We can see the lowest instance cost was the Graviton instance c6g, which corresponds to the study above where the c6g was the lowest cost.

However, there were some exceptions where more expensive instances still had cheaper total job costs:

  1. c5.xlarge – Was the 3rd lowest cost on-demand price, however had the 2nd cheapest overall job cost
  2. R6g.xlarge – Was the 5th lowest cost on-demand price, however had the 3rd  cheapest overall job cost

These two exceptions show that the actual list price of the instances doesn’t always guarantee overall total cost trends.  Sometimes the hardware is such a great fit for your job that it overcomes the higher cost.

InstanceList Price On-Demand
c6g.xlarge0.136
m6g.xlarge0.154
c5.xlarge0.17
m5.xlarge0.192
r6g.xlarge0.2016
m5d.xlarge0.226
r5.xlarge0.252
m5dn.xlarge0.272
r5d.xlarge0.288
I3.xlarge0.312

Conclusion

So at the end of the day, do Graviton instances save you money?  From this study, I’d say that on average their cost/performance numbers were in fact better than other popular instances.  However, as we saw above, it is not always true and, like most things we post – it depends.

If you’re able to explore different instance types, I’d definitely recommend trying out Graviton instances, as they look like a pretty solid bet.  

To revisit the claims that AWS had about Graviton instances being 30% cheaper and 15% more performant, based on the data above that is not always true and depends on a lot of cluster parameters.  

For example, one thing we’ll note is that in the AWS study, they only used workers with *.2xlarge instances, whereas our study only looked at *.xlarge worker node instances.  I also have no idea what Apache Spark configurations they used and if they matched what we did or not.

At the end of the day, everything depends on your workload and what your job is trying to do.  There is no one-size-fits-all instance for your jobs.  That’s why we built the Apache Spark Gradient to help users easily optimize their Apache Spark configurations and instance types to help hit their cost and runtime needs.

Feel free to try out the Spark Gradient yourself!

How does the worker size impact costs for Apache Spark on EMR AWS?

Here at Sync, we are passionate about optimizing data infrastructure on the cloud, and one common point of confusion we hear from users is what kind of worker instance size is best to use for their job?

Many companies run production data pipelines on Apache Spark in the elastic map reduce (EMR) platform on AWS.  As we’ve discussed in previous blog posts, wherever you run Apache Spark, whether it be on Databricks or EMR, the infrastructure you run it on can have a huge impact on the overall cost and performance.

To make matters even more complex, the infrastructure settings can change depending on your business goals.  Is there a service level agreement (SLA) time requirement?  Do you have a cost target?  What about both?  

One of the key tuning parameters is which instance size should your workers run on?  Should you use a few large nodes?  Or perhaps a lot of small nodes?  In this blog post, we take a deep dive into some of these questions utilizing the TPC-DS benchmark.  

Before starting, we want to be clear that these results are very specific to the TPC-DS workload, while it may be nice to generalize, we fully note that we cannot predict that these trends will hold true for other workloads.  We highly recommend people run their own tests to confirm.  Alternatively, we built the Gradientr for Apache Spark to help accelerate this process (feel free to check it out yourself!).

With that said, let’s go!

The Experiment

The main question we seek to answer is – “How does the worker size impact cost and performance for Spark EMR jobs?”  Below are the fixed parameters we used when conducting this experiment:

  • EMR Version: 6.2
  • Driver Node: m5.xlarge
  • Driver EBS storage: 32 GB
  • Worker EBS storage: 128 GB 
  • Worker instance family: m5
  • Worker type: Core nodes only
  • Workload: TPC-DS 1TB (Queries 1-98 in series)
  • Cost structure: On-demand, list price (to avoid Spot node variability)
  • Cost data: Extracted from the AWS cost and usage reports, includes both the EC2 fees and the EMR management fees

Fixed Spark settings:

  • Spark.executor.cores: 4
  • Number of executors: set to 100% cluster utilization based on the cluster size
  • Spark.executor.memory: automatically set based on number of cores

The fixed Spark settings we selected were meant to mimic safe “default” settings that an average Spark user may select at first.  To explain those parameters a bit more, since we are changing the worker instance size in this study, we decided to keep the number of cores per executor to be constant at 4.  The other parameters such as number of executors and executor memory are automatically calculated to utilize the machines to 100%.

For example, if a machine (worker) has 16 cores, we would create 4 executors per machine (worker).  If the worker has 32 cores, we would create 8 executors.

The variables we are sweeping are outlined below:

  • Worker instance type: m5.xlarge, m5.2xlarge, m5.4xlarge
  • Number of workers: 1-50 nodes

Results

The figure below shows the Spark runtime versus the number and type of workers.  The trend here is pretty clear, in that larger clusters are in fact faster.  The 4xlarge size outperformed all other cluster sizes.  If speed is your goal, selecting larger workers could help.  If one were to pick a best instance based on the graph below, one may draw the conclusion that:

It looks like the 4xlarge is the fastest choice

The figure below shows the true total cost versus the number and type of workers.  On the cost metric, the story almost flips compared to the runtime graph above.  The smallest instance usually outperformed larger instances when it came to lowering costs.  For 20 or more workers, the xlarge instances were cheaper than the other two choices.

If one were to quickly look at the plot below, and look for the “lowest points” which correspond to lowest cost, one could draw a conclusion that:

It looks like the 2xlarge and xlarge instance are the lowest cost, depending on the number of workers

However, the real story comes when we merge those two plots together and simultaneously look at cost vs. runtime.  In this plot, it is more desirable to be toward the bottom left, this means the run is both lower cost and faster.  As the plot below shows, if one were to look at the lowest points, the conclusion to be drawn is:

It looks like 4xlarge instances are the lowest cost choice… what?

What’s going on here, is that for a given runtime, there is always a lower cost configuration with the 4xlarge instances.  When you put it into that perspective, there is little to reason to use xlarge sizes as going to larger machines can get you something both faster and cheaper.  

The only caveat here is there is a floor to how cheap and slow the 4xlarge cluster can give you, and that’s with a worker count of 1.  Meaning, you could get a cheaper cluster with a smaller 2xlarge cluster, but the runtime becomes quite long and may be unacceptable for real-world applications.

Here’s a generally summary of how the “best worker” choice can change depending on your cost and runtime goals:

Runtime GoalCost GoalBest Worker
<20,000 secondsMinimize4xlarge
<30,000 secondsMinimize2xlarge
<A very long timeMinimizexlarge

A note on extracting EMR costs

Extracting the actual true costs for individual EMR jobs from the AWS billing information is not straight forward.  We had to write custom scripts to scan the low level cost and usage reports, looking for specific EMR cluster tags.  The exact mechanism for retrieving these costs will probably vary company to company, as different security permissions may alter the mechanics of how these costs can be extracted

If you work at a company and EMR costs are a high priority and you’d like help extracting your true EMR job level costs, feel free to reach out to us here at Sync, we’d be happy to work together.

Conclusion

The main takeaways here are the following points:

  • It Depends:  Selecting the “best” worker is highly dependent on both your cost and runtime goals.  It’s not straightforward what the best choice is.
  • It really depends:  Even with cost and runtime goals set, the “best” worker will also depend on the code, the data size, the data skew, Spot instance pricing, availability to just name a few.  
  • Where even are the costs?  Extracting the actual cost per workload is not easy in AWS, and is actually quite painful to capture both the EC2 and EMR management fees.

Of course here at Sync, we’re working on making this problem go away.  This is why we built the Spark Gradient product to help users quickly understand their infrastructure choices given business needs.  

Feel free to check out the Gradient yourself here!

You can also read our other blog posts here which go into other fundamental Spark infrastructure optimization questions.

Databricks driver sizing impact on cost and performance

As many previous blog posts have reported, tuning and optimizing the cluster configurations of Apache Spark is a notoriously difficult problem.  Especially when a data engineer needs to lower costs or accelerate runtimes on platforms such as EMR or Databricks on AWS, tuning these parameters becomes a high priority.  

Here at Sync, we will experimentally explore the impact of driver sizing in the Databricks platform on the TPC-DS 1TB benchmark, to see if we can obtain an understanding of the relationship between the driver instance size and cost/runtime of the job.

Driver node review

For those who may be less familiar with the driver node details in Apache Spark, there are many excellent previous blog posts as well as the official documentation on this topic and I will recommend users to read those if they are not familiar.  As a quick summary, the driver is an important part of the Apache Spark system and effectively acts as the “brain” of the entire operation.  

The driver program runs the main() function, creates the spark context, and schedules tasks onto the worker nodes.  Aside from these high level functions, we’d like to note that the driver node is also used in the execution of some functions, most famously when using the collect operation and broadcast joins.  During those functions, data is moved to the driver node and if it’s not appropriately sized, can cause a driver side out of memory error which can shut down the entire cluster.

As a quick side note, for broadcast joins, It looks like a ticket has been filed to change this behavior (at least for broadcast joins) in the open source Spark core.  So people should be aware that this may change in the future.

How Does Driver Sizing Impact Performance As a Function of the Number Of Workers?

The main experimental question we want to ask is “how does driver sizing impact performance as a function of the number of workers?”  The reason why we want to correlate driver size with the number of workers is that the number of workers is a very important parameter when tuning systems for either cost or runtime goals.  Observing how the driver impacts the worker scaling of the job is a key part of understanding and optimizing a cluster.

Fundamentally, the maximum number of tasks that can be executed in parallel is determined by the number of workers and executors.  Since the driver node is responsible for scheduling these tasks, we wanted to see if the number of workers changes the hardware requirements of the driver.  For example, does scheduling 1 million tasks require a different driver instance type than scheduling 10 tasks?  

Experimental Setup

The technical parameters of the experiment are below:

  • Data Platform:  Databricks
  • Compute type: Jobs (ephemeral cluster, 1 job per cluster)
  • Photon Enabled: No
  • Fixed parameters::  All worker nodes are i3.xlarge, all configs default
  • Sweep parameters:  Driver instance size (r5a.large, r5a.xlarge, r5a.4xlarge), number of workers
  • AWS market:  On-demand (to eliminate spot fluctuations)
  • Workload: Databrick’s own benchmark on  TPC-DS 1TB (all queries run sequentially)

For reference, here are the hardware specifications of the 3 different drivers used on AWS:

The result

We will break down the results into 3 main plots.  The first is below where we look at runtime vs. number of workers for the 3 different driver types.  In the plot below we see that as the number of workers increases the runtime decreases.  We note here that the scaling trend is not linear and there is a typical “elbow” scaling that occurs.  We published previously the general concept of scaling jobs.  We observe here that the largest driver, r5a.4xlarge, yielded the fastest performance across all worker sizes.

In the plot below we see the cost (DBU’s in $) vs. number of workers.  For the most part we see that the medium sized driver, r5a.xlarge is the most economical, except for the smallest number of workers where the smallest driver size r5a.large was the cheapest.

Putting both plots together, we can see the general summary when we plot cost vs. runtime.  The small numbers next to each point show the number of workers.  In general, the ideal points should be toward the bottom left, as that indicates a configuration that is both faster and cheaper.  Points that are higher up or to the right are more expensive and slower.  

Some companies are only concerned about service level agreement (SLA) timelines, and do not actually need the “fastest” possible runtime.  A more useful way to think about the plot below is to ask the question “what is the maximum time you want to spend running this job?”  Once that number is known, you can then select the configuration with the cheapest cost that matches your SLA.  

For example, consider the SLA scenarios below:

1)  SLA of 2500s – If you need your job to be completed in 2,500s or less, then you should select the r5a.4xlarge driver with a worker size of 50.

2)  SLA of 4000s – If you need your job to be completed in 4,000s or less, then you should select the r5a.xlarge driver with a worker size of 20.

3)  SLA of 10,000s – If you need your job to be completed in 10,000s or less, then you should select the r5a.large driver with a worker size of 5.

Key Insights

It’s very convenient to see the scaling trend of all 3 drivers plotted in this manner, as there are several key insights gained here:

  1. There is a general “good” optimal driver for TPC-DS 1TB – across the spectrum, it’s clear that r5a.xlarge is a good choice generally as it is usually cheaper and faster than the other driver sizes.  This shows the danger that if your driver is too big or too small, you could be wasting money and time.  
  2. At the extremes, driver size matters for TPC-DS 1TB  – At the wings of either large clusters (50 workers) or small clusters (5 workers) we can see that the best driver selection can swing between all 3 drivers.  
  3. Drivers can be too big – At 12 workers, the r5a.4xlarge performance is slightly faster but significantly more expensive than the other two driver types.  Unless that slight speedup is important, it’s clear to see that if a driver is too large, then the extra cost of the larger driver is not worth the slight speedup.  It’s like buying a Ferrari to just sit in traffic – definitely not worth it (although you will look cool).
  4. Small driver bottleneck – For the small driver curve (r5a.large), we see that the blue line’s elbow occurs at a higher runtime than the middle driver (r5a.xlarge).  This implies that the smaller driver is creating a runtime bottleneck for the entire workload as the cluster becomes larger.  The next section will dive into why.

Root cause analysis for the “small driver bottleneck”

To investigate the cause of the small driver bottleneck, we looked into the Spark eventlogs to see what values changed as we scaled the number of workers.  In the Spark UI in Databricks, the typical high level metrics for each task are shown below and plotted graphically.  The image below shows an example of a single task broken down into the 7 main metrics:

When we aggregated all of these values across all tasks, across all the different drivers and workers, the numbers were all pretty consistent, except for one number:  “Scheduler Delay”.   For those who may not be familiar, the formal definition from the Databricks Spark UI, is shown in the image below:

“Scheduler delay includes time to ship the task from the scheduler to the executor, and time to send the task result from the executor to the scheduler. If scheduler delay is large, consider decreasing the size of tasks or decreasing the size of task results.”

In the graph below, we plot the total aggregated scheduler delays of all tasks for each of the job configurations vs the number of workers.  It is expected that the aggregated scheduler delay should increase for a larger number of workers since there are more tasks.  For example, if there are 100 tasks, each with 1s of scheduler delay, the total aggregated scheduler day is 100s (even if all 100 tasks executed in parallel and the “wall clock” scheduler delay is only 1s).  Therefore, if there are 1000 tasks, the total aggregated scheduler should increase as well.  

Theoretically this should scale roughly linearly with the number of workers for a “healthy” system.  For the “middle” and “large” sized drivers (r5a.xlarge and r5a.4xlarge respectively), we see the expected growth of the scheduler delay.  However, for the “small” r5a.large driver, we see a very non-linear growth of the total aggregated scheduler delay, which contributes to the overall longer job runtime.  This appears to be a large contributor to the “small driver bottleneck” issue.

To understand a bit deeper as to what is the formal definition of Scheduler Delay, let’s look at the Spark source code inside the function AppStatusUtils.scala.  At a high level, scheduler delay is a simple calculation as shown in the code below:

schedulerDelay = duration – runTime – deserializeTime – serlializeTime – gettingResultTime

To put it in normal text, scheduler delay is basically a catch-all term, that is the time the task is spent doing something that is not executing, serializing data, or getting results.  A further question would be to see which one of these terms is increasing or decreasing due to the smaller driver?  Maybe duration is increasing, or maybe gettingResultTime is decreasing?  

If we look at the apples to apples case of 32 workers for the “medium” r5a.xlarge driver and the “small” r5a.large driver, the runtime of the “small” driver was significantly longer.  One could hypothesize that the average duration per task is longer (vs. one of the other terms becoming smaller).  

In summary, our hypothesis here is that by reducing the driver size (number of VCPUs and memory), we are incurring an additional time “tax” on each task by taking, on average, slightly longer to ship a task from the scheduler on the driver to each executor.  

A simple analogy here is, imagine you’re sitting in bumper to bumper traffic on a highway, and then all of a sudden every car (a task in Spark) just grew 20% longer, if there are enough cars you could be set back miles.

Conclusion

Based on the data described above, the answer to the question above is that inappropriately sized drivers can lead to excess cost and performance as workers scale up and down.  We present a hypothesis that a driver that is “too small” with too few VCPUs and memory, could cause, on average, an increase in the task duration via an additional overhead in the scheduler delay.  

This final conclusion is not terribly new to those familiar with Spark, but we hope seeing actual data can help create a quantitative understanding on the impact of driver sizing.  There are of course many other things that could cause a poor driver to elongate or even crash a job, (as described earlier via the OOM errors).  This analysis was just a deep dive into one observation.

I’d like to put a large caveat here that this analysis was specific to the TPC-DS workload, and it would be difficult to generalize these findings across all workloads.  Although the TPC-DS benchmark is a collection of very common SQL queries, in reality individual code, or things like user defined functions, could throw these conclusions out the window.  The only way to know for sure about your workloads is to run some driver sizing experiments.

As we’ve mentioned many times before, distributed computing is complicated, and optimizing your cluster for your job needs to be done on an individual basis.  Which is why we built the Apache Spark Autotuner for EMR and Databricks on AWS to help data engineers quickly find the answers they are looking for.