Case Study

Do Graviton instances lower costs for Spark on EMR on AWS?

Here at Sync we are passionate about optimizing cloud infrastructure for Apache Spark workloads.  One question we receive a lot is

“Do Graviton instances help lower costs?”  


For a little background information, AWS built their own processors which promise to be a “major leap” in performance.  Specifically for Spark on EMR, AWS published a report that claimed Graviton can help reduce costs up to 30% and speed up performance up to 15%.  These are fantastic results, and who doesn’t love a company that builds their own hardware.

As an independent company, here at Sync, we of course always want to verify AWS’s claims on the performance of Graviton instances.  So in this blog post we run several experiments with the TPC-DS benchmark with various driver and worker count configurations on several different instance classes to see for ourselves how these instances stack up.

The Experiment

The goal of the experiment is to see how Graviton instances perform relative to other popular instances that people use.  There are of course hundreds of instances types, so we only selected 10 popular instances to make this a feasible study.  

As for the workload, we selected the fan favorite benchmark, TPC-DS 1TB, with all 98 queries run in series.  This is different compared to what AWS used in their study, which was to look at individual queries within the benchmark.  We decided to track the total job runtime of all queries since we’re just looking for the high level “average” performance to see if any interesting trends appear.  Results of course may vary query by query, and of course your individual code is a complete wildcard.  We make no claim that these results are generally true for all workloads or your specific workloads.

The details of the experimental sweeps are shown below:

  • Workload:  TPC-DS 1TB (queries 1-98 run in series)
  • EMR Version:  6.2.0
  • Instances: [r6g, m5dn, c5, i3, m6g, r5, m5d, m5, c6g, r5d] (bold are the Graviton instances)
  • Driver Node sizes: *.xlarge, *.2xlarge, *.4xlarge  (* = instances)
  • Worker Nodes: *.xlarge
  • Number of workers: [5,12,20,32,50]
  • Cores.executor: 4
  • Market: on-demand, list pricing
  • Cost data:  True AWS costs extracted from the cost and usage reports, includes both EC2 and EMR fees

The Result

Below is a global view of all of the experiments run showing cost vs. runtime.  Each dot represents a different configuration as described by the list above.  Points that are in the bottom left hand corner edge are ideal as they are both cheaper and faster.

At a high level, we see that the c6g instances (light green dots) were the lowest cost with comparable runtimes, which was interesting to see.  The other two graviton instance (r6g and m6g) skewed lower-left than most of the other instances as well.  

One deviation is the c5 instances performed surprisingly well on both the cost and runtime curves.  They were quite similar to the best graviton chip, the c6g.

To make the information a bit easier to digest, we take an average of the runtime and cost data to do a clear side by side comparison of the different instances.  The salmon colored bars are the Graviton enabled instances.  

In the graph below the runtime of Graviton instances were comparable with other instances.  The r6g instances were the fastest instances, although not by much – only about 6.5% faster than m6g.  The one negative standout was that the i3 instances took around 20% longer runtime than all of the other instances.

More variation is seen in the cost breakdown, where we see that the Graviton instances were typically lower cost than their non-Graviton counterparts, some by a wide margin.  What really stole the show were the “c” class instances, where c5 actually was cheaper by about 10% than the m6g and r6g Graviton instances.  

The global winner was the c6g instance, which was the absolute cheapest.  It’s interesting to see the significant cost difference between the max (i3) and min (c6g), which shows a 70% cost difference!  

Based on the data above, it’s interesting to see that the runtime of Graviton instances was comparable to other non-Graviton instances.  So, what then was the cause of the huge cost differential?  It seems at the end of the day the total job cost generally followed the trends of the list prices of the machines.  Let’s look deeper.

The table below shows the list price of the instances and their on-demand list price, in order of lowest to highest cost.  We can see the lowest instance cost was the Graviton instance c6g, which corresponds to the study above where the c6g was the lowest cost.

However, there were some exceptions where more expensive instances still had cheaper total job costs:

  1. c5.xlarge – Was the 3rd lowest cost on-demand price, however had the 2nd cheapest overall job cost
  2. R6g.xlarge – Was the 5th lowest cost on-demand price, however had the 3rd  cheapest overall job cost

These two exceptions show that the actual list price of the instances doesn’t always guarantee overall total cost trends.  Sometimes the hardware is such a great fit for your job that it overcomes the higher cost.

InstanceList Price On-Demand
c6g.xlarge0.136
m6g.xlarge0.154
c5.xlarge0.17
m5.xlarge0.192
r6g.xlarge0.2016
m5d.xlarge0.226
r5.xlarge0.252
m5dn.xlarge0.272
r5d.xlarge0.288
I3.xlarge0.312

Conclusion

So at the end of the day, do Graviton instances save you money?  From this study, I’d say that on average their cost/performance numbers were in fact better than other popular instances.  However, as we saw above, it is not always true and, like most things we post – it depends.

