Run Attribution
Experimental
#

You can find the code for this example on Github

Run Coordinator#

The Run Coordinator is used to control the policy that Dagster uses to manage the set of active runs on your deployment.

The Run Coordinator is invoked when runs are submitted on the instance (e.g. via the GraphQL API), and as a result it can be used to dynamically attach tags to submitted runs.

In this example, we'll perform run attribution, which means that we'll attach a user's email as a tag to submitted runs.

To accomplish this, we'll use a custom Run Coordinator to read Flask HTTP headers (from Dagster's GraphQL server) and parse the headers to get an email which we'll attach as a tag.

Custom Run Coordinator#

In this use case, we'd like to add a hook to customize submitted runs while still using a queue to submit runs to the Dagster Daemon. To accomplish this, we can use the Queued Run Coordinator in the example below. The context object available in submit_run has a get_request_header method we can use to read HTTP headers:

from dagster.core.run_coordinator import QueuedRunCoordinator, SubmitRunContext
from dagster.core.storage.pipeline_run import PipelineRun


class CustomRunCoordinator(QueuedRunCoordinator):
    def submit_run(self, context: SubmitRunContext) -> PipelineRun:
        desired_header = context.get_request_header(CUSTOM_HEADER_NAME)

Then we can parse the relevant header (in this case, called the jwt_claims_header) with any custom hook. In the following example, we're decoding a JWT header which contains the user's email.

def get_email(self, jwt_claims_header: Optional[str]) -> Optional[str]:
    if not jwt_claims_header:
        return None

    split_header_tokens = jwt_claims_header.split(".")
    if len(split_header_tokens) < 2:
        return None

    decoded_claims_json_str = b64decode(split_header_tokens[1])
    try:
        claims_json = loads(decoded_claims_json_str)
        return claims_json.get("email")
    except JSONDecodeError:
        return None

The above is just an example - you can write any hook which would be useful to you.

Putting this all together, we can use these hooks to dynamically attach tags to submitted pipeline runs. In the following example, we'd read the user's email from the X-Amzn-Oidc-Data header by using the get_email hook defined above, and then attach the email as a tag to the pipeline run.

def submit_run(self, context: SubmitRunContext) -> PipelineRun:
    pipeline_run = context.pipeline_run
    jwt_claims_header = context.get_request_header("X-Amzn-Oidc-Data")
    email = self.get_email(jwt_claims_header)
    if email:
        self._instance.add_run_tags(pipeline_run.run_id, {"user": email})
    else:
        warnings.warn(f"Couldn't decode JWT header {jwt_claims_header}")
    return super().submit_run(context)

Deploying#

dagster.yaml#

To specify the custom Run Coordinator to be used on the instance, add the following snippet to an instance's dagster.yaml:

run_coordinator:
  module: run_attribution_example
  class: CustomRunCoordinator

Helm#

If you're using Helm to deploy instead, you can specify the custom run coordinator in the Helm chart's values.yaml:

dagsterDaemon:
  runCoordinator:
    enabled: true
    type: CustomRunCoordinator
    config:
      customRunCoordinator:
        module: run_attribution_example
        class: CustomRunCoordinator
        config: {}

Note that the flexibility of specifying module and class allows for any custom Run Coordinator to be used, as long as the relevant module is installed in the image that the Dagster instance is running on.