python - Dynamic Task Sequencing in Airflow via Dag Run Conf - Stack Overflow

admin2025-04-18  4

I have the following airflow dag

I am able to define the dependencies correctly in the dag above because I have hardcoded the elements list (mentioned in the code snippet). But suppose I want the elements list to be taken from airflow dag run configuration (at runtime), and then the dag be generated, how would I acheive that? I have gone through n number of articles but still am not able to find a way to power the elements list from dag_run.conf. Can somebody please help here as I am blocked and don't see a way to get out of this

elements = [
    [["a", "b"], ["c", "d"], ["e", "f"], ["g", "h"], ["i", "j"]],
    [["k", "l"], ["m", "n"], ["o", "p"], ["q", "r"]],
    [["s", "t"], ["u", "v"], ["w", "x"]],
    [["y", "z"]]
]

Corresponding code

from airflow import DAG
from airflow.decorators import task
from datetime import datetime


@task
def process_element(element: str) -> str:
    """
    Task to process an individual element.
    """
    print(f"Processing element {element}")
    return f"Processed {element}"


dag = DAG(
    'process_elements_dag',
    description='DAG for processing elements with dynamic dependencies',
    schedule_interval=None,
    start_date=datetime(2025, 1, 28),
)

elements = [
    [["a", "b"], ["c", "d"], ["e", "f"], ["g", "h"], ["i", "j"]],
    [["k", "l"], ["m", "n"], ["o", "p"], ["q", "r"]],
    [["s", "t"], ["u", "v"], ["w", "x"]],
    [["y", "z"]]
]

with dag:
    for element_idx, element in enumerate(elements):
        task_groups = [
            [process_element.override(task_id=f"process_element_{element}")(element) for element in group]
            for group in element
        ]

        # Set dependencies sequentially between groups
        for i in range(len(task_groups) - 1):
            for upstream_task in task_groups[i]:
                for downstream_task in task_groups[i + 1]:
                    downstream_task.set_upstream(upstream_task)
转载请注明原文地址:http://www.anycun.com/QandA/1744947471a89876.html