如果你曾写过这样的代码——

1
2
3
4
5
6
7
8
9
10
Result process(Request req) {
req = validate(req);
if (!req.valid) return error;
req = enrich(req);
req = transform(req);
req = filterFields(req);
saveToDB(req);
sendNotification(req);
return success;
}

——你一定感受过它的问题:函数体越来越长、每个步骤紧耦合、加一步就要改主流程、单元测试只能测整体。

Pipeline 模式就是为这种"多步骤顺序处理"场景而生的。它把每一步封装成独立的阶段(Stage),数据像流水线一样在阶段之间传递——每个阶段只做一件事,且只关心自己的输入和输出。

一、什么是 Pipeline 模式

1.1 核心思想

Pipeline 模式将复杂处理流程拆解为一系列独立的阶段(Stage),每个阶段接收数据、加工数据、输出数据,阶段之间通过一个统一的**管道(Pipeline)**串联。

1
输入 → [Stage 1] → [Stage 2] → [Stage 3] → ... → [Stage N] → 输出

四个关键特征

特征 说明
单向流动 数据从第一个阶段流入,依次经过每个阶段,最终流出
阶段独立 每个阶段不依赖其他阶段的内部实现,只依赖接口约定
可组合 阶段可以自由排列组合,形成不同的处理链
可复用 同一个阶段可以在多条 Pipeline 中复用

1.2 与责任链模式的本质区别

Pipeline 和责任链(Chain of Responsibility)是两种容易混淆的模式,但它们的意图截然不同:

维度 Pipeline 责任链
数据流 每个阶段都必须处理数据 处理者可以选择不处理,跳过
终止条件 数据走完所有阶段 任一处理者处理后即可终止
数据变换 每个阶段修改数据并传递给下一阶段 通常不修改数据,只做判断
典型场景 ETL、编译管道、HTTP 中间件 审批流、事件冒泡、异常处理
阶段感知 阶段知道自己的位置(有时) 处理者不知道自己在链中的位置

一句话区分:Pipeline 是在"加工数据",责任链是在"寻找谁来处理"。

二、基础实现

2.1 最小可用 Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// pipeline.h
#include <vector>
#include <memory>
#include <functional>

template<typename T>
class Pipeline {
public:
using StageFunc = std::function<T(const T&)>;

Pipeline& addStage(StageFunc stage) {
stages.push_back(std::move(stage));
return *this; // 支持链式调用
}

T execute(T input) const {
T current = std::move(input);
for (const auto& stage : stages) {
current = stage(current);
}
return current;
}

private:
std::vector<StageFunc> stages;
};

使用示例:一个最简单的字符串处理管道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Pipeline<std::string> textPipeline;
textPipeline
.addStage([](const std::string& s) {
// 阶段1:移除首尾空白
auto start = s.find_first_not_of(" \t\n\r");
auto end = s.find_last_not_of(" \t\n\r");
return (start == std::string::npos) ? "" : s.substr(start, end - start + 1);
})
.addStage([](const std::string& s) {
// 阶段2:全部转大写
std::string result = s;
std::transform(result.begin(), result.end(), result.begin(), ::toupper);
return result;
})
.addStage([](const std::string& s) {
// 阶段3:将多个空格合并为一个
std::string result;
bool lastWasSpace = false;
for (char c : s) {
if (std::isspace(c)) {
if (!lastWasSpace) result += ' ';
lastWasSpace = true;
} else {
result += c;
lastWasSpace = false;
}
}
return result;
});

std::string result = textPipeline.execute(" hello world ");
// 结果:"HELLO WORLD"

2.2 带错误的 Pipeline

实际项目中,处理过程可能失败。这时需要对结果类型建模:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
template<typename T>
class Result {
public:
static Result ok(T value) { return Result(true, std::move(value), ""); }
static Result error(std::string msg) { return Result(false, T{}, std::move(msg)); }

bool isOk() const { return success; }
const T& value() const { return data; }
const std::string& errorMsg() const { return errMsg; }

private:
bool success;
T data;
std::string errMsg;
Result(bool ok, T val, std::string err)
: success(ok), data(std::move(val)), errMsg(std::move(err)) {}
};

