news 2026/4/23 12:56:12

自 2014 年以来数据工程是如何演变的

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
自 2014 年以来数据工程是如何演变的

原文:towardsdatascience.com/how-data-engineering-evolved-since-2014-9cc85f37fea6

在这次讨论中,我的目标是探讨数据编排和数据建模的演变趋势,突出工具的进步及其对数据工程师的核心益处。虽然 Airflow 自 2014 年以来一直是主导者,但数据工程领域已经发生了显著变化,现在它解决了更复杂的用例和需求,包括支持多种编程语言、集成和增强的可扩展性。我将检查当代和可能不太常规的工具,这些工具简化了我的数据工程流程,使我能够轻松创建、管理和编排强大、耐用和可扩展的数据管道。


在过去十年中,我们见证了各种 ETL(提取、转换和编排)框架的“寒武纪大爆发”。许多都是开源的,基于 Python,这并不奇怪。

最受欢迎的:

  • Airflow, 2014

  • Luigi, 2014

  • Prefect, 2018

  • Temporal, 2019

  • Flyte, 2020

  • Dagster, 2020

  • Mage, 2021

  • Orchestra, 2023

Apache Airflow, 2014

Apache Airflow 是由 Airbnb 于 2014 年创建的,于 2016 年开源,并于 2018 年加入 Apache 软件基金会。这是一个可以编程创建、调度和监控工作流的平台。他们是第一个引入 DAG 概念的人。工程师使用 Python 定义 DAG(有向无环图)来组织和可视化任务。现在几乎每个编排工具都使用了这个概念。例如,“Prefect DAG”看起来几乎一模一样。界面简单,可以轻松监控数据管道:

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/eea6661465a7df25fd096adf0a7cdab8.png

作者提供的图片

DAGs 易于理解,并且是用 Python 编写的:

@dag(default_args=default_args,tags=['etl'])defetl_pipeline():@task()defextract():returnjson.loads(data_string)@task(multiple_outputs=True)deftransform(order_data_dict:dict):return{"total_count":len(order_data_dict)}@task()defload(total_order_value:float):print(f"Total order value is:{total_count}")extracted=extract()transformed=transform(extracted)load(transformed["total_count"])

在我之前的一篇文章中,我写了一个更高级的 Airflow 示例:

如何成为一名数据工程师

优点:

  • 简单且强大的 UI:DAGs、重试、任务持续时间和失败的详细视图——在所有这些年之后,一切看起来都如此熟悉。

  • 社区支持:经过这么多年,开源项目拥有庞大的追随者群体。它非常受欢迎,并为各种数据源和目的地提供了大量的插件。结合定期更新,它成为许多数据开发者的明显选择。

  • Python:Python 等于定制和脚本化。在这里,一切看起来都如此灵活。在我多年前的一篇文章中,我成功定制了 ML 连接器,而且做起来非常容易 [1]。

使用 Google AI 平台和自定义环境容器训练您的 ML 模型

  • Jinja 和参数:这个特性对许多 DBT 用户来说应该很熟悉,它允许我们以创建模板化模型在 DBT 中的类似方式创建动态管道。

Luigi,2014

另一个用于我们的 ETL 管道的编排器。由 Spotify 开发,用于处理大量数据处理工作负载,Luigi 具有命令行界面和出色的可视化能力。然而,即使是简单的 ETL 管道也需要一些 Python 编程技能。尽管如此,Luigi 在许多情况下表现良好。

优势

  • HDFS 支持:Luigi 为团队提供了一个方便的工具箱,包括任务模板,包括 HDFS 和本地文件的文件系统抽象。这有助于保持操作原子性和数据管道的一致性,使一切运行顺畅。

  • 易于使用:拥有强大的追随者和贡献者社区。项目得到维护。

  • 优秀的用户界面:Web 界面允许搜索、过滤和优先处理 DAGs,这使得处理管道依赖关系变得容易。

  • 强大的基础设施:Luidgi 支持使用多个工具、A/B 测试、推荐和外部报告的复杂管道。

根据我的经验,它非常适合严格和直接的管道,尽管实现复杂的分支逻辑可能具有挑战性。即使是 Spotify 本身也在 2019 年从它转向。他们说维护起来不容易,并且需要多语言支持[2]。

为什么我们切换了我们的数据编排服务 - Spotify 工程

他们转向了 Flyte(更多内容见下文)。

Prefect,2018

