Blog

Why Your Data Pipelines Need Closed-Loop Feedback Control

As data teams scale up on the cloud, data platform teams need to ensure the workloads they are responsible for are meeting business objectives.  At scale with dozens of data engineers building hundreds of production jobs, controlling their performance at scale is untenable for a myriad of reasons from technical to human.

The missing link today is the establishment of a closed loop feedback system that helps automatically drive pipeline infrastructure towards business goals.  That was a mouthful, so let’s dive in and get more concrete about this problem.

The problem for data platform teams today

Data platform teams have to manage fundamentally distinct shareholders from management to engineers.  Oftentimes these two teams have opposing goals, and platform managers can be squeezed by both ends.  

Many real conversations we’ve had with platform managers and data engineers typically go like this:


“Our CEO wants me to lower cloud costs and make sure our SLAs are hit to keep our customers happy.”

Okay, so what’s the problem?

“The problem is that I can’t actually change anything directly, I need other people to help and that is the bottleneck”

So basically, platform teams find themselves handcuffed and face enormous friction when trying to actually implement improvements.  Let’s zoom into the reasons why.

What’s holding back the platform team?

  • Data Teams are out of technical scope – Tuning clusters or complex configurations (Databricks, Snowflake) is a time consuming task where data teams would rather be focusing on actual pipelines and SQL code.  Many engineers don’t have the skillset, support structure, or even know what the costs are for their jobs.  Identifying and solving root cause problems is also a daunting task that gets in the way of just standing up a functional pipeline.

  • Too many layers of abstraction – Let’s just zoom in on one stack: Databricks runs their own version of Apache Spark, which runs on a cloud provider’s virtualized compute (AWS, Azure, GCP), with different network options, and different storage options (DBFS, S3, Blob), and by the way everything can be updated independently and randomly throughout the year.  The amount of options is overwhelming and it’s impossible for platform folks to ensure everything is up to date and optimal.

  • Legacy code – One unfortunate reality is simply just legacy code.  Oftentimes teams in a company can change, people come and go, and over time, the knowledge of any one particular job can fade away.  This effect makes it even more difficult to tune or optimize a particular job.

  • Change is scary – There’s an innate fear to change.  If a production job is flowing, do we want to risk tweaking it?  The old adage comes to mind: “if it ain’t broke, don’t fix it.”  Oftentimes this fear is real, if a job is not idempotent or there are other downstream effects, a botched job can cause a real headache.  This creates a psychological barrier to even trying to improve job performance.

  • At scale there are too many jobs – Typically platform managers oversee hundreds if not thousands of production jobs.  Future company growth ensures this number will only increase.  Given all of the points above, even if you had a local expert, going in and tweaking jobs one at a time is simply not realistic.  While this can work for a select few high priority jobs, it leaves the bulk of a company’s workloads more or less uncared for.  

Clearly it’s an uphill battle for data platform teams to quickly make their systems more efficient at scale.  We believe the solution is a paradigm shift in how pipelines are built.  Pipelines need a closed loop control system that constantly drives a pipeline towards business goals without humans in the loop.  Let’s dig in.

What does a closed loop control for a pipeline mean?

Today’s pipelines are what is known as an “open loop” system in which jobs just run without any feedback.  To illustrate what I’m talking about, pictured below shows where “Job 1” just runs every day, with a cost of $50 per run.  Let’s say the business goal is for that job to cost $30.  Well, until somebody actually does something, that cost will remain at $50 for the foreseeable future – as seen in the cost vs. time plot.

What if instead, we had a system that actually fed back the output statistics of the job so that the next day’s deployment can be improved?  It would look something like this:

What you see here is a classic feedback loop, where in this case the desired “set point” is a cost of $30.  Since this job is run every day, we can take the feedback of the real cost and send it to an “update config” block that takes in the cost differential (in this case $20) and as a result apply a change in “Job 1’s configurations.  For example, the “update config” block may reduce the number of nodes in the Databricks cluster.  

What does this look like in production?

In reality this doesn’t happen in a single shot.  The “update config” model is now responsible for tweaking the infrastructure to try to get the cost down to $30.  As you can imagine, over time the system will improve and eventually hit the desired cost of $30, as shown in the image below.

This may all sound fine and dandy, but you may be scratching your head and asking “what is this magical ‘update config’ block?”  Well that’s where the rubber meets the road.  That block is a mathematical model that can input a numerical goal delta, and output an infrastructure configuration or maybe code change.

It’s not easy to make and will vary depending on the goal (e.g. costs vs. runtime vs. utilization).  This model must fundamentally predict the impact of an infrastructure change on business goals – not an easy thing to do.

Nobody can predict the future