template<typename T>
class FalliblePipeline {
public:
using StageFunc = std::function<Result<T>(const T&)>;

FalliblePipeline& addStage(StageFunc stage) {
stages.push_back(std::move(stage));
return *this;
}

// 一旦失败就短路,跳过后续阶段
Result<T> execute(T input) const {
T current = std::move(input);
for (const auto& stage : stages) {
auto result = stage(current);
if (!result.isOk()) {
return result; // 短路:立即返回错误
}
current = result.value();
}
return Result<T>::ok(current);
}

private:
std::vector<StageFunc> stages;
};

短路行为是 Pipeline 的关键设计决策之一。上面选择"遇错即停",但也可以设计为"收集所有错误继续执行",取决于业务场景。

三、进阶:并行 Pipeline

当 Pipeline 中某几个阶段可以并行执行时(阶段之间无数据依赖),可以进一步榨干多核性能。

3.1 并行阶段示意

1
2
3
         ┌→ [Stage 2a: OCR识别] ──┐
输入 → [Stage 1] ─┼→ [Stage 2b: 人脸检测] ─┼→ [Stage 3: 结果聚合] → 输出
└→ [Stage 2c: 关键词提取] ┘

Stage 2a、2b、2c 互不依赖,可以并行。Stage 3 等它们全部完成后再聚合。

3.2 C++ 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <future>
#include <thread>

template<typename T>
class ParallelPipeline {
public:
struct StageGroup {
// 同组内的阶段可以并行执行
std::vector<std::function<T(T)>> stages;
};

ParallelPipeline& addSequential(std::function<T(T)> stage) {
// 单阶段 = 只含一个阶段的组
groups.push_back({{std::move(stage)}});
return *this;
}

ParallelPipeline& addParallel(std::vector<std::function<T(T)>> stages) {
groups.push_back({std::move(stages)});
return *this;
}

T execute(T input) const {
T current = std::move(input);

for (const auto& group : groups) {
if (group.stages.size() == 1) {
// 顺序阶段
current = group.stages[0](current);
} else {
// 并行阶段:每个 stage 拿到的是 same 输入,输出需要合并
auto results = runParallel(current, group.stages);
current = mergeResults(std::move(results));
}
}

return current;
}

private:
std::vector<StageGroup> groups;

std::vector<T> runParallel(const T& input,
const std::vector<std::function<T(T)>>& stages) const {
std::vector<std::future<T>> futures;
for (const auto& stage : stages) {
futures.push_back(std::async(std::launch::async,
[&stage, &input]() { return stage(input); }));
}

std::vector<T> results;
for (auto& f : futures) {
results.push_back(f.get());
}
return results;
}

T mergeResults(std::vector<T> results) const {
// 默认:合并所有字符串
if constexpr (std::is_same_v<T, std::string>) {
std::string merged;
for (const auto& r : results) {
merged += r + "\n";
}
return merged;
}
// 其他类型:取第一个结果(可根据业务自定义)
return results.empty() ? T{} : results[0];
}
};

3.3 何时用并行 Pipeline

并行 Pipeline 不是银弹。判断标准:

条件 适合并行?
阶段之间无数据依赖
每个阶段的耗时较均匀
阶段数量超过 CPU 核心数 ❌ 反而引入调度开销
数据量小、阶段计算轻 ❌ 线程创建开销大于计算本身
阶段有共享状态 ❌ 需要同步,抵消并行收益

经验法则:单个阶段耗时 > 100μs 且阶段之间无依赖时,并行才有正向收益。

四、典型场景

4.1 编译管道

这是 Pipeline 模式最经典的实现。从源码到可执行文件,数据(源代码文本)依次经过:

1
源码 → [词法分析] → [语法分析] → [语义分析] → [中间代码生成] → [优化] → [目标代码生成] → 可执行文件

LLVM 和 GCC 的内部架构都遵循这个 Pipeline 模型。每个 Pass 是一个阶段,Pass 之间通过 IR(中间表示)传递数据。

4.2 HTTP 中间件

Web 框架的中间件机制本质上是 Pipeline:

