Redis介绍
官网的Redis介绍:
Redis是基于内存的存储系统,可以用作数据库、缓存、消息中间件。Redis提供了strings, hashes, lists, sets, sorted sets、bitmaps, hyperloglogs, geo等多种对象和数据结构。
Redis内置了复制、lua脚本、LRU驱动、事务和不同级别的磁盘持久化功能,并且通过哨兵Sentinel和Redis Cluster集群保证Redis高可用。
Redis线程模型
Redis服务采用类似Reactor的方式实现,每一个网络请求对应一个文件描述符FD。I/O多路复用模块同时监听多个FD,当有accept、read、write、close等文件事件产生时,IO多路复用程序会先激活FD对应事件放在事件队列中,由文件事件处理器调用对应回调函数进行处理,最后返回结果。

- 采用I/O多路复用同时监听多个socket,根据socket当前执行的事件为socket选择对应的事件处理器;
- 当被监听的socket准备执行
、accept
、read
、write
等操作时,I/O多路复用程序会将所有产生事件的socket放入队列,并且以有序的方式向文件事件分派器传送socket;close
- 文件事件处理器根据I/O多路复用传来的scoket事件类型调用对应处理器处理事件;
- 文件事件处理器有命令请求处理器、命令回复处理器、连接应答处理器等,处理完成之后将结果返回。
所以用Redis的单线程模型主要是指网络IO事件处理和键值对读写是由一个线程完成,其他像持久化、异步删除等操作都是额外线程完成的。
Redis为什么使用单线程模型还这么快?
1、为什么采用单线程模型
- 多线程 != 快。当系统中存在多线程访问资源时,就需要类似加锁机制保证共享资源正确性,这就带来了额外开销;
- 多线程频繁的上下文切换和竞争会带来额外的资源消耗;
- Redis是基于内存操作,CPU不是主要问题,主要是在网络IO会比较耗费时间。所以使用单线程足够处理百万命令。
2、为什么单线程模型这么快
- 纯内存操作;Redis是基于内存操作的,响应时间在ns内,QPS可以达到10W+;
- 高效的数据结构:提供了哈希表、跳表等高效操作的数据结构;
- 单线程避免了多线程在资源争抢和线程上下文频繁切换的问题;
- IO多路复用机制:使其在网络IO操作中能够并发处理大量客户端请求,实现高吞吐;
Redis 6多线程模型
Redis 6提供了多线程操作,但是注意Redis仍然是单线程模型。
Redis的多线程模式主要在网络请求的接收、解析命令以及输出命令执行结果可以配置成多线程执行,Redis官方考虑到这些是主要耗时点。但是每条命令的执行依然是单线程运行的。
(图片来源:《和杠精 聊Redis多线程》)
Linux多路复用
Linux多路复用技术是指建立socket连接之后,后续监视socket产生的多个文件描述符FD,当某个文件描述符读/写就绪时,就将对应数据拷贝到用户空间中,再进行读写操作。在处理网络请求时,调用select函数的过程是阻塞的。
Linux提供了select、poll和epoll等实现方式,本质都是同步IO的方式,都需要在读写事件就绪后自己负责进行读写,这个读写过程是阻塞的。
select调用过程如下:
- 首先将需要监听的fd_set从用户空间拷贝到内核,并注册回调函数
- 遍历所有fd,调用poll方法(如socket_poll),如果有可读写mask掩码,就将这个mask掩码赋值给fd_set
- 最后把fd_set从内核空间拷贝到用户空间
【Redis 01】Redis线程模型
select与epoll区别:
- select一个进程能够打开的最大连接数有限(32位机器大小是3232),但是epoll 1G内存机器可以打开10万左右连接
- select需要把FD集合从用户态拷贝到内核态,因为select每次调用都是对所有连接进行线性遍历,当FD集合很大时,遍历速度很慢;而epoll在FD上注册回调函数,这种机制机制可以带来效率提升,即FD数目增加不会降低效率,因为只有活跃可用的FD才会调用回调函数。
(关于select、epoll区别,可以看下这篇文章:深入理解select、poll和epoll及区别)
Redis I/O多路复用源码分析
Redis IO多路复用模型(epoll)
Redis采用的是事件驱动机制,提供了evport、epoll、kqueue和select4种事件驱动模型。
下面看看epoll模型实现源码(ae_epoll.c文件中)。
1、创建epoll实例:
static int aeApiCreate(aeEventLoop *eventLoop) {
// 省略部分代码
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
// 调用
eventLoop->apidata = state;
return 0;
}
2、添加监控事件,将文件描述符和对应事件添加到对应IO多路复用中
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
// 省略部分代码
mask |= eventLoop->events[fd].mask; /* Merge old events */
// 添加AE读写事件
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
3、等待事件触发,接收到事件之后将事件保存
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
// 省略部分代码
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
// 将epoll事件转换成AE事件
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
// 将事件保存在fired数组中,后续处理会用到该数组
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
return numevents;
}
4、事件处理
主函数中通过调用aeMain() 函数进行监听:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
// 循环监听
while (!eventLoop->stop) {
// 调用IO多路复用,如果返回事件,就激活事件处理器进行处理
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
aeProcessEvents函数中调用IO多路复用API进行监听;当IO多路复用返回事件后,aeProcessEvents执行每个激活事件的回调函数进行处理。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// 省略部分代码
// 调用IO多路复用 API,获取激活事件。事件保存在eventLoop->fired[]数组中
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
int invert = fe->mask & AE_BARRIER;
// 先调用回调函数执行可读事件
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
// 再调用回调函数执行可写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
// 处理超时事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}