成立于 2018 年,Prefect 旨在克服工程师在使用其他编排工具(如 Airflow)时面临的挑战。Prefect 采用了一个比 Airflow 更简单的系统。您可以使用Python 装饰器轻松地将代码转换为“Prefect DAG”,而不是使用 DAG 结构。

它提供了一个混合执行模型,允许任务在本地或任何云平台上执行。

Prefect 将自己定位为“数据流自动化的新标准”。

优势

  1. 开源:灵活性和多种部署选项。

  2. 轻量级:只需一个命令,我们就可以设置编排的开发环境。

  3. 动态映射:允许根据另一个任务的输出动态执行任务。

  4. Python 优先:凭借 Python 优先的经验,Prefect 提供了更干净的抽象,类似于 Airflow。

  5. 监控:使用 Prefect 的 Cloud UI 监控关键操作指标并运行失败。它看起来直观且美观。

  6. 警报和通知:Discord、电子邮件、Slack 以及更多渠道用于我们的管道警报。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/3ec19f57d7cad2cc9afa52247fe00041.png

作者图片

开源版本提供了我们在 Airflow 中通常使用的一切,包括诸如调度、本地密钥、重试、并发管理、扩展功能以及出色的日志记录等功能。

使用Prefect Cloud,我们得到一些额外的功能,例如自动化、Webhooks、事件源和针对托管环境可能需要的 Workspace 和组织。

关键组件将是我们要构建的任务和流程。我们可以用 Prefect 的@task装饰器来描述管道中的每一步——流程的基本单元。对于每个任务,我们可以通过参数提供设置。可以是任何东西——标签、描述、重试、缓存设置等。以下是一个示例@task

@taskdefextract_data(source,pipe):...returnresult@taskdefload_data_to_dwh(data,database:bigquery):...returntable_name@taskdefrun_data_checks(dataset,row_conditions:qa_dataset):...

在下面的示例中,我们使用@flow装饰器创建一个流程。该流程将按顺序执行所有任务,生成输入和输出,并将它们从一个任务传递到另一个任务。

@flowdefetl_workflow():s3_data=extract_data(google_spreadsheet,pipe_object_with_settings)dataset=load_data_to_dwh(s3_data,database)run_data_checks(table_name,rules)if__name__=="__main__":etl_workflow()

Prefect 使用工作池来有效地管理工作分配,并在所需环境中(测试、开发、生产)优先处理任务,以实现最佳性能和自动化测试。我们可以在本地或云中创建工作者(代理)。

Prefect 可以使用pip安装:

pip install-U"prefect==2.17.1"# or get the latest version from prefectpip install-U git+https://github.com/PrefectHQ/prefect

这里是一个简单的脚本来从 NASA API 提取数据:

# ./asteroids.pyimportrequests API_KEY="fsMlsu69Y7KdMNB4P2m9sqIpw5TGuF9IuYkhURzW"ASTEROIDS_API_URL="https://api.nasa.gov/neo/rest/v1/feed"defget_asteroids_data():print('Fetching data from NASA Asteroids API...')session=requests.Session()url=ASTEROIDS_API_URL apiKey=API_KEY requestParams={'api_key':apiKey,'start_date':'2023-04-20','end_date':'2023-04-21'}response=requests.get(url,params=requestParams)print(response.status_code)near_earth_objects=(response.json())['near_earth_objects']returnnear_earth_objectsif__name__=="__main__":get_asteroids_data()

我们可以将其转换为以下流程:

# my_nasa_pipeline.pyimportrequests# an HTTP client library and dependency of Prefectfromprefectimportflow,task API_KEY="fsMlsu69Y7KdMNB4P2m9sqIpw5TGuF9IuYkhURzW"ASTEROIDS_API_URL="https://api.nasa.gov/neo/rest/v1/feed"@task(retries=2)defget_asteroids_data(api_key:str,url:str):"""Get asteroids close to Earth for specific datess - will retry twice after failing"""print('Fetching data from NASA Asteroids API...')session=requests.Session()url=ASTEROIDS_API_URL apiKey=API_KEY requestParams={'api_key':apiKey,'start_date':'2023-04-20','end_date':'2023-04-21'}response=session.get(url,params=requestParams)print(response.status_code)near_earth_objects=(response.json())['near_earth_objects']returnnear_earth_objects@taskdefsave_to_s3(data):"""Save data to S3 storage"""# Do some ETL hereresult=print(data)returnresult@flow(log_prints=True)defasteroids_info(date:str="2023-04-21"):""" Given a date, saves data to S3 storage """asteroids_data=get_asteroids_data(API_KEY,ASTEROIDS_API_URL)print(f"Close to Eart asteroids:{asteroids_data}")s3_location=save_to_s3(asteroids_data)print(f"Saved to:{s3_location}")if__name__=="__main__":asteroids_info("2023-04-21")