If you’re able to explore different instance types, I’d definitely recommend trying out Graviton instances, as they look like a pretty solid bet.  

To revisit the claims that AWS had about Graviton instances being 30% cheaper and 15% more performant, based on the data above that is not always true and depends on a lot of cluster parameters.  

For example, one thing we’ll note is that in the AWS study, they only used workers with *.2xlarge instances, whereas our study only looked at *.xlarge worker node instances.  I also have no idea what Apache Spark configurations they used and if they matched what we did or not.

At the end of the day, everything depends on your workload and what your job is trying to do.  There is no one-size-fits-all instance for your jobs.  That’s why we built the Apache Spark autotuner to help users easily optimize their Apache Spark configurations and instance types to help hit their cost and runtime needs.

Feel free to try out the Spark Autotuner yourself!

How does the worker size impact costs for Apache Spark on EMR AWS?

Here at Sync, we are passionate about optimizing data infrastructure on the cloud, and one common point of confusion we hear from users is what kind of worker instance size is best to use for their job?

Many companies run production data pipelines on Apache Spark in the elastic map reduce (EMR) platform on AWS.  As we’ve discussed in previous blog posts, wherever you run Apache Spark, whether it be on Databricks or EMR, the infrastructure you run it on can have a huge impact on the overall cost and performance.

To make matters even more complex, the infrastructure settings can change depending on your business goals.  Is there a service level agreement (SLA) time requirement?  Do you have a cost target?  What about both?  

One of the key tuning parameters is which instance size should your workers run on?  Should you use a few large nodes?  Or perhaps a lot of small nodes?  In this blog post, we take a deep dive into some of these questions utilizing the TPC-DS benchmark.  

Before starting, we want to be clear that these results are very specific to the TPC-DS workload, while it may be nice to generalize, we fully note that we cannot predict that these trends will hold true for other workloads.  We highly recommend people run their own tests to confirm.  Alternatively, we built the Autotuner for Apache Spark to help accelerate this process (feel free to check it out yourself!).

With that said, let’s go!

The Experiment

The main question we seek to answer is – “How does the worker size impact cost and performance for Spark EMR jobs?”  Below are the fixed parameters we used when conducting this experiment:

  • EMR Version: 6.2
  • Driver Node: m5.xlarge
  • Driver EBS storage: 32 GB
  • Worker EBS storage: 128 GB 
  • Worker instance family: m5
  • Worker type: Core nodes only
  • Workload: TPC-DS 1TB (Queries 1-98 in series)
  • Cost structure: On-demand, list price (to avoid Spot node variability)
  • Cost data: Extracted from the AWS cost and usage reports, includes both the EC2 fees and the EMR management fees

Fixed Spark settings:

  • Spark.executor.cores: 4
  • Number of executors: set to 100% cluster utilization based on the cluster size
  • Spark.executor.memory: automatically set based on number of cores

The fixed Spark settings we selected were meant to mimic safe “default” settings that an average Spark user may select at first.  To explain those parameters a bit more, since we are changing the worker instance size in this study, we decided to keep the number of cores per executor to be constant at 4.  The other parameters such as number of executors and executor memory are automatically calculated to utilize the machines to 100%.

For example, if a machine (worker) has 16 cores, we would create 4 executors per machine (worker).  If the worker has 32 cores, we would create 8 executors.

The variables we are sweeping are outlined below:

  • Worker instance type: m5.xlarge, m5.2xlarge, m5.4xlarge
  • Number of workers: 1-50 nodes

Results

The figure below shows the Spark runtime versus the number and type of workers.  The trend here is pretty clear, in that larger clusters are in fact faster.  The 4xlarge size outperformed all other cluster sizes.  If speed is your goal, selecting larger workers could help.  If one were to pick a best instance based on the graph below, one may draw the conclusion that:

It looks like the 4xlarge is the fastest choice

The figure below shows the true total cost versus the number and type of workers.  On the cost metric, the story almost flips compared to the runtime graph above.  The smallest instance usually outperformed larger instances when it came to lowering costs.  For 20 or more workers, the xlarge instances were cheaper than the other two choices.

If one were to quickly look at the plot below, and look for the “lowest points” which correspond to lowest cost, one could draw a conclusion that:

It looks like the 2xlarge and xlarge instance are the lowest cost, depending on the number of workers

However, the real story comes when we merge those two plots together and simultaneously look at cost vs. runtime.  In this plot, it is more desirable to be toward the bottom left, this means the run is both lower cost and faster.  As the plot below shows, if one were to look at the lowest points, the conclusion to be drawn is:

It looks like 4xlarge instances are the lowest cost choice… what?