One subtle thing is that no “update config” model is 100% accurate.  In the 4th blue dot, you can actually see that the cost goes UP at one point.  This is because the model is trying to predict a change in the configurations that will lower costs, but because nothing can predict with 100% accuracy, sometimes it will be wrong locally, and as a result the cost may go up for a single run, while the system is “training.”

But, over time, we can see that the total cost does in fact go down.  You can think of it as an intelligent trial and error process, since predicting the impact of configuration changes with 100% accuracy is straight up impossible.

The big “so what?” – Set any goal and go

The approach above is a general strategy and not one that is limited to just cost savings.  The “set point” above is simply a goal that a data platform person puts in.  It can be any kind of goal, for example runtime is a great example.  

Let’s say we want a job to be under a 1 hour runtime (or SLA).  We can let the system above tweak the configurations until the SLA is hit.  Or what if it’s more complicated, a cost and SLA goal simultaneously?  No problem at all, the system can optimize to hit your goals over many parameters.  In addition to cost and runtime, other business use cases goals are:

  • Resource Utilization: Independent of cost and runtime, am I using the resources I have properly?
  • Energy Efficiency: Am I consuming the least amount of resources possible to minimize my carbon footprint?
  • Fault Tolerance: Is my job actually resilient to failure? Meaning do I want to over-spec it just in case I get preempted or just in case there are no SPOT instances available?
  • Scalability: Does my job scale? What if I have a spike in input data by 10x, will my job crash?
  • Latency: Are my jobs hitting my latency goals? Response time goals?

In theory, all a data platform person has to do is set goals, and then an automatic system can iteratively improve the infrastructure until the goals are hit.  There are no humans in the loop, no engineers to get on board.  The platform team just sets the goals they’ve received from their stakeholders.  Sounds like a dream.

So far we’ve been pretty abstract.  Let’s dive into a some concrete use cases that are hopefully compelling to people:

Example feature #1: Group jobs by business goals

Let’s say you’re a data platform manager and you oversee the operation of hundreds of production jobs.  Right now, they all have their own cost and runtime.  A simple graph below shows a cartoon example, where basically all of the jobs are randomly scattered across a cost and runtime graph.

What if you want to lower costs at scale?  What if you want to change the runtime (or SLA) of many jobs at once?  Right now you’d be stuck.

Now imagine if you had the closed loop control system above implemented for all of your jobs.  All you’d have to do is set the high level business goals of your jobs (in this case SLA runtime requirements), and the feedback control system would do its best to find the infrastructure that accomplishes your goals.  The end state will look like this:

Here we see each job’s color represents a different business goal, as defined by the SLA.  The closed loop feedback control system behind the scenes changed the cluster / warehouse size, various configurations, or even adjusted entire pipelines to try to hit the SLA runtime goals at the lowest cost.  Typically longer job runtimes lead to lower cost opportunities.

Example feature #2: Auto-healing jobs

As most data platform people can confirm, things are always changing in their data pipelines.  Two very popular scenarios are: data size growing over time, and code changes.  Both of which can cause erratic behavior when it comes to cost and runtime.

The illustration below shows the basic concept.  Let’s walk through the example from left to right:

  • Start:  Let’s say you have a job and over time the data size grows.  Normally your cluster stays the same and as a result both costs and runtime increases.
  • Start Feedback:  Over time the runtime approaches the SLA requirement and the feedback control system kicks in at the green arrow.  At this point, the control system changes the cluster to stay below the red line while minimizing costs.
  • Code Change:  At some point a developer pushes a new update to the code which causes a spike in the cost and runtime.  The feedback control system kicks in and adjusts the cluster to work better with the new code change.

Hopefully these two examples explain the potential benefit of how a closed loop control pipeline can be beneficial.  Of course in reality there are many details that have been left out and some design principles companies will have to adhere to.  One big one is a way for configurations to revert back to a previous state in case something goes wrong.  An idempotent pipeline would also be ideal here in case many iterations are needed.

Conclusion

Data pipelines are complex systems, and like any other complex system, they need feedback and control to ensure a stable performance.  Not only does this help solve technical or business problems, it will dramatically help free up data platform and engineering teams to focus on actually building pipelines.  

Like we mentioned before, a lot of this hinges on the performance of the “update config” block.  This is the critical piece of intelligence that is needed to the success of the feedback loop.  It is not trivial to build this block and is the main technical barrier today.  It can be an algorithm or a machine learning model, and utilize historical data.  It is the main technical component we’ve been working on over the past several years.

In our next post we’ll show an actual implementation of this system applied to Databricks Jobs, so you can believe that what we’re talking about is real!

Interested in learning more about closed loop controls for your Databricks pipelines? Reach out to Jeff Chou and the rest of the Sync Team.

Are Databricks clusters with Photon and Graviton instances worth it?

Configuring Databricks clusters can seem more like art than science.  We’ve reported in the past about ways to optimize worker and driver nodes, and how the proper selection of instances impacts a job’s cost and performance.  We’ve also discussed how autoscaling performs, and how it’s not always the most efficient choice for static jobs.  