运行流程

python my_nasa_pipeline.py

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/3699e987960ede79ef4cc5e812c7f49d.png

作者图片

创建 Prefect 服务器

现在我们想要创建一个部署来安排我们的流程运行:

# create_deployment.pyfromprefectimportflowif__name__=="__main__":flow.from_source(# source="https://github.com/your_repo/prefect_nasa.git",source="./",entrypoint="my_nasa_pipeline.py:asteroids_info",).deploy(name="my-first-deployment",work_pool_name="test-pool",cron="0 1 * * *",)

在你的命令行中运行:

python create_deployment.py# Run the workflow manually# prefect deployment run 'repo-info/my-first-deployment'

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/5500a21d38c212e24dffb83722aacac8.png

作者图片

为了运行我们的计划工作流程,我们想要创建一个工作池:

prefect work-pool create test-pool prefect worker start--pool'test-pool'prefect server start

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/12c8e67d5c7c72d3ae8037eed045e751.png

作者图片

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/9278b2493385c6c37e30edb7038f42ea.png

作者图片

或者,我们可以使用此命令并使用提示来创建部署:

prefect deploy./my_nasa_pipeline.py:asteroids_info-n my-first-deployment

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/f82043c0563d964828ac5966f0629c41.png

作者图片

Prefect 集成

Prefect 通过其强大的集成集变得更加强大,完整的列表可以在这里找到 [3]。

例如,我们可以使用 Prefect 与 DBT 结合,这使得我们的数据建模更加强大。在你的命令行中运行:

pip install-U prefect-dbt# register blocks to start using tghem in Prefectprefect block register-m prefect_dbt

现在我们可以使用 Prefect 与dbt-core结合:

fromprefectimportflowfromprefect_dbt.cli.commandsimportDbtCoreOperation@flowdeftrigger_dbt_flow()->str:result=DbtCoreOperation(commands=["pwd","dbt debug","dbt run"],project_dir="PROJECT-DIRECTORY-PLACEHOLDER",profiles_dir="PROFILES-DIRECTORY-PLACEHOLDER").run()returnresultif__name__=="__main__":trigger_dbt_flow()

在我的一个故事中,我写了如何配置 DBT 以便从任何环境中方便地执行:

数据库数据转换数据工程师指南

时间,2019

Temporal 支持通过 API 触发工作流程,并允许多个并发工作流程执行。

优点:

  • 容错和重试:对于任何失败的任务,具有自动重试的能力。管理可能持续几天甚至几个月的长时间运行工作流程的能力。

  • 可扩展性:适应高吞吐量工作负载的能力。

  • 增强监控:对工作流程执行和历史数据的洞察。

  • 支持时间查询和事件驱动的工作流程。

  • 多样性:支持多种编程语言。

Temporal 可以编排复杂的数据处理工作流程,在保持整个过程中数据一致性和可靠性的同时,有效地管理失败。它管理分布式系统状态的能力使 Temporal 能够提高资源分配。

Temporal 可以适应持续运行的工作流程,使各种实体的生命周期建模成为可能。Temporal 工作流程本质上是动态的,能够执行多步核心业务逻辑。它们可以向外部进程发送或等待信号,从而促进对人类的提醒或启动干预过程。

Flyte,2020

Flyte 是一个基于 Kubernetes 构建的开源编排工具,用于管理机器学习和 AI 工作流程。由于数据对业务至关重要,运行大规模计算作业是必要的,但在操作上具有挑战性。扩展、监控和管理集群可能会给产品团队带来负担,减缓创新。这些工作流程还具有复杂的数据依赖关系,没有平台抽象,协作将变得困难。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/ed3d6b5d188022f9c9b5ef91c7c619d6.png

作者图片

Flyte 旨在通过简化这些任务来提高机器学习和数据处理的发展速度。它确保了可靠的、可扩展的计算,让团队能够专注于业务逻辑。此外,它促进了团队间的共享和重用,一次解决问题并增强协作,因为数据和机器学习角色正在合并。

