Integrating Gradient into Apache Airflow

In this blog post, we’ll learn how you can integrate Sync’s Gradient with Airflow to optimize Databricks jobs.

Summary

In this blog post, we’ll explore how you can integrate Sync’s Gradient with Airflow. We’ll walk through the steps to create a DAG that will submit a run to Databricks, and then make a call through Sync’s library to generate a recommendation for an optimized cluster for that task. This DAG example can be used to automate the process of requesting recommendations for tasks that are submitted as jobs to Databricks.

A Common Use Case And It’s Challenges

Use Case:

A common implementation of Databricks within Airflow consists of using the DatabricksSubmitRunOperator to submit a pre-configured notebook to Databricks.

Challenges:

  • Due to orchestration outside of Databricks’ ecosystem, these jobs are reflected as one-time runs
  • It’s difficult to track cluster performance across multiple runs
  • This is exacerbated by the fact that a dag can have multiple tasks that submit these one-off ‘jobs’ to Databricks.

How Can We Fix This?

We’ll set up a python operator to utilize Sync’s Library so we can generate recommendations and view them in Gradient’s UI. From there we can see the changes we need to make to have cost reductions in our cluster definitions. Let’s dive in!

Preparing Your Airflow Environment

Prerequisites

  • Airflow (This tutorial uses 2.0+)
  • Python 3.7+
  • Sync Library installed and environment variables configured on the airflow instance (details below)
  • An s3 path you would like to use for cluster logs – your databricks ARN will need access to this path so it can save the cluster logs there.
  • An account with Sync and a Project created to track the task you would like to optimize.

Sync Account Setup And Library Installation

Quick start instructions on how to create an account, project, and install the Sync Library can be found here. Please configure the cli on your airflow instance. When going through the configuration steps, be sure to choose yes when prompted to configure the Databricks variables.

Note: In the quickstart above, there are instructions on using an init script. Copy the contents of the init script into a file on a shared or personal workspace accessible by the account the Databricks job will run as.

Variables

Certain variables are generated and stored during installation of the sync library. For transparency, they are:

Besides the variables generated by the library, you’ll need the following ENV variables. These are necessary to use the AWS API to retrieve cluster logs when requesting a prediction. DBFS is supported, however, it is not recommended as it goes against Databrick’ best practices. As mentioned in the quick start, it’s best to set these via the AWS CLI.

  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY
  • AWS_DEFAULT_REGION

Cluster Configuration

Referring back to our common use case, often a static cluster configuration is either defined within the dag or dynamically within a helper function that returns the cluster dictionary to be passed into the DatabricksSubmitRunOperator. In preparation for the first run, some specific cluster details need to be configured. 

What are we adding?

  • Cluster_log_conf:  An s3 path to send our cluster logs. These will be used to generate an optimized recommendation
  • Custom_tags: the sync:project_id tag is added so we can assign the run to a sync project
  • Init_scripts: identifies the init script path that we copied into our Databricks workspace during the quick start setup
  • spark_env_vars: environment variables passed to the cluster that the init script will use. Note: the retrieval of tokens/keys in this tutorial is simplified to use the information configured during the sync-cli setup process. Passing them in this manner will result in tokens being visible in plaintext when viewing the cluster in Databricks. Please use Databricks Secrets when productionalizing this code.

The rest of the cluster configuration dictionary comprises the typical settings you normally pass into the DatabricksSubmitRunOperator.

from sync.config import DatabricksConf as sync_databricks_conf
from sync.config import get_api_key

{
    "spark_version": "13.0.x-scala2.12",
    ...
    "cluster_log_conf": {
        "s3": {
            "destination": "", # Add the s3 path for the cluster logs
            "enable_encryption": True,
            "region": "", # Add your aws region ie: us-east-1
            "canned_acl": "bucket-owner-full-control",
        }
    },
    "custom_tags": {"sync:project-id": "",}, # Add the project id from Gradient
    ...
    "init_scripts": [
        {"workspace": {
            "destination": "" # Path to the init script in the workspace ie: Shared/init_scripts/init.sh
            }
        }
    ],
    "spark_env_vars": {
        "DATABRICKS_HOST": f"{sync_databricks_conf().host}",
        "DATABRICKS_TOKEN": f"{sync_databricks_conf().token}",
        "SYNC_API_KEY_ID": f"{get_api_key().id}",
        "SYNC_API_KEY_SECRET": f"{get_api_key().secret}",
        "AWS_DEFAULT_REGION": f"{os.environ['AWS_DEFAULT_REGION']}",
        "AWS_ACCESS_KEY_ID": f"{os.environ['AWS_ACCESS_KEY_ID']}",
        "AWS_SECRET_ACCESS_KEY": f"{os.environ['AWS_SECRET_ACCESS_KEY']}",
    }
}

Reminder: the Databricks ARN attached to the cluster will need access to the s3 path specified in the cluster_log_conf.

Databricks Submit Run Operator Changes

Next, we’ll ensure the Databricks Operator passes the run_id of the created job back to xcom. This is needed in the subsequent task to request a prediction for the run. Just enable the do_xcom_push parameter.

