引言

本文将详细解析一个基于多进程模型的文件传输系统,该系统包含服务器端和客户端两部分。服务器端采用进程池设计模式,通过预先创建多个工作进程来处理客户端的文件请求,提高系统的并发处理能力。客户端则负责接收服务器传输的文件并显示传输进度。

一、系统整体架构

该系统主要由以下几个部分组成:

服务器端

  • 主进程:负责监听客户端连接、管理工作进程池

  • 工作进程:实际处理文件传输任务

  • 进程间通信:通过 UNIX 域套接字传递文件描述符

客户端

  • 连接服务器

  • 接收文件数据

  • 显示传输进度

二、核心数据结构

1. Train 结构体

1
2
3
4
typedef struct Train{
int size;
char data[1024];
}Train;

功能描述:用于文件数据的传输封装

结构说明

  • size:表示data数组中有效数据的长度

  • data:存储实际的文件数据,最大为 1024 字节

2. WorkerData 结构体

1
2
3
4
5
typedef struct WorkerData{
pid_t pid;
int status; // 1 忙 0 闲
int pipefd; //进程通信管道
}WorkerData;

功能描述:用于主进程管理工作进程的信息

结构说明

  • pid:工作进程的进程 ID

  • status:工作进程状态,1 表示忙,0 表示闲

  • pipefd:与工作进程通信的管道文件描述符

三、服务器端核心函数解析

1. sendfd 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int sendfd(int sockfd, int flag, int fdtosend){
struct msghdr hdr;
bzero(&hdr,sizeof(hdr));
struct iovec iov[1];
iov[0].iov_base = &flag;
iov[0].iov_len = sizeof(flag);
hdr.msg_iov = iov;
hdr.msg_iovlen = 1;
struct cmsghdr *pcmsg = (struct cmsghdr *)malloc(CMSG_LEN(sizeof(int)));
pcmsg->cmsg_len = CMSG_LEN(sizeof(int));
pcmsg->cmsg_level = SOL_SOCKET;
pcmsg->cmsg_type = SCM_RIGHTS;
*(int *)CMSG_DATA(pcmsg) = fdtosend;
hdr.msg_control = pcmsg;
hdr.msg_controllen = CMSG_LEN(sizeof(int));
int ret = sendmsg(sockfd, &hdr, 0);
ERROR_CHECK(ret,-1,"sendmsg");
free(pcmsg);
return 0;
}

功能描述:在进程间传递文件描述符

参数说明

  • sockfd:用于传输的套接字文件描述符

  • flag:控制标志,1 表示退出,0 表示正常传输

  • fdtosend:待发送的文件描述符

实现逻辑

  1. 初始化消息头结构msghdr
  2. 设置消息正文部分,包含控制标志flag
  3. 分配控制消息缓冲区,用于存储文件描述符
  4. 设置控制消息的长度、级别和类型(SCM_RIGHTS表示传递权限)
  5. 将文件描述符存入控制消息数据部分
  6. 调用sendmsg发送消息
  7. 释放动态分配的内存

2. recvfd 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int recvfd(int sockfd, int *pflag, int *pfdtorecv){
struct msghdr hdr;
bzero(&hdr, sizeof(hdr));
struct iovec iov[1];
iov[0].iov_base = pflag;
iov[0].iov_len = sizeof(int);
hdr.msg_iov = iov;
hdr.msg_iovlen = 1;
struct cmsghdr *pcmsg = (struct cmsghdr *)malloc(CMSG_LEN(sizeof(int)));
pcmsg->cmsg_len = CMSG_LEN(sizeof(int));
pcmsg->cmsg_level = SOL_SOCKET;
pcmsg->cmsg_type = SCM_RIGHTS;
hdr.msg_control = pcmsg;
hdr.msg_controllen = CMSG_LEN(sizeof(int));
int ret = recvmsg(sockfd, &hdr, 0);
ERROR_CHECK(ret,-1,"recvmsg");
*pfdtorecv = *(int *)CMSG_DATA(pcmsg);
printf("flag = %d, fdtorecv = %d\n", *pflag, *pfdtorecv);
free(pcmsg);
return 0;
}

功能描述:接收其他进程发送的文件描述符

参数说明

  • sockfd:用于接收的套接字文件描述符

  • pflag:接收控制标志的指针

  • pfdtorecv:接收文件描述符的指针

实现逻辑

  1. 初始化消息头结构msghdr
  2. 设置消息正文缓冲区,用于接收控制标志
  3. 分配控制消息缓冲区,用于接收文件描述符
  4. 调用recvmsg接收消息
  5. 从控制消息中提取文件描述符
  6. 释放动态分配的内存