What’s going on here, is that for a given runtime, there is always a lower cost configuration with the 4xlarge instances.  When you put it into that perspective, there is little to reason to use xlarge sizes as going to larger machines can get you something both faster and cheaper.  

The only caveat here is there is a floor to how cheap and slow the 4xlarge cluster can give you, and that’s with a worker count of 1.  Meaning, you could get a cheaper cluster with a smaller 2xlarge cluster, but the runtime becomes quite long and may be unacceptable for real-world applications.

Here’s a generally summary of how the “best worker” choice can change depending on your cost and runtime goals:

Runtime GoalCost GoalBest Worker
<20,000 secondsMinimize4xlarge
<30,000 secondsMinimize2xlarge
<A very long timeMinimizexlarge

A note on extracting EMR costs

Extracting the actual true costs for individual EMR jobs from the AWS billing information is not straight forward.  We had to write custom scripts to scan the low level cost and usage reports, looking for specific EMR cluster tags.  The exact mechanism for retrieving these costs will probably vary company to company, as different security permissions may alter the mechanics of how these costs can be extracted

If you work at a company and EMR costs are a high priority and you’d like help extracting your true EMR job level costs, feel free to reach out to us here at Sync, we’d be happy to work together.

Conclusion

The main takeaways here are the following points:

  • It Depends:  Selecting the “best” worker is highly dependent on both your cost and runtime goals.  It’s not straightforward what the best choice is.
  • It really depends:  Even with cost and runtime goals set, the “best” worker will also depend on the code, the data size, the data skew, Spot instance pricing, availability to just name a few.  
  • Where even are the costs?  Extracting the actual cost per workload is not easy in AWS, and is actually quite painful to capture both the EC2 and EMR management fees.

Of course here at Sync, we’re working on making this problem go away.  This is why we built the Spark Autotuner product to help users quickly understand their infrastructure choices given business needs.  

Feel free to check out the Autotuner yourself here!

You can also read our other blog posts here which go into other fundamental Spark infrastructure optimization questions.

Databricks driver sizing impact on cost and performance

As many previous blog posts have reported, tuning and optimizing the cluster configurations of Apache Spark is a notoriously difficult problem.  Especially when a data engineer needs to lower costs or accelerate runtimes on platforms such as EMR or Databricks on AWS, tuning these parameters becomes a high priority.  

Here at Sync, we will experimentally explore the impact of driver sizing in the Databricks platform on the TPC-DS 1TB benchmark, to see if we can obtain an understanding of the relationship between the driver instance size and cost/runtime of the job.

Driver node review

For those who may be less familiar with the driver node details in Apache Spark, there are many excellent previous blog posts as well as the official documentation on this topic and I will recommend users to read those if they are not familiar.  As a quick summary, the driver is an important part of the Apache Spark system and effectively acts as the “brain” of the entire operation.  

The driver program runs the main() function, creates the spark context, and schedules tasks onto the worker nodes.  Aside from these high level functions, we’d like to note that the driver node is also used in the execution of some functions, most famously when using the collect operation and broadcast joins.  During those functions, data is moved to the driver node and if it’s not appropriately sized, can cause a driver side out of memory error which can shut down the entire cluster.

As a quick side note, for broadcast joins, It looks like a ticket has been filed to change this behavior (at least for broadcast joins) in the open source Spark core.  So people should be aware that this may change in the future.

Experimental Question

The main question we want to ask is “how does driver sizing impact performance as a function of the number of workers?”  The reason why we want to correlate driver size with the number of workers is that the number of workers is a very important parameter when tuning systems for either cost or runtime goals.  Observing how the driver impacts the worker scaling of the job is a key part of understanding and optimizing a cluster.

Fundamentally, the maximum number of tasks that can be executed in parallel is determined by the number of workers and executors.  Since the driver node is responsible for scheduling these tasks, we wanted to see if the number of workers changes the hardware requirements of the driver.  For example, does scheduling 1 million tasks require a different driver instance type than scheduling 10 tasks?  

Experimental Setup

The technical parameters of the experiment are below:

  • Data Platform:  Databricks
  • Compute type: Jobs (ephemeral cluster, 1 job per cluster)
  • Photon Enabled: No
  • Fixed parameters::  All worker nodes are i3.xlarge, all configs default
  • Sweep parameters:  Driver instance size (r5a.large, r5a.xlarge, r5a.4xlarge), number of workers
  • AWS market:  On-demand (to eliminate spot fluctuations)
  • Workload: Databrick’s own benchmark on  TPC-DS 1TB (all queries run sequentially)

For reference, here are the hardware specifications of the 3 different drivers used on AWS:

The result

We will break down the results into 3 main plots.  The first is below where we look at runtime vs. number of workers for the 3 different driver types.  In the plot below we see that as the number of workers increases the runtime decreases.  We note here that the scaling trend is not linear and there is a typical “elbow” scaling that occurs.  We published previously the general concept of scaling jobs.  We observe here that the largest driver, r5a.4xlarge, yielded the fastest performance across all worker sizes.

