Databricks (dagster-databricks)

The dagster_databricks package provides two main pieces of functionality:

  • A resource, databricks_pyspark_step_launcher, which will execute a op within a Databricks context on a cluster, such that the pyspark resource uses the cluster’s Spark instance.

  • A function, create_databricks_job_op, which creates a op that submits an external configurable job to Databricks using the ‘Run Now’ API.

Note that, for the databricks_pyspark_step_launcher, either S3 or Azure Data Lake Storage config must be specified for ops to succeed, and the credentials for this storage must also be stored as a Databricks Secret and stored in the resource config so that the Databricks cluster can access storage.

APIs

dagster_databricks.create_databricks_job_op(name='databricks_job', num_inputs=1, description=None, required_resource_keys=frozenset({'databricks_client'}))[source]

Creates an op that launches a databricks job (not to be confused with Dagster’s job API).

As config, the op accepts a blob of the form described in Databricks’ job API: https://docs.databricks.com/dev-tools/api/latest/jobs.html.

Returns

An op definition.

Return type

OpDefinition

Example

from dagster import graph
from dagster_databricks import create_databricks_job_op, databricks_client

sparkpi = create_databricks_job_op().configured(
    {
        "job": {
            "name": "SparkPi Python job",
            "new_cluster": {
                "spark_version": "7.3.x-scala2.12",
                "node_type_id": "i3.xlarge",
                "num_workers": 2,
            },
            "spark_python_task": {"python_file": "dbfs:/docs/pi.py", "parameters": ["10"]},
        }
    },
    name="sparkpi",
)

@graph
def my_spark():
    sparkpi()

my_spark.to_job(
    resource_defs={
        "databricks_client": databricks_client.configured(
            {"host": "my.workspace.url", "token": "my.access.token"}
        )
    }
)
dagster_databricks.databricks_pyspark_step_launcher ResourceDefinition[source]

Config Schema:
run_config (strict dict)

Databricks job run configuration

Config Schema:
cluster (selector)
Config Schema:
new (strict dict)
Config Schema:
size (selector)
Config Schema:
autoscale (strict dict)
Config Schema:
min_workers (Int)

The minimum number of workers to which the cluster can scale down when underutilized. It is also the initial number of workers the cluster will have after creation.

max_workers (Int)

The maximum number of workers to which the cluster can scale up when overloaded. max_workers must be strictly greater than min_workers.

num_workers (Int)

If num_workers, number of worker nodes that this cluster should have. A cluster has one Spark Driver and num_workers Executors for a total of num_workers + 1 Spark nodes.

spark_version (String)

The Spark version of the cluster. A list of available Spark versions can be retrieved by using the Runtime versions API call. This field is required.

spark_conf (permissive dict, optional)

An object containing a set of optional, user-specified Spark configuration key-value pairs. You can also pass in a string of extra JVM options to the driver and the executors via spark.driver.extraJavaOptions and spark.executor.extraJavaOptions respectively. Example Spark confs: {“spark.speculation”: true, “spark.streaming.ui.retainedBatches”: 5} or {“spark.driver.extraJavaOptions”: “-verbose:gc -XX:+PrintGCDetails”}

nodes (selector)

The nodes used in the cluster. Either the node types or an instance pool can be specified.

Config Schema:
node_types (strict dict)
Config Schema:
node_type_id (String)

This field encodes, through a single value, the resources available to each of the Spark nodes in this cluster. For example, the Spark nodes can be provisioned and optimized for memory or compute intensive workloads. A list of available node types can be retrieved by using the List node types API call. This field is required.

driver_node_type_id (String, optional)

The node type of the Spark driver. This field is optional; if unset, the driver node type is set as the same value as node_type_id defined above.

instance_pool_id (String, optional)

The optional ID of the instance pool to which the cluster belongs. Refer to the Instance Pools API for details.

ssh_public_keys (List[String], optional)

SSH public key contents that will be added to each Spark node in this cluster. The corresponding private keys can be used to login with the user name ubuntu on port 2200. Up to 10 keys can be specified.

custom_tags (List[strict dict], optional)

Additional tags for cluster resources. Databricks tags all cluster resources (e.g., AWS instances and EBS volumes) with these tags in addition to default_tags. Note: - Tags are not supported on legacy node types such as compute-optimized and memory-optimized - Databricks allows at most 45 custom tagsMore restrictions may apply if using Azure Databricks; refer to the official docs for further details.

