python 工作流引擎
At one point working in a data project, you will always end up with multiple (dependent or independent) data processing steps that you need to somehow connect, schedule for periodic execution, and also monitor for errors in production. This is usually the time you either hack together some unreliable bash script orchestrating these steps or you are looking for a dedicated workflow engine. In this article, I want to give you some insights into one of the newer tools in this area called Prefect.
在数据项目中的某一时刻,您总是会遇到多个(独立或独立的)数据处理步骤,这些步骤需要以某种方式连接,安排定期执行并监视生产中的错误。 通常,这是您要么编排一些不可靠的bash脚本或编排这些步骤,要么正在寻找专用的工作流引擎的时候。 在本文中,我想向您介绍该领域中一种名为Prefect的较新工具。
The space of workflow orchestration tooling is quite big and there are some well known and often used engines in there like Apache Airflow, Luigi, or Oozie besides many commercial options. Nevertheless, this list is long and it could mean that many of these tools are not perfect and don’t fulfill all user requirements. When the need presented itself for such a tool in a previous project, I began looking for new promising options without defaulting to the hyped Apache Airflow. One of the reasons why Prefect was chosen is that it allowed us to start easily without the need for a central server with its open-sourced Prefect Core engine. Since we started using Prefect in the end of 2019 many things have changed like the release of Prefect Cloud and Server which are in big parts open-source and accompanied with great managed service offerings that are especially useful for smaller teams. If you want to compare some of the details e.g. to Apache Airflow I can recommend the following blog article: https://medium.com/the-prefect-blog/why-not-airflow-4cfa423299c4
工作流程编排工具的空间很大,除许多商业选项外,还存在一些知名且经常使用的引擎,例如Apache Airflow , Luigi或Oozie 。 但是,此列表很长,可能意味着这些工具中的许多工具都不完美,无法满足所有用户要求。 当在上一个项目中对这种工具提出自己的需求时,我开始寻找新的有前途的选择,而没有默认使用大肆宣传的Apache Airflow。 选择Prefect的原因之一是,它使我们能够轻松启动,而无需带有开源Prefect Core引擎的中央服务器。 自从我们在2019年底开始使用Prefect以来,许多事情已经发生了变化,例如Prefect Cloud和Server的发布大部分是开源的,并提供了出色的托管服务产品,这些产品对于小型团队尤其有用。 如果您想将某些细节与例如Apache Airflow进行比较,我可以推荐以下博客文章: https : //medium.com/the-prefect-blog/why-not-airflow-4cfa423299c4
Typical example DAG 典型示例DAGAdditional motivations to use Prefect are the handling of task and data dependencies as Directed Acyclic Graph (DAG) which is a great way to model this problem. If you look at other tools in that area like AWS Step functions which is using state machines for their workflows you can see that there are multiple ways to address this problem. An example DAG you can see on the left. The basic idea is that you model the graph with concrete tasks which are depending on other tasks and optionally on their produced data as input for downstream tasks. In Prefect this is modeled quite natural in plain Python and normal function arguments and return values all programmers are used to.
使用Prefect的其他动机是将任务和数据依赖项作为有向非循环图(DAG)处理,这是对此问题建模的一种好方法。 如果您查看该领域中的其他工具,例如将状态机用于其工作流的AWS Step函数,您会发现有多种方法可以解决此问题。 您可以在左侧看到示例DAG。 基本思想是使用具体任务对图形进行建模,具体任务取决于其他任务,并视其生成的数据作为下游任务的输入。 在Prefect中,这是用普通的Python和普通函数参数以及所有程序员都习惯使用的返回值来自然建模的。
Prefect consists of several components where many can be used optionally. The central component is Prefect Core which is the basic framework to define flows and tasks including the possibility to run them locally. It is available in PyPI as a pip package and can be installed like the following into a virtual environment or globally.
Prefect由几个组件组成,可以选择使用多个组件。 核心组件是Prefect Core ,它是定义流程和任务的基本框架,包括在本地运行它们的可能性。 它可以在PyPI中作为pip包提供,并且可以像以下方式安装到虚拟环境或全局中。
pip install prefectWorkflows with Prefect Core can also potentially be used in production setups if you schedule, log, and monitor the execution yourselves. This is for example possible with the Cron functionality of AWS ECS Fargate and AWS CloudWatch services. Nevertheless, you would miss out on many things that Prefect can offer you as an integrated workflow platform like the great and modern UI they provide for you. If you want to use this you have the choice to use Prefect Server, host and secure it yourself, or use their managed Prefect Cloud offering.
如果您自己计划,记录和监视执行,则具有Prefect Core的工作流也可能会用于生产设置中。 例如,这可以通过AWS ECS Fargate和AWS CloudWatch服务的Cron功能实现。 但是,您会错过许多Prefect可以为您提供的作为集成工作流平台的功能,例如它们为您提供的出色而现代的UI。 如果要使用此功能,则可以选择使用Prefect Server,自己托管和保护它或使用其托管的Prefect Cloud产品。
In the following example (https://github.com/ludwigm/prefect-demo-flow), I want to run you through how a potential Prefect flow may look like. This is a simplified example and in reality, you probably have more complex flows with more and bigger tasks. It should still give you the possibility to see what you can do with this tool. The example contains classical parts you see in many ETL flows consisting of different steps for Gathering of data (Extract), preparing and aggregation of data (Transform), and providing them somewhere as an output (Load).
在以下示例( https://github.com/ludwigm/prefect-demo-flow )中,我想向您介绍潜在的Prefect流的外观。 这是一个简化的示例,实际上,您可能拥有更多,更大任务的更复杂流程。 它仍然应该让您看到使用此工具可以做什么的可能性。 该示例包含您在许多ETL流程中看到的经典部分,这些步骤由不同的步骤组成,这些步骤包括数据收集(提取),数据准备和聚合(转换)以及将它们作为输出提供(加载)的步骤。
DAG of the flow created with flow.visualize() 使用flow.visualize()创建的流的DAGThis example is taking daily world-wide COVID data, filters them by requested countries, aggregates them up on a monthly level, and finally uploads them as a human-readable CSV on AWS S3. On the left, you can see a DAG representation of the flow as it is created by Prefect.
此示例获取每日全球COVID数据,按请求的国家/地区过滤它们,按月汇总它们,最后将其作为人类可读的CSV格式上传到AWS S3。 在左侧,您可以看到由Prefect创建的流的DAG表示形式。
The input data is in the following format provided as JSON files via an HTTP endpoint:
输入数据采用以下格式,通过HTTP端点作为JSON文件提供:
{ "records" : [ { "dateRep" : "07/09/2020", "day" : "07", "month" : "09", "year" : "2020", "cases" : 74, "deaths" : 2, "countriesAndTerritories" : "Afghanistan", "geoId" : "AF", "countryterritoryCode" : "AFG", "popData2019" : 38041757, "continentExp" : "Asia", "Cumulative_number_for_14_days_of_COVID-19_cases_per_100000" : "1.04884745" }, ... many more records ]}The final output in CSV format is looking like this:
CSV格式的最终输出如下所示:
year_month,cases,deaths2019_12,0,02020_01,5,02020_02,52,02020_03,61856,5832020_04,97206,57052020_05,22363,22122020_06,12777,4732020_07,14439,1682020_08,33683,157In Prefect a flow is defined in arbitrary Python code. Following you see how the functions are wired together to produce the DAG:
在Prefect中,流程是用任意Python代码定义的。 接下来,您将看到如何将功能连接在一起以产生DAG:
def create_flow(): local_parallelizing_environment = LocalEnvironment(executor=LocalDaskExecutor()) with Flow(FLOW_NAME, environment=local_parallelizing_environment) as flow: country = Parameter("country", default=DEFAULT_COUNTRY) bucket = Parameter("bucket", default=DEFAULT_BUCKET) covid_df = download_data() filtered_covid_df = filter_data(covid_df, country) prepared_df = enrich_data(filtered_covid_df) aggregated_df = aggregate_data(prepared_df) print_data(aggregated_df) csv_results = prepare_data_for_upload(aggregated_df) upload_to_s3(csv_results["csv"], csv_results["filename"], bucket=bucket) return flowThe context manager (with statement) for Flow is creating the space to wire tasks together. Such a task can be an arbitrary python function that is annotated with @task or is inherited from Task. Both of these constructs are from Prefect. A special kind of task is a Parameter which acts as inputs to your flow. In this case, it is a selection in which countries your interested in for analysis and how the AWS S3 bucket is named where you want to export your data. In the following example, we will go into detail how these task functions are built.
Flow的上下文管理器(带有语句)正在创建将任务连接在一起的空间。 这样的任务可以是用@task注释或从Task继承的任意python函数。 这两个构造都来自Prefect。 一种特殊的任务是参数,它是流程的输入。 在这种情况下,您可以选择要在哪个国家/地区进行分析,以及如何将AWS S3存储桶命名为您要导出数据的位置。 在下面的示例中,我们将详细介绍如何构建这些任务功能。
Let’s start with the gathering of the data and the function download_data. This task is the most “heavy” task as it needs to download a comparable large chunk of data (around 25 MB) and we don’t want to repeat this over and over again e.g. when re-executing the flow. Out of this reason, there are some special Prefect settings set to store the result with a daily cache key.
让我们开始收集数据和功能download_data 。 此任务是最“繁重”的任务,因为它需要下载相当大的数据块(大约25 MB),并且我们不想一遍又一遍地重复此操作,例如在重新执行流时。 出于这个原因,有一些特殊的“完美”设置被设置为使用每日缓存密钥存储结果。
@task(checkpoint=True, result=LocalResult(), target="{task_name}-{today}") def download_data() -> pd.DataFrame: with urllib.request.urlopen(COVID_DATA_URL) as url: covid_data = json.loads(url.read().decode())["records"] covid_df = pd.DataFrame(covid_data) return covid_dfdonwload_data seems like an arbitrary Python function you would find in many data projects but it has the @task annotation so that Prefect knows that this is a unit of work to place in the DAG. Prefect is using Dask as a framework to execute and distribute work which means that these tasks themselves can do also quite some heavy-lifting. You could also imagine doing the heavy work outside of Prefect like executing a SQL on the data warehouse or submitting a Spark job on an external Big Data cluster instead of passing this to Dask.
donwload_data似乎是在许多数据项目中都可以找到的任意Python函数,但是它具有@task批注,因此Prefect知道这是放置在DAG中的一个工作单元。 Prefect使用Dask作为执行和分发工作的框架,这意味着这些任务本身也可以做一些繁重的工作。 您还可以想象在Prefect之外进行繁重的工作,例如在数据仓库上执行SQL或在外部大数据集群上提交Spark作业,而不是将其传递给Dask。
In this case, we want the data to be cached so we specify that we want to checkpoint the data and that we are going to use a LocalResult which stores the data on the local disk with cloudpickle. It is also possible to specify different serializers in case you want to cache it in JSON or your own format. The target is the cache key which should be used and in this case, uses some templating variables provided by Prefect. The resulting cached file on disk will be saved in the following format:
在这种情况下,我们希望数据缓存,所以我们指定我们要检查点的数据和我们将要使用LocalResult其存储在本地磁盘与cloudpickle上的数据。 如果您想以JSON或自己的格式缓存它,也可以指定不同的序列化器。 目标是应使用的缓存键,在这种情况下,将使用Prefect提供的一些模板变量。 磁盘上生成的缓存文件将以以下格式保存:
download_data-2020-09-08In a more production-like setup, you can also use S3 to store the data and would therefore exchange LocalResult with S3Result. You can use any Python datatype that cloudpickle can serialize and instead of passing the real data, it is also an option to only forward references to data e.g. paths in S3.
在更像生产的设置中,您还可以使用S3来存储数据,并因此将LocalResult与S3Result交换。 您可以使用cloudpickle可以序列化的任何Python数据类型,而不是传递真实数据,这也是仅转发对数据的引用(例如S3中的路径)的一种选择。
The rest of the task functions are like the following:
其余任务功能如下:
@task def filter_data(covid_df: pd.DataFrame, country: str) -> pd.DataFrame: logger = prefect.context.get("logger") logger.info(f"Filtering data for country: {country}") return covid_df[covid_df.countriesAndTerritories == country].copy() @task def enrich_data(covid_df: pd.DataFrame) -> pd.DataFrame: enriched_df = covid_df.copy() enriched_df["year_month"] = enriched_df["year"] + "_" + enriched_df["month"] return enriched_df @task def prepare_data_for_upload(covid_df: pd.DataFrame) -> Dict[str, str]: csv_string = io.StringIO() covid_df.to_csv(csv_string) filename = f"covid-monthly-{datetime.now().isoformat()}.csv" return {"csv": csv_string.getvalue(), "filename": filename} @task def aggregate_data(covid_df: pd.DataFrame) -> pd.DataFrame: return ( covid_df.groupby("year_month") .agg({"cases": "sum", "deaths": "sum"}) .sort_index() ) @task def print_data(data: Any) -> None: # Only prints locally and does not log to cloud print(data) upload_to_s3 = S3Upload()If you know Pandas this code probably looks very familiar. The only important part here is that all the unique functions are having the @task decorator.
如果您知道Pandas,则此代码可能看起来非常熟悉。 这里唯一重要的部分是所有独特的功能都具有@task装饰器。
The thing unique here is upload_to_s3 as it is not using these decorators but a pre-defined task from the Prefect task library called S3Upload which allows uploading data to an S3 bucket. There are many other pre-defined tasks e.g. for submitting a Databricks job, dbt job, or executing an AWS Lambda function. It is also possible to write your own reusable and generic tasks. We were doing this in the past for a task submission to AWS ECS Fargate to do the heavy-lifting like machine learning externally from Prefect/Dask.
此处唯一的事物是upload_to_s3,因为它不使用这些装饰器,而是Prefect任务库中的预定义任务,称为S3Upload ,该任务允许将数据上传到S3存储桶。 还有许多其他预定义的任务,例如,提交Databricks作业,dbt作业或执行AWS Lambda函数。 也可以编写自己的可重用和通用任务。 过去,我们这样做是为了向AWS ECS Fargate提交任务,以便像从Prefect / Dask进行外部机器学习一样进行繁重的工作。
If you want to run this flow you can run it with Prefect Core locally on your computer or with Prefect Cloud to have a nice UI for your flow and inspect logs and task failures much nicer. In this example project, I showcase the two different options. In both cases, my flow is executed locally but in the later case, it also registered in the UI of Prefect Cloud. The annotations on the functions are not needed and in this case coming from the click framework to allow to easily build a nice CLI for your Python applications.
如果要运行此流程,则可以在计算机上本地使用Prefect Core或使用Prefect Cloud运行该流程,以为流程创建一个漂亮的UI,并更好地检查日志和任务失败。 在这个示例项目中,我展示了两个不同的选项。 在这两种情况下,我的流程都在本地执行,但在后一种情况下,它也在Prefect Cloud的UI中注册。 不需要功能上的注释,在这种情况下,该注释来自click框架,从而可以轻松地为Python应用程序构建一个不错的CLI。
@main.command() @bucket_option @country_filter def run_local(bucket: str, country: str): flow = create_flow() flow.run(parameters={"bucket": bucket, "country": country}) @main.command() @bucket_option @country_filter def run_with_cloud(bucket: str, country: str): flow = create_flow() flow_id = flow.register(project_name=PROJECT_NAME) client = Client() flow_run_id = client.create_flow_run( flow_id=flow_id, parameters={"bucket": bucket, "country": country} ) print(f"Flow Run id: {flow_run_id}") flow.run_agent(show_flow_logs=True)I will go into details about deployment and how your flows are run in the next section.
在下一部分中,我将详细介绍有关部署以及流程如何运行的信息。
After running this flow a CSV file with the results should be visible in the specified S3 bucket:
运行此流后,带有结果的CSV文件应在指定的S3存储桶中可见:
aws s3 ls s3://ludwigm-bucket2020-09-08 16:57:05 187 covid-monthly-2020-09-08T16:57:00.282974.csv2020-09-08 18:03:31 187 covid-monthly-2020-09-08T18:03:26.231616.csvTo get a better understanding of the different moving parts I will go into more detail on how you would deploy such flows with Prefect Cloud.
为了更好地了解不同的移动部件,我将更详细地介绍如何使用Prefect Cloud部署此类流程。
If you check out the repository for this blog article you can register the flow in Prefect cloud and run it with a local in-process agent like in the following code snippet. I use poetry as Python package manager as an alternative to pip.
如果您检出此博客文章的存储库,则可以在Prefect云中注册流,并使用本地进程内代理运行该流,如以下代码片段所示。 我用诗歌作为Python包管理器来替代pip。
poetry installprefect auth login -t <personal-access-token>export PREFECT__CLOUD__AGENT__AUTH_TOKEN=<runner-token>export PREFECT__FLOWS__CHECKPOINTING=truepoetry run demo-flow run-with-cloud --bucket <s3-bucket>Keep in mind to setup some things in Prefect Cloud beforehand to make this work:
请记住,要事先在Prefect Cloud中进行一些设置,以使这项工作有效:
Project: Home → Team → Projects → Add project (“Demo”)
项目:主页→团队→项目→添加项目(“演示”)
Agent runner token: Home → Team → API tokens → Create token (“RUNNER”)
代理运行者令牌:主页→团队→API令牌→创建令牌(“ RUNNER”)
Personal access token: Home → User → Personal Access token -> Create token
个人访问令牌:主页→用户→个人访问令牌->创建令牌
What is noteworthy is that your flow is monitored and displayed in the UI but it is still running locally on your computer (LocalAgent). This is the hybrid execution model of Prefect which means that you potentially can use their cloud offering but the real execution is happening securely in your own infrastructure without your data being transferred to the Prefect cloud. How this would look like from an architecture standpoint can be seen in the next picture. For a more production-like setup, your flows are usually dockerized and loaded from a private docker registry like AWS ECR and executed for example with the FargateAgent. Other options like loading flows from GitHub or S3 are also possible.
值得注意的是,您的流程已受到监视并显示在UI中,但仍在计算机上本地运行( LocalAgent )。 这是Prefect的混合执行模型,这意味着您可能可以使用他们的云产品,但是实际执行是在您自己的基础架构中安全进行的,而无需将数据传输到Prefect云中。 从体系结构的角度来看,它的外观如下图所示。 对于更像生产的设置,通常将流从Docker私有注册表(如AWS ECR)进行dockerized和加载,并使用FargateAgent执行。 也可以使用其他选项,例如从GitHub或S3加载流程。
Simplified architecture with multiple environments and hybrid execution model 具有多种环境和混合执行模型的简化架构In the following screenshots, you can see how this example flow looks like in the UI. You have possibilities to drill down in the DAG, run your flow with different input parameters, see a Gantt chart to see what executes when, and how parallel, or investigate task failures to see specific logs for that issue.
在以下屏幕截图中,您可以看到该示例流程在UI中的外观。 您可以在DAG中进行深入研究,使用不同的输入参数运行流程,查看甘特图以查看何时执行什么以及并行执行的方式,或者调查任务失败以查看该问题的特定日志。
All Projects dashboard 所有项目仪表板 Flow group overview 流程组概述 Flow schematic for DAG DAG的流程图 Task detail view 任务详细信息视图 Logs for a single task 记录单个任务 Gantt visualization for timings 甘特图可视化时序Other interesting things are the possibility to define schedules for your flow for periodic execution or the configuration of CloudHooks to enable e.g. an alerting for failed flows to Slack. Prefect also brings functionality to use and manages secrets for your flow, e.g. for a database or API access.
其他有趣的事情是可以为您的流程定义时间表以进行定期执行,或者可以配置CloudHooks来启用,例如,提醒到Slack的流程失败。 Prefect还为您的流程带来了使用和管理机密的功能,例如用于数据库或API访问。
To make your flow run daily at a certain time it is as simple as adding the following code snippet to your flow creation.
要使您的流在特定时间每天运行,就像在流创建中添加以下代码段一样简单。
flow.schedule = CronSchedule("00 13 * * *")This article is a glimpse of what possibilities you have but should give you a bit of insight on how a full flow could look like. Overall I am very happy with Prefect and since I started using it in October 2019 there were so many releases and allowed bugs to be fixed quickly that I see a lot of good momentum here. It fulfilled our requirements to start easily without a server and big maintenance and allowed us to shift into a managed service offering with the Cloud version when our requirements became bigger. As it is just plain Python the hurdle for writing flows is also very little and it is also easy as a data scientist to write pipelines which can go to production. In the end, the documentation contains a lot of information but it can be sometimes a bit overwhelming to figure out all the moving parts for a more production-like setup. There is a great slack community with over 1400 people in it and a great and responsive development team. So in case you are looking for a workflow orchestration system, I can highly recommend giving Prefect a try.
本文是您可能拥有的可能性的概览,但应该使您对整个流程的样子有一些了解。 总体而言,我对Prefect感到非常满意,自从我于2019年10月开始使用它以来,发布了很多版本并允许快速修复错误,因此我在这里看到了很多良好的发展势头。 它满足了我们的要求,无需服务器和大量维护即可轻松启动,并且当我们的需求越来越大时,它可以迁移到具有Cloud版本的托管服务产品中。 由于它只是普通的Python,因此编写流程的障碍也很小,而且作为数据科学家,编写可以投入生产的管道也很容易。 最后,文档中包含很多信息,但有时找出所有运动部件以实现更像生产的设置可能有些不知所措。 有一个很棒的休闲社区,其中有1400多人,还拥有一个强大且响应Swift的开发团队。 因此,如果您正在寻找工作流程编排系统,我强烈建议您尝试使用Prefect。
Repository: https://github.com/ludwigm/prefect-demo-flow
仓库: https : //github.com/ludwigm/prefect-demo-flow
翻译自: https://makeitnew.io/prefect-a-modern-python-native-data-workflow-engine-7ece02ceb396
python 工作流引擎
相关资源:基于关系结构的轻量级工作流引擎