要将双栈 TCP 回声服务器从线程池模式改造为Reactor 模式,核心是基于I/O 多路复用(epoll) 实现 “事件驱动” 的高效 I/O 处理 —— 通过单线程(或主线程)监听多个 Socket 的 I/O 事件,事件触发时再分发到对应处理器,避免线程上下文切换开销,更适合高并发场景。

一、Reactor 模式核心原理与组件

Reactor 模式(反应器模式)是一种事件驱动架构,核心思想是 “将 I/O 事件从业务逻辑中分离”,通过以下组件实现:

组件 作用
Reactor(反应器) 核心调度器:运行事件循环,通过 I/O 多路复用(epoll)等待事件,分发事件到处理器
EventDemultiplexer 事件多路分离器:封装 epoll,负责注册 / 删除事件、等待事件触发(本文用 epoll)
EventHandler(处理器) 事件处理接口:定义handleRead/handleWrite/handleError等统一接口,子类实现具体逻辑(如 “新连接处理”“回声处理”)
Non-blocking Socket 所有 Socket 设为非阻塞:避免 I/O 操作阻塞线程,配合 epoll 边缘触发(ET)提升效率

二、关键技术选型

  1. I/O 多路复用:Linux 下选择epoll(高效支持海量连接,边缘触发 ET 模式);

  2. Socket 模式:非阻塞(O_NONBLOCK),确保recv/send不会阻塞线程;

  3. 事件触发:epoll 边缘触发(ET),仅在 Socket 状态变化时通知一次,需一次性处理完所有数据;

  4. 协议兼容:保留原 “4 字节长度头 + 数据” 自定义协议,解决 TCP 粘包;

  5. 双栈支持:维持AF_UNSPEC+AI_PASSIVE,兼容 IPv4/IPv6。

三、Reactor 模式完整实现(双栈 TCP 回声服务器)

1. 头文件与全局定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <netdb.h>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <cerrno>
#include <cstdlib>
#include <fcntl.h>
#include <sys/epoll.h>
#include <unordered_map>
#include <vector>

using namespace std;

// 事件类型枚举
enum EventType {
EVENT_READ = EPOLLIN, // 可读事件
EVENT_WRITE = EPOLLOUT, // 可写事件
EVENT_ERROR = EPOLLERR // 错误事件
};

// 事件处理器基类(抽象接口)
class EventHandler {
public:
virtual ~EventHandler() = default;
// 处理可读事件
virtual void handleRead(int fd) = 0;
// 处理可写事件(可选,本文回声场景暂用不到)
virtual void handleWrite(int fd) {}
// 处理错误事件
virtual void handleError(int fd) {
cerr << "EventHandler: fd=" << fd << " 错误事件触发" << endl;
close(fd);
}
// 获取关联的Socket描述符
virtual int getFd() const = 0;
};

// Reactor核心类(事件循环+epoll管理)
class Reactor {
public:
Reactor() : epollFd(-1), isRunning(false) {
// 创建epoll实例(参数size>=1,现代Linux已忽略,仅需>0)
epollFd = epoll_create1(EPOLL_CLOEXEC); // EPOLL_CLOEXEC:进程退出时自动关闭
if (epollFd == -1) {
perror("epoll_create1 failed");
exit(EXIT_FAILURE);
}
}

~Reactor() {
stop();
if (epollFd != -1) {
close(epollFd);
}
}

// 启动事件循环
void start() {
if (isRunning) return;
isRunning = true;
const int MAX_EVENTS = 1024; // 一次最多处理1024个事件
struct epoll_event events[MAX_EVENTS];

cout << "Reactor事件循环启动..." << endl;
while (isRunning) {
// 等待事件触发(-1表示无限阻塞,直到有事件)
int nEvents = epoll_wait(epollFd, events, MAX_EVENTS, -1);
if (nEvents == -1) {
if (errno == EINTR) continue; // 被信号中断,继续循环
perror("epoll_wait failed");
break;
}

// 遍历触发的事件,分发处理
for (int i = 0; i < nEvents; ++i) {
int fd = events[i].data.fd;
auto it = handlerMap.find(fd);
if (it == handlerMap.end()) {
cerr << "Reactor: 未知fd=" << fd << "的事件" << endl;
continue;
}
EventHandler* handler = it->second;

// 处理可读事件
if (events[i].events & EVENT_READ) {
handler->handleRead(fd);
}
// 处理可写事件(本文暂不使用,若需发送大文件可启用)
if (events[i].events & EVENT_WRITE) {
handler->handleWrite(fd);
}
// 处理错误事件
if (events[i].events & EVENT_ERROR) {
handler->handleError(fd);
removeHandler(fd); // 错误后移除处理器
}
}
}
}

// 停止事件循环
void stop() {
isRunning = false;
}

// 注册事件处理器(关联fd和事件类型)
bool registerHandler(EventHandler* handler, EventType eventType) {
if (!handler) return false;
int fd = handler->getFd();
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));

