1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
| #include <queue> #include <vector> #include <pthread.h> #include <sys/socket.h> #include <arpa/inet.h>
// 任务结构体(存储客户端连接信息) struct Task { int connfd; // 客户端连接套接字 struct sockaddr_storage clientAddr; // 客户端地址(兼容IPv4/IPv6) };
class ThreadPool { public: // 初始化线程池(参数:线程数量) ThreadPool(int threadNum) : threadCount(threadNum), isRunning(false) { // 初始化互斥锁和条件变量 pthread_mutex_init(&queueMutex, nullptr); pthread_cond_init(&queueCond, nullptr); }
~ThreadPool() { stop(); // 停止线程池 // 释放资源 pthread_mutex_destroy(&queueMutex); pthread_cond_destroy(&queueCond); }
// 启动线程池 void start() { if (isRunning) return; isRunning = true; // 创建指定数量的线程 threads.resize(threadCount); for (int i = 0; i < threadCount; ++i) { if (pthread_create(&threads[i], nullptr, threadFunc, this) != 0) { LOG_ERROR("创建线程%d失败:%s", i, strerror(errno)); isRunning = false; break; } } if (isRunning) { LOG_INFO("线程池启动成功,线程数量:%d", threadCount); } }
// 停止线程池(等待所有线程退出) void stop() { if (!isRunning) return; isRunning = false; pthread_cond_broadcast(&queueCond); // 唤醒所有等待的线程
// 等待所有线程退出 for (pthread_t& tid : threads) { pthread_join(tid, nullptr); } threads.clear(); LOG_INFO("线程池已停止"); }
// 添加任务到队列(外部调用,如accept后添加客户端任务) bool addTask(const Task& task) { pthread_mutex_lock(&queueMutex); // 任务队列满(简单限制队列最大长度为1024) if (taskQueue.size() >= 1024) { pthread_mutex_unlock(&queueMutex); LOG_ERROR("任务队列已满,拒绝新连接"); return false; } taskQueue.push(task); pthread_mutex_unlock(&queueMutex); pthread_cond_signal(&queueCond); // 唤醒一个等待的线程 return true; }
private: // 线程入口函数(静态函数,通过this指针访问成员) static void* threadFunc(void* arg) { ThreadPool* pool = static_cast<ThreadPool*>(arg); pool->run(); // 线程循环执行任务 return nullptr; }
// 线程循环逻辑(取任务、执行任务) void run() { while (isRunning) { Task task; pthread_mutex_lock(&queueMutex);
// 等待任务(队列为空且线程池运行中) while (isRunning && taskQueue.empty()) { pthread_cond_wait(&queueCond, &queueMutex); }
// 线程池已停止,退出循环 if (!isRunning) { pthread_mutex_unlock(&queueMutex); break; }
// 取出任务 task = taskQueue.front(); taskQueue.pop(); pthread_mutex_unlock(&queueMutex);
// 执行任务(处理客户端回声请求) handleClientTask(task); } }
// 处理单个客户端任务(核心逻辑,替代原fork子进程) void handleClientTask(const Task& task) { int connfd = task.connfd; struct sockaddr_storage clientAddr = task.clientAddr;
// 1. 打印并记录客户端IP char ipstr[INET6_ADDRSTRLEN] = {0}; void* addr = (clientAddr.ss_family == AF_INET) ? &((struct sockaddr_in*)&clientAddr)->sin_addr : &((struct sockaddr_in6*)&clientAddr)->sin6_addr; inet_ntop(clientAddr.ss_family, addr, ipstr, sizeof(ipstr)); LOG_INFO("新客户端连接:%s,connfd:%d", ipstr, connfd);
// 2. 设置套接字超时(recv/send超时5秒) struct timeval timeout = {5, 0}; // 5秒超时 // 设置接收超时 if (setsockopt(connfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) == -1) { LOG_ERROR("connfd=%d 设置接收超时失败:%s", connfd, strerror(errno)); close(connfd); return; } // 设置发送超时 if (setsockopt(connfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)) == -1) { LOG_ERROR("connfd=%d 设置发送超时失败:%s", connfd, strerror(errno)); close(connfd); return; }
// 3. 处理回声请求(基于自定义协议) handleEchoWithProtocol(connfd, ipstr);
// 4. 释放连接资源 close(connfd); LOG_INFO("客户端%s 连接关闭,connfd:%d", ipstr, connfd); }
// 基于自定义协议的回声处理(解决粘包) void handleEchoWithProtocol(int connfd, const char* clientIP) { char recvBuf[1024] = {0}; // 接收缓冲区 char sendBuf[1024] = {0}; // 发送缓冲区 ssize_t n;
while (true) { // -------------------------- 步骤1:接收协议头(4字节长度) -------------------------- uint32_t dataLen = 0; // 存储数据长度(网络字节序转主机字节序后) // 接收4字节长度头(循环接收,确保完整) n = recvN(connfd, (char*)&dataLen, sizeof(dataLen)); if (n == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { LOG_ERROR("connfd=%d 接收超时(客户端%s无数据发送)", connfd, clientIP); } else { LOG_ERROR("connfd=%d 接收长度头失败:%s", connfd, strerror(errno)); } break; } else if (n == 0) { LOG_INFO("客户端%s 主动关闭连接,connfd:%d", clientIP, connfd); break; }
// 网络字节序转主机字节序(大端转小端) dataLen = ntohl(dataLen); if (dataLen > sizeof(recvBuf)) { // 限制最大数据长度,避免缓冲区溢出 LOG_ERROR("connfd=%d 数据长度超出限制(%d > %lu)", connfd, dataLen, sizeof(recvBuf)); break; }
// -------------------------- 步骤2:接收数据内容 -------------------------- n = recvN(connfd, recvBuf, dataLen); if (n == -1) { LOG_ERROR("connfd=%d 接收数据失败:%s", connfd, strerror(errno)); break; } else if (n == 0) { LOG_INFO("客户端%s 主动关闭连接,connfd:%d", clientIP, connfd); break; }
// 记录接收的数据 LOG_INFO("connfd=%d 接收客户端%s 数据:%s(长度:%d)", connfd, clientIP, recvBuf, dataLen);
// -------------------------- 步骤3:发送回声数据(按协议打包) -------------------------- // 数据内容复制到发送缓冲区 strncpy(sendBuf, recvBuf, dataLen); // 打包协议头(主机字节序转网络字节序) uint32_t sendLen = htonl(dataLen); // 先发送长度头 if (sendN(connfd, (char*)&sendLen, sizeof(sendLen)) == -1) { LOG_ERROR("connfd=%d 发送长度头失败:%s", connfd, strerror(errno)); break; } // 再发送数据内容 if (sendN(connfd, sendBuf, dataLen) == -1) { LOG_ERROR("connfd=%d 发送数据失败:%s", connfd, strerror(errno)); break; }
LOG_INFO("connfd=%d 回声客户端%s 数据:%s(长度:%d)", connfd, clientIP, sendBuf, dataLen); memset(recvBuf, 0, sizeof(recvBuf)); // 清空缓冲区 } }
// 封装recv:确保接收指定长度的数据(解决部分接收问题) ssize_t recvN(int fd, char* buf, size_t len) { size_t total = 0; while (total < len) { ssize_t n = recv(fd, buf + total, len - total, 0); if (n == -1) { return -1; // 错误(超时或其他错误) } else if (n == 0) { return total; // 客户端关闭,返回已接收长度 } total += n; } return total; // 接收完成,返回总长度 }
// 封装send:确保发送指定长度的数据(解决部分发送问题) ssize_t sendN(int fd, const char* buf, size_t len) { size_t total = 0; while (total < len) { ssize_t n = send(fd, buf + total, len - total, 0); if (n == -1) { return -1; // 错误(超时或其他错误) } total += n; } return total; // 发送完成,返回总长度 }
private: std::vector<pthread_t> threads; // 线程列表 std::queue<Task> taskQueue; // 任务队列 pthread_mutex_t queueMutex; // 任务队列互斥锁 pthread_cond_t queueCond; // 任务队列条件变量 int threadCount; // 线程数量 bool isRunning; // 线程池运行状态 };
|