优势

  • 端到端数据溯源:这允许你在每个执行阶段监控你的数据和 ML 工作流程的健康状况。轻松分析数据流以确定任何错误的来源。

  • 参数和缓存:考虑到机器学习的设计,它允许动态管道和缓存预计算工件的使用。例如,在超参数优化期间,你可以轻松地为每次运行应用不同的参数。如果一个任务在之前的执行中已经被计算,Flyte 将有效地使用缓存的输出,节省时间和资源。

  • 多租户和无服务器:Flyte 消除了管理基础设施的需求,让你能够专注于业务挑战而不是机器。作为一个多租户服务,它为你的工作提供了一个隔离的存储库,允许你独立部署和扩展。你的代码带有版本号,与其依赖项一起容器化,并且每次执行都是可重现的。

  • 可扩展:Flyte 任务可能非常复杂。它可能是一个简单的 ETL 作业,或是对远程 Hive 集群或分布式 Spark 执行的调用。最佳解决方案可能托管在其他地方,因此任务的可扩展性允许你将外部解决方案集成到 Flyte 和你的基础设施中。

  • 异构和多语言支持:数据管道可能很复杂。每一步都可以用不同的语言编写并使用各种框架。一步可能使用 Spark 来准备数据,而下一步则训练深度学习模型。

Dagster,2020

Dagster 是一个开源的数据编排器,它促进了数据管道的开发、管理和监控。它支持作业监控、调试、数据资产检查和回填执行。

本质上,Dagster 充当构建数据管道的框架,在您的数据生态系统中作为抽象层。

异构一切

优势:

  • 异构:Dagster 提供了一个全面的接口,使用户能够在一个地方构建、测试、部署、运行和优化他们的数据管道。这种统一的方法简化了数据工程师的工作流程,使他们更容易管理数据处理的整个生命周期。

  • 改进的抽象:它通过软件定义资产(SDAs)采用声明式编程,这增强了抽象并简化了管道设计。用户从共享、可重用和可配置的组件中受益,这些组件促进了高效的数据处理。此外,Dagster 还包括声明式调度功能,使用户能够实施新鲜度策略,确保数据是最新的。

  • 测试能力:为确保数据的完整性,Dagster 通过定义输入和输出可接受值的类型来整合质量检查。它还支持代码和数据的资产版本控制,以及增强性能的缓存机制。

  • 优秀的监控功能:Dagster 配备了内置的可观察性仪表板,提供对管道性能和健康状况的实时监控。其内置的可测试性功能允许用户无缝验证其管道的功能。

  • 强大的集成:Dagster 与您数据生态系统中各种工具提供强大的集成,包括 Airflow、dbt、Databricks、Snowflake、Fivetran、Great Expectations、Spark 和 Pandas。

Mage,2021

看起来 Mage 是针对速度和可扩展性而创建的,专门针对 Kubernetes 等容器化环境。2021 年创建的 Mage 旨在满足微服务架构中对实时数据处理日益增长的需求。

使用 Mage,我们可以使用多种编程语言,如 R、Python 和 SQL,结合强大的模板功能。Mage 无疑在编排空间引入了一些新事物。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/a87824d42aa9d6ba5be1af99750ba383.png

Mage 的 DBT 支持。图片由作者提供。

优势:

  • DRY 组件和 DBT 支持:采用模块化设计构建,例如,我们可以轻松集成不同的仓库、包和组件。轻松构建、运行和管理您的 dbt 模型。对于数据网格平台非常有用。

  • 成本效益:声称通过明显的成本效益优化资源分配和消耗。

  • 原生 Kubernetes:为现代数据平台架构提供简易部署。

  • 实时数据管道:与实时数据完美配合。对于许多公司来说,摄取和转换流数据是一个真正的痛点。

  • 内置集成:支持数十个源和目标,例如 Amazon S3、BigQuery、Redshift、PowerBI、Tableau、Salesforce、Snowflake 等。

魔法师的用户界面还提供了在部署前预览您创建的管道的能力。

此功能允许用户可视化和检查其管道的结构和功能,确保在上线前一切设置正确。通过提供此预览选项,Mage 帮助用户识别任何可能需要的问题或调整,从而最终导致更顺畅的部署过程,并提高整体工作流程效率。

Mage 的关键概念与 Prefect 几乎相同:项目、块、管道、数据产品、回填、触发器和运行。

允许我预览代码输出的交互式笔记本 UI 是我最喜欢的功能之一。

Orchestra,2024

由 Hugo Lu 于 2023 年创建。Orchestra 是新一代编排解决方案,专注于无服务器架构,以在一个平台上统一所有内容。

优势:

  • 无服务器模块化架构:您不再需要 Kubernetes 集群。

  • 快速交付:在几分钟内构建企业级编排。

  • 加强监控:不要浪费时间寻找错误。

  • 最先进的集成:数百种开箱即用的集成。