In the plot below we see the cost (DBU’s in $) vs. number of workers.  For the most part we see that the medium sized driver, r5a.xlarge is the most economical, except for the smallest number of workers where the smallest driver size r5a.large was the cheapest.

Putting both plots together, we can see the general summary when we plot cost vs. runtime.  The small numbers next to each point show the number of workers.  In general, the ideal points should be toward the bottom left, as that indicates a configuration that is both faster and cheaper.  Points that are higher up or to the right are more expensive and slower.  

Some companies are only concerned about service level agreement (SLA) timelines, and do not actually need the “fastest” possible runtime.  A more useful way to think about the plot below is to ask the question “what is the maximum time you want to spend running this job?”  Once that number is known, you can then select the configuration with the cheapest cost that matches your SLA.  

For example, consider the SLA scenarios below:

1)  SLA of 2500s – If you need your job to be completed in 2,500s or less, then you should select the r5a.4xlarge driver with a worker size of 50.

2)  SLA of 4000s – If you need your job to be completed in 4,000s or less, then you should select the r5a.xlarge driver with a worker size of 20.

3)  SLA of 10,000s – If you need your job to be completed in 10,000s or less, then you should select the r5a.large driver with a worker size of 5.

Key Insights

It’s very convenient to see the scaling trend of all 3 drivers plotted in this manner, as there are several key insights gained here:

  1. There is a general “good” optimal driver for TPC-DS 1TB – across the spectrum, it’s clear that r5a.xlarge is a good choice generally as it is usually cheaper and faster than the other driver sizes.  This shows the danger that if your driver is too big or too small, you could be wasting money and time.  
  2. At the extremes, driver size matters for TPC-DS 1TB  – At the wings of either large clusters (50 workers) or small clusters (5 workers) we can see that the best driver selection can swing between all 3 drivers.  
  3. Drivers can be too big – At 12 workers, the r5a.4xlarge performance is slightly faster but significantly more expensive than the other two driver types.  Unless that slight speedup is important, it’s clear to see that if a driver is too large, then the extra cost of the larger driver is not worth the slight speedup.  It’s like buying a Ferrari to just sit in traffic – definitely not worth it (although you will look cool).
  4. Small driver bottleneck – For the small driver curve (r5a.large), we see that the blue line’s elbow occurs at a higher runtime than the middle driver (r5a.xlarge).  This implies that the smaller driver is creating a runtime bottleneck for the entire workload as the cluster becomes larger.  The next section will dive into why.

Root cause analysis for the “small driver bottleneck”

To investigate the cause of the small driver bottleneck, we looked into the Spark eventlogs to see what values changed as we scaled the number of workers.  In the Spark UI in Databricks, the typical high level metrics for each task are shown below and plotted graphically.  The image below shows an example of a single task broken down into the 7 main metrics:

When we aggregated all of these values across all tasks, across all the different drivers and workers, the numbers were all pretty consistent, except for one number:  “Scheduler Delay”.   For those who may not be familiar, the formal definition from the Databricks Spark UI, is shown in the image below:

“Scheduler delay includes time to ship the task from the scheduler to the executor, and time to send the task result from the executor to the scheduler. If scheduler delay is large, consider decreasing the size of tasks or decreasing the size of task results.”

In the graph below, we plot the total aggregated scheduler delays of all tasks for each of the job configurations vs the number of workers.  It is expected that the aggregated scheduler delay should increase for a larger number of workers since there are more tasks.  For example, if there are 100 tasks, each with 1s of scheduler delay, the total aggregated scheduler day is 100s (even if all 100 tasks executed in parallel and the “wall clock” scheduler delay is only 1s).  Therefore, if there are 1000 tasks, the total aggregated scheduler should increase as well.  

Theoretically this should scale roughly linearly with the number of workers for a “healthy” system.  For the “middle” and “large” sized drivers (r5a.xlarge and r5a.4xlarge respectively), we see the expected growth of the scheduler delay.  However, for the “small” r5a.large driver, we see a very non-linear growth of the total aggregated scheduler delay, which contributes to the overall longer job runtime.  This appears to be a large contributor to the “small driver bottleneck” issue.

To understand a bit deeper as to what is the formal definition of Scheduler Delay, let’s look at the Spark source code inside the function AppStatusUtils.scala.  At a high level, scheduler delay is a simple calculation as shown in the code below:

schedulerDelay = duration – runTime – deserializeTime – serlializeTime – gettingResultTime

To put it in normal text, scheduler delay is basically a catch-all term, that is the time the task is spent doing something that is not executing, serializing data, or getting results.  A further question would be to see which one of these terms is increasing or decreasing due to the smaller driver?  Maybe duration is increasing, or maybe gettingResultTime is decreasing?  