3. TansFile 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void TansFile(int netfd){
char name[] = "text.txt";
int size = strlen(name);
int fd = open(name, O_RDWR);
ERROR_CHECK(fd, -1,"open");
struct stat st;
fstat(fd, &st);
int filesize = st.st_size;
send(netfd, &size, sizeof(int), MSG_NOSIGNAL);
send(netfd, name, strlen(name), MSG_NOSIGNAL);
send(netfd, &filesize, sizeof(int), MSG_NOSIGNAL);
int num = 0;
while(num < filesize){
char buf[1024] = {0};
int ret = read(fd, buf, sizeof(buf));
send(netfd, &ret, sizeof(int), MSG_NOSIGNAL);
send(netfd, buf, ret, MSG_NOSIGNAL);
num += ret;
sleep(1);
}
close(fd);
return;
}

功能描述:向客户端传输文件

参数说明

  • netfd:与客户端连接的套接字文件描述符

实现逻辑

  1. 定义要传输的文件名 "text.txt"

  2. 获取文件名长度并打开文件

  3. 获取文件大小

  4. 向客户端发送文件名长度、文件名和文件大小

  5. 循环读取文件内容并发送:

    • 每次读取最多 1024 字节
    • 先发送本次读取的字节数
    • 再发送实际数据
    • 累加已发送字节数
    • 休眠 1 秒模拟传输延迟
  6. 关闭文件描述符

4. MakeWorker 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void MakeWorker(int workernum, WorkerData * workerArr){
for(int i = 0; i < workernum; ++i){
int pipe[2];
socketpair(AF_LOCAL, SOCK_STREAM, 0, pipe);
pid_t pid = fork();
if(pid == 0){
while(1){
close(pipe[0]);
int netfd;
int flag;
recvfd(pipe[1], &flag, &netfd);
if(flag == 1){
printf("I am going to exit!\n");
exit(0);
}
TansFile(netfd);
printf("netfd = %d finish send\n", netfd);
close(netfd);
pid_t pid = getpid();
write(pipe[1], &pid, sizeof(pid));
}
}
close(pipe[1]);
workerArr[i].pid = pid;
workerArr[i].status = 0;
workerArr[i].pipefd = pipe[0];
printf("i = %d, pid = %d, pipefd = %d\n", i, pid, pipe[0]);
}
return;
}

功能描述:创建指定数量的工作进程

参数说明

  • workernum:工作进程数量

  • workerArr:存储工作进程信息的数组指针

实现逻辑

  1. 循环创建指定数量的工作进程

  2. 为每个工作进程创建一个socketpair用于进程间通信

  3. 子进程逻辑:

    • 关闭不需要的管道端

    • 循环接收主进程发送的文件描述符和标志

    • 如果标志为 1,则退出

    • 否则调用TansFile处理文件传输

    • 完成后向主进程发送自己的 PID 表示已空闲

  4. 父进程逻辑:

    • 关闭不需要的管道端

    • 记录工作进程的 PID、初始状态 (闲) 和管道描述符

5. main 函数(服务器端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
int main(int argc, char *argv[]){
ARGS_CHECK(argc, 4);
int workernum = atoi(argv[3]);
WorkerData *worker = (WorkerData *)calloc(workernum, sizeof(WorkerData));
pipe(exitPipe);
signal(SIGUSR1, handler);
MakeWorker(workernum,worker);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
ERROR_CHECK(sockfd, -1, "error socket");
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(atoi(argv[2]));
addr.sin_addr.s_addr = inet_addr(argv[1]);
int opt = 1;
ERROR_CHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), -1, "setsockopt");
int bret = bind(sockfd, (struct sockaddr *)&addr, sizeof(addr));
ERROR_CHECK(bret, -1, "bind");
listen(sockfd, 50);
int epfd = epoll_create(1);
ERROR_CHECK(epfd, -1, "epoll_create");
struct epoll_event event;
struct epoll_event readyset[1024];
event.events = EPOLLIN;
event.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
event.data.fd = exitPipe[0];
epoll_ctl(epfd, EPOLL_CTL_ADD, exitPipe[0], &event);
for(int i = 0; i < workernum; ++i){
event.data.fd = worker[i].pipefd;
epoll_ctl(epfd, EPOLL_CTL_ADD, worker[i].pipefd, &event);
}
while(1){
int readynum = epoll_wait(epfd, readyset, 1024, -1);
for(int i = 0; i < readynum; ++i){
if(readyset[i].data.fd == sockfd){
int netfd = accept(sockfd, NULL, NULL);
printf("1 client connect, netfd = %d\n", netfd);
for(int j = 0; j < workernum; ++j){
if(worker[j].status == 0){
int flag = 0;
sendfd(worker[j].pipefd,flag,netfd);
worker[j].status = 1;
break;
}
}
close(netfd);
}else if(readyset[i].data.fd == exitPipe[0]){
printf("Process pool is going to exit!\n");
for(int j = 0; j < workernum; ++j){
int flag = 1;
sendfd(worker[j].pipefd, flag, 0);
}
for(int j = 0; j < workernum; ++j){
wait(NULL);
}
printf("All worker has been killed!\n");
free(worker);
exit(0);
}else{
for(int j = 0; j < workernum; ++j){
if(readyset[i].data.fd == worker[j].pipefd){
pid_t pid;
read(readyset[i].data.fd, &pid, sizeof(pid));
printf("worker %d is finished!\n", pid);
worker[j].status = 0;
break;
}
}
}
}
}
close(sockfd);
close(epfd);
return 0;
}

