基于 epoll 的简易多人聊天服务器与客户端实现

导言

今天是聊天室,改了又改版本,目前更改部分包括:放弃了好用但是实现复杂的链表,采用数组来实现信息存储;时间结构采用ctime函数;用epoll代替select;生成历史记录文件等。

本代码实现的简易多人聊天系统主要包含两个部分:

  • 服务器端:支持多客户端连接,具备消息广播、私聊以及超时检测功能。

  • 客户端:负责与服务器建立连接,实现消息的发送与接收。

系统采用 TCP 协议进行通信,并运用 epoll 实现 I/O 多路复用。相较于传统的 select/poll 模型,在高并发场景下,epoll 展现出更出色的性能。

一、核心技术点解析

1.1 epoll I/O 多路复用

epoll 是 Linux 系统下高效的 I/O 事件通知机制,本项目主要使用了以下函数:

  • epoll_create():用于创建 epoll 实例。

  • epoll_ctl():可添加、删除或修改被监控的文件描述符。

  • epoll_wait():用于等待事件发生。

服务器和客户端均通过 epoll 同时监控 socket 和标准输入,实现了非阻塞的 I/O 处理。

1.2 服务器端核心实现

1.2.1 初始化与配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建socket
int sockfd = socket(AF_INET, SOCK_STREAM, 0);

// 配置服务器地址信息
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(atoi(argv[2]));
server_addr.sin_addr.s_addr = inet_addr(argv[1]);

// 设置地址重用
int reuse = 1;
setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));

// 绑定与监听
bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
listen(sockfd, 50);

上述代码完成了服务器 socket 的创建、配置、绑定和监听操作,其中SO_REUSEADDR选项可使服务器在重启后快速重用端口。

1.2.2 客户端管理结构

1
2
3
4
5
typedef struct List{
int readyfd[1024]; // 已就绪的文件描述符
int fdtime[1024]; // 记录活动时间
int chatfd[1024]; // 私聊目标fd
}List;

该结构体用于管理所有连接的客户端,包含客户端的文件描述符、最后活动时间和私聊目标等信息。

1.2.3 事件处理循环

服务器的主循环是程序的核心部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
while(1){
int readynum = epoll_wait(epfd, readyset, 1024, 1000);

// 超时检测
// ...

// 处理就绪事件
for(int i=0; i < readynum; ++i){
// 处理新连接
if(readyset[i].data.fd == sockfd){
// 接受新客户端连接
// ...
}
// 处理服务器输入
else if(readyset[i].data.fd == STDIN_FILENO){
// 读取并广播服务器消息
// ...
}
// 处理客户端消息
else{
// 接收客户端消息并转发
// ...
}
}
}

此循环持续等待并处理三类事件:新客户端连接、服务器端输入以及客户端发送的消息。

1.3 客户端核心实现

客户端的实现相对简洁,主要功能包括:

  • 与服务器建立连接。

  • 通过 epoll 同时监听服务器消息和用户输入。

  • 向服务器发送用户输入。

  • 显示从服务器接收到的消息。

此外,客户端还实现了重连机制,当与服务器的连接断开时,会自动尝试重新连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 连接断开后的重连逻辑
epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
while(1){
close(sockfd);
sleep(1);
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(connect (sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) != -1){
printf("New server has been connected\n");
event.events = EPOLLIN;
event.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
break;
}
}

二、特色功能详解

2.1 消息广播

当服务器接收到某一客户端的消息时,会将该消息广播给所有其他连接的客户端:

1
2
3
4
5
6
// 广播消息给所有客户端
for(int j = 0; j < num; ++j){
if(list->readyfd[j] != -1 && list->readyfd[j] != clientfd){
send(list->readyfd[j], msg, strlen(msg), 0);
}
}

2.2 私聊功能

客户端可通过特定格式的消息发起私聊:

1
2
3
4
5
6
7
else if(strncmp(buf, "*chat ", 6) == 0){
int chatfd_n = atoi(buf + 6);
// 验证目标客户端是否存在
// ...
list->chatfd[index] = chatfd_n;
// ...
}

发送格式为*chat 目标客户端ID,此后发送的消息将仅发送至指定客户端。

2.3 超时检测

服务器会定期检查客户端的活动时间,若客户端超过 100 秒未活动,将被强制断开连接:

1
2
3
4
5
6
7
for(int i=0; i < num; ++i){
if(list->readyfd[i] != -1 && time(NULL)-list->fdtime[i] > 100){
// 处理超时客户端
// ...
CloseFd(&count, i, epfd, list);
}
}

2.4 历史记录

服务器会创建一个包含时间戳的日志文件,用于记录所有聊天消息:

1
2
3
char path_n [128];
sprintf(path_n,"History %s.txt",time_now);
int openfd = open(path_n, O_RDWR|O_CREAT|O_TRUNC, 0775);