# DAG code
    ...
    # Submit the Databricks run
    run_operator = DatabricksSubmitRunOperator(
        task_id=...,
        do_xcom_push=True,
    )
    ...

Create A Recommendation You Can View In Gradient!

Upon successful completion of the DatabricksSubmitRunOperator task, we’ll have the run_id we need to create a recommendation for optimal cluster configuration. We’ll utilize the PythonOperator to call the create_prediction_for_run method from the Sync Library. Within the library, this method will connect to the Databricks instance to gather the cluster log location, fetch the logs, and generate the recommendation.

Below is an example of how to call the create_prediction_for_run method from the Sync Library. 

from sync.awsdatabricks import create_prediction_for_run


def submit_run_for_recommendation(task_to_submit: str, **kwargs):
    run_id = kwargs["ti"].xcom_pull(task_ids=task_to_submit, key="run_id")
    project_id = "Project_Id_Goes_Here"
    create_prediction_for_run(
        run_id=run_id,
        plan_type="Premium",
        project_id=project_id,
        compute_type="Jobs Compute",
     )

What this code block does:

  • wraps and implements create_prediction_for_run
  • pulls the run_id for the previous task from xcom. We supply the task_to_submit as the task_id that we named the DatabricksSubmitRunOperator.
  • We assign the project id for that task to the project_id variable.
  • We pass our project id, supplied on the project details page in Gradient, to the Sync library method.

Optionally, add a parameter to the submit_run_for_recommendation if you’d like to extract this out to the python operator. Edit plan_type and compute_type as needed, these reference your Databricks settings.

To call the submit_run_for_recommendation method we defined, implement the python operator as follows:

    submit_for_recommendation = PythonOperator(
        task_id="submit_for_recommendation",
        python_callable=submit_run_for_recommendation,
        op_kwargs={
            "task_to_submit": "Task_id of the DatabricksSubmitRunOperator of which to generate a recommendation for",
        },
        provide_context=True,
        retries=0,
    )

Putting It All Together

Let’s combine all of the above together in a DAG. The DAG will submit a run to Databricks, and then make a call through Sync’s library to generate a prediction for an optimized cluster for that task.

# DAG .py code
from airflow.operators.python_operator import PythonOperator
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from sync.awsdatabricks import create_prediction_for_run
from sync.config import DatabricksConf as sync_databricks_conf
from sync.config import get_api_key


with DAG(
    dag_id="example_dag",
    default_args=default_args,
    tags=["example"],
    ...
) as dag:

    # define the cluster configuration
        cluster_config = {
        "spark_version": "13.0.x-scala2.12",
        ...
        "cluster_log_conf": {
            "s3": {
                "destination": "", # Add the s3 path for the cluster logs
                "enable_encryption": True,
                "region": "", # Add your aws region ie: us-east-1
                "canned_acl": "bucket-owner-full-control",
            }
        },
        "custom_tags": {"sync:project-id": "",}, # Add the project id from Gradient
        ...
        "init_scripts": [
            {"workspace": {
                "destination": "" # Path to the init script in the workspace ie: Shared/init_scripts/init.sh
                }
            }
        ],
        "spark_env_vars": {
            "DATABRICKS_HOST": "", # f"{sync_databricks_conf().host}"
            "DATABRICKS_TOKEN": "", # f"{sync_databricks_conf().token}"
            "SYNC_API_KEY_ID": "", # f"{get_api_key().id}"
            "SYNC_API_KEY_SECRET": "", # f"{get_api_key().secret}"
            "AWS_DEFAULT_REGION": "", # f"{os.environ['AWS_DEFAULT_REGION']}"
            "AWS_ACCESS_KEY_ID": "", # f"{os.environ['AWS_ACCESS_KEY_ID']}"
            "AWS_SECRET_ACCESS_KEY": "", # f"{os.environ['AWS_SECRET_ACCESS_KEY']}",
        }
    }

    # define your databricks operator
    dbx_operator = DatabricksSubmitRunOperator(
        task_id="dbx_operator",
        do_xcom_push=True,
        ...
        new_cluster=cluster_config,
        ...
    )

    # define the submit function to pass to the PythonOperator
    def submit_run_for_recommendation(task_to_submit: str, **kwargs):
    run_id = kwargs["ti"].xcom_pull(task_ids=task_to_submit, key="run_id")
    project_id = "Project_Id_Goes_Here"
    create_prediction_for_run(
        run_id=run_id,
        plan_type="Premium",
        project_id=project_id,
        compute_type="Jobs Compute",
    )

    # define the python operator
    submit_for_recommendation = PythonOperator(
        task_id="submit_for_recommendation",
        python_callable=submit_run_for_recommendation,
        op_kwargs={
            "task_to_submit": "dbx_operator",
        },
        provide_context=True,
        retries=0,
    )

    # define dag dependency
    dbx_operator >> submit_for_recommendation

Viewing Your Recommendation

Once the code above is implemented into your DAG, head over to the Projects dashboard in Gradient. There you’ll be able to easily review recommendations and can make changes to the cluster configuration as needed.

Try integrating Gradient into Airflow yourself!