一、引言

在网络通信领域,TCP 协议凭借其强大的可靠性与稳定性,成为文件传输系统的首选。本教程聚焦于基于 TCP 的多线程文件传输系统,深入剖析事件驱动线程池协同工作的架构设计。支持双向消息交互与文件传输,由九个核心文件构成,全面覆盖网络连接建立、事件监听、线程管理以及数据传输等关键环节。

二、代码结构分析

1. 文件概览

文件名 主要功能 核心函数列表
head.h 全局结构体定义与函数声明 结构体:Train(封装消息与文件传输数据)、Queue_t(定义客户端连接队列结构)、Res_t(线程池资源相关结构体);函数声明:Ready(服务器套接字初始化)、EpollAdd(将文件描述符添加到 epoll 监听列表)等
client.c 客户端主逻辑实现 main函数包含事件驱动循环,集成消息处理、连接状态管理及用户输入响应等模块,负责客户端的连接管理与数据通信。
Server.c 服务器核心逻辑 main函数基于 epoll 事件驱动机制,整合信号处理模块,实现客户端连接接收、消息转发及线程池任务调度,是服务器的核心控制中枢。
Ready.c 服务器套接字初始化配置 Ready函数承担套接字创建、地址绑定与监听操作,为服务器接收客户端连接做好前置准备。
Queue.c 客户端连接队列操作 QueueInit用于队列初始化,分配内存并设置初始状态;EnQueue将新连接加入队列;DeQueue从队列移除连接,实现连接队列的增删改查。
Send.c 消息与文件发送实现 SendMsg负责封装并发送普通消息;SendFile实现文件传输流程,包括元数据与内容传输,确保数据准确送达。
recv.c 文件接收处理逻辑 recvn按指定长度接收数据,保障数据完整性;Recvfile完整接收文件,并处理文件存储与校验,确保文件正确落地。
PthreadTool.c 线程池管理 PthreadInit创建线程池,初始化线程资源与任务队列;PthreadPool作为线程工作函数,执行文件发送等任务,实现高效线程调度。

2. 核心依赖关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Server.c → Ready.c → 初始化套接字
// Server.c调用Ready.c中的Ready函数,完成服务器监听套接字的创建、绑定和监听,为后续接收客户端连接奠定基础

Server.c → Queue.c → 管理客户端连接队列
// 当Server.c接收到新的客户端连接时,通过调用Queue.c中的EnQueue函数将连接加入队列,后续处理时从队列获取连接

Server.c → PthreadTool.c → 线程池处理文件发送
// Server.c在需要处理文件发送任务时,借助PthreadTool.c中的线程池,将任务分配给空闲线程执行

Server.c → Send.c → 消息与文件发送
// Server.c在处理客户端消息或需要向客户端发送文件时,调用Send.c中的函数完成数据发送操作

client.c → recv.c → 文件接收
// 客户端client.c在接收到服务器发送的文件数据时,通过recv.c中的函数进行接收和处理

Send.c ←→ head.h ←→ 所有文件(共享结构体与声明)
// head.h定义的结构体和函数声明为各文件提供统一接口,确保数据结构一致和函数调用规范

分析结论:系统采用模块化设计,各文件遵循单一职责原则,通过head.h实现接口标准化。核心业务逻辑集中在Server.cclient.c,借助事件驱动模型实现多客户端并发连接与高效数据交互。这种低耦合、层次分明的架构设计,使系统易于维护、扩展与调试,每个模块专注于特定功能,减少相互干扰。

三、函数设计模式

