天天看點

從0到1寫一個Eventloop事件循環

作者:Red果果

什麼是事件循環?

Event Loop(事件循環)是一種用于處理和排程事件的機制,常用于異步程式設計模型中。它是一種循環結構,不斷地從事件隊列中取出事件并處理,直到事件隊列為空。

在事件循環中,有兩個主要的元件:事件隊列和事件處理器。事件隊列用于存儲待處理的事件,而事件處理器則負責從隊列中取出事件并執行相應的處理邏輯。

當有新的事件發生時,可以将其加入到事件隊列中,而不需要立即處理。事件循環會在合适的時機從隊列中取出事件,并按照先入先出的順序進行處理。這種機制使得程式可以同時處理多個事件,而不需要等待某個事件的完成。

Event Loop在異步程式設計中起到了關鍵的作用。它可以處理網絡請求、定時器、使用者輸入等各種類型的事件,并通過回調函數或者異步處理來處理這些事件。通過合理地利用Event Loop,可以實作高效的非阻塞式程式設計。

最簡單的Eventloop:

#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
 class EventLoop {
public:
    typedef std::function<void()> EventCallback;
     void addEvent(EventCallback callback) {
        std::lock_guard<std::mutex> lock(mutex_);
        events_.push(callback);
        condition_.notify_one();
    }
     void run() {
        while (true) {
            std::unique_lock<std::mutex> lock(mutex_);
            condition_.wait(lock, [this] { return !events_.empty(); });
             EventCallback event = events_.front();
            events_.pop();
             lock.unlock();
            event();
        }
    }
 private:
    std::mutex mutex_;
    std::condition_variable condition_;
    std::queue<EventCallback> events_;
};
 int main() {
    EventLoop loop;
     // 添加事件
    loop.addEvent([]() {
        std::cout << "事件1" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件2" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件3" << std::endl;
    });
     // 運作事件循環
    std::thread loopThread([&]() {
        loop.run();
    });
     loopThread.join();
     return 0;
}           

有什麼局限?

這個示例中的Eventloop是單線程的,它在一個循環中依次執行事件。這意味着如果某個事件執行時間過長,會阻塞後續事件的執行。對于需要并發處理的場景,可能需要使用多線程或異步任務來處理事件。

添加多線程和異步任務的支援

#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <future>
 class EventLoop {
public:
    typedef std::function<void()> EventCallback;
     void addEvent(EventCallback callback) {
        std::lock_guard<std::mutex> lock(mutex_);
        events_.push(callback);
        condition_.notify_one();
    }
     void run() {
        while (true) {
            std::unique_lock<std::mutex> lock(mutex_);
            condition_.wait(lock, [this] { return !events_.empty(); });
             EventCallback event = events_.front();
            events_.pop();
             lock.unlock();
             // 異步執行事件
            std::async(std::launch::async, event);
        }
    }
 private:
    std::mutex mutex_;
    std::condition_variable condition_;
    std::queue<EventCallback> events_;
};
 int main() {
    EventLoop loop;
     // 添加事件
    loop.addEvent([]() {
        std::cout << "事件1" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件2" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件3" << std::endl;
    });
     // 運作事件循環
    std::thread loopThread([&]() {
        loop.run();
    });
     loopThread.join();
     return 0;
}           

另外一種思路:使用Eventfd