所有消息都会写入该文件,便于后续查看聊天历史。

三、使用方法与扩展建议

3.1 使用方法

  1. 编译服务器和客户端代码。

  2. 启动服务器:./server 服务器IP 端口号。

  3. 启动客户端:./client 服务器IP 端口号。

  4. 在客户端输入消息发送,输入exit退出。

  5. 发送*chat 目标ID开始私聊。

3.2 扩展建议

本代码作为基础框架,可从以下方面进行扩展:

  • 增加用户认证机制,实现用户名密码登录。

  • 优化私聊功能,支持用户名而非文件描述符。

  • 增加文件传输功能。

  • 实现更完善的错误处理和日志系统。

  • 支持更复杂的聊天模式,如聊天室功能。

3.3 代码

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include <arpa/inet.h>
#include <my_header.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
/* Usage: */
int main(int argc, char *argv[]){
//检查输入数据的正确性
ARGS_CHECK(argc, 3);
//创建SOCKET
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
ERROR_CHECK(sockfd, -1, "socket");
//确定服务端网络地址
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(atoi(argv[2]));
server_addr.sin_addr.s_addr = inet_addr(argv[1]);
//链接服务端
int cret = connect (sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
ERROR_CHECK(cret, -1, "connect");
printf("Server %d is connected, stdin 'exit' to getout\n",ntohs(server_addr.sin_port));
//创建epoll的位图,结构体的缘故union 导致event和后面readset指向同一个区域
struct epoll_event event;
struct epoll_event readyset[1024];
int epfd = epoll_create(1);
ERROR_CHECK(epfd, -1, "epoll_create");
//监听输入和sockfd
event.events = EPOLLIN;
event.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);

event.events = EPOLLIN;
event.data.fd = STDIN_FILENO;
epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &event);
//接收信息
char buf[1024]={0};
while(1){
int readynum = epoll_wait(epfd, readyset, 2, -1);
//就绪集合的长度
memset(buf, 0, sizeof(buf));
//清空信息
for(int i = 0; i < readynum; ++i){
//监听输入
if(readyset[i].data.fd == STDIN_FILENO){
int ret = read(STDIN_FILENO, buf, sizeof(buf));
ERROR_CHECK(ret, -1 ,"read");
//输入exit退出
if(ret == 0 || strcmp(buf,"exit\n") == 0){
printf("Good bye\n");
send(sockfd, "exit\n", 5, 0);
close(sockfd);
return 0;
}
//发送信息
send(sockfd, buf, strlen(buf), 0);
}else{
//监听收信
int ret = recv(sockfd, buf, sizeof(buf), 0);
if(ret <= 0){
printf("Something error, waiting for new connect\n");
epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
while(1){
//服务端退出后,等待链接
close(sockfd);
sleep(1);
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(connect (sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) != -1){
printf("New server has been connected\n");
event.events = EPOLLIN;
event.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
break;
}
}
continue;
}
printf("%s\n", buf);
}
}
}
close(sockfd);
return 0;
}

