Hello everyone π !
I'm currently using kedro-airflow to generate my Airflow DAGs from my Kedro project. I followed the recommendation in the documentation and used a custom template to adapt the DAG for execution on Cloud Composer.
According to the documentation, it is possible to create TaskGroups if needed: Kedro-Airflow Documentation.
Iβd like to group multiple nodes into TaskGroups, but I can't find any parameters that are automatically passed to the Jinja2 template to enable this grouping.
Has anyone done this before? Or does anyone know exactly what the documentation is referring to?
Thanks in advance!
Hi Mohammed,
Currently, you can only use the --group-in-memory
option with the kedro airflow create
command to group in-memory nodes together. However, we are actively working on a more advanced approach that will allow nodes to be grouped automatically based on their namespace. We expect to deliver this feature in February.
Hope this helps!
Hi @Dmitry Sorokin ! π
Thanks for your response! π However, Iβm not sure if these features will help in my case due to the way my DAG is structured.
Hereβs my current Jinja2 template for the DAG:
# Default Args DAG default_args = dict( owner="{{ owner | default('airflow') }}", depends_on_past={{ depends_on_past | default(False) }}, email_on_failure={{ email_on_failure | default(False) }}, email_on_retry={{ email_on_retry | default(False) }}, retries={{ retries | default(1) }}, retry_delay=timedelta(minutes={{ retry_delay | default(5) }}), ) with DAG( dag_id=dag_id, schedule=CronDataIntervalTimetable("0 1 * * *", timezone="{{ timezone }}"), default_args=default_args, ) as dag: # Create cluster create_cluster = DataprocCreateClusterOperator( task_id="create_cluster", project_id=project_id, cluster_name=cluster_name, region=region, cluster_config=cluster_config, gcp_conn_id=".....", ) tasks = { {% for node_name, node_list in nodes.items() %} "{{ node_name | safe}}": DataprocSubmitJobOperator( task_id="{{ node_name | safe}}", project_id=project_id, region=region, job={ "reference": { "job_id": "{{ node_name | safe | slugify }}-job-{{ '{{' }} ts_nodash {{ '}}' }}", }, "placement": {"cluster_name": cluster_name}, "pyspark_job": { "python_file_uris": [whl_file], "main_python_file_uri": script_path, "args": [ "--pipeline={{ pipeline_name }}", "--from-nodes={{ node_name | safe }}", "--to-nodes={{ node_name | safe }}", "--params process_date={{ '{{' }} data_interval_end | ds {{ '}}' }}", "--async", ], "archive_uris": [f"{config_archive}#config"], }, }, gcp_conn_id="{{ .... }}", ), {% endfor %} } # Delete cluster delete_cluster = DataprocDeleteClusterOperator( task_id="delete_cluster", project_id=project_id, cluster_name=cluster_name, region=region, trigger_rule="all_done", gcp_conn_id="{{ .... }}", ) # Dependancies create_cluster >> list(tasks.values()) >> delete_cluster {% for parent_node, child_nodes in dependencies.items() -%} {% for child in child_nodes %} tasks["{{ parent_node | safe }}"] >> tasks["{{ child | safe}}"] {% endfor %} {%- endfor %}
I would like to execute a single job for a group of nodes so that they can benefit from memory datasets. The goal is for one Airflow step to correspond to one or multiple logically grouped nodes, whether through tags, namespaces, or any other mechanism. The key requirement is that this grouping happens automatically, without manual intervention.
Instead of using --from-nodes
and --to-nodes
, I believe that the solution would be to run kedro run -ns <namespace>
for each task, which should resolve my problem.
@Dmitry Sorokin I believe that using tags associated with nodes will make things much simpler to meet my needs. Will they also be included in the February update? π
Nice, thanks @Dmitry Sorokin!
Starting with namespaces makes sense! π
And great that tags are also on the radar. π