// 设置epoll事件:ET模式(EPOLLET)+ 非阻塞I/O + 事件类型
ev.events = eventType | EPOLLET | EPOLLONESHOT; // EPOLLONESHOT:事件触发后需重新注册
ev.data.fd = fd;

// 将fd和处理器加入映射表
handlerMap[fd] = handler;

// 注册事件到epoll
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &ev) == -1) {
perror("epoll_ctl ADD failed");
handlerMap.erase(fd);
return false;
}
return true;
}

// 移除事件处理器
void removeHandler(int fd) {
// 从epoll中删除fd
if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, nullptr) == -1) {
if (errno != EBADF) { // 忽略fd已关闭的错误
perror("epoll_ctl DEL failed");
}
}
// 从映射表中删除,并释放处理器内存
auto it = handlerMap.find(fd);
if (it != handlerMap.end()) {
delete it->second;
handlerMap.erase(it);
}
// 关闭fd(确保资源释放)
close(fd);
cout << "Reactor: 移除fd=" << fd << "的处理器" << endl;
}

// 重新注册可读事件(ET模式+EPOLLONESHOT需重新注册)
bool reregisterReadHandler(int fd) {
auto it = handlerMap.find(fd);
if (it == handlerMap.end()) return false;

struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EVENT_READ | EPOLLET | EPOLLONESHOT;
ev.data.fd = fd;

if (epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &ev) == -1) {
perror("epoll_ctl MOD failed");
return false;
}
return true;
}

private:
int epollFd; // epoll实例描述符
bool isRunning; // 事件循环运行状态
// fd到EventHandler的映射表(管理所有注册的处理器)
unordered_map<int, EventHandler*> handlerMap;
};

// 工具函数:设置Socket为非阻塞模式
bool setNonBlocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl F_GETFL failed");
return false;
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl F_SETFL failed");
return false;
}
return true;
}

// 工具函数:创建双栈监听Socket(非阻塞)
int createNonBlockListener(const char* service = "9527") {
struct addrinfo hints, *result, *p;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC; // 双栈兼容
hints.ai_socktype = SOCK_STREAM; // TCP类型
hints.ai_flags = AI_PASSIVE; // 通配地址绑定

int err = getaddrinfo(nullptr, service, &hints, &result);
if (err) {
cerr << "getaddrinfo failed: " << gai_strerror(err) << endl;
return -1;
}

int listenFd = -1;
for (p = result; p != nullptr; p = p->ai_next) {
// 创建Socket
listenFd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (listenFd == -1) {
cerr << "socket failed: " << strerror(errno) << "(尝试下一个地址)" << endl;
continue;
}

// 设置地址重用(避免重启端口占用)
int opt = 1;
if (setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1) {
perror("setsockopt SO_REUSEADDR failed");
close(listenFd);
continue;
}

// 设置非阻塞模式
if (!setNonBlocking(listenFd)) {
close(listenFd);
continue;
}

// 绑定端口
if (bind(listenFd, p->ai_addr, p->ai_addrlen) == -1) {
cerr << "bind failed: " << strerror(errno) << "(尝试下一个地址)" << endl;
close(listenFd);
continue;
}

// 监听(backlog=10,支持10个等待连接)
if (listen(listenFd, 10) == -1) {
perror("listen failed");
close(listenFd);
continue;
}

break; // 成功创建监听Socket
}

freeaddrinfo(result);
if (p == nullptr || listenFd == -1) {
cerr << "创建双栈监听Socket失败" << endl;
return -1;
}

cout << "双栈监听Socket创建成功,端口:" << service << "(fd=" << listenFd << ")" << endl;
return listenFd;
}

2. 具体事件处理器实现

(1)新连接处理器(AcceptHandler)