#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <sys/eventfd.h>
#include <unistd.h>
#include <atomic>
#include <stdexcept>
#include <sstream>
 class EventLoop {
public:
    typedef std::function<void()> EventCallback;
     EventLoop() {
        event_fd_ = eventfd(0, EFD_NONBLOCK);
        if (event_fd_ == -1) {
            throw std::runtime_error("Failed to create eventfd");
        }
    }
     ~EventLoop() {
        close(event_fd_);
    }
     void addEvent(EventCallback callback) {
        std::lock_guard<std::mutex> lock(mutex_);
        events_.push_back(std::move(callback));
        uint64_t value = 1;
        write(event_fd_, &value, sizeof(value));
    }
     void run() {
        while (running_) {
            std::unique_lock<std::mutex> lock(mutex_);
            condition_.wait(lock, [this] { return !events_.empty() || !running_; });
             if (!running_) {
                break;
            }
             // 讀取eventfd以清空計數器
            uint64_t value;
            read(event_fd_, &value, sizeof(value));
             EventCallback event = std::move(events_.front());
            events_.pop_front();
             lock.unlock();
            event();
        }
    }
     void stop() {
        running_ = false;
        condition_.notify_one();
    }
 private:
    std::mutex mutex_;
    std::condition_variable condition_;
    std::deque<EventCallback> events_;
    int event_fd_;
    std::atomic<bool> running_{true};
};
 int main() {
    EventLoop loop;
     // 添加事件
    loop.addEvent([]() {
        std::cout << "事件1" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件2" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件3" << std::endl;
    });
     // 運作事件循環
    std::thread loopThread([&]() {
        try {
            loop.run();
        } catch (const std::exception& e) {
            std::ostringstream oss;
            oss << "Event loop error: " << e.what() << std::endl;
            std::cerr << oss.str();
        }
    });
     // 等待事件循環結束
    loopThread.join();
     return 0;
}
           

既然使用了fd了,那麼幹嘛不使用epoll?

epoll是Linux作業系統提供的一種I/O事件通知機制,可以用于高效地處理大量的并發連接配接。而epoll采用了一種基于事件驅動的方式,隻有當檔案描述符上有I/O事件發生時才會通知應用程式。這樣就避免了無效的周遊,提高了效率。 使用epoll進行優化時,通常的步驟如下:

  1. 建立一個epoll執行個體,通過調用epoll_create函數。
  2. 将需要監聽的檔案描述符添加到epoll執行個體中,通過調用epoll_ctl函數。
  3. 不斷地調用epoll_wait函數等待事件發生,一旦有事件發生就進行處理。
  4. epoll的優勢在于能夠處理大量并發連接配接,因為它采用了事件驅動的方式,隻有當有事件發生時才會通知應用程式。這樣就避免了不必要的周遊,提高了效率。
#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>

class EventLoop {
public:
    EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
        epollFd_ = epoll_create1(0);
        epoll_event event;
        event.events = EPOLLIN;
        event.data.fd = eventFd_;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
    }

    ~EventLoop() {
        close(eventFd_);
        close(epollFd_);
    }

    void addEvent(std::function<void()> event) {
        events_.push(event);
        uint64_t value = 1;
        write(eventFd_, &value, sizeof(value));
    }

    void run() {
        while (true) {
            epoll_event events[10];
            int numEvents = epoll_wait(epollFd_, events, 10, -1);
            for (int i = 0; i < numEvents; i++) {
                if (events[i].data.fd == eventFd_) {
                    uint64_t value;
                    read(eventFd_, &value, sizeof(value));
                    handleEvents();
                }
            }
        }
    }

private:
    void handleEvents() {
        while (!events_.empty()) {
            auto event = events_.front();
            events_.pop();
            event();
        }
    }

    int eventFd_;
    int epollFd_;
    std::queue<std::function<void()>> events_;
};

int main() {
    EventLoop eventLoop;

    eventLoop.addEvent([]() {
        std::cout << "Event 1" << std::endl;
    });

    eventLoop.addEvent([]() {
        std::cout << "Event 2" << std::endl;
    });

    eventLoop.run();

    return 0;
}           

在上述代碼中,使用eventfd建立了一個eventFd,用于事件通知。在EventLoop的構造函數中,建立了一個epollFd,并将eventFd添加到epoll中進行監聽。當有事件到達時,epoll_wait會傳回,并通過讀取eventFd的值來處理事件。

在addEvent函數中,不僅将事件添加到隊列中,還通過write函數向eventFd_寫入一個值,以觸發epoll_wait傳回。這樣可以確定在有事件到達時立即處理,而不需要等待epoll_wait逾時。

在run函數中,使用epoll_wait等待事件到達,并通過handleEvents函數處理隊列中的事件。

添加對其他fd的事件監聽

#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>

