一、一个问题

你的服务有没有这种情况:跑起来一切正常,Ctrl+C 一按就段错误?偶尔能干净退出,偶尔 core dump?日志里最后几行是乱序的,有时候甚至看不到 fatal 信息?

如果有,恭喜你——你的 stop() 顺序大概率是错的。

二、一个真实案例

一个基于 MQTT 的无人机控制服务(C++、Linux、多线程)。运行态一切完美:消息收发正常,日志正常,CPU 和内存正常。但只要退出程序,Valgrind 就报 112 个 use-after-free 错误。

调用链长这样:

1
2
3
Logger::workerThread()          // 后台线程,处理日志队列
→ LogPublisher::publish() // 封装日志为 protobuf,调用 MQTT 发送
→ MqttClient::publish() // 实际发送 MQTT 消息

原始的 stop()

1
2
3
4
5
6
7
void SystemFactory::stop()
{
EmergencyFactory::getInstance().stop();
DeviceManager::getInstance().stop();
MqttClient::getInstance().Stop(); // MQTT 停了
// Logger 去哪了???
}

时序是这样的:

1
2
3
4
5
6
7
T1: MqttClient::Stop() 执行完毕。MQTT 连接关闭,mosquitto 实例销毁。
T2: Logger 的 worker 线程依然在运行——没人通知它停下来。
T3: worker 线程取出下一条日志,调用 LogPublisher::publish()
T4: LogPublisher 调用 MqttClient::publish() → Client not connected(还好,只是报错)
T5: 程序退出,C++ 开始析构静态对象
T6: LogPublisher 先析构(topic 字符串释放)
T7: Logger 线程还在跑,继续访问 LogPublisher → use-after-free → 崩溃

MqttClient 不是最底层依赖,Logger 才是。但 Logger 的 stop() 根本没被调用。

三、依赖链法则

一个服务的组件之间一定存在依赖关系。依赖是有方向的——A 依赖 B 意味着:A 工作的时候,B 必须在运行

1
2
3
4
Logger 依赖 MqttClient
⇒ Logger 工作时,MqttClient 必须运行
⇒ MqttClient 不能在 Logger 之前停止
⇒ 停止顺序必须是:先停 Logger,后停 MqttClient

但别急——这只是两层的例子。真实服务的依赖链通常更长:

1
2
3
4
5
6
7
数据源(Ticker、Device、Emergency)
↓ 产生消息
消息队列(Logger、EventBus)
↓ 消费、转发
传输层(MqttClient、gRPC Channel)
↓ 发送到外部
外部系统(Broker、数据库)

停止顺序 = 依赖顺序的反向。

  • 数据源在最上层(只产出,不消费外部服务)
  • 消息队列在中间(消费数据源,依赖传输层)
  • 传输层在最下层(被所有人依赖)

所以 stop() 必须:

1
2
3
1. 停止数据源    → 不再有新消息产生
2. 停止消息队列 → 排空已有消息(此时传输层还活着)
3. 停止传输层 → 安全关闭(队列已空,不会有新消息到来)

翻译成代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void SystemFactory::stop()
{
stopTickerLocked(); // 1. 停止定时器(数据源之一)
EmergencyFactory::getInstance().stop(); // 2. 停止紧急事件(数据源之二)
DeviceManager::getInstance().stop(); // 3. 停止设备(数据源之三)

csLog::Logger::getInstance().stop(); // 4. 停止日志队列
// stop() 内部:
// ① 设置 exitFlag
// ② worker 排空队列
// ③ join() 等待线程结束
// 排空期间 MqttClient 还活着

MqttClient::getInstance().Stop(); // 5. 最后停止传输层
}

四、为什么要显式写 stop() 而不是依赖析构函数?

有人会说:C++ 有 RAII 啊,析构函数自动清理资源。为什么还要手写 stop()

因为析构函数解决的是单对象清理问题:一个对象销毁时,释放自己持有的资源。但它解决不了跨对象依赖顺序问题。

看这个例子:

1
2
3
4
5
6
7
8
9
10
class A {
~A() { /* A 的清理 */ }
};

class B {
std::thread _worker;
~B() {
_worker.join(); // B 知道要 join 自己的线程
}
};

看起来没问题。但如果 B 的 worker 线程使用了 A 的对象呢?

1
2
3
4
5
void B::workerLoop() {
while (!_exit) {
A::getInstance().doSomething(); // B 依赖 A
}
}

B 的析构函数要 join worker。但 C++ 静态对象的析构顺序不确定——A 可能已经在 B 之前析构了

  • 如果 A 先析构、B 后析构:B 的 ~B() 尝试 join worker → worker 还在访问已析构的 A → use-after-free
  • 如果 B 先析构、A 后析构:B 的 ~B() join worker 成功,A 还没析构 → 一切正常

你无法控制这个顺序。依赖析构函数 = 依赖编译器掷骰子。

所以规则很简单:

析构函数负责释放资源,但不负责编排释放顺序。顺序由显式的 stop() 函数控制。

