Skip to content

Airflow integration guide#

Metadata logged with Airflow displayed in the Neptune web app

Apache Airflow is an open source platform for batch oriented workflows. With Neptune, you can log metadata generated from different tasks of Airflow DAG runs.

You can do the following with the Neptune-Airflow integration:

  • Compare model results
  • Track the workflow/DAG config
  • Log metadata from different tasks in one place

See example in Neptune  Code examples 

Before you start#

Setting Neptune credentials#

Save your Neptune credentials as Airflow Variables to ensure that they work for all tasks.

  • NEPTUNE_API_TOKEN - Neptune API token of the account that is doing the logging. (How to find)
  • NEPTUNE_PROJECT - Full name of the Neptune project where the metadata should be logged (workspace-name/project-name).

You can also set your credentials as regular environment variables, but this needs to be done on all machines. Airflow Variables will take precedence.

Passing your Neptune credentials

Once you've registered and created a project, set your Neptune API token and full project name to the NEPTUNE_API_TOKEN and NEPTUNE_PROJECT environment variables, respectively.

export NEPTUNE_API_TOKEN="h0dHBzOi8aHR0cHM.4kl0jvYh3Kb8...6Lc"

To find your API token: In the bottom-left corner of the Neptune app, expand the user menu and select Get my API token.

export NEPTUNE_PROJECT="ml-team/classification"

Your full project name has the form workspace-name/project-name. You can copy it from the project settings: Click the menu in the top-right → Edit project details.

On Windows, navigate to SettingsEdit the system environment variables, or enter the following in Command Prompt: setx SOME_NEPTUNE_VARIABLE 'some-value'


While it's not recommended especially for the API token, you can also pass your credentials in the code when initializing Neptune.

run = neptune.init_run(
    project="ml-team/classification",  # your full project name here
    api_token="h0dHBzOi8aHR0cHM6Lkc78ghs74kl0jvYh...3Kb8",  # your API token here
)

For more help, see Set Neptune credentials.

Installing the integration#

To use your preinstalled version of Neptune together with the integration:

pip
pip install -U neptune-airflow
conda
conda install -c conda-forge neptune-airflow

To install both Neptune and the integration:

pip
pip install -U "neptune[airflow]"
conda
conda install -c conda-forge neptune neptune-airflow

Basic logging example#

To enable Neptune logging, create a NeptuneLogger instance and use it in your task:

from neptune_airflow import NeptuneLogger

with DAG(
    ...
) as dag:

    def your_task(**context):
        logger = NeptuneLogger()
        return task_results(logger, **context)

More options#

Getting the task run and logging metadata#

You can get the Neptune run of a context with the get_run_from_context() method, then log whatever metadata you want to the returned Run object.

with DAG(
    ...
) as dag:

    def task(**context):
        logger = NeptuneLogger()
        ...
        with logger.get_run_from_context(
            context=context, log_context=log_context
        ) as run:
            ...
            run["model_checkpoint/checkpoint"].upload_files("my_model.h5")
            run.sync()
            run.stop()

Logging to a run namespace based on task ID#

You can get a namespace handler based on the task ID with the get_task_handler_from_context() method.

This lets you organize metadata from different tasks to separate namespaces within the same Neptune Run object.

from neptune.types import File

with DAG(
    ...
) as dag:

    def task(**context):
        logger = NeptuneLogger()
        ...
        with logger.get_task_handler_from_context(
            context=context, log_context=True
            ) as handler:
            ...
            for image, label in zip(x_test[:10], y_test[:10]):
                prediction = model.predict(image[None], verbose=0)
                predicted = prediction.argmax()
                desc = f"label : {label} | predicted : {predicted}"
                handler["visualization/test_prediction"].append(
                    File.as_image(image), description=desc
                )

The above example would log the predictions under <task_id>/visualization/test_prediction inside the Neptune run.

Finding the Neptune run associated with a DAG run#

To ensure that the custom ID of the Neptune run is no longer than 36 characters, it is generated in the following way:

custom_run_id = md5(dag_run_id.encode()).hexdigest()

You can use this to find the custom Neptune ID of any DAG run based on its ID.

The custom run ID is stored in the sys/custom_run_id field.


Related