In this blog post, we look across a few other popular questions and options we see from folks:

  1. How do Graviton instances impact cost and performance?
  2. How does the price and performance of Photon compare to standard instances?

What are Graviton instances?

Graviton instances on AWS contain custom AWS built processors, which promise to be a “major leap” in performance. Specifically for Spark, AWS published a report that claimed Graviton can help reduce costs up to 30% and speed up performance up to 15% for Apache Spark on EMR.   Although Databricks clusters can use Graviton, there haven’t been any performance metrics reported (that we know of).   There’s no extra surcharge for Graviton instances, and they are typically moderately priced compared to other instances.

What is Photon in Databricks?

Photon is a vectorized query engine written in C++ developed by the creators of Apache Spark and is available within the Databricks platform.  Photon is an amazing technical feat with a multitude of features and considerations, that extend well beyond the scope of this blog to go into.   For full details, we encourage readers to check out the original Photon academic paper here.  Unfortunately, Photon is not free and is typically a 2x cost increase for DBUs compared to non-photon.  So users have to decide if the cost increase is “worth it.”

At the highest level for most end users, as cited by the original academic paper::

  • Photon is great for CPU heavy operations such as joins, aggregations, and SQL expression evaluations.  
  • The academic paper claims about a 3x speedup on the TPC-H benchmark compared to standard Databricks runtime
  • Photon is not expected to provide a speedup to workloads that are I/O or network bound.

Yes, you can even run Photon on Graviton instances!  What happens with this powerful combo?  The data below shows the results.

How do I use Graviton and/or Photon?

Graviton instances typically have the “g” letter in the instance names, such as “m6g.xlarge” or “c7g.xlarge” and are selected during the cluster creation step within Databricks under “Worker type” and “Driver type”.

Photon is enabled by simply checking the box “Use Photon Acceleration” in the cluster creation step.  An image of the UI is shown below.

Experimental setup

In our analysis we utilize the TPC-DS 1TB benchmark, with all queries run sequentially.  We then look at the total runtime of all queries summed together.  To keep things simple and fair, every cluster has identical driver and worker instances.  We sampled 28 different instances spanning from photon enabled, Graviton, memory, compute, I/O, network, and storage optimized instances.   A full list of the parameters of each cluster are below:

  1. Driver:  [instance].xlarge
  2. Worker:  [instance].xlarge
  3. Number of workers: 10
  4. EBS volume: 64
  5. Databricks runtime version:  11.3.x-scala2.12
  6. Market:  On-demand
  7. Cloud provider: AWS
  8. Instances:  28 different instances on AWS

For the cost, we utilize only the DBU cost of each cluster.  We did not include the AWS costs for various reasons:

  • Cloud cost attribution difficulty:  Databricks internally re-uses clusters of adjacent jobs.  Meaning, AWS clusters for one job may be reused for a second job, if they require the same machine.  This causes identifying which job was using which cluster in AWS difficult to determine.  This is a niche problem, and only for people who want to determine the true cost of a single job
  • AWS costs depend on the market:  The AWS costs, or cloud costs in general, depend on the market.  Specifically, if users are using on-demand vs. spot nodes, it will drastically change the relative cost performance.  Furthermore, spot prices can fluctuate daily, so extracting fair comparisons would be difficult here.
  • AWS costs depend on contracts:  Large companies negotiate their own costs for their instances, thus again, making an overall apples to apples comparison difficult.

For the reasons above, the DBU costs are utilized because they are exact, easy to identify, and do not fluctuate depending on the market.  However, we will say that DBU costs can also depend on contracts.  But for the sake of this study, we’ll just use the list prices of DBUs.  As you can tell by these thoughts, doing actual cost comparisons is not a trivial task, and is highly dependent on each company’s use case.

Results

The graph below shows the cost vs runtime plots of all 28 different clusters.  They are grouped into 3 sections, “Graviton” instances, “Photon” enabled instances, “Standard” instances (no photon, no Graviton), and “Graviton + Photon” instances.  Points that are closer to the bottom left hand corner of the graph are both “faster and cheaper.”

In the graph below, we can see two clear “clusters”, basically with and without Photon.  It’s clear from this data that Photon is legitimately faster.  Unfortunately, it doesn’t appear any cheaper, so if your goal is to save money these results are a bit of a downer.  If you’re trying to run faster, Photon may be exactly what you’re looking for.

The two bar graphs below contain the same data as the XY plot above, but they break out the data into runtime and DBU costs separately.  Also, we present the individual instances used, in case people would like a more granular view into the data.

