
    科技2025-02-25  13


    Apache Airflow is a platform for scalable workflow scheduling and execution with detailed monitoring and management. In this article I will describe its specific use case: scheduling API requests to enrich your data.

    Apache Airflow是一个可扩展的工作流计划和执行的平台,具有详细的监视和管理功能。 在本文中,我将描述其特定的用例:安排API请求以丰富您的数据。

    问题 (Issue)

    We need to update economic event data in our database as soon as its data is released. We do not have any subscription API available, we only have a REST API and we know the specific date at which event’s data should be released.

    数据发布后,我们需要更新数据库中的经济事件数据。 我们没有任何可用的订阅API,我们只有REST API,并且我们知道应该发布事件数据的特定日期。

    We store upcoming economic events in our database and we want to dynamically schedule workflows to pull additional data at specified dates. The dates are stored in a table (actually a MongoDB collection).

    我们将即将发生的经济事件存储在我们的数据库中,并且我们希望动态安排工作流程以在指定日期提取其他数据。 日期存储在表中(实际上是MongoDB集合)。

    We also want to be able to monitor workflows execution and easily retry failed ones.


    救援气流 (Airflow to the rescue)

    Now let’s discuss how Airflow works and how it can help us achieve what we want.


    Airflow consists of 3 separate processes:


    Web server — it serves user interface to create, manage and monitor workflows

    Web服务器-它提供用户界面来创建,管理和监视工作流 Scheduler — it schedules workflows

    计划程序-它计划工作流程Worker — it runs workflow tasks


    The processes communicate with each other via a metadata database.


    How do we add a workflow?


    Workflows are generated from Python source files containing DAG definition. DAG is an acronym for Directed Acyclic Graph. This name describes very well how tasks can be connected with each other in a workflow.

    从包含DAG定义的Python源文件生成工作流。 DAG是有向无环图的缩写。 该名称很好地描述了任务如何在工作流中相互连接。

    实作 (Implementation)

    First we have created a workflow, which pulls upcoming events and stores them in an Airflow variable. In another Python file one DAG per economic event is created and scheduled to run once at desired start_date — the economic data release date.

    首先,我们创建了一个工作流,该工作流提取即将发生的事件并将其存储在Airflow变量中。 在另一个Python文件中,每个经济事件都会创建一个DAG,并计划在所需的start_date (经济数据发布日期)运行一次。

    The DAG consists of 3 tasks, each of them represents a part of the ETL process:


    Extraction of an event data from REST API endpoint

    从REST API端点提取事件数据 Transformation of the response

    React的转变Loading the enriched event object into database

    将丰富的事件对象加载到数据库中 DAG for getting economic data DAG获取经济数据 DAG view in Airflow UI Airflow UI中的DAG视图

    The full DAG definition file code is available in a Gist below.


    结论 (Conclusion)

    Now we have an efficient mechanism for updating economic data. If there are any issues with specific events, we can easily find related DAG by event ID and check what went wrong. Then re-trigger the idempotent workflow.

    现在,我们有了一种有效的机制来更新经济数据。 如果特定事件有任何问题,我们可以通过事件ID轻松找到相关的DAG,并检查出了什么问题。 然后重新触发幂等工作流程。

    资料来源 (Sources)

    翻译自: https://medium.com/@alucarded/using-airflow-to-dynamically-schedule-workflows-14e3000e7d8c


    Processed: 0.013, SQL: 8