如果你曾写过这样的代码——
1 2 3 4 5 6 7 8 9 10
| Result process(Request req) { req = validate(req); if (!req.valid) return error; req = enrich(req); req = transform(req); req = filterFields(req); saveToDB(req); sendNotification(req); return success; }
|
——你一定感受过它的问题:函数体越来越长、每个步骤紧耦合、加一步就要改主流程、单元测试只能测整体。
Pipeline 模式就是为这种"多步骤顺序处理"场景而生的。它把每一步封装成独立的阶段(Stage),数据像流水线一样在阶段之间传递——每个阶段只做一件事,且只关心自己的输入和输出。
一、什么是 Pipeline 模式
1.1 核心思想
Pipeline 模式将复杂处理流程拆解为一系列独立的阶段(Stage),每个阶段接收数据、加工数据、输出数据,阶段之间通过一个统一的**管道(Pipeline)**串联。
1
| 输入 → [Stage 1] → [Stage 2] → [Stage 3] → ... → [Stage N] → 输出
|
四个关键特征:
| 特征 |
说明 |
| 单向流动 |
数据从第一个阶段流入,依次经过每个阶段,最终流出 |
| 阶段独立 |
每个阶段不依赖其他阶段的内部实现,只依赖接口约定 |
| 可组合 |
阶段可以自由排列组合,形成不同的处理链 |
| 可复用 |
同一个阶段可以在多条 Pipeline 中复用 |
1.2 与责任链模式的本质区别
Pipeline 和责任链(Chain of Responsibility)是两种容易混淆的模式,但它们的意图截然不同:
| 维度 |
Pipeline |
责任链 |
| 数据流 |
每个阶段都必须处理数据 |
处理者可以选择不处理,跳过 |
| 终止条件 |
数据走完所有阶段 |
任一处理者处理后即可终止 |
| 数据变换 |
每个阶段修改数据并传递给下一阶段 |
通常不修改数据,只做判断 |
| 典型场景 |
ETL、编译管道、HTTP 中间件 |
审批流、事件冒泡、异常处理 |
| 阶段感知 |
阶段知道自己的位置(有时) |
处理者不知道自己在链中的位置 |
一句话区分:Pipeline 是在"加工数据",责任链是在"寻找谁来处理"。
二、基础实现
2.1 最小可用 Pipeline
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| #include <vector> #include <memory> #include <functional>
template<typename T> class Pipeline { public: using StageFunc = std::function<T(const T&)>; Pipeline& addStage(StageFunc stage) { stages.push_back(std::move(stage)); return *this; } T execute(T input) const { T current = std::move(input); for (const auto& stage : stages) { current = stage(current); } return current; } private: std::vector<StageFunc> stages; };
|
使用示例:一个最简单的字符串处理管道
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| Pipeline<std::string> textPipeline; textPipeline .addStage([](const std::string& s) { auto start = s.find_first_not_of(" \t\n\r"); auto end = s.find_last_not_of(" \t\n\r"); return (start == std::string::npos) ? "" : s.substr(start, end - start + 1); }) .addStage([](const std::string& s) { std::string result = s; std::transform(result.begin(), result.end(), result.begin(), ::toupper); return result; }) .addStage([](const std::string& s) { std::string result; bool lastWasSpace = false; for (char c : s) { if (std::isspace(c)) { if (!lastWasSpace) result += ' '; lastWasSpace = true; } else { result += c; lastWasSpace = false; } } return result; });
std::string result = textPipeline.execute(" hello world ");
|
2.2 带错误的 Pipeline
实际项目中,处理过程可能失败。这时需要对结果类型建模:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| template<typename T> class Result { public: static Result ok(T value) { return Result(true, std::move(value), ""); } static Result error(std::string msg) { return Result(false, T{}, std::move(msg)); } bool isOk() const { return success; } const T& value() const { return data; } const std::string& errorMsg() const { return errMsg; } private: bool success; T data; std::string errMsg; Result(bool ok, T val, std::string err) : success(ok), data(std::move(val)), errMsg(std::move(err)) {} };
template<typename T> class FalliblePipeline { public: using StageFunc = std::function<Result<T>(const T&)>; FalliblePipeline& addStage(StageFunc stage) { stages.push_back(std::move(stage)); return *this; } Result<T> execute(T input) const { T current = std::move(input); for (const auto& stage : stages) { auto result = stage(current); if (!result.isOk()) { return result; } current = result.value(); } return Result<T>::ok(current); } private: std::vector<StageFunc> stages; };
|
短路行为是 Pipeline 的关键设计决策之一。上面选择"遇错即停",但也可以设计为"收集所有错误继续执行",取决于业务场景。
三、进阶:并行 Pipeline
当 Pipeline 中某几个阶段可以并行执行时(阶段之间无数据依赖),可以进一步榨干多核性能。
3.1 并行阶段示意
1 2 3
| ┌→ [Stage 2a: OCR识别] ──┐ 输入 → [Stage 1] ─┼→ [Stage 2b: 人脸检测] ─┼→ [Stage 3: 结果聚合] → 输出 └→ [Stage 2c: 关键词提取] ┘
|
Stage 2a、2b、2c 互不依赖,可以并行。Stage 3 等它们全部完成后再聚合。
3.2 C++ 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| #include <future> #include <thread>
template<typename T> class ParallelPipeline { public: struct StageGroup { std::vector<std::function<T(T)>> stages; }; ParallelPipeline& addSequential(std::function<T(T)> stage) { groups.push_back({{std::move(stage)}}); return *this; } ParallelPipeline& addParallel(std::vector<std::function<T(T)>> stages) { groups.push_back({std::move(stages)}); return *this; } T execute(T input) const { T current = std::move(input); for (const auto& group : groups) { if (group.stages.size() == 1) { current = group.stages[0](current); } else { auto results = runParallel(current, group.stages); current = mergeResults(std::move(results)); } } return current; } private: std::vector<StageGroup> groups; std::vector<T> runParallel(const T& input, const std::vector<std::function<T(T)>>& stages) const { std::vector<std::future<T>> futures; for (const auto& stage : stages) { futures.push_back(std::async(std::launch::async, [&stage, &input]() { return stage(input); })); } std::vector<T> results; for (auto& f : futures) { results.push_back(f.get()); } return results; } T mergeResults(std::vector<T> results) const { if constexpr (std::is_same_v<T, std::string>) { std::string merged; for (const auto& r : results) { merged += r + "\n"; } return merged; } return results.empty() ? T{} : results[0]; } };
|
3.3 何时用并行 Pipeline
并行 Pipeline 不是银弹。判断标准:
| 条件 |
适合并行? |
| 阶段之间无数据依赖 |
✅ |
| 每个阶段的耗时较均匀 |
✅ |
| 阶段数量超过 CPU 核心数 |
❌ 反而引入调度开销 |
| 数据量小、阶段计算轻 |
❌ 线程创建开销大于计算本身 |
| 阶段有共享状态 |
❌ 需要同步,抵消并行收益 |
经验法则:单个阶段耗时 > 100μs 且阶段之间无依赖时,并行才有正向收益。
四、典型场景
4.1 编译管道
这是 Pipeline 模式最经典的实现。从源码到可执行文件,数据(源代码文本)依次经过:
1
| 源码 → [词法分析] → [语法分析] → [语义分析] → [中间代码生成] → [优化] → [目标代码生成] → 可执行文件
|
LLVM 和 GCC 的内部架构都遵循这个 Pipeline 模型。每个 Pass 是一个阶段,Pass 之间通过 IR(中间表示)传递数据。
4.2 HTTP 中间件
Web 框架的中间件机制本质上是 Pipeline:
1
| 请求 → [日志] → [鉴权] → [限流] → [参数校验] → [业务处理] → 响应
|
Go 的 net/http 中间件、Express/Koa 的中间件、ASP.NET Core 的 Middleware Pipeline,都是同一模式的不同实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class HttpPipeline { std::vector<Middleware> middlewares; Response handle(Request req) { auto handler = wrapHandler(businessLogic); for (auto it = middlewares.rbegin(); it != middlewares.rend(); ++it) { handler = it->wrap(handler); } return handler(req); } };
|
4.3 ETL 数据处理
在数据工程中,ETL(Extract-Transform-Load)流程天然适合 Pipeline:
1
| 数据源 → [抽取] → [清洗] → [转换] → [校验] → [聚合] → [加载到数据仓库]
|
4.4 图像/视频处理
1 2
| 原始帧 → [解码] → [缩放] → [降噪] → [色彩校正] → [人脸检测] → [编码输出] ↘ [OCR识别] ↗
|
两个下游阶段(人脸检测、OCR)可以并行。
五、设计考量
5.1 阶段粒度
太粗:一个阶段做太多事,丧失灵活性和可复用性
太细:阶段数量爆炸,Pipeline 的执行开销超过业务逻辑
经验法则:一个阶段 = 一个明确的、可独立命名的职责。如果命名时不得不使用"和"字("解析和校验"),就该拆成两个。
5.2 阶段间数据格式
是所有阶段共享同一数据类型,还是每个阶段有不同的输入/输出类型?
1 2 3 4 5 6 7 8 9
| 方案 A(统一类型): Pipeline<Request> pipeline; 优点:简单 缺点:Request 会成为"上帝对象",携带所有阶段可能需要的字段
方案 B(类型转换): Pipeline<RawData, CleanedData, ProcessedData, ...> pipeline; 优点:类型安全,每个阶段明确表达自己的输入输出 缺点:实现复杂,需要类型列表或 variant
|
建议:初期用方案 A(统一类型),当 Request 膨胀到不可维护时再迁移到方案 B。
5.3 错误处理策略
| 策略 |
行为 |
适用场景 |
| 短路停止 |
遇错立即返回,跳过后续阶段 |
多数业务场景 |
| 收集继续 |
记录错误仍继续执行,最后汇总 |
批量处理、数据校验 |
| 降级跳过 |
遇错跳过当前阶段,继续后续 |
非关键步骤(如日志、分析) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| enum class ErrorPolicy { FailFast, CollectAndContinue, SkipOnError };
template<typename T> class ConfigurablePipeline { FalliblePipeline<T> mainPipeline; FalliblePipeline<T> fallbackPipeline; public: Result<T> execute(const T& input, ErrorPolicy policy) { switch (policy) { case ErrorPolicy::FailFast: return mainPipeline.execute(input); case ErrorPolicy::SkipOnError: auto result = mainPipeline.execute(input); return result.isOk() ? result : fallbackPipeline.execute(input); } } };
|
5.4 生命周期管理
Pipeline 对象本身应该无状态还是有状态?
- 无状态 Pipeline:阶段本身不保存状态,每次 execute 独立。线程安全,可复用。推荐。
- 有状态 Pipeline:阶段内部有缓存或计数器。需要关注线程安全和重置逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| class TrimStage { public: std::string operator()(const std::string& input) const { auto start = input.find_first_not_of(" \t\n\r"); auto end = input.find_last_not_of(" \t\n\r"); return (start == std::string::npos) ? "" : input.substr(start, end - start + 1); } };
class CounterStage { std::atomic<uint64_t> count{0}; public: Request operator()(const Request& input) { count.fetch_add(1); return input; } uint64_t getCount() const { return count.load(); } };
|
六、实战:日志处理 Pipeline
用一个完整的例子来串联所有概念——一个服务端日志处理 Pipeline。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| struct LogEntry { std::string timestamp; std::string level; std::string service; std::string message; bool isValid = true; };
Result<LogEntry> parseStage(const std::string& raw) { std::istringstream ss(raw); std::string ts, level, service, msg; if (!std::getline(ss, ts, '|') || !std::getline(ss, level, '|') || !std::getline(ss, service, '|') || !std::getline(ss, msg)) { return Result<LogEntry>::error("Parse failed: " + raw); } return Result<LogEntry>::ok({ts, level, service, msg, true}); }
Result<LogEntry> validateStage(const LogEntry& entry) { static const std::set<std::string> validLevels = {"INFO", "WARN", "ERROR", "FATAL"}; if (entry.timestamp.empty() || entry.service.empty()) { return Result<LogEntry>::error("Missing required fields"); } if (!validLevels.count(entry.level)) { return Result<LogEntry>::error("Invalid level: " + entry.level); } LogEntry validated = entry; validated.isValid = true; return Result<LogEntry>::ok(validated); }
Result<LogEntry> maskStage(const LogEntry& entry) { LogEntry masked = entry; static std::regex phoneRegex(R"(\b1[3-9]\d{9}\b)"); masked.message = std::regex_replace(entry.message, phoneRegex, "1**********"); return Result<LogEntry>::ok(masked); }
Result<LogEntry> routeStage(const LogEntry& entry) { if (entry.level == "ERROR" || entry.level == "FATAL") { sendAlert(entry); } writeToStorage(entry); return Result<LogEntry>::ok(entry); }
int main() { FalliblePipeline<LogEntry> logPipeline; logPipeline .addStage(validateStage) .addStage(maskStage) .addStage(routeStage); std::string rawLine; while (std::getline(std::cin, rawLine)) { auto parsed = parseStage(rawLine); if (!parsed.isOk()) { std::cerr << "[SKIP] " << parsed.errorMsg() << std::endl; continue; } auto result = logPipeline.execute(parsed.value()); if (!result.isOk()) { std::cerr << "[FAIL] " << result.errorMsg() << std::endl; } } }
|
这个例子展示了 Pipeline 的核心价值:
- 每个阶段独立:你可以单独测试
parseStage、validateStage、maskStage
- 易于扩展:想加一个"采样"阶段?
addStage(samplingStage) 一行搞定
- 错误短路:解析失败的日志直接跳过,不会进入校验和脱敏
- 可替换:生产环境和测试环境可以用不同的
routeStage
七、总结
Pipeline 模式的本质是用组合替代过程。它不是技术上的创新,而是组织上的优化——把一个大函数拆成一系列小阶段,然后声明式地组合它们。
三个最重要的原则:
- 每个阶段做且只做一件事——如果一个阶段既校验又脱敏,拆成两个
- 阶段之间通过数据耦合,不通过控制流耦合——阶段不调用其他阶段,只返回结果
- Pipeline 本身是可配置、可替换的——不同环境、不同场景可以组装不同的 Pipeline
当你下次面对一个超过 50 行的 process() 函数时,可以问自己一个问题:这里面的步骤,哪些可以独立成一个阶段? 答案通常决定了 Pipeline 的边界。
延伸阅读
- 责任链模式:Pipeline 的"近亲",适用于"谁处理不确定"的场景
- 本系列《Runtime Architecture》中的其他运行时架构模式