cluster_log_conf (selector, optional)

Recommended! The configuration for delivering Spark logs to a long-term storage destination. Only one destination can be specified for one cluster. If the conf is given, the logs will be delivered to the destination every 5 mins. The destination of driver logs is <destination>/<cluster-id>/driver, while the destination of executor logs is <destination>/<cluster-id>/executor.

Config Schema:
dbfs (strict dict)

DBFS storage information

Config Schema:
destination (String)

DBFS destination, e.g. dbfs:/my/path

s3 (strict dict)

S3 storage information

Config Schema:
destination (String)

S3 destination, e.g. s3://my-bucket/some-prefix. You must configure the cluster with an instance profile and the instance profile must have write access to the destination. You cannot use AWS keys.

region (String)

S3 region, e.g. us-west-2. Either region or endpoint must be set. If both are set, endpoint is used.

endpoint (String)

S3 endpoint, e.g. https://s3-us-west-2.amazonaws.com. Either region or endpoint must be set. If both are set, endpoint is used.

enable_encryption (Bool, optional)

(Optional) Enable server side encryption, false by default.

encryption_type (String, optional)

(Optional) The encryption type, it could be sse-s3 or sse-kms. It is used only when encryption is enabled and the default type is sse-s3.

kms_key (String, optional)

(Optional) KMS key used if encryption is enabled and encryption type is set to sse-kms.

canned_acl (String, optional)

(Optional) Set canned access control list, e.g. bucket-owner-full-control.If canned_acl is set, the cluster instance profile must have s3:PutObjectAcl permission on the destination bucket and prefix. The full list of possible canned ACLs can be found at https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl. By default only the object owner gets full control. If you are using cross account role for writing data, you may want to set bucket-owner-full-control to make bucket owner able to read the logs.

init_scripts (List[selector], optional)

The configuration for storing init scripts. Any number of scripts can be specified. The scripts are executed sequentially in the order provided. If cluster_log_conf is specified, init script logs are sent to <destination>/<cluster-id>/init_scripts.

spark_env_vars (permissive dict, optional)

An object containing a set of optional, user-specified environment variable key-value pairs. Key-value pair of the form (X,Y) are exported as is (i.e., export X=”Y”) while launching the driver and workers. To specify an additional set of SPARK_DAEMON_JAVA_OPTS, we recommend appending them to $SPARK_DAEMON_JAVA_OPTS as shown in the example below. This ensures that all default Databricks managed environmental variables are included as well. Example Spark environment variables: {“SPARK_WORKER_MEMORY”: “28000m”, “SPARK_LOCAL_DIRS”: “/local_disk0”} or {“SPARK_DAEMON_JAVA_OPTS”: “$SPARK_DAEMON_JAVA_OPTS -Dspark.shuffle.service.enabled=true”}

enable_elastic_disk (Bool, optional)

Autoscaling Local Storage: when enabled, this cluster dynamically acquires attitional disk space when its Spark workers are running low on disk space. This feature requires specific AWS permissions to function correctly - refer to https://docs.databricks.com/clusters/configure.html#autoscaling-local-storage for details.

existing (String)

The ID of an existing cluster that will be used for all runs of this job. When running jobs on an existing cluster, you may need to manually restart the cluster if it stops responding. Databricks suggests running jobs on new clusters for greater reliability.

run_name (String, optional)

An optional name for the run. The default value is Untitled

libraries (List[selector], optional)

An optional list of libraries to be installed on the cluster that will execute the job. By default dagster, dagster-databricks and dagster-pyspark libraries will be included.

timeout_seconds (Int, optional)

An optional timeout applied to each run of this job. The default behavior is to have no timeout.

idempotency_token (String, optional)

An optional token that can be used to guarantee the idempotency of job run requests.If an active run with the provided token already exists, the request will not create a new run, but will return the ID of the existing run instead. If you specify the idempotency token, upon failure you can retry until the request succeeds. Databricks guarantees that exactly one run will be launched with that idempotency token. This token should have at most 64 characters.

databricks_host (dagster.StringSource)

Databricks host, e.g. uksouth.azuredatabricks.com

databricks_token (dagster.StringSource)

Databricks access token

secrets_to_env_variables (List[strict dict], optional)

