什麼是事件循環?
Event Loop(事件循環)是一種用于處理和排程事件的機制,常用于異步程式設計模型中。它是一種循環結構,不斷地從事件隊列中取出事件并處理,直到事件隊列為空。
在事件循環中,有兩個主要的元件:事件隊列和事件處理器。事件隊列用于存儲待處理的事件,而事件處理器則負責從隊列中取出事件并執行相應的處理邏輯。
當有新的事件發生時,可以将其加入到事件隊列中,而不需要立即處理。事件循環會在合适的時機從隊列中取出事件,并按照先入先出的順序進行處理。這種機制使得程式可以同時處理多個事件,而不需要等待某個事件的完成。
Event Loop在異步程式設計中起到了關鍵的作用。它可以處理網絡請求、定時器、使用者輸入等各種類型的事件,并通過回調函數或者異步處理來處理這些事件。通過合理地利用Event Loop,可以實作高效的非阻塞式程式設計。
最簡單的Eventloop:
#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
class EventLoop {
public:
typedef std::function<void()> EventCallback;
void addEvent(EventCallback callback) {
std::lock_guard<std::mutex> lock(mutex_);
events_.push(callback);
condition_.notify_one();
}
void run() {
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !events_.empty(); });
EventCallback event = events_.front();
events_.pop();
lock.unlock();
event();
}
}
private:
std::mutex mutex_;
std::condition_variable condition_;
std::queue<EventCallback> events_;
};
int main() {
EventLoop loop;
// 添加事件
loop.addEvent([]() {
std::cout << "事件1" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件2" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件3" << std::endl;
});
// 運作事件循環
std::thread loopThread([&]() {
loop.run();
});
loopThread.join();
return 0;
}
有什麼局限?
這個示例中的Eventloop是單線程的,它在一個循環中依次執行事件。這意味着如果某個事件執行時間過長,會阻塞後續事件的執行。對于需要并發處理的場景,可能需要使用多線程或異步任務來處理事件。
添加多線程和異步任務的支援
#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <future>
class EventLoop {
public:
typedef std::function<void()> EventCallback;
void addEvent(EventCallback callback) {
std::lock_guard<std::mutex> lock(mutex_);
events_.push(callback);
condition_.notify_one();
}
void run() {
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !events_.empty(); });
EventCallback event = events_.front();
events_.pop();
lock.unlock();
// 異步執行事件
std::async(std::launch::async, event);
}
}
private:
std::mutex mutex_;
std::condition_variable condition_;
std::queue<EventCallback> events_;
};
int main() {
EventLoop loop;
// 添加事件
loop.addEvent([]() {
std::cout << "事件1" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件2" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件3" << std::endl;
});
// 運作事件循環
std::thread loopThread([&]() {
loop.run();
});
loopThread.join();
return 0;
}
另外一種思路:使用Eventfd
#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <sys/eventfd.h>
#include <unistd.h>
#include <atomic>
#include <stdexcept>
#include <sstream>
class EventLoop {
public:
typedef std::function<void()> EventCallback;
EventLoop() {
event_fd_ = eventfd(0, EFD_NONBLOCK);
if (event_fd_ == -1) {
throw std::runtime_error("Failed to create eventfd");
}
}
~EventLoop() {
close(event_fd_);
}
void addEvent(EventCallback callback) {
std::lock_guard<std::mutex> lock(mutex_);
events_.push_back(std::move(callback));
uint64_t value = 1;
write(event_fd_, &value, sizeof(value));
}
void run() {
while (running_) {
std::unique_lock<std::mutex> lock(mutex_);
condition_.wait(lock, [this] { return !events_.empty() || !running_; });
if (!running_) {
break;
}
// 讀取eventfd以清空計數器
uint64_t value;
read(event_fd_, &value, sizeof(value));
EventCallback event = std::move(events_.front());
events_.pop_front();
lock.unlock();
event();
}
}
void stop() {
running_ = false;
condition_.notify_one();
}
private:
std::mutex mutex_;
std::condition_variable condition_;
std::deque<EventCallback> events_;
int event_fd_;
std::atomic<bool> running_{true};
};
int main() {
EventLoop loop;
// 添加事件
loop.addEvent([]() {
std::cout << "事件1" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件2" << std::endl;
});
loop.addEvent([]() {
std::cout << "事件3" << std::endl;
});
// 運作事件循環
std::thread loopThread([&]() {
try {
loop.run();
} catch (const std::exception& e) {
std::ostringstream oss;
oss << "Event loop error: " << e.what() << std::endl;
std::cerr << oss.str();
}
});
// 等待事件循環結束
loopThread.join();
return 0;
}
既然使用了fd了,那麼幹嘛不使用epoll?
epoll是Linux作業系統提供的一種I/O事件通知機制,可以用于高效地處理大量的并發連接配接。而epoll采用了一種基于事件驅動的方式,隻有當檔案描述符上有I/O事件發生時才會通知應用程式。這樣就避免了無效的周遊,提高了效率。 使用epoll進行優化時,通常的步驟如下:
- 建立一個epoll執行個體,通過調用epoll_create函數。
- 将需要監聽的檔案描述符添加到epoll執行個體中,通過調用epoll_ctl函數。
- 不斷地調用epoll_wait函數等待事件發生,一旦有事件發生就進行處理。
- epoll的優勢在于能夠處理大量并發連接配接,因為它采用了事件驅動的方式,隻有當有事件發生時才會通知應用程式。這樣就避免了不必要的周遊,提高了效率。
#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
class EventLoop {
public:
EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
epollFd_ = epoll_create1(0);
epoll_event event;
event.events = EPOLLIN;
event.data.fd = eventFd_;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
}
~EventLoop() {
close(eventFd_);
close(epollFd_);
}
void addEvent(std::function<void()> event) {
events_.push(event);
uint64_t value = 1;
write(eventFd_, &value, sizeof(value));
}
void run() {
while (true) {
epoll_event events[10];
int numEvents = epoll_wait(epollFd_, events, 10, -1);
for (int i = 0; i < numEvents; i++) {
if (events[i].data.fd == eventFd_) {
uint64_t value;
read(eventFd_, &value, sizeof(value));
handleEvents();
}
}
}
}
private:
void handleEvents() {
while (!events_.empty()) {
auto event = events_.front();
events_.pop();
event();
}
}
int eventFd_;
int epollFd_;
std::queue<std::function<void()>> events_;
};
int main() {
EventLoop eventLoop;
eventLoop.addEvent([]() {
std::cout << "Event 1" << std::endl;
});
eventLoop.addEvent([]() {
std::cout << "Event 2" << std::endl;
});
eventLoop.run();
return 0;
}
在上述代碼中,使用eventfd建立了一個eventFd,用于事件通知。在EventLoop的構造函數中,建立了一個epollFd,并将eventFd添加到epoll中進行監聽。當有事件到達時,epoll_wait會傳回,并通過讀取eventFd的值來處理事件。
在addEvent函數中,不僅将事件添加到隊列中,還通過write函數向eventFd_寫入一個值,以觸發epoll_wait傳回。這樣可以確定在有事件到達時立即處理,而不需要等待epoll_wait逾時。
在run函數中,使用epoll_wait等待事件到達,并通過handleEvents函數處理隊列中的事件。
添加對其他fd的事件監聽
#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
class EventLoop {
public:
EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
epollFd_ = epoll_create1(0);
epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = eventFd_;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
}
~EventLoop() {
close(eventFd_);
close(epollFd_);
}
void addEvent(std::function<void()> event) {
events_.push(event);
uint64_t value = 1;
write(eventFd_, &value, sizeof(value));
}
void addFdEvent(int fd, std::function<void()> event) {
epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = fd;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
fdEvents_[fd] = event;
}
void run() {
while (true) {
epoll_event events[10];
int numEvents = epoll_wait(epollFd_, events, 10, -1);
for (int i = 0; i < numEvents; i++) {
if (events[i].data.fd == eventFd_) {
handleEvents();
} else {
handleFdEvent(events[i].data.fd);
}
}
}
}
private:
void handleEvents() {
while (!events_.empty()) {
auto event = events_.front();
events_.pop();
event();
}
}
void handleFdEvent(int fd) {
if (fdEvents_.count(fd) > 0) {
auto event = fdEvents_[fd];
event();
}
}
int eventFd_;
int epollFd_;
std::queue<std::function<void()>> events_;
std::unordered_map<int, epoll_event> fdEvents_;
};
int main() {
EventLoop eventLoop;
eventLoop.addEvent([]() {
std::cout << "Event 1" << std::endl;
});
eventLoop.addEvent([]() {
std::cout << "Event 2" << std::endl;
});
int fd = open("test.txt", O_RDONLY);
eventLoop.addFdEvent(fd, []() {
std::cout << "File Descriptor Event" << std::endl;
});
eventLoop.run();
return 0;
}
在上述代碼中,添加了一個addFdEvent函數,用于添加對檔案描述符的事件監聽。在該函數中,通過epoll_ctl将檔案描述符添加到epoll中進行監聽,并将對應的事件處理函數儲存到fdEvents_中。
在run函數中,對每個事件進行處理時,先判斷事件是來自eventFd還是其他檔案描述符。如果是eventFd的事件,則調用handleEvents函數處理事件隊列中的事件。如果是其他檔案描述符的事件,則調用handleFdEvent函數處理對應的事件。
在main函數中,通過open函數打開了一個檔案,并将其檔案描述符添加到EventLoop中進行監聽。當該檔案可讀時,會觸發對應的事件處理函數。
繼續添加定時器事件
#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <chrono>
#include <thread>
class EventLoop {
public:
EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
epollFd_ = epoll_create1(0);
epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = eventFd_;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
}
~EventLoop() {
close(eventFd_);
close(epollFd_);
}
void addEvent(std::function<void()> event) {
events_.push(event);
uint64_t value = 1;
write(eventFd_, &value, sizeof(value));
}
void addFdEvent(int fd, std::function<void()> event) {
epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = fd;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
fdEvents_[fd] = event;
}
void addTimerEvent(int milliseconds, std::function<void()> event) {
auto startTime = std::chrono::steady_clock::now();
auto endTime = startTime + std::chrono::milliseconds(milliseconds);
timerEvents_.push({endTime, event});
}
void run() {
while (true) {
epoll_event events[10];
int numEvents = epoll_wait(epollFd_, events, 10, -1);
for (int i = 0; i < numEvents; i++) {
if (events[i].data.fd == eventFd_) {
handleEvents();
} else {
handleFdEvent(events[i].data.fd);
}
}
handleTimerEvents();
}
}
private:
void handleEvents() {
while (!events_.empty()) {
auto event = events_.front();
events_.pop();
event();
}
}
void handleFdEvent(int fd) {
if (fdEvents_.count(fd) > 0) {
auto event = fdEvents_[fd];
event();
}
}
void handleTimerEvents() {
auto now = std::chrono::steady_clock::now();
while (!timerEvents_.empty() && timerEvents_.top().first <= now) {
auto event = timerEvents_.top().second;
timerEvents_.pop();
event();
}
}
int eventFd_;
int epollFd_;
std::queue<std::function<void()>> events_;
std::unordered_map<int, epoll_event> fdEvents_;
std::priority_queue<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>,
std::vector<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>>,
std::greater<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>>> timerEvents_;
};
int main() {
EventLoop eventLoop;
eventLoop.addEvent([]() {
std::cout << "Event 1" << std::endl;
});
eventLoop.addEvent([]() {
std::cout << "Event 2" << std::endl;
});
int fd = open("test.txt", O_RDONLY);
eventLoop.addFdEvent(fd, []() {
std::cout << "File Descriptor Event" << std::endl;
});
eventLoop.addTimerEvent(1000, []() {
std::cout << "Timer Event" << std::endl;
});
eventLoop.run();
return 0;
}
在上述代碼中,添加了一個addTimerEvent函數,用于添加定時器事件。在該函數中,将事件的觸發時間和事件處理函數儲存到timerEvents_中。
在run函數中,添加了一個新的handleTimerEvents函數,用于處理定時器事件。在該函數中,擷取目前時間now,并循環處理timerEvents_中的事件,直到找到第一個觸發時間大于now的事件為止。
在main函數中,通過調用addTimerEvent函數添加了一個定時器事件。該定時器事件将在1000毫秒後觸發,并執行對應的事件處理函數。
Linux一切皆檔案:使用timerfd 簡化定時器事件
使用timerfd來改寫定時器事件可以更簡潔和高效。timerfd是一個檔案描述符,可以用來建立定時器。當定時器到期時,檔案描述符上就會産生可讀事件,可以通過epoll來監聽該事件,并在事件發生時執行相應的處理函數。
#include <iostream>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>
class EventLoop {
public:
EventLoop() : epollFd_(epoll_create1(0)) {}
~EventLoop() {
close(epollFd_);
}
void addEvent(std::function<void()> event) {
events_.push(event);
uint64_t value = 1;
write(eventFd_, &value, sizeof(value));
}
void addFdEvent(int fd, std::function<void()> event) {
epoll_event event;
event.events = EPOLLIN | EPOLLET;
event.data.fd = fd;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
fdEvents_[fd] = event;
}
void addTimerEvent(int milliseconds, std::function<void()> event) {
int timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
struct itimerspec timerSpec;
memset(&timerSpec, 0, sizeof(timerSpec));
timerSpec.it_value.tv_sec = milliseconds / 1000;
timerSpec.it_value.tv_nsec = (milliseconds % 1000) * 1000000;
timerfd_settime(timerFd, 0, &timerSpec, nullptr);
addFdEvent(timerFd, [this, timerFd, event]() {
uint64_t expirations;
read(timerFd, &expirations, sizeof(expirations));
event();
close(timerFd);
});
}
void run() {
while (true) {
epoll_event events[10];
int numEvents = epoll_wait(epollFd_, events, 10, -1);
for (int i = 0; i < numEvents; i++) {
if (events[i].data.fd == eventFd_) {
handleEvents();
} else {
handleFdEvent(events[i].data.fd);
}
}
}
}
private:
void handleEvents() {
while (!events_.empty()) {
auto event = events_.front();
events_.pop();
event();
}
}
void handleFdEvent(int fd) {
if (fdEvents_.count(fd) > 0) {
auto event = fdEvents_[fd];
event();
}
}
int epollFd_;
int eventFd_;
std::queue<std::function<void()>> events_;
std::unordered_map<int, epoll_event> fdEvents_;
};
int main() {
EventLoop eventLoop;
eventLoop.addEvent([]() {
std::cout << "Event 1" << std::endl;
});
eventLoop.addEvent([]() {
std::cout << "Event 2" << std::endl;
});
eventLoop.addTimerEvent(1000, []() {
std::cout << "Timer Event" << std::endl;
});
eventLoop.run();
return 0;
}
使用signalfd事件,并将所有事件統一為fd事件
信号( signal )本質是 Linux 程序間通信的一種機制,也叫軟中斷信号。既然是通信機制,那麼就是傳遞資訊用的,信号傳遞的資訊很簡單,就是一個整數,一般用于配合系統管理任務,比如程序的終結、恢複、熱加載等。
信号都用整數常量表示,命名以 SIG 未字首,比如 SIGINT( ctrl-c 觸發),SIGKILL( kill -9 觸發 )。
#include <iostream>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>
#include <csignal>
#include <functional>
#include <queue>
#include <unordered_map>
#include <sys/signalfd.h>
class EventLoop {
public:
EventLoop() : epollFd_(epoll_create1(0)) {
// 建立信号集
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGINT);
// 将信号集添加到信号檔案描述符中
signalFd_ = signalfd(-1, &sigset, 0);
// 将信号檔案描述符添加到epoll中監聽
addFdEvent(signalFd_, [this]() {
handleSignal();
});
}
~EventLoop() {
close(epollFd_);
close(signalFd_);
}
void addEvent(std::function<void()> event) {
events_.push(event);
}
void addFdEvent(int fd, std::function<void()> event) {
epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = fd;
epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev);
fdEvents_[fd] = event;
}
void addTimerEvent(int milliseconds, std::function<void()> event) {
int timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
struct itimerspec timerSpec;
memset(&timerSpec, 0, sizeof(timerSpec));
timerSpec.it_value.tv_sec = milliseconds / 1000;
timerSpec.it_value.tv_nsec = (milliseconds % 1000) * 1000000;
timerfd_settime(timerFd, 0, &timerSpec, nullptr);
addFdEvent(timerFd, [this, timerFd, event]() {
uint64_t expirations;
read(timerFd, &expirations, sizeof(expirations));
event();
close(timerFd);
});
}
void run() {
while (true) {
epoll_event events[10];
int numEvents = epoll_wait(epollFd_, events, 10, -1);
for (int i = 0; i < numEvents; i++) {
handleFdEvent(events[i].data.fd);
}
}
}
private:
void handleEvents() {
while (!events_.empty()) {
auto event = events_.front();
events_.pop();
event();
}
}
void handleFdEvent(int fd) {
if (fdEvents_.count(fd) > 0) {
auto event = fdEvents_[fd];
event();
}
}
void handleSignal() {
struct signalfd_siginfo siginfo;
read(signalFd_, &siginfo, sizeof(siginfo));
std::cout << "Received signal: " << siginfo.ssi_signo << std::endl;
// 執行自定義操作
// ...
}
private:
int epollFd_;
int signalFd_;
std::queue<std::function<void()>> events_;
std::unordered_map<int, std::function<void()>> fdEvents_;
};
int main() {
EventLoop eventLoop;
// 添加其他事件和定時器事件
// ...
eventLoop.run();
return 0;
}
上面代碼中,使用signalfd來建立一個信号檔案描述符,并将其添加到EventLoop類的epoll中進行監聽。在handleSignal()函數中,使用read()函數從信号檔案描述符中讀取信号資訊,并處理接收到的信号。
可以根據需要在handleSignal()函數中執行自定義操作,例如記錄日志、執行清理操作等。同時,可以根據需要在EventLoop類中添加其他信号處理函數,隻需在構造函數中使用signalfd()函數建立相應的信号檔案描述符,并将其添加到epoll中監聽即可。
上述代碼删除了 eventFd_ ,因為 handleFdEvent 函數已經可以處理所有的檔案描述符事件,包括信号檔案描述符的事件。