负责处理监听 Socket 的 “可读事件”(新客户端连接请求),创建新的连接 Socket 和回声处理器(EchoHandler),并注册到 Reactor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 新连接处理器(处理监听Socket的可读事件)
class AcceptHandler : public EventHandler {
public:
AcceptHandler(Reactor* reactor, int listenFd)
: reactor(reactor), listenFd(listenFd) {}

~AcceptHandler() override {
cout << "AcceptHandler销毁(listenFd=" << listenFd << ")" << endl;
}

// 处理可读事件:接受新连接
void handleRead(int fd) override {
if (fd != listenFd) return; // 只处理监听Socket的事件

struct sockaddr_storage clientAddr;
socklen_t clientAddrLen = sizeof(clientAddr);
char ipStr[INET6_ADDRSTRLEN] = {0};

// 循环接受所有新连接(ET模式需一次性处理完)
while (true) {
// 接受新连接(非阻塞,无连接时返回EAGAIN)
int connFd = accept4(listenFd, (struct sockaddr*)&clientAddr,
&clientAddrLen, SOCK_NONBLOCK); // 直接创建非阻塞Socket
if (connFd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; // 没有更多连接,退出循环
}
perror("accept4 failed");
return;
}

// 解析客户端IP
void* addr = (clientAddr.ss_family == AF_INET) ?
&((struct sockaddr_in*)&clientAddr)->sin_addr :
&((struct sockaddr_in6*)&clientAddr)->sin6_addr;
inet_ntop(clientAddr.ss_family, addr, ipStr, sizeof(ipStr));
cout << "新客户端连接:" << ipStr << "(connFd=" << connFd << ")" << endl;

// 创建回声处理器,注册到Reactor(监听可读事件)
EventHandler* echoHandler = new EchoHandler(reactor, connFd, ipStr);
if (!reactor->registerHandler(echoHandler, EVENT_READ)) {
delete echoHandler;
close(connFd);
cerr << "注册EchoHandler失败(connFd=" << connFd << ")" << endl;
}
}

// 重新注册监听Socket的可读事件(EPOLLONESHOT需重新注册)
reactor->reregisterReadHandler(listenFd);
}

// 获取监听Socket的fd
int getFd() const override {
return listenFd;
}

private:
Reactor* reactor; // 关联的Reactor实例
int listenFd; // 监听Socket的fd
};

(2)回声处理器(EchoHandler)

负责处理连接 Socket 的 “可读事件”(客户端发送数据),按 “4 字节长度头 + 数据” 协议解析数据,原样回声,并重新注册事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// 回声处理器(处理连接Socket的可读事件)
class EchoHandler : public EventHandler {
public:
EchoHandler(Reactor* reactor, int connFd, const string& clientIp)
: reactor(reactor), connFd(connFd), clientIp(clientIp),
recvBufLen(0), sendBufLen(0) {
// 初始化接收/发送缓冲区
memset(recvBuf, 0, sizeof(recvBuf));
memset(sendBuf, 0, sizeof(sendBuf));
}

~EchoHandler() override {
cout << "EchoHandler销毁:客户端" << clientIp << "(connFd=" << connFd << ")" << endl;
}

// 处理可读事件:读取客户端数据,按协议解析并回声
void handleRead(int fd) override {
if (fd != connFd) return;

// 循环读取所有数据(ET模式需一次性读完)
while (true) {
ssize_t n = recv(connFd, recvBuf + recvBufLen, sizeof(recvBuf) - recvBufLen, 0);
if (n == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
break; // 数据已读完,退出循环
}
perror("recv failed");
reactor->removeHandler(connFd); // 错误,移除处理器
return;
} else if (n == 0) {
cout << "客户端" << clientIp << "主动关闭连接(connFd=" << connFd << ")" << endl;
reactor->removeHandler(connFd); // 客户端关闭,移除处理器
return;
}

recvBufLen += n;
cout << "接收客户端" << clientIp << "数据:" << string(recvBuf, recvBufLen)
<< "(长度:" << recvBufLen << ")" << endl;

// 按自定义协议解析:先读4字节长度头,再读对应长度的数据
while (true) {
// 步骤1:读取4字节长度头(未读满则退出,等待下一次事件)
if (recvBufLen < sizeof(uint32_t)) {
break;
}

// 步骤2:解析长度头(网络字节序转主机字节序)
uint32_t dataLen = ntohl(*(uint32_t*)recvBuf);
// 检查数据长度是否合法(避免缓冲区溢出)
if (dataLen > sizeof(recvBuf) - sizeof(uint32_t)) {
cerr << "客户端" << clientIp << "数据长度非法:" << dataLen << "(connFd=" << connFd << ")" << endl;
reactor->removeHandler(connFd);
return;
}

// 步骤3:读取完整数据(长度头+数据未读满则退出)
if (recvBufLen < sizeof(uint32_t) + dataLen) {
break;
}

// 步骤4:提取数据,准备回声(复制到发送缓冲区)
memcpy(sendBuf, recvBuf, sizeof(uint32_t) + dataLen);
sendBufLen = sizeof(uint32_t) + dataLen;

// 步骤5:发送回声数据(非阻塞,一次性发送所有数据)
sendEchoData();

// 步骤6:移动剩余数据到缓冲区头部(处理粘包)
recvBufLen -= sizeof(uint32_t) + dataLen;
if (recvBufLen > 0) {
memmove(recvBuf, recvBuf + sizeof(uint32_t) + dataLen, recvBufLen);
}
memset(recvBuf + recvBufLen, 0, sizeof(recvBuf) - recvBufLen);
}
}

// 重新注册可读事件(EPOLLONESHOT需重新注册)
reactor->reregisterReadHandler(connFd);
}