class EventLoop {
public:
    EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
        epollFd_ = epoll_create1(0);
        epoll_event event;
        event.events = EPOLLIN | EPOLLET;
        event.data.fd = eventFd_;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
    }

    ~EventLoop() {
        close(eventFd_);
        close(epollFd_);
    }

    void addEvent(std::function<void()> event) {
        events_.push(event);
        uint64_t value = 1;
        write(eventFd_, &value, sizeof(value));
    }

    void addFdEvent(int fd, std::function<void()> event) {
        epoll_event event;
        event.events = EPOLLIN | EPOLLET;
        event.data.fd = fd;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
        fdEvents_[fd] = event;
    }

    void run() {
        while (true) {
            epoll_event events[10];
            int numEvents = epoll_wait(epollFd_, events, 10, -1);
            for (int i = 0; i < numEvents; i++) {
                if (events[i].data.fd == eventFd_) {
                    handleEvents();
                } else {
                    handleFdEvent(events[i].data.fd);
                }
            }
        }
    }

private:
    void handleEvents() {
        while (!events_.empty()) {
            auto event = events_.front();
            events_.pop();
            event();
        }
    }

    void handleFdEvent(int fd) {
        if (fdEvents_.count(fd) > 0) {
            auto event = fdEvents_[fd];
            event();
        }
    }

    int eventFd_;
    int epollFd_;
    std::queue<std::function<void()>> events_;
    std::unordered_map<int, epoll_event> fdEvents_;
};

int main() {
    EventLoop eventLoop;

    eventLoop.addEvent([]() {
        std::cout << "Event 1" << std::endl;
    });

    eventLoop.addEvent([]() {
        std::cout << "Event 2" << std::endl;
    });

    int fd = open("test.txt", O_RDONLY);
    eventLoop.addFdEvent(fd, []() {
        std::cout << "File Descriptor Event" << std::endl;
    });

    eventLoop.run();

    return 0;
}           

在上述代碼中,添加了一個addFdEvent函數,用于添加對檔案描述符的事件監聽。在該函數中,通過epoll_ctl将檔案描述符添加到epoll中進行監聽,并将對應的事件處理函數儲存到fdEvents_中。

在run函數中,對每個事件進行處理時,先判斷事件是來自eventFd還是其他檔案描述符。如果是eventFd的事件,則調用handleEvents函數處理事件隊列中的事件。如果是其他檔案描述符的事件,則調用handleFdEvent函數處理對應的事件。

在main函數中,通過open函數打開了一個檔案,并将其檔案描述符添加到EventLoop中進行監聽。當該檔案可讀時,會觸發對應的事件處理函數。

繼續添加定時器事件

#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <chrono>
#include <thread>

class EventLoop {
public:
    EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
        epollFd_ = epoll_create1(0);
        epoll_event event;
        event.events = EPOLLIN | EPOLLET;
        event.data.fd = eventFd_;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
    }

    ~EventLoop() {
        close(eventFd_);
        close(epollFd_);
    }

    void addEvent(std::function<void()> event) {
        events_.push(event);
        uint64_t value = 1;
        write(eventFd_, &value, sizeof(value));
    }

    void addFdEvent(int fd, std::function<void()> event) {
        epoll_event event;
        event.events = EPOLLIN | EPOLLET;
        event.data.fd = fd;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
        fdEvents_[fd] = event;
    }

    void addTimerEvent(int milliseconds, std::function<void()> event) {
        auto startTime = std::chrono::steady_clock::now();
        auto endTime = startTime + std::chrono::milliseconds(milliseconds);
        timerEvents_.push({endTime, event});
    }

    void run() {
        while (true) {
            epoll_event events[10];
            int numEvents = epoll_wait(epollFd_, events, 10, -1);
            for (int i = 0; i < numEvents; i++) {
                if (events[i].data.fd == eventFd_) {
                    handleEvents();
                } else {
                    handleFdEvent(events[i].data.fd);
                }
            }
            handleTimerEvents();
        }
    }