After perusing through the data, our main observations are outlined below.  I’d like to heavily caution that these observations are purely from the experiment we ran above.  We urge people to exercise caution when trying to generalize these results, as individual jobs can have wildly different results than the ones we showed above.  With that said, these are the main takeaways:

  • Photon is generally 2x faster – Across the board Photon was about 2x faster than their non-photon counterparts (same instances).  This was great to see.  Although not as high as some of the claims reported by Databricks, we understand that it is highly dependent on the workload.  In my opinion a 2x speedup is pretty impressive.
  • Graviton was neutral  – The runtime for graviton was perhaps a bit faster than standard instances, but it’s unclear if it’s statistically significant.  There doesn’t seem much risk to using Graviton, and they are newer chips so maybe they will be faster for your jobs? 
  • Photon’s total cost is cheaper (with this data) – In the data above, since the DBU costs were about the same across all 3 types, and Photon’s runtimes were about 2x faster, one can logically conclude that the cloud portion of the costs (the AWS fees) will be less with Photon.  As a result, the total cost for an end user was cheapest with Photon enabled.
  • Photon pricing makes for complex cost ROI –  Because of the previous point, determining the ROI of Photon is difficult.  It basically boils down to if the speedup is fast enough to endure the increased cost.  If it does not, then users are essentially paying more money for a potentially faster job.  If Photon speedup is fast enough, then it will be cheaper.  What that threshold is will depend on the market and any discounts.  For the sake of this study, the crossover point for on-demand instances was around 20%.  Meaning, Photon needs to be at least 20% faster than Standard to observe any cost savings.

Formula for determining Photon ROI

For those that are mathematically inclined, here is a simple formula to help determine the “speedup threshold” which is the minimum speedup Photon needs to achieve for your job in order to break even.  If your speedup is greater than this threshold, then you are saving money.

For a simple example, let’s say all of the machine and DBU costs are 1, and the Photon cost increase is a factor of 2, and we have 10 workers.  With these very simple numbers, we get a Psth value of 1.5.  Plugging in 1.5 for Psth and setting R_orig =1 and solving for R_photon, that means Photon needs to be 33% faster to break even.  Clearly this value is heavily dependent on a lot of factors, all of which are shown in the equation above.

Conclusion

Overall the answers to the original two questions really comes down to “it depends.”  The data points we showed above are an infinitely small slice of what workloads actually look like.  Based on simply the data above, here are the answers:

1)  Photon will probably be faster than non-photon, but whether or not it’s cheaper will depend on how much faster it is relative to the costs.  To understand if the 2x DBU cost increase with Photon is worth it, it all depends on the markets and pricing of your cloud instances.

2)  On average Graviton was about the same for cost and runtime compared to standard instances.  We did not see any significant advantage of using Graviton here, but we didn’t see any downside either.  Maybe these new chips will be perfect for your workload, or maybe not.  It’s hard to tell.

However, with the data above, specifically around Photon, I can’t help but ask the question:

Is Databricks motivated to make Spark run faster? 

This is an interesting philosophical question where the tech enthusiast may clash with the business units.  The faster Databricks makes Spark, the less revenue they get, since they charge per minute.  Photon is an interesting case study in which, yes, they made Spark 2x faster – but then had to double their costs to not lose money.  This is at least one data point that shows you where Databricks basically sits: “Yes we can make Spark faster, but not cheaper.”

In my opinion, Databricks, and other cloud providers, are fundamentally motivated to increase revenue.  So making Spark run faster and/or cheaper is not in alignment with where they need to do as a business.  They will however make the product easier to use, or expand to other use cases which, fundamentally, increases revenue.
We of course respect the fact that any business needs to make money, so I don’t think anything improper is happening here.  But it does reveal an interesting conflict between technology and business and how that fundamentally impacts the end user.

How to Use the Gradient CLI Tool to Optimize Databricks / EMR Programmatically

Introduction:

The Gradient Command Line Interface (CLI) is a powerful yet easy utility to automate the optimization of your Spark jobs from your terminal, command prompt, or automation scripts. 

Whether you are a Data Engineer, SysDevOps administrator, or just an Apache Spark enthusiast, knowing how to use the Gradient CLI can be incredibly beneficial as it can dramatically reduce the cost of your Spark workloads and while helping you hit your pipeline SLAs. 

If you are new to Gradient, you can learn more about it in the Sync Docs. In this tutorial, we’ll walk you through the Gradient CLI’s installation process and give you some examples of how to get started. This is meant to be a tour of the CLI’s overall capabilities. For an end to end recipe on how to integrate with Gradient take a look at our Quick Start and Integration Guides.

Pre Work

This tutorial assumes that you have already created a Gradient account and generated your

Sync API keys. If you haven’t generated your key yet, you can do so on the Accounts tab of the Gradient UI.

Step 1: Setting up your Environment

Let’s start by making sure our environment meets all the prerequisites. The Gradient CLI is actually part of the Sync Library, which requires Python v3.7 or above and which only runs on Linux/Unix based systems.