If we look at the apples to apples case of 32 workers for the “medium” r5a.xlarge driver and the “small” r5a.large driver, the runtime of the “small” driver was significantly longer.  One could hypothesize that the average duration per task is longer (vs. one of the other terms becoming smaller).  

In summary, our hypothesis here is that by reducing the driver size (number of VCPUs and memory), we are incurring an additional time “tax” on each task by taking, on average, slightly longer to ship a task from the scheduler on the driver to each executor.  

A simple analogy here is, imagine you’re sitting in bumper to bumper traffic on a highway, and then all of a sudden every car (a task in Spark) just grew 20% longer, if there are enough cars you could be set back miles.

Conclusion

Based on the data described above, the answer to the question above is that inappropriately sized drivers can lead to excess cost and performance as workers scale up and down.  We present a hypothesis that a driver that is “too small” with too few VCPUs and memory, could cause, on average, an increase in the task duration via an additional overhead in the scheduler delay.  

This final conclusion is not terribly new to those familiar with Spark, but we hope seeing actual data can help create a quantitative understanding on the impact of driver sizing.  There are of course many other things that could cause a poor driver to elongate or even crash a job, (as described earlier via the OOM errors).  This analysis was just a deep dive into one observation.

I’d like to put a large caveat here that this analysis was specific to the TPC-DS workload, and it would be difficult to generalize these findings across all workloads.  Although the TPC-DS benchmark is a collection of very common SQL queries, in reality individual code, or things like user defined functions, could throw these conclusions out the window.  The only way to know for sure about your workloads is to run some driver sizing experiments.

As we’ve mentioned many times before, distributed computing is complicated, and optimizing your cluster for your job needs to be done on an individual basis.  Which is why we built the Apache Spark Autotuner for EMR and Databricks on AWS to help data engineers quickly find the answers they are looking for.

Is Databricks’s autoscaling cost efficient?

Here at Sync we are always trying to learn and optimize complex cloud infrastructure, with the goal to help more knowledge to the community.  In our previous blog post we outlined a few high level strategies companies employ to squeeze out more efficiency in their cloud data platforms.  One very popular response from mid-sized to large enterprise companies we hear a lot of is:

“We use Autoscaling to minimize costs”

We wanted to zoom into this statement to really understand how true it is, and to get a better understanding of the fundamental question 

“Is autoscaling Apache Spark cost efficient?”  

To explain in more detail, we wanted to investigate the technical side of Autoscaling and really dive deep into a specific example.  Because of this we chose to begin with a gold standard workload to analyze, the TPC-DS benchmark, just to minimize any argument being made that we cherry picked a weird workload to skew the final answer.  Our goal here is to be as technical and informative as possible about a few workloads – we are not trying to perform a broad comprehensive study (that would take a long time).  So let’s begin:

What is Autoscaling?

For those who may not know, Autoscaling is the general concept that a cluster should automatically tune the number of workers (or instances on AWS) based on the needs of your job.  The basic message told to companies is, autoscaling will optimize the cluster for your workload and minimize costs.  

Technically, Autoscaling is usually a reactive algorithm that measures some utilization metric inside your cluster to determine if more or less resources are needed.  While this makes logical sense, in reality the complexity of Apache Spark and constantly changing cloud infrastructure make this problem highly unpredictable.  

In the Databricks UI, autoscaling is just a simple checkbox that many people may overlook.  The choice people make by selecting that box could impact their overall performance significantly.

Since many people use Databricks or EMR, the exact algorithm they employ is behind closed doors, so we don’t know the exact details of their logic.  The only thing we can do is measure their performance.

Experiment Setup

Our goal is to provide a technical study of Autoscaling from a novice’s point of view.  Meaning, our base case to compare against will be whatever “default” settings Databricks suggests.  We are not comparing against the global best or against an expert who has spent many days optimizing a particular cluster (who we think would probably do an awesome job).

  • Data Platform:  Databricks
  • Compute type: Jobs (ephemeral cluster, 1 job per cluster)
  • Photon Enabled: No
  • Baseline configuration:  Default params given to users at spin up
  • AWS market:  Driver on-demand, workers on spot with 100% on-demand fall back
  • Workload: Databrick’s own benchmark on  TPC-DS 100GB (all 99 queries run sequentially)

To keep things simple, we ran 3 comparison job runs:

  1. Fixed 8 Nodes – a fixed 8 node cluster using the default machine types suggested to us in the Databricks UI.
  2. Fixed 2 Nodes w/ Autotuner – We use our Apache Spark Autotuner product to recommend an optimized fixed custer to give us the lowest cost option (runtime not optimized).  The recommendation was to use 2 nodes (with different instance types than default)
  3. Autoscaler 2-8 Nodes – We used the default UI settings in Databricks here.