private:
    void handleEvents() {
        while (!events_.empty()) {
            auto event = events_.front();
            events_.pop();
            event();
        }
    }

    void handleFdEvent(int fd) {
        if (fdEvents_.count(fd) > 0) {
            auto event = fdEvents_[fd];
            event();
        }
    }

    void handleTimerEvents() {
        auto now = std::chrono::steady_clock::now();
        while (!timerEvents_.empty() && timerEvents_.top().first <= now) {
            auto event = timerEvents_.top().second;
            timerEvents_.pop();
            event();
        }
    }

    int eventFd_;
    int epollFd_;
    std::queue<std::function<void()>> events_;
    std::unordered_map<int, epoll_event> fdEvents_;
    std::priority_queue<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>,
                        std::vector<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>>,
                        std::greater<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>>> timerEvents_;
};

int main() {
    EventLoop eventLoop;

    eventLoop.addEvent([]() {
        std::cout << "Event 1" << std::endl;
    });

    eventLoop.addEvent([]() {
        std::cout << "Event 2" << std::endl;
    });

    int fd = open("test.txt", O_RDONLY);
    eventLoop.addFdEvent(fd, []() {
        std::cout << "File Descriptor Event" << std::endl;
    });

    eventLoop.addTimerEvent(1000, []() {
        std::cout << "Timer Event" << std::endl;
    });

    eventLoop.run();

    return 0;
}           

在上述代碼中,添加了一個addTimerEvent函數,用于添加定時器事件。在該函數中,将事件的觸發時間和事件處理函數儲存到timerEvents_中。

在run函數中,添加了一個新的handleTimerEvents函數,用于處理定時器事件。在該函數中,擷取目前時間now,并循環處理timerEvents_中的事件,直到找到第一個觸發時間大于now的事件為止。

在main函數中,通過調用addTimerEvent函數添加了一個定時器事件。該定時器事件将在1000毫秒後觸發,并執行對應的事件處理函數。

Linux一切皆檔案:使用timerfd 簡化定時器事件

使用timerfd來改寫定時器事件可以更簡潔和高效。timerfd是一個檔案描述符,可以用來建立定時器。當定時器到期時,檔案描述符上就會産生可讀事件,可以通過epoll來監聽該事件,并在事件發生時執行相應的處理函數。

#include <iostream>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>

class EventLoop {
public:
    EventLoop() : epollFd_(epoll_create1(0)) {}

    ~EventLoop() {
        close(epollFd_);
    }

    void addEvent(std::function<void()> event) {
        events_.push(event);
        uint64_t value = 1;
        write(eventFd_, &value, sizeof(value));
    }

    void addFdEvent(int fd, std::function<void()> event) {
        epoll_event event;
        event.events = EPOLLIN | EPOLLET;
        event.data.fd = fd;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
        fdEvents_[fd] = event;
    }

    void addTimerEvent(int milliseconds, std::function<void()> event) {
        int timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
        struct itimerspec timerSpec;
        memset(&timerSpec, 0, sizeof(timerSpec));
        timerSpec.it_value.tv_sec = milliseconds / 1000;
        timerSpec.it_value.tv_nsec = (milliseconds % 1000) * 1000000;
        timerfd_settime(timerFd, 0, &timerSpec, nullptr);
        addFdEvent(timerFd, [this, timerFd, event]() {
            uint64_t expirations;
            read(timerFd, &expirations, sizeof(expirations));
            event();
            close(timerFd);
        });
    }

    void run() {
        while (true) {
            epoll_event events[10];
            int numEvents = epoll_wait(epollFd_, events, 10, -1);
            for (int i = 0; i < numEvents; i++) {
                if (events[i].data.fd == eventFd_) {
                    handleEvents();
                } else {
                    handleFdEvent(events[i].data.fd);
                }
            }
        }
    }

private:
    void handleEvents() {
        while (!events_.empty()) {
            auto event = events_.front();
            events_.pop();
            event();
        }
    }

    void handleFdEvent(int fd) {
        if (fdEvents_.count(fd) > 0) {
            auto event = fdEvents_[fd];
            event();
        }
    }

    int epollFd_;
    int eventFd_;
    std::queue<std::function<void()>> events_;
    std::unordered_map<int, epoll_event> fdEvents_;
};