结论

这项研究强调,在数据管道编排中,强调异构性、支持多种编程语言、有效使用元数据和采用数据网格架构是构建现代、健壮和可扩展数据平台的关键趋势。

例如,Apache Airflow 提供了各种预构建的数据连接器,这些连接器可以促进跨各种云供应商(包括 AWS、GCP 和 Azure)的无缝 ETL 任务。然而,它缺少诸如任务内检查点和缓存等特性,并且没有针对机器学习管道进行优化。

预计在未来几年中,支持多种编程语言将成为一个重要趋势。例如,Temporal 兼容各种语言和运行时,而 Airflow 主要强调 Python。

数据网格时代的协作是数据空间成功的关键。您的组织可能有专门负责数据管理、分类模型和预测模型的独立团队,所有这些团队都可以协作使用同一个平台——例如 Mage 或 Flyte。这允许他们在同一个工作空间内操作,而不会相互干扰。

尽管工具的演变确实令人着迷,但数据工程的基本原则保持不变:

数据工程师的 Python

我们可以见证数据提取和转换的各种 ETL 框架的“寒武纪大爆发”。许多都是开源的,并且基于 Python,这并不令人惊讶。

现代数据工程

推荐阅读

[1]www.datacamp.com/tutorial/ml-workflow-orchestration-with-prefect

[2]docs.prefect.io/latest/concepts/work-pools/

[3]docs.prefect.io/latest/integrations/prefect-dbt/

[4]docs.prefect.io/latest/integrations/prefect-dbt/

[5]engineering.atspotify.com/2022/03/why-we-switched-our-data-orchestration-service/

[6]thenewstack.io/flyte-an-open-source-orchestrator-for-ml-ai-workflows/?utm_referrer=https%3A%2F%2Fwww.google.com%2F

[7]atlan.com/dagster-data-orchestration/

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/23 11:22:30

我们如何知道人工智能是否只是烟雾弹?

原文:towardsdatascience.com/how-do-we-know-if-ai-is-smoke-and-mirrors-16ed5b6877aa?sourcecollection_archive---------4-----------------------#2024-04-17 思考“人工智能革命”更像是印刷术还是加密货币。(剧透:两者都不是。&#…

作者头像 李华
网站建设 2026/4/23 11:20:37

金融合规Agent日志深度剖析:如何用日志数据应对SOX、GDPR双重挑战?

第一章:金融合规 Agent 的审计日志在金融行业,系统操作的可追溯性与安全性至关重要。审计日志作为合规性保障的核心组件,能够记录所有关键操作的时间、主体、行为和上下文信息,为监管审查、异常检测和责任追溯提供数据支撑。审计日…

作者头像 李华
网站建设 2026/4/23 11:21:50

LeetCode 451 - 根据字符出现频率排序

文章目录 摘要描述题解答案(整体思路)第一步:统计字符频率第二步:按频率排序第三步:按排序结果拼接字符串 题解代码(Swift 可运行 Demo)题解代码分析1. 为什么用 Dictionary 统计?2.…

作者头像 李华
网站建设 2026/4/23 11:21:54

工业机器人精度检测困局突破:基于激光跟踪仪的4维评估体系构建

第一章:工业机器人Agent的精度定义与挑战工业机器人Agent在现代智能制造中承担着装配、焊接、搬运等关键任务,其操作精度直接影响产品质量与生产效率。精度通常分为**绝对精度**和**重复精度**两类:前者指机器人末端执行器到达指定目标点的实…

作者头像 李华
网站建设 2026/4/23 11:22:02

2025年中南大学计算机考研复试机试真题

2025年中南大学计算机考研复试机试真题 2025年中南大学计算机考研复试上机真题 历年中南大学计算机考研复试上机真题 历年中南大学计算机考研复试机试真题 更多学校题目开源地址:https://gitcode.com/verticallimit1/noobdream N 诺 DreamJudge 题库&#xff1…

作者头像 李华
网站建设 2026/4/23 11:21:59

如何用AI新技术绕过老牌业务,比如微信或搜索等?靠范式转移(Paradigm Shift) 去App化,接管入口 合成社交,提供超级情绪价值 生成式媒体,无限个性化 第二大脑,极致隐私与记忆

绕过微信(或类似的垄断级Super App)的核心逻辑,绝对不是“做一个更好的微信”,而是让“发消息”这个动作本身变得过时或次要。 老牌业务的护城河在于网络效应(所有人都在这里)和路径依赖(习惯&a…

作者头像 李华