I have the following airflow dag
I am able to define the dependencies correctly in the dag above because I have hardcoded the elements list (mentioned in the code snippet). But suppose I want the elements list to be taken from airflow dag run configuration (at runtime), and then the dag be generated, how would I acheive that? I have gone through n number of articles but still am not able to find a way to power the elements list from dag_run.conf. Can somebody please help here as I am blocked and don't see a way to get out of this
elements = [
[["a", "b"], ["c", "d"], ["e", "f"], ["g", "h"], ["i", "j"]],
[["k", "l"], ["m", "n"], ["o", "p"], ["q", "r"]],
[["s", "t"], ["u", "v"], ["w", "x"]],
[["y", "z"]]
]
Corresponding code
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
@task
def process_element(element: str) -> str:
"""
Task to process an individual element.
"""
print(f"Processing element {element}")
return f"Processed {element}"
dag = DAG(
'process_elements_dag',
description='DAG for processing elements with dynamic dependencies',
schedule_interval=None,
start_date=datetime(2025, 1, 28),
)
elements = [
[["a", "b"], ["c", "d"], ["e", "f"], ["g", "h"], ["i", "j"]],
[["k", "l"], ["m", "n"], ["o", "p"], ["q", "r"]],
[["s", "t"], ["u", "v"], ["w", "x"]],
[["y", "z"]]
]
with dag:
for element_idx, element in enumerate(elements):
task_groups = [
[process_element.override(task_id=f"process_element_{element}")(element) for element in group]
for group in element
]
# Set dependencies sequentially between groups
for i in range(len(task_groups) - 1):
for upstream_task in task_groups[i]:
for downstream_task in task_groups[i + 1]:
downstream_task.set_upstream(upstream_task)