python --version

I am on a Mac and running python version 3.10, so I am good to go, but before we get started I am going to create a Python virtual environment with vEnv. This is a good practice for whenever you install any new Python tool, as it allows you to avoid conflicts between projects and makes environment management simpler. For this example, I am creating a virtual environment called gradient-cli that will reside under the ~/VirtualEnvironments path.

python -m venv ~/VirtualEnvironments/gradient-cli

Step 2: Install the Sync Library

Once you’ve confirmed that your system meets the prerequisites, it’s time to install the Sync Library. Start by activating your new virtual environment.

source ~/VirtualEnvironments/gradient-cli/bin/activate

Next use the pip package installer to install the latest version of the Sync Library.

pip install https://github.com/synccomputingcode/syncsparkpy/archive/latest.tar.gz

You can confirm that the installation was successful by viewing the CLI executable’s version by using the –version or –help options.

sync-cli --help

Step 3. Configure the Sync Library

Configuring the CLI with your credentials and preferences is the final step for the installation and setup for the Sync CLI. To do this, run the configure command:

sync-cli configure

You will be prompted for the following values:

Sync API key ID:

Sync API key secret:

Default prediction preference (performance, balanced, economy) [economy]:

Would you like to configure a Databricks workspace? [y/n]:

Databricks host (prefix with https://):

Databricks token:

Databricks AWS region name:

If you remember from the Pre Work, your Sync API key & secret are found on the Accounts tab of the Gradient UI. For this tutorial we are running on Databricks, so you will need to provide a Databricks Workspace and an Access token.


Databricks recommends that you set up a service principal for automation tasks. As noted in their docs, service principals give automated tools and scripts API-only access to Databricks resources, providing greater security than using users or groups.

These values are stored in ~/.sync/config.

Congrats! You are now ready to interact with Gradient from your terminal, command prompt, or automation scripts.

Step 4. Example Uses

Below are some tasks you can complete using the CLI. This is useful when you want to automate Gradient processes and incorporate them into larger workflows.

Projects

All Gradient recommendations are stored in Projects. Projects are associated with a single Spark job or a group of jobs running on the same cluster. Here are some useful commands you can use to manage your projects with the CLI. For an exhaustive list of commands use the –help option.

Project Commands:

create – Create a project

sync-cli projects create --description [TEXT] --job-id [Databricks Job ID] PROJECT_NAME

delete – Delete a project

sync-cli projects delete PROJECT_ID

get – Get info on a project

sync-cli projects get PROJECT_ID

list – List all projects for account

sync-cli projects list

Predictions

You can also use the CLI to manage, generate and retrieve predictions. This is useful when you want to automate the implementation of recommendations within your Databricks or EMR environments.

Prediction commands:

get – Retrieve a specific prediction

sync-cli predictions get --preference [performance|balanced|economy] PREDICTION_ID

list – List all predictions for account or project

sync-cli predictions list --platform [aws-emr|aws-databricks] --project TEXT

status – Get the status of a previously initiated prediction

sync-cli predictions status PREDICTION_ID

The CLI also provides platform specific commands to generate and retrieve predictions.

Databricks

For Databricks you can generate a recommendation for a previously completed job run with the following command:

sync-cli aws-databricks create-prediction --plan [Standard|Premium|Enterprise] --compute ['Jobs Compute'|'All Purpose Compute'] --project [Your Project ID] RUN_ID

If the run you provided was not already configured with the Gradient agent when it executed, you can still generate a recommendation but the basis metrics may be missing some time sensitive information that may no longer be available. To enable evaluation of prior logs executed without the Gradient agent, you can add the –allow-incomplete-cluster-report option. However, to avoid this issue altogether, you can implement the agent and re-run the job.

Alternatively, you can use the following command to run the job and request a recommendation with a single command:

sync-cli aws-databricks run-job --plan [Standard|Premium|Enterprise] --compute ['Jobs Compute'|'All Purpose Compute'] --project [Your Project ID] JOB_ID

This method is useful in cases when you are able to manually run your job without interfering with scheduled runs.

Finally, to implement a recommendation and run the job with the new configuration, you can issue the following command:

sync-cli aws-databricks run-prediction --preference [performance|balanced|economy] JOB_ID PREDICTION_ID

EMR

Similarly, for Spark EMR, you can generate a recommendation for a previously completed job. EMR does not have the same issue with regard to ephemeral cost data not being available, so you can request a recommendation on a previous run without the Gradient agent.

sync-cli aws-emr create-prediction --region [Your AWS Region] CLUSTER_ID

Use the following command to do so:

If you want to manually rerun the EMR job and immediately request a Gradient recommendation, use the following command:

sync-cli aws-emr record-run --region [Your AWS Region] CLUSTER_ID PROJECT

To execute the EMR job using the recommended configuration, use the following command:

sync-cli aws-emr run-prediction --region [Your AWS Region] PREDICTION_ID

Products

Gradient is constantly working on adding support for new data engineering platforms. To see which platforms are supported by your version of the CLI, you can use the following command:

sync-cli products

Configuration

Should you ever need to update your CLI configurations, you can call config again to change one or more your values.

sync-cli configure --api-key-id TEXT --api-key-secret TEXT --prediction-preference TEXT --databricks-host TEXT --databricks-token TEXT --databricks-region TEXT

Token

The Token command returns an access token that you can use against our REST API with clients like postman

sync-cli token

Conclusion

With these simple commands, you can automate the end to end optimization of all your Databricks or EMR workloads, dramatically reducing your costs and improving the performance. For more information refer to our developer docs or reach out to us at info@synccomputing.com.

Integrating Gradient into Apache Airflow

Summary

In this blog post, we’ll explore how you can integrate Sync’s Gradient with Airflow. We’ll walk through the steps to create a DAG that will submit a run to Databricks, and then make a call through Sync’s library to generate a recommendation for an optimized cluster for that task. This DAG example can be used to automate the process of requesting recommendations for tasks that are submitted as jobs to Databricks.

A Common Use Case And It’s Challenges

Use Case:

A common implementation of Databricks within Airflow consists of using the DatabricksSubmitRunOperator to submit a pre-configured notebook to Databricks.

Challenges:

  • Due to orchestration outside of Databricks’ ecosystem, these jobs are reflected as one-time runs
  • It’s difficult to track cluster performance across multiple runs
  • This is exacerbated by the fact that a dag can have multiple tasks that submit these one-off ‘jobs’ to Databricks.

How Can We Fix This?

We’ll set up a python operator to utilize Sync’s Library so we can generate recommendations and view them in Gradient’s UI. From there we can see the changes we need to make to have cost reductions in our cluster definitions. Let’s dive in!

Preparing Your Airflow Environment

Prerequisites

  • Airflow (This tutorial uses 2.0+)
  • Python 3.7+
  • Sync Library installed and environment variables configured on the airflow instance (details below)
  • An s3 path you would like to use for cluster logs – your databricks ARN will need access to this path so it can save the cluster logs there.
  • An account with Sync and a Project created to track the task you would like to optimize.

Sync Account Setup And Library Installation

Quick start instructions on how to create an account, project, and install the Sync Library can be found here. Please configure the cli on your airflow instance. When going through the configuration steps, be sure to choose yes when prompted to configure the Databricks variables.

Note: In the quickstart above, there are instructions on using an init script. Copy the contents of the init script into a file on a shared or personal workspace accessible by the account the Databricks job will run as.

Variables

Certain variables are generated and stored during installation of the sync library. For transparency, they are:

Besides the variables generated by the library, you’ll need the following ENV variables. These are necessary to use the AWS API to retrieve cluster logs when requesting a prediction. DBFS is supported, however, it is not recommended as it goes against Databrick’ best practices. As mentioned in the quick start, it’s best to set these via the AWS CLI.

  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY
  • AWS_DEFAULT_REGION

Cluster Configuration

Referring back to our common use case, often a static cluster configuration is either defined within the dag or dynamically within a helper function that returns the cluster dictionary to be passed into the DatabricksSubmitRunOperator. In preparation for the first run, some specific cluster details need to be configured. 

What are we adding?

  • Cluster_log_conf:  An s3 path to send our cluster logs. These will be used to generate an optimized recommendation
  • Custom_tags: the sync:project_id tag is added so we can assign the run to a sync project
  • Init_scripts: identifies the init script path that we copied into our Databricks workspace during the quick start setup
  • spark_env_vars: environment variables passed to the cluster that the init script will use. Note: the retrieval of tokens/keys in this tutorial is simplified to use the information configured during the sync-cli setup process. Passing them in this manner will result in tokens being visible in plaintext when viewing the cluster in Databricks. Please use Databricks Secrets when productionalizing this code.

The rest of the cluster configuration dictionary comprises the typical settings you normally pass into the DatabricksSubmitRunOperator.

from sync.config import DatabricksConf as sync_databricks_conf
from sync.config import get_api_key

{
    "spark_version": "13.0.x-scala2.12",
    ...
    "cluster_log_conf": {
        "s3": {
            "destination": "", # Add the s3 path for the cluster logs
            "enable_encryption": True,
            "region": "", # Add your aws region ie: us-east-1
            "canned_acl": "bucket-owner-full-control",
        }
    },
    "custom_tags": {"sync:project-id": "",}, # Add the project id from Gradient
    ...
    "init_scripts": [
        {"workspace": {
            "destination": "" # Path to the init script in the workspace ie: Shared/init_scripts/init.sh
            }
        }
    ],
    "spark_env_vars": {
        "DATABRICKS_HOST": f"{sync_databricks_conf().host}",
        "DATABRICKS_TOKEN": f"{sync_databricks_conf().token}",
        "SYNC_API_KEY_ID": f"{get_api_key().id}",
        "SYNC_API_KEY_SECRET": f"{get_api_key().secret}",
        "AWS_DEFAULT_REGION": f"{os.environ['AWS_DEFAULT_REGION']}",
        "AWS_ACCESS_KEY_ID": f"{os.environ['AWS_ACCESS_KEY_ID']}",
        "AWS_SECRET_ACCESS_KEY": f"{os.environ['AWS_SECRET_ACCESS_KEY']}",
    }
}

Reminder: the Databricks ARN attached to the cluster will need access to the s3 path specified in the cluster_log_conf.

Databricks Submit Run Operator Changes

Next, we’ll ensure the Databricks Operator passes the run_id of the created job back to xcom. This is needed in the subsequent task to request a prediction for the run. Just enable the do_xcom_push parameter.

# DAG code
    ...
    # Submit the Databricks run
    run_operator = DatabricksSubmitRunOperator(
        task_id=...,
        do_xcom_push=True,
    )
    ...

Create A Recommendation You Can View In Gradient!

Upon successful completion of the DatabricksSubmitRunOperator task, we’ll have the run_id we need to create a recommendation for optimal cluster configuration. We’ll utilize the PythonOperator to call the create_prediction_for_run method from the Sync Library. Within the library, this method will connect to the Databricks instance to gather the cluster log location, fetch the logs, and generate the recommendation.

Below is an example of how to call the create_prediction_for_run method from the Sync Library. 

from sync.awsdatabricks import create_prediction_for_run


def submit_run_for_recommendation(task_to_submit: str, **kwargs):
    run_id = kwargs["ti"].xcom_pull(task_ids=task_to_submit, key="run_id")
    project_id = "Project_Id_Goes_Here"
    create_prediction_for_run(
        run_id=run_id,
        plan_type="Premium",
        project_id=project_id,
        compute_type="Jobs Compute",
     )

What this code block does:

  • wraps and implements create_prediction_for_run
  • pulls the run_id for the previous task from xcom. We supply the task_to_submit as the task_id that we named the DatabricksSubmitRunOperator.
  • We assign the project id for that task to the project_id variable.
  • We pass our project id, supplied on the project details page in Gradient, to the Sync library method.

Optionally, add a parameter to the submit_run_for_recommendation if you’d like to extract this out to the python operator. Edit plan_type and compute_type as needed, these reference your Databricks settings.

To call the submit_run_for_recommendation method we defined, implement the python operator as follows:

    submit_for_recommendation = PythonOperator(
        task_id="submit_for_recommendation",
        python_callable=submit_run_for_recommendation,
        op_kwargs={
            "task_to_submit": "Task_id of the DatabricksSubmitRunOperator of which to generate a recommendation for",
        },
        provide_context=True,
        retries=0,
    )

Putting It All Together

Let’s combine all of the above together in a DAG. The DAG will submit a run to Databricks, and then make a call through Sync’s library to generate a prediction for an optimized cluster for that task.

# DAG .py code
from airflow.operators.python_operator import PythonOperator
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from sync.awsdatabricks import create_prediction_for_run
from sync.config import DatabricksConf as sync_databricks_conf
from sync.config import get_api_key


with DAG(
    dag_id="example_dag",
    default_args=default_args,
    tags=["example"],
    ...
) as dag:

    # define the cluster configuration
        cluster_config = {
        "spark_version": "13.0.x-scala2.12",
        ...
        "cluster_log_conf": {
            "s3": {
                "destination": "", # Add the s3 path for the cluster logs
                "enable_encryption": True,
                "region": "", # Add your aws region ie: us-east-1
                "canned_acl": "bucket-owner-full-control",
            }
        },
        "custom_tags": {"sync:project-id": "",}, # Add the project id from Gradient
        ...
        "init_scripts": [
            {"workspace": {
                "destination": "" # Path to the init script in the workspace ie: Shared/init_scripts/init.sh
                }
            }
        ],
        "spark_env_vars": {
            "DATABRICKS_HOST": "", # f"{sync_databricks_conf().host}"
            "DATABRICKS_TOKEN": "", # f"{sync_databricks_conf().token}"
            "SYNC_API_KEY_ID": "", # f"{get_api_key().id}"
            "SYNC_API_KEY_SECRET": "", # f"{get_api_key().secret}"
            "AWS_DEFAULT_REGION": "", # f"{os.environ['AWS_DEFAULT_REGION']}"
            "AWS_ACCESS_KEY_ID": "", # f"{os.environ['AWS_ACCESS_KEY_ID']}"
            "AWS_SECRET_ACCESS_KEY": "", # f"{os.environ['AWS_SECRET_ACCESS_KEY']}",
        }
    }

    # define your databricks operator
    dbx_operator = DatabricksSubmitRunOperator(
        task_id="dbx_operator",
        do_xcom_push=True,
        ...
        new_cluster=cluster_config,
        ...
    )

    # define the submit function to pass to the PythonOperator
    def submit_run_for_recommendation(task_to_submit: str, **kwargs):
    run_id = kwargs["ti"].xcom_pull(task_ids=task_to_submit, key="run_id")
    project_id = "Project_Id_Goes_Here"
    create_prediction_for_run(
        run_id=run_id,
        plan_type="Premium",
        project_id=project_id,
        compute_type="Jobs Compute",
    )

    # define the python operator
    submit_for_recommendation = PythonOperator(
        task_id="submit_for_recommendation",
        python_callable=submit_run_for_recommendation,
        op_kwargs={
            "task_to_submit": "dbx_operator",
        },
        provide_context=True,
        retries=0,
    )

    # define dag dependency
    dbx_operator >> submit_for_recommendation

Viewing Your Recommendation

Once the code above is implemented into your DAG, head over to the Projects dashboard in Gradient. There you’ll be able to easily review recommendations and can make changes to the cluster configuration as needed.

Developing Gradient Part II

Introduction: Using Gradient in a Workflow

Gradient, the latest product release from Sync Computing, helps customers manage the infrastructure behind their recurring Apache Spark applications. Gradient gives infrastructure recommendations for each job to lower the cost of their Production jobs while hitting their target SLA’s. We’ve been hard at work on this project for a long time and we’re excited for people to use it and realize real cost savings in their Apache Spark jobs running on EMR or Databricks!

The key feature of Gradient is a Project, which in Databricks manages the lifecycle of a single Job. With the integration of Sync’s Python library, each Job run produces: 

  1. An Apache Spark eventlog
  1. A “Cluster Report” that includes Databricks and EC2 API response data about that run and its associated cluster

These outputs then get fed into Gradient’s recommendation engine which performs runtime and cost prediction for that same job if it were to run on different hardware. Within Gradient’s recommendation engine there are two key steps: (1) runtime prediction modeling, and (2) cost estimation modeling. These two steps are repeated for a variety of potential hardware configurations, and then one that yields the lowest cost given some time constraint is recommended for the job run.

Figure: Diagram of a Gradient Project. After each Job Run a cluster report and Eventlog are produced which get fed into Gradient’s recommendation engine. The output of this process is a new cluster recommendation, one that should reduce cost while maintaining a runtime requirement, that informs the subsequent Job run.

Internal Testing

In a separate blog post [link to Part I] we discuss in some detail the internal testing process we used to develop Gradient. A huge benefit to internal testing is that we have access to the cost-actual data of each Job we run via Databricks and AWS cost and usage reporting. This data is critical in assessing whether or not Gradient behaves as it should from a customer’s perspective. In other words, do customers still save money in spite of imperfect runtime prediction and cost estimation? The answer, we found, is yes!

In the following figure we show a snapshot of data we generated that gave us great confidence in the solution we developed for Gradient. The figure displays a histogram of the percent change in cost between an initial “parent run” (with random hardware configuration), and a subsequent “child run” informed by Gradient’s recommendation engine. In total, 80% of runs showed a cost reduction, and the median cost reduction was 30%.

It’s worth emphasizing that these results are the single shot improvements going from an initial cold-start run to the first recommendation produced by Gradient. Given how complex predicting runtime is, these are excellent results, and it’s not surprising that 20% of the runs increased in cost (a less than ideal outcome). That said, it’s the feedback loop that really unlocks the capability of Gradient. Every time your Job runs more data is added to your project, and that history of runs will be used to improve the recommendation quality of future recommendations. It’s a sure bet that when you spin up your first Job there’ll be a Sync engineer assessing the quality of your recommendations!

Figure: Internal testing results show after 1 iteration through Gradient, a cost difference of jobs before and after using Gradient’s recommendation engine. Cost was reduced in 80% of runs, with a median reduction of 30% when compared to the parent run cost.

External Savings

When it comes to demonstrating value, nothing beats a positive user experience. Early customer interactions have yielded huge savings and positive experiences from a number of different companies. Wanna see their story using Gradient? Check out some of the blogs they’ve written!

Conclusion

Since we first got started, Sync has felt confident that we’re tackling a real issue that’s pervasive in the cloud infrastructure world. Making infrastructure decisions and being uncertain about how to reduce costs is a ubiquitous story when we talk to folks. With Gradient we feel we finally tackled a real solution to this problem. A solution that will make the lives of developers around the world easier, enabling them to focus their efforts on more important tasks, all while saving companies money on their Apache Spark workloads.