int main() {
    EventLoop eventLoop;

    eventLoop.addEvent([]() {
        std::cout << "Event 1" << std::endl;
    });

    eventLoop.addEvent([]() {
        std::cout << "Event 2" << std::endl;
    });

    eventLoop.addTimerEvent(1000, []() {
        std::cout << "Timer Event" << std::endl;
    });

    eventLoop.run();

    return 0;
}           

使用signalfd事件,并将所有事件統一為fd事件

信号( signal )本質是 Linux 程序間通信的一種機制,也叫軟中斷信号。既然是通信機制,那麼就是傳遞資訊用的,信号傳遞的資訊很簡單,就是一個整數,一般用于配合系統管理任務,比如程序的終結、恢複、熱加載等。

信号都用整數常量表示,命名以 SIG 未字首,比如 SIGINT( ctrl-c 觸發),SIGKILL( kill -9 觸發 )。

#include <iostream>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>
#include <csignal>
#include <functional>
#include <queue>
#include <unordered_map>
#include <sys/signalfd.h>

class EventLoop {
public:
    EventLoop() : epollFd_(epoll_create1(0)) {
        // 建立信号集
        sigset_t sigset;
        sigemptyset(&sigset);
        sigaddset(&sigset, SIGINT);

        // 将信号集添加到信号檔案描述符中
        signalFd_ = signalfd(-1, &sigset, 0);

        // 将信号檔案描述符添加到epoll中監聽
        addFdEvent(signalFd_, [this]() {
            handleSignal();
        });
    }

    ~EventLoop() {
        close(epollFd_);
        close(signalFd_);
    }

    void addEvent(std::function<void()> event) {
        events_.push(event);
    }

    void addFdEvent(int fd, std::function<void()> event) {
        epoll_event ev;
        ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = fd;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev);
        fdEvents_[fd] = event;
    }

    void addTimerEvent(int milliseconds, std::function<void()> event) {
        int timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
        struct itimerspec timerSpec;
        memset(&timerSpec, 0, sizeof(timerSpec));
        timerSpec.it_value.tv_sec = milliseconds / 1000;
        timerSpec.it_value.tv_nsec = (milliseconds % 1000) * 1000000;
        timerfd_settime(timerFd, 0, &timerSpec, nullptr);
        addFdEvent(timerFd, [this, timerFd, event]() {
            uint64_t expirations;
            read(timerFd, &expirations, sizeof(expirations));
            event();
            close(timerFd);
        });
    }

    void run() {
        while (true) {
            epoll_event events[10];
            int numEvents = epoll_wait(epollFd_, events, 10, -1);
            for (int i = 0; i < numEvents; i++) {
                handleFdEvent(events[i].data.fd);
            }
        }
    }

private:
    void handleEvents() {
        while (!events_.empty()) {
            auto event = events_.front();
            events_.pop();
            event();
        }
    }

    void handleFdEvent(int fd) {
        if (fdEvents_.count(fd) > 0) {
            auto event = fdEvents_[fd];
            event();
        }
    }

    void handleSignal() {
        struct signalfd_siginfo siginfo;
        read(signalFd_, &siginfo, sizeof(siginfo));
        std::cout << "Received signal: " << siginfo.ssi_signo << std::endl;

        // 執行自定義操作
        // ...
    }

private:
    int epollFd_;
    int signalFd_;
    std::queue<std::function<void()>> events_;
    std::unordered_map<int, std::function<void()>> fdEvents_;
};

int main() {
    EventLoop eventLoop;

    // 添加其他事件和定時器事件
    // ...

    eventLoop.run();

    return 0;
}           

上面代碼中,使用signalfd來建立一個信号檔案描述符,并将其添加到EventLoop類的epoll中進行監聽。在handleSignal()函數中,使用read()函數從信号檔案描述符中讀取信号資訊,并處理接收到的信号。

可以根據需要在handleSignal()函數中執行自定義操作,例如記錄日志、執行清理操作等。同時,可以根據需要在EventLoop類中添加其他信号處理函數,隻需在構造函數中使用signalfd()函數建立相應的信号檔案描述符,并将其添加到epoll中監聽即可。

上述代碼删除了 eventFd_ ,因為 handleFdEvent 函數已經可以處理所有的檔案描述符事件,包括信号檔案描述符的事件。

繼續閱讀