Join the Kedro community

Updated 2 weeks ago

Generating Airflow Dags from Kedro Projects Using Kedro-Airflow

Hello everyone πŸ˜„ !

I'm currently using kedro-airflow to generate my Airflow DAGs from my Kedro project. I followed the recommendation in the documentation and used a custom template to adapt the DAG for execution on Cloud Composer.
According to the documentation, it is possible to create TaskGroups if needed: Kedro-Airflow Documentation.

I’d like to group multiple nodes into TaskGroups, but I can't find any parameters that are automatically passed to the Jinja2 template to enable this grouping.

Has anyone done this before? Or does anyone know exactly what the documentation is referring to?

Thanks in advance!

D
M
M
8 comments

Hi Mohammed,
Currently, you can only use the --group-in-memory option with the kedro airflow create command to group in-memory nodes together. However, we are actively working on a more advanced approach that will allow nodes to be grouped automatically based on their namespace. We expect to deliver this feature in February.
Hope this helps!

Hi @Dmitry Sorokin ! πŸ˜„
Thanks for your response! πŸ™‚ However, I’m not sure if these features will help in my case due to the way my DAG is structured.

Here’s my current Jinja2 template for the DAG:

# Default Args DAG
default_args = dict(
    owner="{{ owner | default('airflow') }}",
    depends_on_past={{ depends_on_past | default(False) }},
    email_on_failure={{ email_on_failure | default(False) }},
    email_on_retry={{ email_on_retry | default(False) }},
    retries={{ retries | default(1) }},
    retry_delay=timedelta(minutes={{ retry_delay | default(5) }}),
)

with DAG(
    dag_id=dag_id,
    schedule=CronDataIntervalTimetable("0 1 * * *", timezone="{{ timezone }}"),
    default_args=default_args,
) as dag:
    # Create cluster
    create_cluster = DataprocCreateClusterOperator(
        task_id="create_cluster",
        project_id=project_id,
        cluster_name=cluster_name,
        region=region,
        cluster_config=cluster_config,
        gcp_conn_id=".....",
    )

    tasks = {
    {% for node_name, node_list in nodes.items() %}
        "{{ node_name | safe}}": DataprocSubmitJobOperator(
            task_id="{{ node_name | safe}}",
            project_id=project_id,
            region=region,
            job={
                "reference": {
                    "job_id": "{{ node_name | safe | slugify }}-job-{{ '{{' }} ts_nodash {{ '}}' }}",
                },
                "placement": {"cluster_name": cluster_name},
                "pyspark_job": {
                    "python_file_uris": [whl_file],
                    "main_python_file_uri": script_path,
                    "args": [
                        "--pipeline={{ pipeline_name }}",
                        "--from-nodes={{ node_name | safe }}",
                        "--to-nodes={{ node_name | safe }}",
                        "--params process_date={{ '{{' }} data_interval_end | ds {{ '}}' }}",
                        "--async",
                    ],
                    "archive_uris": [f"{config_archive}#config"],
                },
            },
            gcp_conn_id="{{ .... }}",
        ),
    {% endfor %}
    }

    # Delete cluster
    delete_cluster = DataprocDeleteClusterOperator(
        task_id="delete_cluster",
        project_id=project_id,
        cluster_name=cluster_name,
        region=region,
        trigger_rule="all_done",
        gcp_conn_id="{{ .... }}",
    )

    # Dependancies
    create_cluster >> list(tasks.values()) >> delete_cluster

    {% for parent_node, child_nodes in dependencies.items() -%}
    {% for child in child_nodes %}    tasks["{{ parent_node | safe }}"] >> tasks["{{ child | safe}}"]
    {% endfor %}
    {%- endfor %}

As you can see, I define my tasks dynamically within a dictionary, and my dependencies are managed at the end of the template. My goal is to group certain nodes into TaskGroups, but I don't see any parameters automatically passed to the template that would allow me to do this.

I structured my DAG this way because I didn’t know how to do it differently...

Would the upcoming feature you're working on (grouping nodes based on namespace) allow this kind of grouping within the Jinja2 template? Or do you have any suggestions on how I could implement it manually in the meantime?

Thanks again for your help! 😊

I would like to execute a single job for a group of nodes so that they can benefit from memory datasets. The goal is for one Airflow step to correspond to one or multiple logically grouped nodes, whether through tags, namespaces, or any other mechanism. The key requirement is that this grouping happens automatically, without manual intervention.

Instead of using --from-nodes and --to-nodes, I believe that the solution would be to run kedro run -ns <namespace> for each task, which should resolve my problem.

I just need to find how to retrieve the namespace in the template. πŸ˜„

@Dmitry Sorokin I believe that using tags associated with nodes will make things much simpler to meet my needs. Will they also be included in the February update? πŸ˜„

we decided to start from namespaces, then will consider tags also

Nice, thanks @Dmitry Sorokin!

Starting with namespaces makes sense! πŸ™‚
And great that tags are also on the radar. πŸš€

Add a reply
Sign up and join the conversation on Slack