1
请求 → [日志] → [鉴权] → [限流] → [参数校验] → [业务处理] → 响应

Go 的 net/http 中间件、Express/Koa 的中间件、ASP.NET Core 的 Middleware Pipeline,都是同一模式的不同实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 伪代码:HTTP 中间件 Pipeline
class HttpPipeline {
std::vector<Middleware> middlewares;

Response handle(Request req) {
auto handler = wrapHandler(businessLogic);
for (auto it = middlewares.rbegin(); it != middlewares.rend(); ++it) {
handler = it->wrap(handler); // 洋葱模型包装
}
return handler(req);
}
};
// 执行顺序:日志 → 鉴权 → 限流 → 业务 → 限流 → 鉴权 → 日志
// ────── 请求方向 → ← 响应方向 ──────

4.3 ETL 数据处理

在数据工程中,ETL(Extract-Transform-Load)流程天然适合 Pipeline:

1
数据源 → [抽取] → [清洗] → [转换] → [校验] → [聚合] → [加载到数据仓库]

4.4 图像/视频处理

1
2
原始帧 → [解码] → [缩放] → [降噪] → [色彩校正] → [人脸检测] → [编码输出]
↘ [OCR识别] ↗

两个下游阶段(人脸检测、OCR)可以并行。

五、设计考量

5.1 阶段粒度

太粗:一个阶段做太多事,丧失灵活性和可复用性
太细:阶段数量爆炸,Pipeline 的执行开销超过业务逻辑

经验法则:一个阶段 = 一个明确的、可独立命名的职责。如果命名时不得不使用"和"字("解析和校验"),就该拆成两个。

5.2 阶段间数据格式

是所有阶段共享同一数据类型,还是每个阶段有不同的输入/输出类型?

1
2
3
4
5
6
7
8
9
方案 A(统一类型):
Pipeline<Request> pipeline;
优点:简单
缺点:Request 会成为"上帝对象",携带所有阶段可能需要的字段

方案 B(类型转换):
Pipeline<RawData, CleanedData, ProcessedData, ...> pipeline;
优点:类型安全,每个阶段明确表达自己的输入输出
缺点:实现复杂,需要类型列表或 variant

建议:初期用方案 A(统一类型),当 Request 膨胀到不可维护时再迁移到方案 B。

5.3 错误处理策略

策略 行为 适用场景
短路停止 遇错立即返回,跳过后续阶段 多数业务场景
收集继续 记录错误仍继续执行,最后汇总 批量处理、数据校验
降级跳过 遇错跳过当前阶段,继续后续 非关键步骤(如日志、分析)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
enum class ErrorPolicy { FailFast, CollectAndContinue, SkipOnError };

template<typename T>
class ConfigurablePipeline {
FalliblePipeline<T> mainPipeline;
FalliblePipeline<T> fallbackPipeline; // 降级 Pipeline

public:
Result<T> execute(const T& input, ErrorPolicy policy) {
switch (policy) {
case ErrorPolicy::FailFast:
return mainPipeline.execute(input);
case ErrorPolicy::SkipOnError:
// 主 Pipeline 失败时走降级 Pipeline
auto result = mainPipeline.execute(input);
return result.isOk() ? result : fallbackPipeline.execute(input);
}
}
};

5.4 生命周期管理

Pipeline 对象本身应该无状态还是有状态

  • 无状态 Pipeline:阶段本身不保存状态,每次 execute 独立。线程安全,可复用。推荐。
  • 有状态 Pipeline:阶段内部有缓存或计数器。需要关注线程安全和重置逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 无状态阶段(推荐)
class TrimStage {
public:
std::string operator()(const std::string& input) const {
// const 成员函数,无副作用
auto start = input.find_first_not_of(" \t\n\r");
auto end = input.find_last_not_of(" \t\n\r");
return (start == std::string::npos) ? "" : input.substr(start, end - start + 1);
}
};

// 有状态阶段(需要谨慎)
class CounterStage {
std::atomic<uint64_t> count{0}; // 线程安全
public:
Request operator()(const Request& input) {
count.fetch_add(1);
// 利用计数做某些处理...
return input;
}
uint64_t getCount() const { return count.load(); }
};