Databricks secrets to be exported as environment variables. Since runs will execute in the Databricks runtime environment, environment variables (such as those required for a StringSource config variable) will not be accessible to Dagster. These variables must be stored as Databricks secrets and specified here, which will ensure they are re-exported as environment variables accessible to Dagster upon execution.

storage (selector, optional)

Databricks storage configuration for either S3 or ADLS2. If access credentials for your Databricks storage are stored in Databricks secrets, this config indicates the secret scope and the secret keys used to access either S3 or ADLS2.

Config Schema:
s3 (strict dict)

S3 storage secret configuration

Config Schema:
secret_scope (String)

The Databricks secret scope containing the storage secrets.

access_key_key (String)

The key of a Databricks secret containing the S3 access key ID.

secret_key_key (String)

The key of a Databricks secret containing the S3 secret access key.

adls2 (strict dict)

ADLS2 storage secret configuration

Config Schema:
secret_scope (String)

The Databricks secret scope containing the storage secrets.

storage_account_name (String)

The name of the storage account used to access data.

storage_account_key_key (String)

The key of a Databricks secret containing the storage account secret key.

local_pipeline_package_path (dagster.StringSource, optional)

Absolute path to the package that contains the pipeline definition(s) whose steps will execute remotely on Databricks. This is a path on the local fileystem of the process executing the pipeline. Before every step run, the launcher will zip up the code in this path, upload it to DBFS, and unzip it into the Python path of the remote Spark process. This gives the remote process access to up-to-date user code.

local_dagster_job_package_path (dagster.StringSource, optional)

Absolute path to the package that contains the dagster job definition(s) whose steps will execute remotely on Databricks. This is a path on the local fileystem of the process executing the dagster job. Before every step run, the launcher will zip up the code in this path, upload it to DBFS, and unzip it into the Python path of the remote Spark process. This gives the remote process access to up-to-date user code.

staging_prefix (dagster.StringSource, optional)

Directory in DBFS to use for uploaded job code. Must be absolute.

Default Value: ‘/dagster_staging’

wait_for_logs (Bool, optional)

If set, and if the specified cluster is configured to export logs, the system will wait after job completion for the logs to appear in the configured location. Note that logs are copied every 5 minutes, so enabling this will add several minutes to the job runtime. NOTE: this integration will export stdout/stderrfrom the remote Databricks process automatically, so this option is not generally necessary.

Default Value: False

max_completion_wait_time_seconds (dagster.IntSource, optional)

If the Databricks job run takes more than this many seconds, then consider it failed and terminate the step.

Default Value: 86400

poll_interval_sec (Float, optional)

How frequently Dagster will poll Databricks to determine the state of the job.

Default Value: 5.0

Resource for running ops as a Databricks Job.

When this resource is used, the op will be executed in Databricks using the ‘Run Submit’ API. Pipeline code will be zipped up and copied to a directory in DBFS along with the op’s execution context.

Use the ‘run_config’ configuration to specify the details of the Databricks cluster used, and the ‘storage’ key to configure persistent storage on that cluster. Storage is accessed by setting the credentials in the Spark context, as documented here for S3 and here for ADLS.

class dagster_databricks.DatabricksError[source]

Legacy APIs

dagster_databricks.create_databricks_job_solid(name='databricks_job', num_inputs=1, description=None, required_resource_keys=frozenset({'databricks_client'}))[source]

Creates a solid that launches a databricks job.

As config, the solid accepts a blob of the form described in Databricks’ job API: https://docs.databricks.com/dev-tools/api/latest/jobs.html.

Returns

A solid definition.

Return type

SolidDefinition

Example

from dagster import ModeDefinition, pipeline
from dagster_databricks import create_databricks_job_solid, databricks_client

sparkpi = create_databricks_job_solid().configured(
    {
        "job": {
            "name": "SparkPi Python job",
            "new_cluster": {
                "spark_version": "7.3.x-scala2.12",
                "node_type_id": "i3.xlarge",
                "num_workers": 2,
            },
            "spark_python_task": {"python_file": "dbfs:/docs/pi.py", "parameters": ["10"]},
        }
    },
    name="sparkspi",
)


@pipeline(
    mode_defs=[
        ModeDefinition(
            resource_defs={
                "databricks_client": databricks_client.configured(
                    {"host": "my.workspace.url", "token": "my.access.token"}
                )
            }
        )
    ]
)
def my_pipeline():
    sparkpi()