Dagster — 把流水线想成数据资产图,不是任务序列
是什么
Dagster 是一套数据流水线编排系统。和 Airflow / Prefect 同类,但心智模型完全不同:它让你写”我要哪些数据资产”,不写”我要按顺序跑哪些任务”。
日常类比:传统编排像菜谱(“先洗菜、再切菜、再下锅”——按步骤),Dagster 像菜单(“我要一盘宫保鸡丁”——告诉系统结果,它自己倒推该做什么)。
你写:
@dg.assetdef daily_sales() -> None: # 拉今日订单写到表里 ...
@dg.asset(deps=[daily_sales])def weekly_sales() -> None: # 基于 daily_sales 聚合到周 ...Dagster 自动得出:“weekly_sales 依赖 daily_sales”,画一张资产图,按图调度。
为什么重要
数据团队三大老问题,资产模型一次性框住:
- 血缘看不见:表 A 来自表 B 来自表 C——Airflow 不知道,Dagster 把 deps 写在代码里,UI 直接画图
- 重跑成本高:哪个资产坏了?只重跑下游就行,Dagster 知道边界;Airflow 要人手算
- 测试难:Airflow 任务跟调度耦合,本地难跑;Dagster 资产函数就是普通 Python,本地直接调 + 单测
现代数据栈(dbt + 仓库 + ML 流水线)需要一个统一调度层——dbt 模型、Python 训练脚本、SQL 表都能当资产挂在同一张图上。
核心要点
四个概念吃透就能上手:
-
Software-Defined Asset(SDA,软件定义资产):一个资产 = 一段 Python 函数 + 一个唯一 key + 上游依赖。函数运行结果就是这个资产的”内容”。
-
Materialization(物化):跑一次资产函数,结果写到存储里。Dagster 记录每次物化的元数据(code_version、时间、输入哈希),下次想重跑能溯源。
-
IO Manager:资产存在哪里(S3、Snowflake、本地文件)由 IO Manager 决定,业务函数不关心存储细节。换存储只换配置,不改业务代码。
-
Asset Graph:所有资产组成一张有向图。UI 里看一眼就知道:“这张表坏了,下游 5 个报表都受影响”。
辅助概念:
- Partitions:按时间/地区切片同一个资产(每天的 daily_sales 是独立分区),支持回填
- Asset Checks:物化时跑数据质量校验(行数 / null 比例 / 唯一性)
- Sensors / Schedules:定时或事件触发资产刷新
实践案例
案例 1:从 task 思维切到 asset 思维
Airflow 写法(任务):
def extract(): ...def transform(): ...def load(): ...
dag = DAG(...)extract_task >> transform_task >> load_task你定义的是步骤的顺序。系统不知道每一步产生了什么。
Dagster 写法(资产):
@dg.assetdef raw_orders(): return fetch_orders()
@dg.assetdef cleaned_orders(raw_orders): return clean(raw_orders)
@dg.assetdef daily_revenue(cleaned_orders): return aggregate(cleaned_orders)你定义的是三张表,依赖是函数参数自动推出来的。系统知道每张表的内容、版本、来源。
案例 2:Partitions 让回填变简单
每日订单表按 date 分区:
@dg.asset(partitions_def=DailyPartitionsDefinition(start_date='2024-01-01'))def daily_orders(context): date = context.partition_key return fetch_orders_for(date)UI 里可以选 “2025-03-01 到 2025-03-15” 一键回填这 15 天。不用写循环、不用手算日期。
案例 3:dbt + Python ML 同图编排
dbt 模型可以批量导入成资产:
@dbt_assets(manifest=Path('target/manifest.json'))def my_dbt_assets(...): ...
@dg.asset(deps=[my_dbt_assets])def churn_model(): # 基于 dbt 产出的特征表训练模型 ...SQL 转换 + Python 训练在同一张资产图上调度,血缘连通。
踩过的坑
-
心智模型转换有成本:Airflow 用户头三天会想”我的 task 在哪”——Dagster 没有 task 概念(有 op,但平时不用)。要硬切到”我有什么数据”。
-
IO Manager 第一次写容易踩坑:
@asset函数return df之后,df 存哪里?默认是本地 pickle。生产环境要换成 S3 或 Snowflake,不配 IO Manager 就静默写到/tmp。 -
资产数量爆炸 UI 卡:超过 1000 个资产时 UI 加载慢,需要拆 Asset Group 或多 Code Location(多个 Python 进程加载不同的资产集)。
-
多维 Partitions 容易跑错:(date, region) 二维分区有 365×50 = 18250 个组合,回填范围算错就跑爆机器。先用小范围试。
-
OSS 和 Cloud 不完全一致:Dagster Cloud(托管)有些功能 OSS 没有(branch deployment、insights),文档偶尔混着写,看的时候要分清版本。
适用 vs 不适用场景
适用:
- 数据仓库 ETL/ELT,需要血缘和回填
- ML 流水线(特征 → 训练 → 评估 → 上线),各阶段产出物需要追溯
- dbt 编排 + 数据质量监控
- 需要把 SQL / Python / Spark 任务统一调度
不适用:
- 纯任务编排,没有”数据产物”概念(如部署流程、CI 任务)→ 用 Airflow / GitHub Actions 更轻
- 极简场景一两个 cron 脚本 → 直接 cron 别上重型框架
- 实时流处理(毫秒级)→ Dagster 是批/微批,要实时用 Flink / Kafka Streams
历史小故事(可跳过)
- 2018 年:Nick Schrock(前 Facebook 工程总监,GraphQL 联合创造者)创立 Elementl,开源 Dagster
- 2022-08:v1.0 发布,正式稳定 API
- 2024 年起:CEO 由 Pete Hunt(早期 React 团队成员)接任,公司更名 Dagster Labs
学到什么
- 抽象层级影响一切:把”步骤”换成”产物”,整个调度、血缘、测试、回填的体验都变了——这就是好抽象的力量
- 声明式 > 命令式:声明”我要什么”让系统帮你倒推;命令式”按这个顺序跑”把决策推给人
- 跟 dbt 互补:dbt 是 SQL 资产层,Dagster 是统一编排层,两者一起组成现代数据栈
- 可测试性来自解耦:业务函数不依赖调度器,本地能直接跑,这是 Airflow 时代的痛点
延伸阅读
- 官方文档:docs.dagster.io
- 入门:What & Why Dagster
- 源码:dagster-io/dagster
- airflow —— 任务编排前辈,对照看清楚资产模型的差异
- dbt —— SQL 资产层,常和 Dagster 一起用