// 处理可写事件(本文暂不使用,若需发送大文件可扩展)
void handleWrite(int fd) override {
// 若发送缓冲区有剩余数据,可在此处理(如大文件分片发送)
}

// 获取连接Socket的fd
int getFd() const override {
return connFd;
}

private:
// 发送回声数据(非阻塞)
void sendEchoData() {
if (sendBufLen == 0) return;

ssize_t totalSent = 0;
while (totalSent < sendBufLen) {
ssize_t n = send(connFd, sendBuf + totalSent, sendBufLen - totalSent, 0);
if (n == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// 暂时无法发送,可注册可写事件后续处理(本文简化,直接重试)
cout << "客户端" << clientIp << "发送暂时阻塞,等待下一次事件(connFd=" << connFd << ")" << endl;
break;
}
perror("send failed");
reactor->removeHandler(connFd);
return;
}
totalSent += n;
}

// 数据发送完成
if (totalSent == sendBufLen) {
cout << "回声客户端" << clientIp << "数据:" << string(sendBuf + sizeof(uint32_t), sendBufLen - sizeof(uint32_t))
<< "(长度:" << sendBufLen - sizeof(uint32_t) << ")" << endl;
sendBufLen = 0;
memset(sendBuf, 0, sizeof(sendBuf));
} else {
// 剩余数据下次发送(可注册可写事件)
memmove(sendBuf, sendBuf + totalSent, sendBufLen - totalSent);
sendBufLen -= totalSent;
}
}

private:
Reactor* reactor; // 关联的Reactor实例
int connFd; // 连接Socket的fd
string clientIp; // 客户端IP
static const int BUF_SIZE = 4096; // 缓冲区大小(适配自定义协议)
char recvBuf[BUF_SIZE]; // 接收缓冲区
size_t recvBufLen; // 接收缓冲区已用长度
char sendBuf[BUF_SIZE]; // 发送缓冲区
size_t sendBufLen; // 发送缓冲区已用长度
};

3. 主函数(程序入口)

初始化 Reactor、创建双栈监听 Socket、注册 AcceptHandler、启动事件循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int main(int argc, char* argv[]) {
// 支持命令行参数指定端口(默认9527)
const char* service = (argc > 1) ? argv[1] : "9527";

// 1. 创建非阻塞双栈监听Socket
int listenFd = createNonBlockListener(service);
if (listenFd == -1) {
exit(EXIT_FAILURE);
}

// 2. 初始化Reactor
Reactor reactor;

// 3. 创建AcceptHandler,注册到Reactor(监听可读事件)
EventHandler* acceptHandler = new AcceptHandler(&reactor, listenFd);
if (!reactor.registerHandler(acceptHandler, EVENT_READ)) {
delete acceptHandler;
close(listenFd);
cerr << "注册AcceptHandler失败" << endl;
exit(EXIT_FAILURE);
}

// 4. 启动Reactor事件循环(阻塞,直到stop()被调用)
reactor.start();

// 5. 清理资源(理论上不会执行到这里,除非stop()被调用)
reactor.removeHandler(listenFd);
return 0;
}

四、客户端代码(复用原自定义协议)

客户端无需修改核心逻辑,只需保持 “4 字节长度头 + 数据” 的发送格式,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <netdb.h>
#include <cstring>
#include <unistd.h>
#include <arpa/inet.h>
#include <cerrno>
#include <cstdlib>
#include <cstdio>

using namespace std;

ssize_t sendN(int fd, const char* buf, size_t len) {
size_t total = 0;
while (total < len) {
ssize_t n = send(fd, buf + total, len - total, 0);
if (n == -1) {
cerr << "send failed: " << strerror(errno) << endl;
return -1;
}
total += n;
}
return total;
}