Fixed ClusterFixed Cluster (Autotuner)Autoscaler 2-8 Nodes
No. of Workers822-8
Driver Nodei3.xlarger5a.largei3.xlarge
Worker Nodesi3.xlargei3.2xlargei3.xlarge
Runtime [s]159324412834
DBU Cost [$]0.60.390.73
AWS Cost [$]0.920.921.35
Total Cost [$]1.521.312.08

The results

To our surprise, of the 3 jobs run, the default autoscaler performed the worst in both runtime and cost.  Both a fixed cluster of 8 nodes and 2 nodes, outperformed autoscaling in both time and cost.  The Sync optimized cluster outperformed autoscaling by 37% in terms of cost and 14% in runtime.  

To examine why the autoscaled cluster performed poorly, let’s look at the number of workers created and shut-down over time, in comparison to the fixed 2 node cluster.  The figure below tells the basic story, that the autoscaled cluster spent a lot of time scaling up and down, tuning itself to the workload itself.  At first glance, that is exactly what autoscaling is supposed to do, so why did the autoscaled cost and runtime perform so poorly?

The main reason, from what we can tell, is that there is a time penalty for changing the cluster size – specifically in upsizing the cluster.  We can see from the cluster event log below, that the time between “RESIZING” and “UPSIZE_COMPLETED” can span several minutes.  Based on the Spark UI, the executors don’t get launched until “UPSIZE_COMPLETED” occurs, so no new computing occurs until this step is achieved.  

Another observation here is that in order for us to run the TPC-DS benchmark, we had to run an init_script to install some code at the start of the job.  Based on the cluster event log below, it looks like every time it upsizes new machines, they have to reinstall all the init_scripts each time which costs time and money.  This is something to consider, where if your job requires you to load specific init_scripts, this would certainly negatively impact the autoscaling performance.

So to summarize, you are paying for the “ramp up time” of new workers during autoscaling, where no computing is occurring.  The more often your cluster upsizes, the more you will be waiting and paying.  

Databricks mentions that using pools can help speed up autoscaling, by creating a pool of “warm” instances ready to be kicked off.  Although you are not charged DBU’s, you do still have to pay AWS’s fees for those machines.  So in the end, it still depends on your workload, size of cluster, and use case if the pools solution makes sense.

Another issue is the question of optimizing for throughput.  If 3 nodes processes the data at the same rate as 8 nodes, then ideally autoscaling should stop at 3 nodes.  But it doesn’t seem like that’s the case here, as auto-scaling just went up to the max workers set by the user. 

The optimized fixed cluster looks at cost and throughput to find the best cluster, which is another reason why it is able to outperform the autoscaling solution.

Some follow up questions:

  • Is this just a TPC-DS specific artifact? 

We ran the same tests with two other internal Spark jobs, which we call Airline Delay and Gen Data, and observed the same trend – that the Autoscale cluster was more expensive than fixed clusters.  The amount of Autoscaling fluctuation was much less for Airline delay, so we noticed the advantage of a fixed cluster was reduced.   Gen Data is a very I/O intense job, and the autoscaler actually did not scale up the cluster beyond 2 nodes.  For the sake of brevity, we won’t show those details here (feel free to reach out if there are more questions).  

We just wanted to confirm that these results weren’t specific to TPC-DS, and if we had more time we could do a large scale test with a diverse set of workloads.  Here we observed the optimized fixed cluster (using the Sync Apache Spark Autotuner) achieved a 28% and 65% cost savings over default autoscaling for Airline Delay and Gen Data respectively.

  • What if we just set Autoscaling to 1-2 nodes (instead of 2-8)?

We thought that if we just changed the autoscaling min and max to be near what the “Fixed 2 node autotuner” cluster was, then it should get about the same runtime and cost.  To our surprise, what happened was the autoscaler bounced back and forth between 1 and 2 nodes, which caused a longer job run than the fixed cluster.  You can see in the plot below, we added the autoscaling job from 1-2 nodes on the worker plot.  Overall the cost of the fixed 2 nodes cluster was still 12% cheaper than the autoscaled version of the same cluster with 1-2 nodes.  

What this results indicates is that the parameters of min/max workers in the autoscaler are also parameters to optimize for cost and require experimentation.

  • How does the cost and runtime of the job change vs. varying the autoscaling max worker count? 

If the cost and runtime of your job changes based on the input into max and min worker count, then autoscaling actually becomes a new tuning parameter.  

The data below shows what happens if we keep the min_worker = 2, but sweep the max_worker from 3 to 8 workers.  Clearly both cost and runtime vary quite a bit compared to the Max Worker count.  And the profiles of these slopes depends on the workload.  The bumpiness of the total cost can be attributed to the fluctuating spot prices.