服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#include <arpa/inet.h>
#include <fcntl.h>
#include <my_header.h>
#include <netinet/in.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <time.h>
/* Usage: */
typedef struct List{
int readyfd[1024];
int fdtime[1024];
int chatfd[1024];
}List;
void InitArr(int *arr){
for(int i=0; i < 1024; ++i){
arr[i] = -1;
}
}
//设置时间参数
void Time(char *set_time){
time_t now_t;
time(&now_t);
sprintf(set_time, "%s", ctime(&now_t));
}
//加入监控
void FdInSet(int fd, int epfd, struct epoll_event *event){
event->events = EPOLLIN;
event->data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, event);
}
void SendMes(){
//广播,转发

}
void CloseFd(int *count, int index, int epfd, List *list){
epoll_ctl(epfd, EPOLL_CTL_DEL, list->readyfd[index], NULL);
close(list->readyfd[index]);
(*count) --;
//将对应位置
list->fdtime[index] = -1;
list->chatfd[index] = -1;
list->readyfd[index] = -1;
}
int main(int argc, char *argv[]){
ARGS_CHECK(argc, 3);
//设置socket
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
ERROR_CHECK(sockfd, -1, "socket");
//配置服务端基本信息
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(atoi(argv[2]));
server_addr.sin_addr.s_addr = inet_addr(argv[1]);
//地址重用
int reuse = 1; // 申请了一个整数,数值是1
setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));
int bret = bind (sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
ERROR_CHECK(bret, -1, "bind");
int lret = listen(sockfd, 50);
ERROR_CHECK(lret, -1, "listen");
printf("Server is listening \n");

//创建epoll
int epfd = epoll_create(1);
ERROR_CHECK(epfd, -1, "epoll_create");

struct epoll_event event;
struct epoll_event readyset[1024];
FdInSet(sockfd, epfd, &event);
FdInSet(STDIN_FILENO, epfd, &event);

//用来存储接到的文件描述符
List *list = (List*)malloc(sizeof(List));


int num = 0;
int count = 0;

InitArr(list->chatfd);
InitArr(list->fdtime);
InitArr(list->readyfd);


char buf[1024] = {0};
char msg[2048] = {0};
char time_now[64] = {0};

Time(time_now);
char path_n [128];
sprintf(path_n,"History %s.txt",time_now);
int openfd = open(path_n, O_RDWR|O_CREAT|O_TRUNC, 0775);
ERROR_CHECK(openfd, -1, "open");
//文件描述符
int maxedfd = (sockfd > openfd ? sockfd : openfd) + 1;

while(1){

int readynum = epoll_wait(epfd, readyset, 1024, 1000);

//超时退出
for(int i=0; i < num; ++i){
if(list->readyfd[i] != -1 && time(NULL)-list->fdtime[i] > 100){
memset(msg, 0, sizeof(msg));
Time(time_now);
sprintf(msg, "%sClinet %d sleep too long\n", time_now, list->readyfd[i]);
printf("%s\n", msg);
CloseFd(&count, i, epfd, list);
}
}

for(int i=0; i < readynum; ++i){
memset(buf, 0, sizeof(buf));
memset(msg, 0, sizeof(msg));
memset(time_now, 0, sizeof(time_now));

// 检测到接收情况
if(readyset[i].data.fd == sockfd){
//将accept获得的clientfd放入readyfd中,第几个就放在下标处
int clientfd = accept(sockfd, NULL, NULL);
ERROR_CHECK(clientfd, -1 ,"accept");

if(num >= 1024){
Time(time_now);
printf("%sToo many clients\n", time_now);
close(clientfd);
continue;
}

//根据 fd 内容返回下标,通过一个数组记录
list->readyfd[clientfd - maxedfd] = clientfd;
//同一个 fd 放入同一个位置,下标是fd - 5;
list->fdtime[clientfd - maxedfd] = time(NULL);
FdInSet(clientfd, epfd, &event);
num = num > (clientfd - maxedfd + 1) ? num : (clientfd - maxedfd + 1);
Time(time_now);
++ count;
printf("%sClient %d has been accepted, the num of them is %d\n", time_now, clientfd, count);
//如果只想两人聊天,此处
//epoll_ctl(epfd,EPOLL_CTL_DEL,sockfd,NULL);
}else if(readyset[i].data.fd == STDIN_FILENO){
//检测到输入情况
int ret = read(STDIN_FILENO, buf, sizeof(buf));
ERROR_CHECK(ret, -1 ,"read");
if(ret == 0 || strcmp(buf, "exit\n") == 0){
goto end;
}
Time(time_now);
sprintf(msg, "%sServer message: %s", time_now, buf);
write(openfd, msg, strlen(msg));

for(int j = 0; j < num; ++j){
//这个下标的节点未被删除
if(list->readyfd[j] != -1){
send(list->readyfd[j], msg, strlen(msg), 0);
}
}
}else{
//接到消息并群发
int clientfd = readyset[i].data.fd;
int index = clientfd - maxedfd;
list->fdtime[index] = time(NULL);
int ret = recv(clientfd, buf, sizeof(buf), 0);

if(ret <= 0 || strcmp(buf, "exit\n") == 0){
CloseFd(&count, index, epfd, list);
Time(time_now);
printf("%sClient %d exit, there are %d Clients\n", time_now, clientfd, count);
continue;

} else if(strncmp(buf, "*chat ", 6) == 0){

int chatfd_n = atoi(buf + 6);
if(chatfd_n > num || chatfd_n == clientfd || chatfd_n - maxedfd < 0){
send(clientfd, "ERROR", 5, 0);
continue;
}
list->chatfd[index] = chatfd_n;
sprintf(msg, "You can send one mes to Client %d\n", chatfd_n);
send(clientfd, msg, strlen(msg), 0);
}else{

Time(time_now);
//私聊
if(list->chatfd[index] != -1){
sprintf(msg, "%s[*chat] Client %d message to %d: %s", time_now, clientfd, list->chatfd[index], buf);
write(openfd, msg, strlen(msg));
printf("%s\n", msg);
send(list->chatfd[index], msg, strlen(msg), 0);
list->chatfd[index] = -1;
}else{
sprintf(msg, "%sClient %d message: %s", time_now, clientfd, buf);
write(openfd, msg, strlen(msg));
printf("%s\n", msg);
for(int j = 0; j < num; ++j){
//广播
if(list->readyfd[j] != -1 && list->readyfd[j] != clientfd){
send(list->readyfd[j], msg, strlen(msg), 0);
}

}
}
}
} //if
} //for
} //while
end:
close(openfd);
close(sockfd);
return 0;
}