什么是事件循环?
Event Loop(事件循环)是一种用于处理和调度事件的机制,常用于异步编程模型中。它是一种循环结构,不断地从事件队列中取出事件并处理,直到事件队列为空。
在事件循环中,有两个主要的组件:事件队列和事件处理器。事件队列用于存储待处理的事件,而事件处理器则负责从队列中取出事件并执行相应的处理逻辑。
当有新的事件发生时,可以将其加入到事件队列中,而不需要立即处理。事件循环会在合适的时机从队列中取出事件,并按照先入先出的顺序进行处理。这种机制使得程序可以同时处理多个事件,而不需要等待某个事件的完成。
Event Loop在异步编程中起到了关键的作用。它可以处理网络请求、定时器、用户输入等各种类型的事件,并通过回调函数或者异步处理来处理这些事件。通过合理地利用Event Loop,可以实现高效的非阻塞式编程。
最简单的Eventloop:
#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
class EventLoop {
public:
typedef std::function<void()> EventCallback;
void addEvent(EventCallback callback) {
std::lock_guard<std::mutex> lock(mutex_);
events_.push(callback);
condition_.notify_one();
}
void run() {
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !events_.empty(); });
EventCallback event = events_.front();
events_.pop();
lock.unlock();
event();
}
}
private:
std::mutex mutex_;
std::condition_variable condition_;
std::queue<EventCallback> events_;
};
int main() {
EventLoop loop;
// 添加事件
loop.addEvent([]() {
std::cout << "事件1" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件2" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件3" << std::endl;
});
// 运行事件循环
std::thread loopThread([&]() {
loop.run();
});
loopThread.join();
return 0;
}
有什么局限?
这个示例中的Eventloop是单线程的,它在一个循环中依次执行事件。这意味着如果某个事件执行时间过长,会阻塞后续事件的执行。对于需要并发处理的场景,可能需要使用多线程或异步任务来处理事件。
添加多线程和异步任务的支持
#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <future>
class EventLoop {
public:
typedef std::function<void()> EventCallback;
void addEvent(EventCallback callback) {
std::lock_guard<std::mutex> lock(mutex_);
events_.push(callback);
condition_.notify_one();
}
void run() {
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !events_.empty(); });
EventCallback event = events_.front();
events_.pop();
lock.unlock();
// 异步执行事件
std::async(std::launch::async, event);
}
}
private:
std::mutex mutex_;
std::condition_variable condition_;
std::queue<EventCallback> events_;
};
int main() {
EventLoop loop;
// 添加事件
loop.addEvent([]() {
std::cout << "事件1" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件2" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件3" << std::endl;
});
// 运行事件循环
std::thread loopThread([&]() {
loop.run();
});
loopThread.join();
return 0;
}
另外一种思路:使用Eventfd
#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <sys/eventfd.h>
#include <unistd.h>
#include <atomic>
#include <stdexcept>
#include <sstream>
class EventLoop {
public:
typedef std::function<void()> EventCallback;
EventLoop() {
event_fd_ = eventfd(0, EFD_NONBLOCK);
if (event_fd_ == -1) {
throw std::runtime_error("Failed to create eventfd");
}
}
~EventLoop() {
close(event_fd_);
}
void addEvent(EventCallback callback) {
std::lock_guard<std::mutex> lock(mutex_);
events_.push_back(std::move(callback));
uint64_t value = 1;
write(event_fd_, &value, sizeof(value));
}
void run() {
while (running_) {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !events_.empty() || !running_; });
if (!running_) {
break;
}
// 读取eventfd以清空计数器
uint64_t value;
read(event_fd_, &value, sizeof(value));
EventCallback event = std::move(events_.front());
events_.pop_front();
lock.unlock();
event();
}
}
void stop() {
running_ = false;
condition_.notify_one();
}
private:
std::mutex mutex_;
std::condition_variable condition_;
std::deque<EventCallback> events_;
int event_fd_;
std::atomic<bool> running_{true};
};
int main() {
EventLoop loop;
// 添加事件
loop.addEvent([]() {
std::cout << "事件1" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件2" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件3" << std::endl;
});
// 运行事件循环
std::thread loopThread([&]() {
try {
loop.run();
} catch (const std::exception& e) {
std::ostringstream oss;
oss << "Event loop error: " << e.what() << std::endl;
std::cerr << oss.str();
}
});
// 等待事件循环结束
loopThread.join();
return 0;
}
既然使用了fd了,那么干嘛不使用epoll?
epoll是Linux操作系统提供的一种I/O事件通知机制,可以用于高效地处理大量的并发连接。而epoll采用了一种基于事件驱动的方式,只有当文件描述符上有I/O事件发生时才会通知应用程序。这样就避免了无效的遍历,提高了效率。 使用epoll进行优化时,通常的步骤如下:
- 创建一个epoll实例,通过调用epoll_create函数。
- 将需要监听的文件描述符添加到epoll实例中,通过调用epoll_ctl函数。
- 不断地调用epoll_wait函数等待事件发生,一旦有事件发生就进行处理。
- epoll的优势在于能够处理大量并发连接,因为它采用了事件驱动的方式,只有当有事件发生时才会通知应用程序。这样就避免了不必要的遍历,提高了效率。
#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
class EventLoop {
public:
EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
epollFd_ = epoll_create1(0);
epoll_event event;
event.events = EPOLLIN;
event.data.fd = eventFd_;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
}
~EventLoop() {
close(eventFd_);
close(epollFd_);
}
void addEvent(std::function<void()> event) {
events_.push(event);
uint64_t value = 1;
write(eventFd_, &value, sizeof(value));
}
void run() {
while (true) {
epoll_event events[10];
int numEvents = epoll_wait(epollFd_, events, 10, -1);
for (int i = 0; i < numEvents; i++) {
if (events[i].data.fd == eventFd_) {
uint64_t value;
read(eventFd_, &value, sizeof(value));
handleEvents();
}
}
}
}
private:
void handleEvents() {
while (!events_.empty()) {
auto event = events_.front();
events_.pop();
event();
}
}
int eventFd_;
int epollFd_;
std::queue<std::function<void()>> events_;
};
int main() {
EventLoop eventLoop;
eventLoop.addEvent([]() {
std::cout << "Event 1" << std::endl;
});
eventLoop.addEvent([]() {
std::cout << "Event 2" << std::endl;
});
eventLoop.run();
return 0;
}
在上述代码中,使用eventfd创建了一个eventFd,用于事件通知。在EventLoop的构造函数中,创建了一个epollFd,并将eventFd添加到epoll中进行监听。当有事件到达时,epoll_wait会返回,并通过读取eventFd的值来处理事件。
在addEvent函数中,不仅将事件添加到队列中,还通过write函数向eventFd_写入一个值,以触发epoll_wait返回。这样可以确保在有事件到达时立即处理,而不需要等待epoll_wait超时。
在run函数中,使用epoll_wait等待事件到达,并通过handleEvents函数处理队列中的事件。
添加对其他fd的事件监听
#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
class EventLoop {
public:
EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
epollFd_ = epoll_create1(0);
epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = eventFd_;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
}
~EventLoop() {
close(eventFd_);
close(epollFd_);
}
void addEvent(std::function<void()> event) {
events_.push(event);
uint64_t value = 1;
write(eventFd_, &value, sizeof(value));
}
void addFdEvent(int fd, std::function<void()> event) {
epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = fd;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
fdEvents_[fd] = event;
}
void run() {
while (true) {
epoll_event events[10];
int numEvents = epoll_wait(epollFd_, events, 10, -1);
for (int i = 0; i < numEvents; i++) {
if (events[i].data.fd == eventFd_) {
handleEvents();
} else {
handleFdEvent(events[i].data.fd);
}
}
}
}
private:
void handleEvents() {
while (!events_.empty()) {
auto event = events_.front();
events_.pop();
event();
}
}
void handleFdEvent(int fd) {
if (fdEvents_.count(fd) > 0) {
auto event = fdEvents_[fd];
event();
}
}
int eventFd_;
int epollFd_;
std::queue<std::function<void()>> events_;
std::unordered_map<int, epoll_event> fdEvents_;
};
int main() {
EventLoop eventLoop;
eventLoop.addEvent([]() {
std::cout << "Event 1" << std::endl;
});
eventLoop.addEvent([]() {
std::cout << "Event 2" << std::endl;
});
int fd = open("test.txt", O_RDONLY);
eventLoop.addFdEvent(fd, []() {
std::cout << "File Descriptor Event" << std::endl;
});
eventLoop.run();
return 0;
}
在上述代码中,添加了一个addFdEvent函数,用于添加对文件描述符的事件监听。在该函数中,通过epoll_ctl将文件描述符添加到epoll中进行监听,并将对应的事件处理函数保存到fdEvents_中。
在run函数中,对每个事件进行处理时,先判断事件是来自eventFd还是其他文件描述符。如果是eventFd的事件,则调用handleEvents函数处理事件队列中的事件。如果是其他文件描述符的事件,则调用handleFdEvent函数处理对应的事件。
在main函数中,通过open函数打开了一个文件,并将其文件描述符添加到EventLoop中进行监听。当该文件可读时,会触发对应的事件处理函数。
继续添加定时器事件
#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <chrono>
#include <thread>
class EventLoop {
public:
EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
epollFd_ = epoll_create1(0);
epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = eventFd_;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
}
~EventLoop() {
close(eventFd_);
close(epollFd_);
}
void addEvent(std::function<void()> event) {
events_.push(event);
uint64_t value = 1;
write(eventFd_, &value, sizeof(value));
}
void addFdEvent(int fd, std::function<void()> event) {
epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = fd;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
fdEvents_[fd] = event;
}
void addTimerEvent(int milliseconds, std::function<void()> event) {
auto startTime = std::chrono::steady_clock::now();
auto endTime = startTime + std::chrono::milliseconds(milliseconds);
timerEvents_.push({endTime, event});
}
void run() {
while (true) {
epoll_event events[10];
int numEvents = epoll_wait(epollFd_, events, 10, -1);
for (int i = 0; i < numEvents; i++) {
if (events[i].data.fd == eventFd_) {
handleEvents();
} else {
handleFdEvent(events[i].data.fd);
}
}
handleTimerEvents();
}
}
private:
void handleEvents() {
while (!events_.empty()) {
auto event = events_.front();
events_.pop();
event();
}
}
void handleFdEvent(int fd) {
if (fdEvents_.count(fd) > 0) {
auto event = fdEvents_[fd];
event();
}
}
void handleTimerEvents() {
auto now = std::chrono::steady_clock::now();
while (!timerEvents_.empty() && timerEvents_.top().first <= now) {
auto event = timerEvents_.top().second;
timerEvents_.pop();
event();
}
}
int eventFd_;
int epollFd_;
std::queue<std::function<void()>> events_;
std::unordered_map<int, epoll_event> fdEvents_;
std::priority_queue<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>,
std::vector<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>>,
std::greater<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>>> timerEvents_;
};
int main() {
EventLoop eventLoop;
eventLoop.addEvent([]() {
std::cout << "Event 1" << std::endl;
});
eventLoop.addEvent([]() {
std::cout << "Event 2" << std::endl;
});
int fd = open("test.txt", O_RDONLY);
eventLoop.addFdEvent(fd, []() {
std::cout << "File Descriptor Event" << std::endl;
});
eventLoop.addTimerEvent(1000, []() {
std::cout << "Timer Event" << std::endl;
});
eventLoop.run();
return 0;
}
在上述代码中,添加了一个addTimerEvent函数,用于添加定时器事件。在该函数中,将事件的触发时间和事件处理函数保存到timerEvents_中。
在run函数中,添加了一个新的handleTimerEvents函数,用于处理定时器事件。在该函数中,获取当前时间now,并循环处理timerEvents_中的事件,直到找到第一个触发时间大于now的事件为止。
在main函数中,通过调用addTimerEvent函数添加了一个定时器事件。该定时器事件将在1000毫秒后触发,并执行对应的事件处理函数。
Linux一切皆文件:使用timerfd 简化定时器事件
使用timerfd来改写定时器事件可以更简洁和高效。timerfd是一个文件描述符,可以用来创建定时器。当定时器到期时,文件描述符上就会产生可读事件,可以通过epoll来监听该事件,并在事件发生时执行相应的处理函数。
#include <iostream>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>
class EventLoop {
public:
EventLoop() : epollFd_(epoll_create1(0)) {}
~EventLoop() {
close(epollFd_);
}
void addEvent(std::function<void()> event) {
events_.push(event);
uint64_t value = 1;
write(eventFd_, &value, sizeof(value));
}
void addFdEvent(int fd, std::function<void()> event) {
epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = fd;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
fdEvents_[fd] = event;
}
void addTimerEvent(int milliseconds, std::function<void()> event) {
int timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
struct itimerspec timerSpec;
memset(&timerSpec, 0, sizeof(timerSpec));
timerSpec.it_value.tv_sec = milliseconds / 1000;
timerSpec.it_value.tv_nsec = (milliseconds % 1000) * 1000000;
timerfd_settime(timerFd, 0, &timerSpec, nullptr);
addFdEvent(timerFd, [this, timerFd, event]() {
uint64_t expirations;
read(timerFd, &expirations, sizeof(expirations));
event();
close(timerFd);
});
}
void run() {
while (true) {
epoll_event events[10];
int numEvents = epoll_wait(epollFd_, events, 10, -1);
for (int i = 0; i < numEvents; i++) {
if (events[i].data.fd == eventFd_) {
handleEvents();
} else {
handleFdEvent(events[i].data.fd);
}
}
}
}
private:
void handleEvents() {
while (!events_.empty()) {
auto event = events_.front();
events_.pop();
event();
}
}
void handleFdEvent(int fd) {
if (fdEvents_.count(fd) > 0) {
auto event = fdEvents_[fd];
event();
}
}
int epollFd_;
int eventFd_;
std::queue<std::function<void()>> events_;
std::unordered_map<int, epoll_event> fdEvents_;
};
int main() {
EventLoop eventLoop;
eventLoop.addEvent([]() {
std::cout << "Event 1" << std::endl;
});
eventLoop.addEvent([]() {
std::cout << "Event 2" << std::endl;
});
eventLoop.addTimerEvent(1000, []() {
std::cout << "Timer Event" << std::endl;
});
eventLoop.run();
return 0;
}
使用signalfd事件,并将所有事件统一为fd事件
信号( signal )本质是 Linux 进程间通信的一种机制,也叫软中断信号。既然是通信机制,那么就是传递信息用的,信号传递的信息很简单,就是一个整数,一般用于配合系统管理任务,比如进程的终结、恢复、热加载等。
信号都用整数常量表示,命名以 SIG 未前缀,比如 SIGINT( ctrl-c 触发),SIGKILL( kill -9 触发 )。
#include <iostream>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>
#include <csignal>
#include <functional>
#include <queue>
#include <unordered_map>
#include <sys/signalfd.h>
class EventLoop {
public:
EventLoop() : epollFd_(epoll_create1(0)) {
// 创建信号集
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
// 将信号集添加到信号文件描述符中
signalFd_ = signalfd(-1, &sigset, 0);
// 将信号文件描述符添加到epoll中监听
addFdEvent(signalFd_, [this]() {
handleSignal();
});
}
~EventLoop() {
close(epollFd_);
close(signalFd_);
}
void addEvent(std::function<void()> event) {
events_.push(event);
}
void addFdEvent(int fd, std::function<void()> event) {
epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = fd;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev);
fdEvents_[fd] = event;
}
void addTimerEvent(int milliseconds, std::function<void()> event) {
int timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
struct itimerspec timerSpec;
memset(&timerSpec, 0, sizeof(timerSpec));
timerSpec.it_value.tv_sec = milliseconds / 1000;
timerSpec.it_value.tv_nsec = (milliseconds % 1000) * 1000000;
timerfd_settime(timerFd, 0, &timerSpec, nullptr);
addFdEvent(timerFd, [this, timerFd, event]() {
uint64_t expirations;
read(timerFd, &expirations, sizeof(expirations));
event();
close(timerFd);
});
}
void run() {
while (true) {
epoll_event events[10];
int numEvents = epoll_wait(epollFd_, events, 10, -1);
for (int i = 0; i < numEvents; i++) {
handleFdEvent(events[i].data.fd);
}
}
}
private:
void handleEvents() {
while (!events_.empty()) {
auto event = events_.front();
events_.pop();
event();
}
}
void handleFdEvent(int fd) {
if (fdEvents_.count(fd) > 0) {
auto event = fdEvents_[fd];
event();
}
}
void handleSignal() {
struct signalfd_siginfo siginfo;
read(signalFd_, &siginfo, sizeof(siginfo));
std::cout << "Received signal: " << siginfo.ssi_signo << std::endl;
// 执行自定义操作
// ...
}
private:
int epollFd_;
int signalFd_;
std::queue<std::function<void()>> events_;
std::unordered_map<int, std::function<void()>> fdEvents_;
};
int main() {
EventLoop eventLoop;
// 添加其他事件和定时器事件
// ...
eventLoop.run();
return 0;
}
上面代码中,使用signalfd来创建一个信号文件描述符,并将其添加到EventLoop类的epoll中进行监听。在handleSignal()函数中,使用read()函数从信号文件描述符中读取信号信息,并处理接收到的信号。
可以根据需要在handleSignal()函数中执行自定义操作,例如记录日志、执行清理操作等。同时,可以根据需要在EventLoop类中添加其他信号处理函数,只需在构造函数中使用signalfd()函数创建相应的信号文件描述符,并将其添加到epoll中监听即可。
上述代码删除了 eventFd_ ,因为 handleFdEvent 函数已经可以处理所有的文件描述符事件,包括信号文件描述符的事件。