ssize_t recvN(int fd, char* buf, size_t len) {
size_t total = 0;
while (total < len) {
ssize_t n = recv(fd, buf + total, len - total, 0);
if (n == -1) {
cerr << "recv failed: " << strerror(errno) << endl;
return -1;
} else if (n == 0) {
cerr << "服务器关闭连接" << endl;
return total;
}
total += n;
}
return total;
}

int connectToServer(const char* node = "127.0.0.1", const char* service = "9527") {
struct addrinfo hints, *result, *p;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;

int err = getaddrinfo(node, service, &hints, &result);
if (err) {
cerr << "getaddrinfo failed: " << gai_strerror(err) << endl;
return -1;
}

int connFd = -1;
for (p = result; p != nullptr; p = p->ai_next) {
connFd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (connFd == -1) {
cerr << "socket failed: " << strerror(errno) << "(尝试下一个地址)" << endl;
continue;
}

if (connect(connFd, p->ai_addr, p->ai_addrlen) == -1) {
cerr << "connect failed: " << strerror(errno) << "(尝试下一个地址)" << endl;
close(connFd);
continue;
}

break;
}

freeaddrinfo(result);
if (p == nullptr || connFd == -1) {
cerr << "无法连接到服务器 " << node << ":" << service << endl;
return -1;
}

cout << "成功连接到 " << node << ":" << service << "(connFd=" << connFd << ")" << endl;
return connFd;
}

void handleEchoInteraction(int connFd) {
char inputBuf[1024] = {0};
char recvBuf[1024] = {0};

cout << "请输入要发送的内容(输入quit退出):" << endl;
while (true) {
if (!fgets(inputBuf, sizeof(inputBuf), stdin)) {
cerr << "\n输入错误" << endl;
break;
}

size_t inputLen = strlen(inputBuf);
if (inputLen > 0 && inputBuf[inputLen - 1] == '\n') {
inputBuf[--inputLen] = '\0';
}
if (strcmp(inputBuf, "quit") == 0) {
cout << "正在退出..." << endl;
break;
}

// 按协议打包:4字节长度头 + 数据
uint32_t sendLen = htonl(inputLen);
if (sendN(connFd, (char*)&sendLen, sizeof(sendLen)) == -1) {
close(connFd);
return;
}
if (sendN(connFd, inputBuf, inputLen) == -1) {
close(connFd);
return;
}
cout << "已发送:" << inputBuf << "(长度:" << inputLen << ")" << endl;

// 接收响应:4字节长度头 + 数据
uint32_t recvLen = 0;
if (recvN(connFd, (char*)&recvLen, sizeof(recvLen)) == -1) {
close(connFd);
return;
}
recvLen = ntohl(recvLen);
if (recvN(connFd, recvBuf, recvLen) == -1) {
close(connFd);
return;
}
recvBuf[recvLen] = '\0';
cout << "服务器响应:" << recvBuf << "(长度:" << recvLen << ")" << endl;

memset(inputBuf, 0, sizeof(inputBuf));
memset(recvBuf, 0, sizeof(recvBuf));
}

close(connFd);
cout << "已断开连接" << endl;
}

int main(int argc, char* argv[]) {
const char* node = (argc > 1) ? argv[1] : "127.0.0.1";
const char* service = (argc > 2) ? argv[2] : "9527";

int connFd = connectToServer(node, service);
if (connFd == -1) {
return 1;
}

handleEchoInteraction(connFd);
return 0;
}

五、Reactor 模式关键特性与优势

  1. 高效 I/O 处理:基于 epoll ET 模式 + 非阻塞 I/O,单线程可处理上万连接,避免线程上下文切换开销(线程池模式下多线程竞争和切换开销较大)。

  2. 事件驱动:仅在 Socket 有 I/O 事件时才处理,无轮询开销,CPU 利用率高。

  3. 组件解耦:Reactor 负责事件调度,EventHandler 负责业务逻辑,新增功能只需实现新的 EventHandler,扩展性强(如添加 “文件传输处理器”“加密处理器”)。

  4. 双栈兼容:保留原双栈逻辑,同时支持 IPv4 和 IPv6 客户端连接。

六、编译与运行说明

1. 运行步骤

1
2
3
4
5
# 启动Reactor服务器(默认端口9527,可指定端口:./reactor_server 8888)
./reactor_server
# 启动客户端(可多开,测试高并发)
./echo_client 127.0.0.1 9527 # IPv4连接
./echo_client ::1 9527 # IPv6连接

2. 测试示例

1
2
3
4
5
6
7
8
# 客户端输入
请输入要发送的内容(输入quit退出):
hello reactor
已发送:hello reactor(长度:13)
服务器响应:hello reactor(长度:13)
quit
正在退出...
已断开连接