The black dashed line shows the runtime and cost performance of the optimize fixed 2 node cluster.  We note that a fixed cluster was able to outperform the best optimal autoscaling configuration for cost and runtime for the TPC-DS workload.

  • How did we get the cost of the jobs?

It turns out obtaining the actual cost charged for your jobs is pretty tedious and time consuming.  As a quick summary, below are the steps we took to obtain the actual observed costs of each job:

  1. Obtain the Databricks ClusterId of each completed job.  (this can be found in the cluster details of the completed job under “Automatically added tags”)
  2. In the Databricks console, go to the “manage account>usage tab”, filter results by tags, and search for the specific charge for each ClusterId.  (one note: the cost data is only updated every couple of hours, so you can’t retrieve this information right after your run completes)
  3. In AWS, go to your cost explorer, filter by tags, and type in the same cluster-id to obtain the AWS costs for that job (this tag is automatically transferred to your AWS account).  (Another note, AWS updates this cost data once a day, so you’ll have to wait)
  4. Add together your DBU and AWS EC2 costs to obtain your total job cost.

So to obtain the actual observed total cost (DBU and AWS), you have to wait around 24 hours for all of the cost data to hit their final end points.  We were disappointed to see we couldn’t see the actual cost in real time.  

Conclusion

In our analysis, we saw that a fixed cluster could outperform an autoscaled cluster in both runtime and costs for the 3 workloads that we looked at by 37%, 28%, and 65%.  Our experiments showed that by just sticking to a fixed cluster, we eliminated all of the overhead that came with autoscaling which resulted in faster runtimes and lower costs.  So ultimately, the net cost efficiency all depends on if the scaling benefits outweigh the negative overhead costs.

To be fair to the autoscaling algorithm, it’s very difficult to build a universal algorithm that reactively works for all workloads.  One has to analyze the specifics of each job in order to truly optimize the cluster underneath and then still experiment to really know what’s best.  This point is also not specific to Databricks, as many data platforms (EMR, Snowflake, etc) also have autoscaling policies that may work similarly.

To summarize our findings, here are a few high level takeaways:

  • Autoscaling is not one size fits all –  Cluster configurations is an extremely complicated topic that is highly dependent on the details of your workload.  A reactive autoscaling algorithm and the overheads associated with changing the cluster is a good attempt, but does not solve the problem of cluster optimization.
  • Autoscaling still requires tuning – Since Autoscaling is not a “set and forget” solution, it still requires tuning and experimentation to see what min and max worker settings are optimal for your application.  Unfortunately, since the autoscaling algorithm is opaque to users, the fastest way to determine the best settings is to manually experiment.
  • So when is autoscaling good to use for batch jobs?  It’s difficult to provide a general answer because, like mentioned above, it’s all dependent on your workload.  But perhaps two scenarios I could see are (1) if your job has long periods of idle time, then autoscaling should shut down the nodes correctly, or (2) you are running ad-hoc data science experiments and you are prioritizing productivity over costs.  Scenarios (1) and (2) could be the same thing!
  • So what should people do?  If cost efficiency of your production level Databricks jobs is a priority, I would heavily consider performing an experiment where you select a few jobs, switch them to fixed clusters, and then extract the costs to do a before and after analysis – just like we did here.

The challenge of the last bullet is, what is the optimal fixed cluster?  This is an age-old question that required a lot of manual experimentation to determine in the past, which is why we built the Apache Spark Autotuner to figure that out quickly.  In this study, that is how I found the optimal fixed clusters with a single file upload, without having to run numerous experiments.  

Maybe autoscaling is great for your workloads, maybe it isn’t, unfortunately the answer is really “it depends.”  There’s only one way to really find out – you need to experiment.  

Top 3 trends we’ve learned about the scaling of Apache Spark (EMR and Databricks)

We launched the Autotuner for Apache Spark several months ago, and have worked with many companies on analyzing and optimizing their Apache Spark workloads for EMR and Databricks. In this article, we summarize cluster scaling trends we’ve seen with customers, as well as the theory behind it. The truth is, cluster sizing and configuring is a very complex topic and is different for each workload. Some cloud providers ignore all of the complexities and offer simple “T-Shirt” sizes (e.g. small, large, xlarge), while although great for quick testing of jobs, will lead to massive cost inefficiencies in production environments.

The Sync Autotuner for Apache Spark makes it easy to understand the complex tradeoffs of clusters, and enables data engineers to make the best cloud infrastructure decisions for their production environments.

Try for free: Autotuner for Apache Spark

The Theory

In any distributed computing system (even beyond Apache Spark), there exist well known scaling trends (runtime vs. number of nodes), as illustrated in the images below. These trends are universal and fundamental to computer science, so even if you’re running Tensorflow, OpenFOAM (computational fluid dynamics solver), or MonteCarlo simulations on many nodes, they will all follow one of the three scaling trends below:

Standard Scaling: As more and more nodes are added, the runtime of the job decreases, but the cost also increases. The reason is because adding more nodes is not free computationally, there are usually additional overheads to runtime such as being network bound (e.g. shuffles in Spark), compute bound, I/O bound, or memory bound. As an example, doubling the number of nodes to run your job results in a runtime of more than half of the original runtime if they exhibit standard scaling.

At some point, adding more nodes has diminishing returns and the job stops running faster, but obviously cloud costs start rising (since more nodes are being added). We can see point B here is running on let’s say, 5 nodes, but point A is running on 25 nodes. Running your job at point A is significantly less cost efficient and you may be wasting your money.

Embarrassingly Parallel: This is the case when adding more nodes actually does linearly decrease your runtime, and as a result we see a “flat” cost curve. This is traditionally known in the industry as “embarrassingly parallel” because there are no penalties for adding more nodes. This is usually because there is very little communication between nodes (e.g. no shuffles in Spark), and each node just acts independently.

For example at point B we are running at 5 nodes, but point A we’re running at 25 nodes. Turns out, although your number of nodes from A to B went up by 5x, your runtime also went down by 5x. So they both cancel out and you basically have a flat cost curve. In this case, you are free to increase your cluster size, and decrease your runtime for no extra cost! Due to the computational overheads mentioned above though, this case is quite rare and will eventually stop at large enough nodes (when exactly depends on your code).

Negative Scaling: This is the interesting case when running with more nodes is both cheaper and faster (the complete opposite of “Standard Scaling”). The reason here is that some overheads could actually decrease with larger cluster sizes. For example, there could be a network or disk I/O bound issue (e.g. fetch time waiting for data), where having more nodes increases the effective network or I/O bandwidth and makes your jobs run a lot faster. If you have too few nodes, then network or I/O will be your bottleneck as your Spark application gets hung up on fetching data. Memory bound jobs could also exhibit this behavior if the cluster is too small and doesn’t have enough memory, and there exists significant memory overhead.

For example at point B, we are running at 5 nodes, but now we only have 5 machines performing data read/write. But at point A we have 25 nodes, we have 5x more bandwidth on read/write, and thus the job runs much faster.

Real Customer Plots

The 3 scaling trends are universal behaviors of any distributed compute system, Apache Spark applications included. These scaling curves exist whether you’re running open source Spark, EMR, or Databricks — this is fundamental computer science stuff here.

When we actually started processing customer logs, we noticed that the jobs weren’t even on the proper scaling curve, due to the improper configurations of Spark. As a result, we saw that customers were actually located in the “Land of Inefficiency” (as shown by the striped region below), in which they were observing both larger costs and runtime, for no good reason.

For example if you set your workers and memory settings improperly, the result you’d see in the autotuner is a black “current” dot in the “Land of Inefficiency.” The entire goal of the autotuner is to provide an easy and automatic way for customers to achieve an efficient Spark cluster.

Standard Scaling — In the 3 screen shots below, we see the classic standard scaling for customer jobs. We see the classic “elbow” curve as described above. We can see that here in all 3 cases, all of the users were in the “Land of Inefficiency.” Some of the runtime and cost savings went up to 90%, which was amazing to see. Users can also tune the cost/runtime, based on their company’s goals.

Embarrassingly Parallel: In the screen shots below, we see almost flat curves for these jobs. In these cases the jobs were almost entirely CPU bound, meaning there was little communication between nodes. As a result, adding more nodes linearly increased the runtime. In this case, the jobs were still in the “Land of Inefficiency”, so substantial cost/runtime savings could still be achieved.

Negative Scaling — In the screen shots below, we see the negative scaling behavior. The issue here is a large amount of fetch wait time (e.g. network I/O) that causes larger clusters to be substantially more efficient than smaller clusters. As a result, going to larger clusters will be more advantageous for both cost and runtime.

Conclusion

We hope this was a useful blog for data engineers. As readers hopefully see, the scaling of your big data jobs is not straightforward, and is highly dependent on the particularities of your job. The big question is always, what is the bottleneck of your job? Is it CPU, network, disk I/O, or memory bound? Or perhaps it is a combination of a few things. The truth is, “it depends” and requires workload specific optimization. The Autotuner for Apache Spark is an easy way to understand your workload, bring you out of the “Land of Inefficiency”, and optimize your job depending on the type of scaling behavior it exhibits.

One question we get a lot is — what about multi-tenant situations when one cluster is running hundreds or thousands of jobs? How does the Autotuner take into account other simultaneous jobs? This solutions requires another level of optimization, and one we recently published a paper on entitled “Global Optimization of Data Pipelines on the Cloud”

References

  1. Autotuner post for EMR on AWS
  2. Autotuner post for Databricks on AWS
  3. Global Optimization of Data Pipelines on the Cloud
  4. https://synccomputing.com/autotuner/