本来 , 我的代码使用的是进程间传递文件描述符的方式 , 也就是经典著作里面讲述的那种方式 , 但是我觉得它太UNIX了 , 就没有用 , 于是使用了fork这种更加UNIX的方式!以上的代码写的不好 , 但是那是可以体现UDP多进程负载均衡的最好方式了 , 什么东西都在代码本身!不过这代码可真够烂的...
4.问题1:中间有个socket死了会弄乱其它活着socket的位置我们知道 , SO_REUSEPORT选项设定了以后 , 会根据数据包的源地址和源端口来计算一个值 , 该值对应所有相同目标IP地址相同目标端口的socket中的一个socket的位置 , 内核便把这个位置的socket作为接收数据的socket , 然而 , 如果这个socket死掉了的话 , 即使重新创建了一个新的socket凑足了数 , 也会使整个socket链表重新排序 , 除非死掉的是最后一个 。由于socket查找算法是不变的 , 计算位置的数据也不变 , 诸多socket的顺序改变意味着数据源和目标处理进程的对应关系会发生改变 , 从而本应该处理该数据的socket没有收到数据 , 反而被别的socket接收 , 这也是上述多进程方案中a方案的唯一弊端 , 你必须在外部监控哪个进程挂掉了 , 进程挂掉意味着socket被关闭 , 你又没有办法(没有这样的API)再创建一个socket插入到挂掉进程使用的那个socket所在的位置 , 唯一的方法就是全部杀掉 , 重新开始 , 但是这显然不是最好的解决方案 , 最好的方案...
解决:保证UDP不死 , 即它不被关闭
怎么保证呢?很简单 , 那就是socket的创建和关闭全部由主进程来统一管理 , 工作子进程们只处理网络IO , 不关闭socket , 这就要求socket在创建的时候要带有CLOEXEC标志 。这也是方案b和方案c被提出的原因(同时也是结果) 。
5.UDP失败的accept模型-按需建立UDP处理进程多么想为UDP建立一个accept模型 , 有了SO_REUSEPORT选项以后 , 貌似有了一点希望 , 那就着手写代码了 。代码是写出来了 , 也能用 , 看样子还不错 , 可是总是有解决不了的小尾巴 , 最终由于UDP和4元组没有必然的对应关系(这也是UDP的本质 , 否则它就成了有连接协议了 , 详见问题3)推论出UDP的accept几乎是没有希望的 , 之所以还可以用SO_REUSEPORT选项来做UDP的多进程负载均衡 , 其实完全依赖了Linux内核处理UDP socket查找时的一个算法 , 该算法只是一种实现而已 , 并不能保证其它的系统或者未来的Linux内核不会改变算法的行为 , 因此这种UDP的多进程负载均衡并非标准方案 , 它的使用有赖于你对系统内核行为的熟悉以及特定版本特定算法的副作用的理解 。
不管怎么样 , 还是给出一个很XX的实现吧 , 毕竟目标不是为了展示“瞧啊 , 我实现了一个UDP的accept” , 这无异于对别人说 , 我有更麻烦的解法 。醉翁之意不在酒 , 在于OpenVPN?NO , 是为了引出一个排他唤醒的问题 。看代码:
#define _GNU_SOURCE#include <sys/types.h>#include <string.h>#include <netinet/in.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <sys/socket.h>#include <unistd.h>#include <sys/wait.h>#include <fcntl.h>#include <sys/uio.h>#include <poll.h>#include <pthread.h>#include "list.h"#define NUM_PROCESS10#define SO_REUSEPORT15#define SERV_PORT12345#define MAXSIZE1024struct client_socket {struct list_head list;pid_t pid;int sock_fd;int process_num;struct sockaddr *src_addr;char init_data[MAXSIZE]; };void DEBUG(char *argv) {}void client_echo(int sock, int process_num){int n;char buff[MAXSIZE];struct sockaddr clientfrom;socklen_t addrlen = sizeof(clientfrom);struct pollfd pfds[1];int pfd[1] = {-1};pfds[0].fd = sock;pfds[0].events = POLLIN|POLLOUT;while (1) {int efds = -1;//以下注释掉的代码如果不放开 , 就会有问题!具体还请看我下面的关于”排他唤醒“的分析//if ((efds = poll(pfds,1,-1)) < 0) {//return -1;//}//if(pfds[0].revents & POLLIN) {memset(buff, 0, MAXSIZE);n = recvfrom(sock, buff, MAXSIZE, 0, &clientfrom, &addrlen);printf("recv data:%ssize:%dnum:%dpid:%dn", buff, n, process_num, getpid());n = sendto(sock, buff, n, 0, &clientfrom, addrlen);//}}}int create_udp_socks(const char *str_addr){int sockfd;int optval = 1;int fdval = 0;struct sockaddr_in servaddr, cliaddr;sockfd = socket(AF_INET, SOCK_DGRAM|SOCK_CLOEXEC, 0); /* create a socket */setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof(optval));bzero(&servaddr, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = inet_addr(str_addr);servaddr.sin_port = htons(SERV_PORT);if(bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {perror("bind error");exit(1);}return sockfd;}void woker_process(int sock, int num, char *init_data, pid_t pid){int woker_sock = sock;client_echo(woker_sock, num);}pid_t create_worker(void *arg){struct client_socket *pcs = (struct client_socket*)arg;pid_t pid = fork();if (pid > 0) {pcs->pid = pid;return pid;} else if (pid == 0){woker_process(pcs->sock_fd, pcs->process_num, pcs->init_data, pcs->pid);// 不可能运行到此} else {exit(1);}// 不可能运行到此return pid;}struct client_socket* udp_accept(struct pollfd *pfds){int ret_fd = -1;int efds = -1;static LIST_HEAD(worker_list);int i;struct client_socket *pcs = NULL;if ((efds = poll(pfds, NUM_PROCESS, -1)) < 0) {printf("poll errorn");return NULL;}for (i = 0; i < NUM_PROCESS; i++) {// 此处的循环只会处理没有hash到既有worker的UDP套接字if(pfds[i].revents & POLLIN) {struct sockaddr_in *pclientfrom = (struct sockaddr_in *)calloc(1, sizeof(struct sockaddr_in));socklen_t addrlen = sizeof(struct sockaddr_in);char pesud_buf[MAXSIZE];int n = 0;struct list_head *pos;struct client_socket *ocs;n = recvfrom(pfds[i].fd, pesud_buf, MAXSIZE, MSG_PEEK, pclientfrom, &addrlen);printf("accept n:%dport:%dn", n, ntohs(pclientfrom->sin_port));int flag = 0;// 此处的循环为了防止为同一个“连接”(即相同的五元组)创建两个或者多个worker进程list_for_each(pos, &worker_list) {ocs = list_entry(pos, struct client_socket, list);if (!memcmp(pclientfrom, ocs->src_addr, sizeof(struct sockaddr_in))) {flag = 1;break;}}// 如果该“连接”已经被hash到一个既有的worker , 忽略它 , 该worker自会处理!if (flag) {free (pclientfrom);continue;}ret_fd = pfds[i].fd;pcs = (struct client_socket*)calloc(1, sizeof(struct client_socket));pcs->sock_fd = ret_fd;pcs->process_num = i;pcs->src_addr = pclientfrom;INIT_LIST_HEAD(&pcs->list);list_add(&pcs->list, &worker_list);break;}}return pcs;}void schedule_process(int socks[2], char *str_addr, struct pollfd *pfds){pid_t pids[NUM_PROCESS] = {0};int udps[NUM_PROCESS] = {0};int i = 0;for (i = 0; i < NUM_PROCESS; i++) {udps[i] = create_udp_socks(str_addr);pfds[i].fd = udps[i];pfds[i].events = POLLIN;}while (1) {pid_t pid;struct client_sock *pcs = udp_accept(&pfds[0]);printf("accept :%pn", pcs);if (pcs != NULL) {pid = create_worker(pcs);}}}void *wait_thread(void *arg){struct pollfd *pfds = (struct pollfd*)arg;while (1) {int stat;int i;pid_t pid = waitpid(-1, &stat, 0);// 一直以来 , 我总觉得应该wait一点什么 , 但是...}}int main(int argc, char** argv){int unix_socks[2];char *str_addr = argv[1];struct pollfd pfds[NUM_PROCESS] = {0};pthread_t tid;int ret = -1;if(socketpair(AF_UNIX, SOCK_STREAM, 0, unix_socks) == -1){exit(1);}ret = pthread_create(&tid, NULL, wait_thread, &pfds[0]);if (ret) {exit(1);}schedule_process(unix_socks, str_addr, &pfds[0]);return 0;}
推荐阅读
- 用 Python 开发一个 「聊天室」
- Linux两种处理模式reactor模式proactor模式
- 彩农茶友天下,彩农茶友天下
- 翡翠|单调的翡翠无事牌,为何会受到这么多人的喜爱?我帮你分析一下
- 右眼下眼皮痉挛
- 上眼皮过敏红肿痒
- OPPO|高通骁龙888旗舰芯下放!OPPO K10 Pro明天预售
- 浏览器到底哪个好用?浅谈各大浏览器使用心得
- Windows AD域下批量自动配置客户端有线网络认证功能
- 威武雄壮万贵妃万贞儿死后下 万贞儿万贵妃