析构函数是你的安全网——在 stop() 没被调用或者异常退出时兜底。但正常流程下,stop() 必须按你定义的顺序执行。

五、线程池的 stop() 陷阱

上面的案例里,Logger 有一个后台 worker 线程。但还有更复杂的场景——线程池。

MqttClient 内部有一个线程池处理收到的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 多个 worker 线程,从 _task_queue 中取任务执行
void MqttClient::workerFunction() {
while (!_stop_threads) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(_task_mutex);
_task_cond.wait(lock, [this]() {
return _stop_threads || !_task_queue.empty();
});
if (_stop_threads && _task_queue.empty()) {
return; // 只有队列空才能退出
}
task = std::move(_task_queue.front());
_task_queue.pop();
}
task(); // 执行任务
}
}

这里有两个坑:

坑 1:队列无限增长。

_task_queue 是一个 std::queue<std::function<void()>>,没有大小限制。如果 MQTT 消息到达的速度超过线程池处理的速度,队列无限增长,最终 OOM。

修复:硬上限 + 满时丢弃最旧任务。

1
2
3
4
5
6
7
if (client->_task_queue.size() >= MQTT_MAX_TASK_QUEUE_SIZE) {
client->_task_queue.pop(); // 丢弃最旧任务
LOG_WARN << "MQTT task queue full, dropping oldest task";
}
client->_task_queue.push([cb, topic, payload]() {
cb(topic, payload);
});

坑 2:Stop 后队列中的任务没有被释放。

原来的 Stop() join 完所有 worker 线程后,_task_queue 里可能还残留着任务。这些任务里捕获的 topicpayload 字符串仍然占用内存,直到 MqttClient 单例最终析构。

修复:

1
2
3
4
5
// 清空任务队列,释放其中捕获的回调和字符串
{
std::unique_lock<std::mutex> lock(_task_mutex);
std::queue<std::function<void()>>().swap(_task_queue);
}

六、容器清理:不止是队列

类似的问题也出现在业务逻辑的数据结构里:

容器 问题 修复
_ais_trackers (unordered_map) 每个见过的 MMSI 都插入,永不删除 新增 pruneExpiredAisTrackers(),60s 无更新即清理
_track_associations locked 条目被 purge 逻辑无条件跳过,永久滞留 locked 条目加超时兜底(60s 强制解锁)
_fused_targets 120s 超时窗口内可能因繁忙海域增长到数千 硬上限 500,优先淘汰纯 AIS 条目(无云台数据)

这些问题的共同模式:创建(插入)逻辑完善,清理(删除)逻辑要么缺失、要么有盲区。

你写 map[key] = value 的时候,有没有想过这个 key 什么时候被 erase?

七、回调的清理

还有一个更深层的问题——回调的注册和注销。

1
2
3
4
5
6
7
8
9
// ai_node.cpp — init() 中注册回调
_channel.registerTopic("/reply/pose", [this, topic](const std::string& t, const std::string& p){
this->_channel.sendToUav(topic, p);
});

// ai_node.cpp — 析构函数
AiNode::~AiNode() {
// 空的!回调没有注销!
}

Lambda 捕获了 this,注册到 MqttClient 单例的回调映射中。如果 AiNode 析构了但 MqttClient 还活着,下次收到 /reply/pose 消息时,回调就会通过悬空的 this 访问已析构的 AiNode。

修复:让 TopicChannel 的析构负责清理:

1
2
3
4
5
TopicChannel::~TopicChannel() {
for (const auto& topic : _registeredTopics) {
MqttClient::getInstance().unregisterCallback(topic);
}
}

这又呼应了同一个原则:每一个 register 都要有一个对应的 unregister。 如果不显式做,析构的时候就没人替你兜底。

八、关机顺序检查清单

如果你正在维护一个 C++ 多线程服务,拿出你的 stop() 函数,逐条检查:

  1. 数据源是否最早停? Ticker、设备管理器、事件生产者——它们停了,才不会在关机过程中继续产生新数据。

  2. 队列/缓冲区是否在消费者之前排空? stop() 里的 join() 之前,有没有设置退出标志 + 唤醒等待线程?

  3. 传输层是否最后停? MQTT、gRPC、数据库连接——它们被所有人依赖,必须最后关闭。

  4. 每个后台线程都 join 了吗? 有没有遗漏的 std::threadstd::async 的 future?

  5. 容器有上限吗? 每一个 push/insert 的位置,有没有考虑过"如果容器无限增长会怎样"?

  6. 每个 register/addListener 对应的 unregister/removeListener 在哪里? 析构函数里有吗?还是依赖静态对象析构顺序赌运气?

  7. 静态对象之间有没有依赖关系? 如果有,你的 stop() 函数有没有显式控制析构顺序?

九、总结

stop() 不是写完 start() 之后随便补的那几行代码。

它是你服务的降落伞。平时不打开,打开的时候必须没问题。

如果你从来没关注过 stop()——现在去看一眼。很可能已经有 112 个 Valgrind 错误在等着你了。