功能描述:服务器主函数,负责初始化并运行服务器

参数说明

  • argc:命令行参数数量

  • argv:命令行参数数组,包含 IP 地址、端口号和工作进程数量

实现逻辑

  1. 检查命令行参数,解析工作进程数量

  2. 创建工作进程数组并初始化

  3. 创建退出管道,注册信号处理函数

  4. 调用MakeWorker创建工作进程

  5. 创建并配置服务器套接字:

    • 设置地址重用选项

    • 绑定到指定 IP 和端口

    • 开始监听连接

  6. 初始化 epoll 用于 I/O 多路复用:

    • 添加服务器套接字到 epoll 监控

    • 添加退出管道到 epoll 监控

    • 添加所有工作进程管道到 epoll 监控

  7. 进入事件循环:

    • 等待 epoll 事件

    • 处理客户端连接事件:

      • 接受连接

      • 寻找空闲工作进程

      • 向工作进程发送客户端连接的文件描述符

      • 标记工作进程为忙

    • 处理退出事件:

      • 向所有工作进程发送退出标志

      • 等待所有工作进程退出

      • 释放资源并退出

    • 处理工作进程完成事件:

      • 接收工作进程 PID

      • 标记工作进程为闲

四、客户端核心函数解析

1. main 函数(客户端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
int main(int argc, char *argv[]){
ARGS_CHECK(argc, 3);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
ERROR_CHECK(sockfd, -1, "error socket");
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(atoi(argv[2]));
addr.sin_addr.s_addr = inet_addr(argv[1]);
int cret = connect(sockfd, (struct sockaddr *)&addr, sizeof(addr));
ERROR_CHECK(cret, -1, "connect");
printf("Server is connecting\n");
Train *train = (Train*)malloc(sizeof(Train));
int ret = recv(sockfd, &train->size, sizeof(int), 0);
ERROR_CHECK(ret, -1, "recv");
char *name = (char*)malloc(train->size);
memset(name, 0, train->size);
recv(sockfd, name, train->size, 0);
fflush(stdout);
ret = recv(sockfd, &train->size, sizeof(int), 0);
int size = train->size;
ERROR_CHECK(ret, -1, "recv");
printf("file name == %s\t, filesize == %d\n", name, size);
fflush(stdout);
int fd = open(name, O_WRONLY|O_CREAT|O_TRUNC, 0666);
ERROR_CHECK(fd, -1, "OPEN");
int num = 0;
while(num < size){
recv(sockfd, &train->size, sizeof(int), 0);
ret = recv(sockfd, &train->data, train->size, 0);
write(fd, train->data, ret);
while(ret < train->size){
ret += recv(sockfd, &train->data, sizeof(train->data)-ret, 0);
write(fd, train->data, strlen(train->data));
}
num += ret;
double result = (double) num / size * 100;
for(int i = 0; i < result / 10; ++i){
printf("-");
}
printf("%5.02lf%%\r", result);
fflush(stdout);
}
printf("\n");
free(train);
close(fd);
close(sockfd);
return 0;
}

功能描述:客户端主函数,负责连接服务器并接收文件

实现逻辑

  1. 检查命令行参数

    • 验证参数完整性(服务器 IP、端口号)、校验 IP 地址格式(IPv4/IPv6)、转换并验证端口号范围(0-65535)、确认传输协议为 tcp 或 udp
  2. 创建客户端套接字并连接服务器

    • 根据协议创建套接字(SOCK_STREAM 或 SOCK_DGRAM)、填充 sockaddr_in 结构体、TCP:connect()连接,失败指数退避、UDP:sendto()发送,实现丢包重传
  3. 接收服务器文件信息

    • 接收文件名长度(4 字节)、动态分配内存获取文件名、解析文件元数据、异常时断开并重发请求