1. 事件驱动设计(Server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
int main(int argc, char *argv[]){
// 初始化套接字与epoll实例
// 根据命令行参数调用Ready函数创建并配置服务器监听套接字
int sockfd = Ready(argv[1], argv[2]);
// 创建epoll实例,返回的文件描述符用于后续管理事件
int epfd = epoll_create(1);
// 将服务器监听套接字添加到epoll监听列表,关注其可读事件
EpollAdd(epfd, sockfd);
// 将标准输入文件描述符添加到epoll监听列表,以便接收服务器端用户输入
EpollAdd(epfd, STDIN_FILENO);
// 将退出管道的读端文件描述符添加到epoll监听列表,用于接收退出信号
EpollAdd(epfd, ExitPipe[0]);
while(1){
// 阻塞等待事件发生,最多返回2个就绪事件,-1表示无限等待
int readynum = epoll_wait(epfd, readyset, 2, -1);
for(int i = 0; i < readynum; ++i){
// 基于文件描述符的事件分发机制
if(readyset[i].data.fd == sockfd){
// 处理新客户端连接请求
// 接收客户端连接,返回新的连接套接字
int netfd = accept(sockfd, NULL, NULL);
// 将新连接加入客户端连接队列
EnQueue(queue, netfd);
// 将新连接套接字添加到epoll监听列表,关注其后续事件
EpollAdd(epfd, netfd);
} else if(readyset[i].data.fd == STDIN_FILENO){
// 处理服务器标准输入消息
// 从标准输入读取消息,发送给客户端连接队列中的第一个客户端
SendMsg(queue, 0, msg);
} else if(readyset[i].data.fd == ExitPipe[0]){
// 捕获退出信号并执行清理流程
// 接收到退出信号,跳转到资源释放代码段
goto end;
} else {
// 处理客户端发送的消息
// 从客户端连接套接字接收消息
recv(p->netfd, msg, sizeof(Train), 0);
// 将接收到的消息转发给客户端连接队列中的其他客户端
SendMsg(queue, p->netfd, msg);
}
}
}
end:
// 集中式资源释放逻辑
// 关闭相关文件描述符,释放内存等资源,确保系统正常退出
}

设计特征详解

  • I/O 多路复用机制:通过epoll_wait函数,系统可同时监听套接字、标准输入流、退出管道等多个事件源。相比传统阻塞式 I/O,epoll 能在单线程内高效处理大量文件描述符的事件,避免线程频繁切换开销,尤其适用于高并发场景。
  • 事件驱动架构:代码基于文件描述符判断事件类型,每个事件处理逻辑独立封装。这种设计符合高内聚低耦合原则,特定事件触发时才执行对应代码,减少模块间依赖,便于维护和扩展。
  • 资源管理策略:采用集中式资源释放方案,通过goto语句在异常情况下统一回收资源。系统退出时,所有已分配资源将被集中释放,有效避免内存泄漏,保障系统稳定性。

2. 线程池设计(PthreadTool.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void *PthreadPool(void *arg){
Res_t *res = (Res_t *)arg;
while(1){
pthread_mutex_lock(&res->mutex);
// 基于条件变量的任务等待机制
// 当任务队列中没有任务时,线程进入等待状态
while(res->fdcount <= 0){
if(res->flag == 1){
pthread_mutex_unlock(&res->mutex);
pthread_exit(NULL);
}
pthread_cond_wait(&res->cond, &res->mutex);
}
// 任务消费流程
// 从任务计数中减去1,表示有一个任务正在被处理
--res->fdcount;
// 获取任务队列中待处理的连接套接字
int netfd = res->pqueue->pRear->netfd;
// 唤醒所有等待在条件变量上的线程,通知有任务已处理
pthread_cond_broadcast(&res->cond);
pthread_mutex_unlock(&res->mutex);
// 执行文件发送任务,将文件数据发送给对应的客户端
SendFile(netfd, res->pathname);
}
}

设计特征详解

  • 线程同步机制:综合运用pthread_mutex互斥锁与pthread_cond条件变量,解决多线程环境下的资源竞争问题。互斥锁保证同一时刻只有一个线程访问共享资源,条件变量实现线程间通信与同步,使线程在资源不足时等待,资源可用时被唤醒。
  • 任务调度策略:通过while循环检测任务队列状态,避免因虚假唤醒导致线程异常。线程在等待条件变量时,持续检查任务队列,确保仅在有任务时才进行处理,提升任务处理可靠性。
  • 数据封装模式:使用Res_t结构体封装任务参数(如连接套接字、文件路径等),并传递给线程函数。这种方式明确数据来源,保障线程间数据交互的安全性与清晰性,避免数据混乱。

3. 文件传输设计(Send.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int SendFile(const int netfd, const char* pathname){
Train *msg = (Train *)malloc(sizeof(Train));
// 定义文件传输协议标识,通过num字段区分文件传输与普通消息
msg->num = -1;
// 将文件路径复制到消息结构体的数据字段
strcpy(msg->data, pathname);
// 传输文件元数据(包含文件路径标识)给客户端
send(netfd, msg, sizeof(Train), 0);
// 以只读方式打开文件
int fd = open(pathname, O_RDONLY);
struct stat st;
// 获取文件状态信息,包括文件大小等
fstat(fd, &st);
int num = st.st_size;
// 传输文件大小信息给客户端
send(netfd, &num, sizeof(int), MSG_NOSIGNAL);
// 分配内存用于存储文件数据
char *buf = (char*)malloc(num);
int total = 0;
while(1){
// 从文件中读取数据到缓冲区
int ret = read(fd, buf + total, num - total);
total += ret;
// 将缓冲区中的数据分块发送给客户端
send(netfd, buf, num, MSG_NOSIGNAL);
if(total >= num) break;
}
// 释放缓冲区内存
free(buf);
// 关闭文件
close(fd);
return 1;
}

设计特征详解

  • 协议分层设计:通过Train.num字段标识区分文件传输与普通消息,接收端据此选择不同处理逻辑。这种设计简化数据解析流程,提高系统通用性与扩展性,适应多样化数据传输需求。
  • 数据传输策略:采用分阶段传输模式,先发送文件路径,再传输文件大小,最后发送文件内容。接收端按序接收并解析,确保文件完整、准确地被处理,降低传输错误风险。
  • 数据完整性保障:通过循环读取与发送机制,突破单次 I/O 操作字节限制,实现大文件可靠传输。即使文件超大,也能通过多次读写操作完成传输,保证数据不丢失、不损坏。

四、常见问题

1. 多线程同步问题

问题描述:线程池共享资源Res_t在多线程并发访问时,可能因同时读写共享变量(如fdcount)引发数据竞争,导致数据不一致或计算错误。

解决方案

  • 临界区保护:对Res_t中所有共享变量的访问,均置于互斥锁保护下。线程在访问前获取锁,访问结束后释放锁,确保同一时刻仅一个线程操作共享资源。
  • 避免线程饥饿:使用pthread_cond_broadcast替代pthread_cond_signal。前者可唤醒所有等待线程,避免部分线程因长时间无法获取资源而 “饥饿”,提升线程调度公平性。

2. 文件传输完整性问题

问题描述:大文件传输过程中,若网络中断,已发送但未被确认的数据可能丢失,导致接收端文件不完整。

解决方案

  • 确认应答机制:构建可靠传输协议,接收端对每个数据块返回确认信息。发送端在未收到确认时重发数据块,确保数据准确接收。
  • 断点续传:采用分块编号传输,接收端记录已接收数据块编号。传输中断恢复后,发送端从断点继续传输,避免重复发送,提升传输效率。

3. epoll 事件处理问题

问题描述Server.cepoll_waitmaxevents参数硬编码为 2,当同时发生多个事件且超过该值时,可能导致部分事件漏检,影响系统响应速度与稳定性。

解决方案

  • 动态调整参数:根据当前监听的文件描述符数量,动态调整maxevents值。在添加或移除文件描述符时,重新计算并设置合适参数,确保所有事件被及时处理。
  • 防止重复处理:为关键事件注册EPOLLONESHOT标志,事件处理完成后移除监听,避免多线程重复处理同一事件,保证事件处理的准确性与一致性。
1
2
3
4
5
6
7
8
9
10
11
12
13
#include "head.h"
/* Usage:增减监听 */
int EpollAdd(int epfd, int fd){
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);
return 0;
}
int EpollDel(int epfd, int fd){
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include "head.h"
void *PthreadPool(void *arg){
Res_t *res = (Res_t *)arg;
while(1){
pthread_mutex_lock(&res->mutex);
while(res->fdcount <= 0){
if(res->flag == 1){
printf("exit\n");
pthread_mutex_unlock(&res->mutex);
pthread_exit(NULL);
}
pthread_cond_wait(&res->cond, &res->mutex);
}
-- res->fdcount;
int netfd = res->pqueue->pRear->netfd;
pthread_cond_broadcast(&res->cond);
pthread_mutex_unlock(&res->mutex);
SendFile(netfd, res->pathname);
printf("Is sending file to client %d\n", netfd);
//sleep(1);
}
pthread_exit(NULL);
}
int PthreadInit(int pidnum, pthread_t *pid, Res_t *res){
pthread_cond_init(&res->cond, NULL);
pthread_mutex_init(&res->mutex, NULL);
res->flag = 0;
res->fdcount = 0;
for(int i = 0; i < pidnum; ++i){
pthread_create(&pid[i], NULL, PthreadPool, res);
printf("pid == %ld is ready\n", pid[i]);
}
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include "head.h"

/* Usage: 实现入队与出队 */
int QueueInit(Queue_t *queue){
bzero(queue,sizeof(Queue_t));
return 0;
}
int EnQueue(Queue_t *queue, int netfd){
Node_t *pNew = (Node_t *)calloc(1,sizeof(Node_t));
pNew->netfd = netfd;
if(queue->queuesize == 0){
queue->pFront = pNew;
queue->pRear = pNew;
}
else{
queue->pRear->next = pNew;
queue->pRear = pNew;
}
++queue->queuesize;
return 0;
}
int DeleNodeQueue(Queue_t *queue, const int netfd){
Node_t *pcur = queue->pFront;
Node_t *prev = pcur;
while(pcur != NULL){
if(pcur->netfd == netfd){
//判断特殊位置 头尾节点 以及 如果只剩头的情况
if(queue->queuesize > 1){
if(pcur == queue->pFront){
queue->pFront = queue->pFront->next;
}else if(pcur == queue->pRear){
queue->pRear = prev;
}
prev->next = pcur->next;
}
free(pcur);
--queue->queuesize;
break;
}
prev = pcur;
pcur = pcur->next;
}
return 0;
}
int DeQueue(Queue_t *queue){
Node_t *pcur = queue->pFront;
queue->pFront = pcur->next;
if(queue->queuesize == 1){
queue->pRear = NULL;
}
free(pcur);
--queue->queuesize;
return 0;
}
int VisitQueue(Queue_t *queue){
Node_t *pcur = queue->pFront;
while(pcur){
printf("%3d", pcur->netfd);
pcur = pcur->next;
}
printf("\n");
return 0;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include "head.h"

/* Usage: */
int Ready(char *ip,char *port){
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
ERROR_CHECK(sockfd, -1, "socket");
int flag = 1;
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(atoi(port));
addr.sin_addr.s_addr = inet_addr (ip);
int bret = bind(sockfd, (struct sockaddr*)&addr, sizeof(addr));
ERROR_CHECK(bret, -1, "bind");
listen(sockfd, 10);
printf("Server is listening\n");
return sockfd;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include "head.h"
int SendMsg(Queue_t *queue, int netfd, Train *msg){
Node_t *p = queue->pFront;
while(p != NULL){
if(p->netfd != netfd){
send(p->netfd, msg, sizeof(Train), MSG_NOSIGNAL);
}
p = p->next;
}
return 1;
}
int SendFile(const int netfd, const char* pathname){
Train *msg = (Train *)malloc(sizeof(Train));
memset(msg->data, 0, sizeof(msg->data));
msg->num = -1;
strcpy(msg->data, pathname);
send(netfd, msg, sizeof(Train), 0);
printf("pathname == %s\n",msg->data);
int fd = open(pathname, O_RDONLY);
struct stat st;
fstat(fd, &st);

int num = st.st_size;
send(netfd, &num, sizeof(int), MSG_NOSIGNAL);
char *buf = (char*)malloc(num);
printf("size==%d\n",num);
int total = 0;
while(1){
int ret = read(fd, buf + total, num - total);
total += ret;
send(netfd, buf, num, MSG_NOSIGNAL);
if(total >= num){
break;
}
}
printf("SendFile finished\n");
free(buf);
free(msg);
close(fd);
return 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#include "head.h"
int ExitPipe[2];
void InitMsg(Train *msg){
msg->num = 0;
memset(msg->data, 0, 1024);
}
void handle(int sig){
write(ExitPipe[1], "1", 1);
}
int main(int argc, char *argv[]){
//server 127.0.0.1 1234 pathname worker
ARGS_CHECK(argc, 5);
int sockfd = Ready(argv[1], argv[2]);
signal(SIGUSR1, handle);
signal(SIGINT, handle);
//将文件描述符放入监听
pipe(ExitPipe);
if(fork()){
close(ExitPipe[0]);
wait(NULL);
return 0;
} //父进程负责监听信号

close(ExitPipe[1]);

//下面部分由子进程执行
int epfd = epoll_create(1);
struct epoll_event readyset[1024];
EpollAdd(epfd, sockfd);
EpollAdd(epfd, STDIN_FILENO);
EpollAdd(epfd, ExitPipe[0]);

Train *msg = (Train*)malloc(sizeof(Train));
Queue_t *queue = (Queue_t*)malloc(sizeof(Queue_t));
int qret = QueueInit(queue);
ERROR_CHECK(qret, -1, "QueueInit");

int pidnum = atoi(argv[4]);
pthread_t pid[pidnum];
Res_t *res = (Res_t *)malloc(sizeof(Res_t));
res->pqueue = queue;
memset(res->pathname, 0, PATHLEN);
memcpy(res->pathname, argv[3], strlen(argv[3]));
PthreadInit(pidnum, pid, res);

while(1){
int readynum = epoll_wait(epfd, readyset, 1024, -1);
for(int i =0; i < readynum; ++i){
//只存在收发消息两种情况
InitMsg(msg);

if(readyset[i].data.fd == sockfd){
//接收客户端连接
pthread_mutex_lock(&res->mutex);
while(res->fdcount > pidnum){
pthread_cond_wait(&res->cond, &res->mutex);
}
++ res->fdcount;
int netfd = accept(sockfd, NULL, NULL);
ERROR_CHECK(netfd, -1, "accept");
EnQueue(queue, netfd);
EpollAdd(epfd, netfd);
printf("Clientfd == %d accept\n",netfd);
pthread_cond_broadcast(&res->cond);
pthread_mutex_unlock(&res->mutex);

}else if(readyset[i].data.fd == STDIN_FILENO){
//输入信息
msg->num = read(STDIN_FILENO, &msg->data, sizeof(msg->data));
if(msg->num <= 0){
strcpy(msg->data, "exit\n");
}
SendMsg(queue, 0, msg);
if(strcmp(msg->data,"exit\n") == 0){
res->flag = 1;
goto end;
}
}else if(readyset[i].data.fd == ExitPipe[0]){
strcpy(msg->data, "exit\n");
msg->num = strlen(msg->data);
SendMsg(queue, 0, msg);
res->flag = 1;
pthread_cond_broadcast(&res->cond);
goto end;
}else{
Node_t *p = queue->pFront;
while(p != NULL){
InitMsg(msg);
if(readyset[i].data.fd == p->netfd){
int ret = recv(p->netfd, msg, sizeof(Train), 0);
if(ret <= 0 || strcmp(msg->data, "exit\n") == 0){
EpollDel(epfd, p->netfd);
int netfd = p->netfd;
close(p->netfd);
p = p->next;
DeleNodeQueue(queue, netfd);
printf("Client %d exit\n", netfd);
continue;
}
printf("Client %d send %s\n", p->netfd, msg->data);
SendMsg(queue, p->netfd, msg);
}
p = p->next;
}
}
}
}
end:
for(int i = 0; i < pidnum; ++i){
pthread_join(pid[i], NULL);
printf("pid == %ld exit\n", pid[i]);
}
close(sockfd);
close(epfd);
free(res);
DeQueue(queue);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/* Usage: 服务端,主线程发送请求,子线程负责接收文件,采用一次性接收的方式 */
#ifndef CLIENT
#define CLIENT

#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <wait.h>
#include <my_header.h>
#define PATHLEN 20
typedef struct Train{
int num; //接收信息大小
char data[1024]; //接受的信息
}Train;
typedef struct Node{
int netfd;
struct Node *next;
}Node_t;
typedef struct Queue{
Node_t *pFront;
Node_t *pRear;
int queuesize;
}Queue_t;
typedef struct Resource{
Queue_t *pqueue;
int fdcount;
int flag;
char pathname[PATHLEN]; //路径
Train *msg;
pthread_cond_t cond; //信号
pthread_mutex_t mutex; //锁
}Res_t;
int Ready(char *ip, char *port);
int EpollAdd(int epfd, int netfd);
int EpollDel(int epfd, int netfd);
int QueueInit(Queue_t *queue);
int EnQueue(Queue_t *queue, int netfd);
int DeQueue(Queue_t *queue);
int VisitQueue(Queue_t *queue);
int SendMsg(Queue_t *queue, int netfd, Train *msg);
int SendFile(const int netfd, const char *pathname);
int DeleNodeQueue(Queue_t *queue, const int netfd);
int PthreadInit(int pidnum, pthread_t *pid, Res_t *res);
#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
clean: Server
rm -rf *.o
Server: Server.o Queue.o Epoll.o Send.o Ready.o PthreadTool.o
gcc Server.o Queue.o Epoll.o Send.o Ready.o PthreadTool.o -lpthread -o Server
Epoll.o: Epoll.c
gcc -c Epoll.c -g -Wall -o Epoll.o
PthreadTool.o: PthreadTool.c
gcc -c PthreadTool.c -g -Wall -o PthreadTool.o
Queue.o: Queue.c
gcc -c Queue.c -g -Wall -o Queue.o
Ready.o: Ready.c
gcc -c Ready.c -g -Wall -o Ready.o
Send.o: Send.c
gcc -c Send.c -g -Wall -o Send.o
Server.o: Server.c
gcc -c Server.c -g -Wall -o Server.o
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include "head.h"

int main(int argc, char *argv[]){
ARGS_CHECK(argc, 3);
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
ERROR_CHECK(sockfd, -1, "socket");
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(atoi(argv[2]));
addr.sin_addr.s_addr = inet_addr (argv[1]);
int cret = connect(sockfd, (struct sockaddr*)&addr, sizeof(addr));
ERROR_CHECK(cret, -1, "connect");
printf("Server has been connected;\n");

//将文件描述符放入监听
int epfd = epoll_create(1);
struct epoll_event event;
struct epoll_event readyset[2];
event.events = EPOLLIN;
event.data.fd = sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
event.data.fd = STDIN_FILENO;
epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &event);

Train *msg = (Train*)malloc(sizeof(Train));

while(1){
int readynum = epoll_wait(epfd, readyset, 2, -1);
for(int i =0; i < readynum; ++i){
//只存在收发消息两种情况
msg->num = 0;
memset(msg->date, 0, 1024);
if(readyset[i].data.fd == sockfd){
//接收到消息 约定接到正数就是消息,-1就是文件
int ret = recv(sockfd, msg, sizeof(Train), 0);
ERROR_CHECK(ret, -1, "recv");
//以结构体形式接发消息
if(ret <= 0 || strcmp(msg->date, "exit\n") == 0){
printf("Server disconnect\n");
goto end;
}
if(msg->num > 0){
printf("%s\n", msg->date);
}else{
//接文件
if(Recvfile(sockfd, msg)){
printf("Download file finished\n");
}else{
printf("error download file");
}
}
}else{
msg->num = read(STDIN_FILENO, &msg->date, sizeof(msg->date));
if(msg->num >= 0){
send(sockfd, msg, sizeof(Train), 0);
}
if(strcmp(msg->date, "exit\n") == 0 || msg->num <= 0){
printf("I will exit\n");
goto end;
}
}
}
}
end:
free(msg);
close(sockfd);
close(epfd);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include "head.h"
int recvn(const int sockfd, char *buf, size_t bufsize, int flag, const int len){
int total = 0;
while(total < len){
ssize_t ret = recv(sockfd, buf + total, len - total, flag);
total += ret;
}
return 0;
}
int Recvfile(int sockfd, Train *msg){
//获得文件名和大小,准备写入
int fd = open(msg->date, O_WRONLY|O_CREAT|O_TRUNC, 0775);

int num;
recv(sockfd, &num, sizeof(int), 0);
//根据大小创建空间
char *buf = (char*)malloc(num);
recvn(sockfd, buf, num, 0, num);

//写入文件
int ret = write(fd, buf, num);
ERROR_CHECK(ret, -1, "write");
free(buf);
close(fd);
return 1;
}