前言
- 同步I/O模型通常用于实现Reactor模式
- 异步I/O模型则用于实现Proactor模式
- 最后我们会使用同步I/O方式模拟出Proactor模式
- Reactor 释义“反应堆”,是一种事件驱动机制
- Reactor的回调函数:和普通函数调用的不同之处在于,应用程序不是主动的调用某个 API 完成处理,而是恰恰 相反,Reactor 逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上,如果相应的时间发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函数”
文章插图
- Reactor 模式是处理并发I/O比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的I/O 事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上; 一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中 。
- Reactor 模型有三个重要的组件:多路复用器:由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用 。事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中事件处理器:负责处理特定事件的处理函数
- 具体流程如下:注册读就绪事件和相应的事件处理器事件分离器等待事件事件到来,激活分离器,分离器调用事件对应的处理器事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制 权
文章插图
文章插图
【Linux两种处理模式reactor模式proactor模式】
需要C/C++ Linux服务器架构师学习资料后台私信“1”免费获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
多线程Reactor模式多线程Reactor模式特点:它要求主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话就立即将时间通知工作线程(逻辑单元) 。除此之外,主线程不做任何其他实质性的工作读写数据,接受新的连接,以及处理客户请求均在工作线程中完成工作流程:①主线程往epoll内核事件表中注册socket上有数据可读②主线程调用epoll_wait等待socket上有数据可读③当socket上有数据可读时,epoll_wait通知主线程 。主线程则将socket可读事件放入请求队列④睡眠在请求请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪时间⑤主线程调用epoll_wait等到socket可写⑥当socket可写时,epoll_wait通知主线程 。主线程将socket可写事件放入请求队列⑦睡眠在请求队列上的某个工作线程被唤醒,它向socket上写入服务器处理客户请求的结果
单线程Reactor模式单线程Reactor模式与多线程Reactor模式原理相同 。但是工作都是在同一个线程中完成的单线程优缺点:优点:Reactor模型开发效率上比起直接使用IO复用要高,它通常是单线程的,设计目标是希望单线程使用一颗 CPU 的全部资源 。优点为每个事件处理中很多时候可以 不考虑共享资源的互斥访问缺点:可是缺点也是明显的,现在的硬件发展,已经不再遵循摩尔定 律,CPU 的频率受制于材料的限制不再有大的提升,而改为是从核数的增加上提升能力单线程Reactor使用多核:如果程序业务很简单,例如只是简单的访问一些提供了并发访问的服务,就可以直接开启多个反应堆(Reactor),每个反应堆对应一颗CPU核心这些反应堆上跑的请求互不相关,这是完全可以利用多核的 。例如Nginx这样的http静态服务器下面是单线程Reactor模式的实现代码,下载下来之后可以直接编译运行:
// reactor.c// 源码链接: https://github.com/dongyusheng/csdn-code/blob/master/server-client/reactor.c// gcc -o reactor reactor.c#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <string.h>#include <arpa/inet.h>#include <netinet/in.h>#include <sys/socket.h>#include <sys/types.h>#include <sys/epoll.h>#include <errno.h>#include <time.h>#include <libgen.h>#include <fcntl.h> #define MAX_EPOLL_EVENTS1024#define MAX_BUFFER_SIZE4096 typedef int NCALLBACK(int, int, void*); // 事件结构体, 每个套接字都会被封装为一个事件struct ntyevent { int fd;// 事件对应的fd int events;// 事件类型(本代码中我们只处理EPOLL_IN和EPOLL_OUT)void *arg;// 事件回调函数的参数3, 实际传入的是一个struct ntyreactor结构体指针 int (*callback)(int fd, int events, void *arg); //事件回调函数int status;// 当前事件是否位于epoll集合中: 1表示在, 0表示不在char buffer[MAX_BUFFER_SIZE]; // 读写缓冲区 int length;//缓冲区数据的长度long last_active; // 最后一次活跃的时间};// Reactor主体struct ntyreactor { int epoll_fd;// epoll套接字 struct ntyevent *events; // reactor当前处理的事件集}; // 创建一个Tcp Serverint init_server(char *ip, short port);// 向reactor中添加一个服务器监听事件int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback);/***下面这3个函数是用来对reactor操作的***/// 初始化reactorstruct ntyreactor *ntyreactor_init();// 销毁reactorint ntyreactor_destroy(struct ntyreactor *reactor);// reactor运行函数int ntyreactor_run(struct ntyreactor *reactor);/***下面这3个函数是用来对ntyevent事件结构操作的***/// 将一个fd封装到事件结构中int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg);// 将一个事件添加/更新到epoll的事件表中int nty_event_add(int epoll_fd, struct ntyevent* ev);// 将一个事件移出epoll事件表int nty_event_del(int epoll_fd, struct ntyevent* event);/***下面这3个函数是ntyevent事件可以使用的回调函数***/int accept_callback(int fd, int events, void *arg);int recv_callback(int fd, int events, void *arg);int send_callback(int fd, int events, void *arg);int main(int argc, char *argv[]){ if(argc != 3){ printf("usage: ./%s [ip] [port]n", basename(argv[0])); exit(EXIT_FAILURE);}char *ip = argv[1]; short port = atoi(argv[2]);int sock_fd;// 1.初始化一个Tcp Serversock_fd = init_server(ip, port);// 2.初始化reactor struct ntyreactor *reactor = ntyreactor_init(); if( reactor == NULL){ printf("Error in %s(), ntyreactor_init: create reactor errorn", __func__); exit(EXIT_FAILURE);}// 3.将Tcp Server添加到reactor事件集中ntyreactor_addlistener(reactor, sock_fd, accept_callback);// 4.运行reactorntyreactor_run(reactor);// 5.销毁ntyreactor_destroy(reactor);close(sock_fd);return 0;} int init_server(char *ip, short port){ // 1.创建套接字 int sock_fd = socket(AF_INET, SOCK_STREAM, 0); if(sock_fd == -1){ printf("Error in %s(), socket: %sn", __func__, strerror(errno)); return -1;}// 2.初始化服务器地址 struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET; if(inet_pton(AF_INET, ip, (void*)&server_addr.sin_addr.s_addr) == -1){ printf("Error in %s(), inet_pton: %sn", __func__, strerror(errno)); return -1;}server_addr.sin_port = htons(port);// 3.绑定服务器地址 if(bind(sock_fd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) == -1){ printf("Error in %s(), bind: %sn", __func__, strerror(errno)); return -1;}// 3.监听 if(listen(sock_fd, 20) == -1){ printf("Error in %s(), listen: %sn", __func__, strerror(errno)); return -1;}printf("Listen start [%s:%d]...n", inet_ntoa(server_addr.sin_addr), ntohs(server_addr.sin_port));return sock_fd;} struct ntyreactor *ntyreactor_init(){ // 1.创建一个reactor struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor)); if(reactor == NULL) return NULL; memset(reactor, 0, sizeof(struct ntyreactor));// 2.创建reacotr的epoll_fdreactor->epoll_fd = epoll_create(1); if(reactor->epoll_fd == -1){ printf("Error in %s(), epoll_create: %sn", __func__, strerror(errno)); free(reactor); return NULL;}// 3.创建reactor的事件集reactor->events = (struct ntyevent*)malloc(sizeof(struct ntyevent) * MAX_EPOLL_EVENTS); if(reactor->events == NULL){ printf("Error in %s(), malloc: %sn", __func__, strerror(errno));close(reactor->epoll_fd); free(reactor); return NULL;}return reactor;} int ntyreactor_destroy(struct ntyreactor *reactor){ if(reactor == NULL){ printf("Error in %s(): %sn", __func__, "reactor arg is NULL"); return -1;}// 关闭epoll_fd、销毁事件集、释放结构close(reactor->epoll_fd); free(reactor->events);free(reactor);return 0;} int ntyreactor_run(struct ntyreactor *reactor){ // 1.判断参数 if(reactor == NULL || reactor->epoll_fd < 0 || reactor->events == NULL){ printf("Error in %s(): %sn", __func__, "reactor arg is error"); return -1;}struct epoll_event ep_events[MAX_EPOLL_EVENTS + 1];// 2.进行epoll_wait() int nready; while(1){ // 超时检测 /*int checkpos = 0, i;long now = time(NULL);for (i = 0; i < MAX_EPOLL_EVENTS; i++, checkpos ++) {if (checkpos == MAX_EPOLL_EVENTS) {checkpos = 0;}// 如果当前索引处的事件status为0, 则不检测, 进行下一个if (reactor->events[checkpos].status != 1) {continue;}// 如果超过60秒, 那么就认定为超时, 超时后关闭移除long duration = now - reactor->events[checkpos].last_active;if (duration >= 60) {close(reactor->events[checkpos].fd);printf("[fd=%d] timeoutn", reactor->events[checkpos].fd);nty_event_del(reactor->epfd, &reactor->events[checkpos]);}}*/nready = epoll_wait(reactor->epoll_fd, ep_events, MAX_EPOLL_EVENTS, 1000); // 3.函数出错 if(nready == -1){ // 如果函数在阻塞过程中接收到信号, 那么继续进行epoll_wait() if(errno == EAGAIN || errno == EWOULDBLOCK) continue; printf("Error in %s(), epoll_wait: %sn", __func__, strerror(errno)); return -1;} // 4.函数超时 else if(nready == 0) continue; // 5.有事件准备好 else{ // 遍历处理已就绪的事件 int i; for(i = 0; i < nready; ++i){ // 获取事件结构体, 保存在struct epoll_event结构的data.ptr中 struct ntyevent* ev = (struct ntyevent*)ep_events[i].data.ptr;// 如果事件可读 if((ep_events[i].events & EPOLLIN) && (ev->events & EPOLLIN))ev->callback(ev->fd, ev->events, ev->arg);// 如果事件可写 if((ep_events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))ev->callback(ev->fd, ev->events, ev->arg);}}}return 0;} int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback){ if(reactor == NULL || fd <0 || callback == NULL){ printf("Error in %s(): %sn", __func__, "arg error"); return -1;}// 初始化ntyevent事件结构, 然后添加到reactor的epoll事件表中即可nty_event_set(&reactor->events[fd], fd, EPOLLIN, 0, 0, callback, reactor);nty_event_add(reactor->epoll_fd, &reactor->events[fd]);return 0;} int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg){ if(ev == NULL || fd <0 || event <0 || length < 0 || callback == NULL || arg == NULL || status < 0){ printf("Error in %s(): %sn", __func__, "arg error"); return -1;}// 初始化ntyevent结构的相关内容即可ev->fd = fd;ev->events = event;ev->arg = arg;ev->callback = callback;ev->status = status;ev->length = length;ev->last_active = time(NULL);return 0;} int nty_event_add(int epoll_fd, struct ntyevent* ev){ if(epoll_fd <0 || ev == NULL){ printf("Error in %s(): %sn", __func__, "arg error"); return -1;}// 1.创建一个epoll事件结构 struct epoll_event ep_event; memset(&ep_event, 0, sizeof(ep_event));ep_event.events = ev->events;ep_event.data.ptr = ev; //ep_event.data.fd = ev->fd; data成员是一个联合体, 不能同时使用fd和ptr成员// 2.如果当前ev已经在epoll事件表中, 那么就修改; 否则就把ev加入到epoll事件表中 int op; if(ev->status == 0){op = EPOLL_CTL_ADD;ev->status = 1;}elseop = EPOLL_CTL_MOD;// 3.添加/更新if(epoll_ctl(epoll_fd, op, ev->fd, &ep_event) == -1){ printf("Error in %s(), epoll_ctl: %sn", __func__, strerror(errno)); return -1;}return 0;} int nty_event_del(int epoll_fd, struct ntyevent* ev){ if(epoll_fd < 0 || ev == NULL || ev->status != 1){ printf("Error in %s(): %sn", __func__, "ev arg is error"); return -1;}// 初始要删除的epoll事件结构 struct epoll_event ep_event; memset(&ep_event, 0, sizeof(ep_event));ep_event.data.ptr = ev; //ep_event.data.fd = ev->fd; data成员是一个枚举, 不能同时使用ptr和fd成员ev->status = 0;// 从epoll事件表中删除epoll事件 if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev->fd, &ep_event) == -1){ printf("Error in %s(), epoll_ctl: %sn", __func__, strerror(errno)); return -1;}return 0;} int accept_callback(int fd, int events, void *arg){ // 1.获得reactor结构 struct ntyreactor *reactor = (struct ntyreactor*)arg; // 2.获取该fd对应的事件结构 struct ntyevent *ev = reactor->events + fd;// 3.初始化客户端地址结构 struct sockaddr_in cli_addr; memset(&cli_addr, 0 , sizeof(cli_addr)); socklen_t len = sizeof(cli_addr);// 4.接收客户端 int cli_fd;cli_fd = accept(ev->fd, (struct sockaddr*)&cli_addr, &len); if(cli_fd == -1){ printf("Error in %s(), accept: %sn", __func__, strerror(errno)); return -1;}int i; do { // 5.在reactor事件表中找到第一个空位置, 用i表示新事件存放的位置, 也是其套接字的值 // reactor->events的0、1、2、3、4都被占用了, 客户端第一个可以使用的套接字为5, 因此此处从5开始遍历 for(i = 5; i< MAX_EPOLL_EVENTS; ++i){ if(reactor->events[i].status == 0) break;}// 6.如果满了, 就退出 if(i == MAX_EPOLL_EVENTS){ printf("Error in %s(): max connect limit[%d]n", __func__, MAX_EPOLL_EVENTS); return -1;}// 7.将套接字设置为非阻塞 int flag = 0; if ((flag = fcntl(cli_fd, F_SETFL, O_NONBLOCK)) < 0) { printf("Error in %s(), fcntl: %sn", __func__, strerror(errno)); return -1;}// 8.将新事件添加到reactor事件表中 // 此处我们将新客户端的回调函数首先设置为recv_callback, 事件类型为EPOLLIN, 因为一般都是客户端向服务器发送数据的nty_event_set(&reactor->events[cli_fd], cli_fd, EPOLLIN, 0, 0, recv_callback, reactor);nty_event_add(reactor->epoll_fd, &reactor->events[cli_fd]);} while(0);printf("New connect: [%s:%d], [time:%ld], pos[%d]n",inet_ntoa(cli_addr.sin_addr), ntohs(cli_addr.sin_port), reactor->events[cli_fd].last_active, i);return 0;} int recv_callback(int fd, int events, void *arg){ // 1.获得reactor结构 struct ntyreactor *reactor =(struct ntyreactor*)arg; // 2.获取该fd对应的事件结构 struct ntyevent *ev = reactor->events + fd;// 3.先将事件从epoll事件集移除nty_event_del(reactor->epoll_fd, ev);// 3.接收数据 int rc = recv(ev->fd, ev->buffer, MAX_BUFFER_SIZE, 0); if(rc < 0)//recv出错{ //if(errno == EAGAIN || errno == EWOULDBLOCK) //return rc;printf("Error in %s(), recv: %sn", __func__, strerror(errno));// 此处我们不再需要将该nty_event从epoll事件集移除, 因为上面我们已经移除了close(ev->fd);} else if(rc == 0)//对方关闭了{ printf("Client closed the connection, fd = %dn", ev->fd);// 此处我们也当做错误处理 // 此处我们不再需要将该nty_event从epoll事件集移除, 因为上面我们已经移除了close(ev->fd);}else //接收到数据{ev->buffer[rc] = ''; printf("Recv[fd = %d]: %sn", ev->fd, ev->buffer);// 将事件变为可读, 然后加入到epoll事件表中nty_event_set(ev, ev->fd, EPOLLOUT, rc, 0, send_callback, reactor);nty_event_add(reactor->epoll_fd, ev);}return rc;} int send_callback(int fd, int events, void *arg){ // 1.获得reactor结构 struct ntyreactor *reactor =(struct ntyreactor*)arg; // 2.获取该fd对应的事件结构 struct ntyevent *ev = reactor->events + fd;// 3.此处我们把接收的内容再回送给对象, 因此使用的是ev->buffer int rc = send(ev->fd, ev->buffer, ev->length, 0); if(rc > 0) //send成功{ printf("Send[fd = %d]: %sn", ev->fd, ev->buffer);// 移除、添加: 将其变为可读nty_event_del(reactor->epoll_fd, ev);nty_event_set(ev, ev->fd, EPOLLIN, 0, 0, recv_callback, reactor);nty_event_add(reactor->epoll_fd, ev);} else //send失败{ printf("Error in %s(), send: %sn", __func__, strerror(errno));// 关闭、移除close(ev->fd);nty_event_del(reactor->epoll_fd, ev);}return rc;}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 如何在 ASP.NET Core 中处理 404 错误
- 桌面系统linux Mint 20安装全过程,强烈推荐使用
- 两种普洱养生法,两种祁门红茶泡茶的方法
- 记一次阿里云服务器被中病毒处理过程
- 如何在 Linux 终端上漂亮地打印 JSON 文件
- 使用Python+Fabric实现Linux自动化操作
- 几种常用Linux系统的软件镜像源配置
- 面试不懂 Linux 内存管理?我用 20 张图给你讲明白
- 冰箱结冰如何处理调到几档最好 冰箱保鲜室结冰怎么调档位
- 翡翠|购买翡翠原石被忽悠了,应该怎么处理?