六、实战:日志处理 Pipeline

用一个完整的例子来串联所有概念——一个服务端日志处理 Pipeline。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
struct LogEntry {
std::string timestamp;
std::string level; // INFO, WARN, ERROR
std::string service;
std::string message;
bool isValid = true;
};

// 阶段1:解析原始文本
Result<LogEntry> parseStage(const std::string& raw) {
// 格式:"2026-05-06T20:11:00Z|ERROR|PaymentService|timeout:3000ms"
std::istringstream ss(raw);
std::string ts, level, service, msg;

if (!std::getline(ss, ts, '|') ||
!std::getline(ss, level, '|') ||
!std::getline(ss, service, '|') ||
!std::getline(ss, msg)) {
return Result<LogEntry>::error("Parse failed: " + raw);
}

return Result<LogEntry>::ok({ts, level, service, msg, true});
}

// 阶段2:校验
Result<LogEntry> validateStage(const LogEntry& entry) {
static const std::set<std::string> validLevels = {"INFO", "WARN", "ERROR", "FATAL"};

if (entry.timestamp.empty() || entry.service.empty()) {
return Result<LogEntry>::error("Missing required fields");
}
if (!validLevels.count(entry.level)) {
return Result<LogEntry>::error("Invalid level: " + entry.level);
}

LogEntry validated = entry;
validated.isValid = true;
return Result<LogEntry>::ok(validated);
}

// 阶段3:脱敏
Result<LogEntry> maskStage(const LogEntry& entry) {
LogEntry masked = entry;
// 脱敏手机号
static std::regex phoneRegex(R"(\b1[3-9]\d{9}\b)");
masked.message = std::regex_replace(entry.message, phoneRegex, "1**********");
return Result<LogEntry>::ok(masked);
}

// 阶段4:分级存储(ERROR 走告警通道)
Result<LogEntry> routeStage(const LogEntry& entry) {
if (entry.level == "ERROR" || entry.level == "FATAL") {
sendAlert(entry); // 发送告警
}
writeToStorage(entry); // 持久化
return Result<LogEntry>::ok(entry);
}

// 组装 Pipeline
int main() {
FalliblePipeline<LogEntry> logPipeline;
logPipeline
.addStage(validateStage)
.addStage(maskStage)
.addStage(routeStage);

// 处理日志流
std::string rawLine;
while (std::getline(std::cin, rawLine)) {
auto parsed = parseStage(rawLine);
if (!parsed.isOk()) {
std::cerr << "[SKIP] " << parsed.errorMsg() << std::endl;
continue;
}

auto result = logPipeline.execute(parsed.value());
if (!result.isOk()) {
std::cerr << "[FAIL] " << result.errorMsg() << std::endl;
}
}
}

这个例子展示了 Pipeline 的核心价值:

  1. 每个阶段独立:你可以单独测试 parseStagevalidateStagemaskStage
  2. 易于扩展:想加一个"采样"阶段?addStage(samplingStage) 一行搞定
  3. 错误短路:解析失败的日志直接跳过,不会进入校验和脱敏
  4. 可替换:生产环境和测试环境可以用不同的 routeStage

七、总结

Pipeline 模式的本质是用组合替代过程。它不是技术上的创新,而是组织上的优化——把一个大函数拆成一系列小阶段,然后声明式地组合它们。

三个最重要的原则:

  1. 每个阶段做且只做一件事——如果一个阶段既校验又脱敏,拆成两个
  2. 阶段之间通过数据耦合,不通过控制流耦合——阶段不调用其他阶段,只返回结果
  3. Pipeline 本身是可配置、可替换的——不同环境、不同场景可以组装不同的 Pipeline

当你下次面对一个超过 50 行的 process() 函数时,可以问自己一个问题:这里面的步骤,哪些可以独立成一个阶段? 答案通常决定了 Pipeline 的边界。


延伸阅读

  • 责任链模式:Pipeline 的"近亲",适用于"谁处理不确定"的场景
  • 本系列《Runtime Architecture》中的其他运行时架构模式