天天看点

从0到1写一个Eventloop事件循环

作者:Red果果

什么是事件循环?

Event Loop(事件循环)是一种用于处理和调度事件的机制,常用于异步编程模型中。它是一种循环结构,不断地从事件队列中取出事件并处理,直到事件队列为空。

在事件循环中,有两个主要的组件:事件队列和事件处理器。事件队列用于存储待处理的事件,而事件处理器则负责从队列中取出事件并执行相应的处理逻辑。

当有新的事件发生时,可以将其加入到事件队列中,而不需要立即处理。事件循环会在合适的时机从队列中取出事件,并按照先入先出的顺序进行处理。这种机制使得程序可以同时处理多个事件,而不需要等待某个事件的完成。

Event Loop在异步编程中起到了关键的作用。它可以处理网络请求、定时器、用户输入等各种类型的事件,并通过回调函数或者异步处理来处理这些事件。通过合理地利用Event Loop,可以实现高效的非阻塞式编程。

最简单的Eventloop:

#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
 class EventLoop {
public:
    typedef std::function<void()> EventCallback;
     void addEvent(EventCallback callback) {
        std::lock_guard<std::mutex> lock(mutex_);
        events_.push(callback);
        condition_.notify_one();
    }
     void run() {
        while (true) {
            std::unique_lock<std::mutex> lock(mutex_);
            condition_.wait(lock, [this] { return !events_.empty(); });
             EventCallback event = events_.front();
            events_.pop();
             lock.unlock();
            event();
        }
    }
 private:
    std::mutex mutex_;
    std::condition_variable condition_;
    std::queue<EventCallback> events_;
};
 int main() {
    EventLoop loop;
     // 添加事件
    loop.addEvent([]() {
        std::cout << "事件1" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件2" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件3" << std::endl;
    });
     // 运行事件循环
    std::thread loopThread([&]() {
        loop.run();
    });
     loopThread.join();
     return 0;
}           

有什么局限?

这个示例中的Eventloop是单线程的,它在一个循环中依次执行事件。这意味着如果某个事件执行时间过长,会阻塞后续事件的执行。对于需要并发处理的场景,可能需要使用多线程或异步任务来处理事件。

添加多线程和异步任务的支持

#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <future>
 class EventLoop {
public:
    typedef std::function<void()> EventCallback;
     void addEvent(EventCallback callback) {
        std::lock_guard<std::mutex> lock(mutex_);
        events_.push(callback);
        condition_.notify_one();
    }
     void run() {
        while (true) {
            std::unique_lock<std::mutex> lock(mutex_);
            condition_.wait(lock, [this] { return !events_.empty(); });
             EventCallback event = events_.front();
            events_.pop();
             lock.unlock();
             // 异步执行事件
            std::async(std::launch::async, event);
        }
    }
 private:
    std::mutex mutex_;
    std::condition_variable condition_;
    std::queue<EventCallback> events_;
};
 int main() {
    EventLoop loop;
     // 添加事件
    loop.addEvent([]() {
        std::cout << "事件1" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件2" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件3" << std::endl;
    });
     // 运行事件循环
    std::thread loopThread([&]() {
        loop.run();
    });
     loopThread.join();
     return 0;
}           

另外一种思路:使用Eventfd

#include <iostream>
#include <functional>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>
#include <sys/eventfd.h>
#include <unistd.h>
#include <atomic>
#include <stdexcept>
#include <sstream>
 class EventLoop {
public:
    typedef std::function<void()> EventCallback;
     EventLoop() {
        event_fd_ = eventfd(0, EFD_NONBLOCK);
        if (event_fd_ == -1) {
            throw std::runtime_error("Failed to create eventfd");
        }
    }
     ~EventLoop() {
        close(event_fd_);
    }
     void addEvent(EventCallback callback) {
        std::lock_guard<std::mutex> lock(mutex_);
        events_.push_back(std::move(callback));
        uint64_t value = 1;
        write(event_fd_, &value, sizeof(value));
    }
     void run() {
        while (running_) {
            std::unique_lock<std::mutex> lock(mutex_);
            condition_.wait(lock, [this] { return !events_.empty() || !running_; });
             if (!running_) {
                break;
            }
             // 读取eventfd以清空计数器
            uint64_t value;
            read(event_fd_, &value, sizeof(value));
             EventCallback event = std::move(events_.front());
            events_.pop_front();
             lock.unlock();
            event();
        }
    }
     void stop() {
        running_ = false;
        condition_.notify_one();
    }
 private:
    std::mutex mutex_;
    std::condition_variable condition_;
    std::deque<EventCallback> events_;
    int event_fd_;
    std::atomic<bool> running_{true};
};
 int main() {
    EventLoop loop;
     // 添加事件
    loop.addEvent([]() {
        std::cout << "事件1" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件2" << std::endl;
    });
     loop.addEvent([]() {
        std::cout << "事件3" << std::endl;
    });
     // 运行事件循环
    std::thread loopThread([&]() {
        try {
            loop.run();
        } catch (const std::exception& e) {
            std::ostringstream oss;
            oss << "Event loop error: " << e.what() << std::endl;
            std::cerr << oss.str();
        }
    });
     // 等待事件循环结束
    loopThread.join();
     return 0;
}
           

既然使用了fd了,那么干嘛不使用epoll?

epoll是Linux操作系统提供的一种I/O事件通知机制,可以用于高效地处理大量的并发连接。而epoll采用了一种基于事件驱动的方式,只有当文件描述符上有I/O事件发生时才会通知应用程序。这样就避免了无效的遍历,提高了效率。 使用epoll进行优化时,通常的步骤如下:

  1. 创建一个epoll实例,通过调用epoll_create函数。
  2. 将需要监听的文件描述符添加到epoll实例中,通过调用epoll_ctl函数。
  3. 不断地调用epoll_wait函数等待事件发生,一旦有事件发生就进行处理。
  4. epoll的优势在于能够处理大量并发连接,因为它采用了事件驱动的方式,只有当有事件发生时才会通知应用程序。这样就避免了不必要的遍历,提高了效率。
#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>

class EventLoop {
public:
    EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
        epollFd_ = epoll_create1(0);
        epoll_event event;
        event.events = EPOLLIN;
        event.data.fd = eventFd_;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
    }

    ~EventLoop() {
        close(eventFd_);
        close(epollFd_);
    }

    void addEvent(std::function<void()> event) {
        events_.push(event);
        uint64_t value = 1;
        write(eventFd_, &value, sizeof(value));
    }

    void run() {
        while (true) {
            epoll_event events[10];
            int numEvents = epoll_wait(epollFd_, events, 10, -1);
            for (int i = 0; i < numEvents; i++) {
                if (events[i].data.fd == eventFd_) {
                    uint64_t value;
                    read(eventFd_, &value, sizeof(value));
                    handleEvents();
                }
            }
        }
    }

private:
    void handleEvents() {
        while (!events_.empty()) {
            auto event = events_.front();
            events_.pop();
            event();
        }
    }

    int eventFd_;
    int epollFd_;
    std::queue<std::function<void()>> events_;
};

int main() {
    EventLoop eventLoop;

    eventLoop.addEvent([]() {
        std::cout << "Event 1" << std::endl;
    });

    eventLoop.addEvent([]() {
        std::cout << "Event 2" << std::endl;
    });

    eventLoop.run();

    return 0;
}           

在上述代码中,使用eventfd创建了一个eventFd,用于事件通知。在EventLoop的构造函数中,创建了一个epollFd,并将eventFd添加到epoll中进行监听。当有事件到达时,epoll_wait会返回,并通过读取eventFd的值来处理事件。

在addEvent函数中,不仅将事件添加到队列中,还通过write函数向eventFd_写入一个值,以触发epoll_wait返回。这样可以确保在有事件到达时立即处理,而不需要等待epoll_wait超时。

在run函数中,使用epoll_wait等待事件到达,并通过handleEvents函数处理队列中的事件。

添加对其他fd的事件监听

#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>

class EventLoop {
public:
    EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
        epollFd_ = epoll_create1(0);
        epoll_event event;
        event.events = EPOLLIN | EPOLLET;
        event.data.fd = eventFd_;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
    }

    ~EventLoop() {
        close(eventFd_);
        close(epollFd_);
    }

    void addEvent(std::function<void()> event) {
        events_.push(event);
        uint64_t value = 1;
        write(eventFd_, &value, sizeof(value));
    }

    void addFdEvent(int fd, std::function<void()> event) {
        epoll_event event;
        event.events = EPOLLIN | EPOLLET;
        event.data.fd = fd;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
        fdEvents_[fd] = event;
    }

    void run() {
        while (true) {
            epoll_event events[10];
            int numEvents = epoll_wait(epollFd_, events, 10, -1);
            for (int i = 0; i < numEvents; i++) {
                if (events[i].data.fd == eventFd_) {
                    handleEvents();
                } else {
                    handleFdEvent(events[i].data.fd);
                }
            }
        }
    }

private:
    void handleEvents() {
        while (!events_.empty()) {
            auto event = events_.front();
            events_.pop();
            event();
        }
    }

    void handleFdEvent(int fd) {
        if (fdEvents_.count(fd) > 0) {
            auto event = fdEvents_[fd];
            event();
        }
    }

    int eventFd_;
    int epollFd_;
    std::queue<std::function<void()>> events_;
    std::unordered_map<int, epoll_event> fdEvents_;
};

int main() {
    EventLoop eventLoop;

    eventLoop.addEvent([]() {
        std::cout << "Event 1" << std::endl;
    });

    eventLoop.addEvent([]() {
        std::cout << "Event 2" << std::endl;
    });

    int fd = open("test.txt", O_RDONLY);
    eventLoop.addFdEvent(fd, []() {
        std::cout << "File Descriptor Event" << std::endl;
    });

    eventLoop.run();

    return 0;
}           

在上述代码中,添加了一个addFdEvent函数,用于添加对文件描述符的事件监听。在该函数中,通过epoll_ctl将文件描述符添加到epoll中进行监听,并将对应的事件处理函数保存到fdEvents_中。

在run函数中,对每个事件进行处理时,先判断事件是来自eventFd还是其他文件描述符。如果是eventFd的事件,则调用handleEvents函数处理事件队列中的事件。如果是其他文件描述符的事件,则调用handleFdEvent函数处理对应的事件。

在main函数中,通过open函数打开了一个文件,并将其文件描述符添加到EventLoop中进行监听。当该文件可读时,会触发对应的事件处理函数。

继续添加定时器事件

#include <iostream>
#include <functional>
#include <queue>
#include <unistd.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <chrono>
#include <thread>

class EventLoop {
public:
    EventLoop() : eventFd_(eventfd(0, EFD_NONBLOCK)) {
        epollFd_ = epoll_create1(0);
        epoll_event event;
        event.events = EPOLLIN | EPOLLET;
        event.data.fd = eventFd_;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, eventFd_, &event);
    }

    ~EventLoop() {
        close(eventFd_);
        close(epollFd_);
    }

    void addEvent(std::function<void()> event) {
        events_.push(event);
        uint64_t value = 1;
        write(eventFd_, &value, sizeof(value));
    }

    void addFdEvent(int fd, std::function<void()> event) {
        epoll_event event;
        event.events = EPOLLIN | EPOLLET;
        event.data.fd = fd;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
        fdEvents_[fd] = event;
    }

    void addTimerEvent(int milliseconds, std::function<void()> event) {
        auto startTime = std::chrono::steady_clock::now();
        auto endTime = startTime + std::chrono::milliseconds(milliseconds);
        timerEvents_.push({endTime, event});
    }

    void run() {
        while (true) {
            epoll_event events[10];
            int numEvents = epoll_wait(epollFd_, events, 10, -1);
            for (int i = 0; i < numEvents; i++) {
                if (events[i].data.fd == eventFd_) {
                    handleEvents();
                } else {
                    handleFdEvent(events[i].data.fd);
                }
            }
            handleTimerEvents();
        }
    }

private:
    void handleEvents() {
        while (!events_.empty()) {
            auto event = events_.front();
            events_.pop();
            event();
        }
    }

    void handleFdEvent(int fd) {
        if (fdEvents_.count(fd) > 0) {
            auto event = fdEvents_[fd];
            event();
        }
    }

    void handleTimerEvents() {
        auto now = std::chrono::steady_clock::now();
        while (!timerEvents_.empty() && timerEvents_.top().first <= now) {
            auto event = timerEvents_.top().second;
            timerEvents_.pop();
            event();
        }
    }

    int eventFd_;
    int epollFd_;
    std::queue<std::function<void()>> events_;
    std::unordered_map<int, epoll_event> fdEvents_;
    std::priority_queue<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>,
                        std::vector<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>>,
                        std::greater<std::pair<std::chrono::steady_clock::time_point, std::function<void()>>>> timerEvents_;
};

int main() {
    EventLoop eventLoop;

    eventLoop.addEvent([]() {
        std::cout << "Event 1" << std::endl;
    });

    eventLoop.addEvent([]() {
        std::cout << "Event 2" << std::endl;
    });

    int fd = open("test.txt", O_RDONLY);
    eventLoop.addFdEvent(fd, []() {
        std::cout << "File Descriptor Event" << std::endl;
    });

    eventLoop.addTimerEvent(1000, []() {
        std::cout << "Timer Event" << std::endl;
    });

    eventLoop.run();

    return 0;
}           

在上述代码中,添加了一个addTimerEvent函数,用于添加定时器事件。在该函数中,将事件的触发时间和事件处理函数保存到timerEvents_中。

在run函数中,添加了一个新的handleTimerEvents函数,用于处理定时器事件。在该函数中,获取当前时间now,并循环处理timerEvents_中的事件,直到找到第一个触发时间大于now的事件为止。

在main函数中,通过调用addTimerEvent函数添加了一个定时器事件。该定时器事件将在1000毫秒后触发,并执行对应的事件处理函数。

Linux一切皆文件:使用timerfd 简化定时器事件

使用timerfd来改写定时器事件可以更简洁和高效。timerfd是一个文件描述符,可以用来创建定时器。当定时器到期时,文件描述符上就会产生可读事件,可以通过epoll来监听该事件,并在事件发生时执行相应的处理函数。

#include <iostream>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>

class EventLoop {
public:
    EventLoop() : epollFd_(epoll_create1(0)) {}

    ~EventLoop() {
        close(epollFd_);
    }

    void addEvent(std::function<void()> event) {
        events_.push(event);
        uint64_t value = 1;
        write(eventFd_, &value, sizeof(value));
    }

    void addFdEvent(int fd, std::function<void()> event) {
        epoll_event event;
        event.events = EPOLLIN | EPOLLET;
        event.data.fd = fd;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event);
        fdEvents_[fd] = event;
    }

    void addTimerEvent(int milliseconds, std::function<void()> event) {
        int timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
        struct itimerspec timerSpec;
        memset(&timerSpec, 0, sizeof(timerSpec));
        timerSpec.it_value.tv_sec = milliseconds / 1000;
        timerSpec.it_value.tv_nsec = (milliseconds % 1000) * 1000000;
        timerfd_settime(timerFd, 0, &timerSpec, nullptr);
        addFdEvent(timerFd, [this, timerFd, event]() {
            uint64_t expirations;
            read(timerFd, &expirations, sizeof(expirations));
            event();
            close(timerFd);
        });
    }

    void run() {
        while (true) {
            epoll_event events[10];
            int numEvents = epoll_wait(epollFd_, events, 10, -1);
            for (int i = 0; i < numEvents; i++) {
                if (events[i].data.fd == eventFd_) {
                    handleEvents();
                } else {
                    handleFdEvent(events[i].data.fd);
                }
            }
        }
    }

private:
    void handleEvents() {
        while (!events_.empty()) {
            auto event = events_.front();
            events_.pop();
            event();
        }
    }

    void handleFdEvent(int fd) {
        if (fdEvents_.count(fd) > 0) {
            auto event = fdEvents_[fd];
            event();
        }
    }

    int epollFd_;
    int eventFd_;
    std::queue<std::function<void()>> events_;
    std::unordered_map<int, epoll_event> fdEvents_;
};

int main() {
    EventLoop eventLoop;

    eventLoop.addEvent([]() {
        std::cout << "Event 1" << std::endl;
    });

    eventLoop.addEvent([]() {
        std::cout << "Event 2" << std::endl;
    });

    eventLoop.addTimerEvent(1000, []() {
        std::cout << "Timer Event" << std::endl;
    });

    eventLoop.run();

    return 0;
}           

使用signalfd事件,并将所有事件统一为fd事件

信号( signal )本质是 Linux 进程间通信的一种机制,也叫软中断信号。既然是通信机制,那么就是传递信息用的,信号传递的信息很简单,就是一个整数,一般用于配合系统管理任务,比如进程的终结、恢复、热加载等。

信号都用整数常量表示,命名以 SIG 未前缀,比如 SIGINT( ctrl-c 触发),SIGKILL( kill -9 触发 )。

#include <iostream>
#include <sys/timerfd.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>
#include <csignal>
#include <functional>
#include <queue>
#include <unordered_map>
#include <sys/signalfd.h>

class EventLoop {
public:
    EventLoop() : epollFd_(epoll_create1(0)) {
        // 创建信号集
        sigset_t sigset;
        sigemptyset(&sigset);
        sigaddset(&sigset, SIGINT);

        // 将信号集添加到信号文件描述符中
        signalFd_ = signalfd(-1, &sigset, 0);

        // 将信号文件描述符添加到epoll中监听
        addFdEvent(signalFd_, [this]() {
            handleSignal();
        });
    }

    ~EventLoop() {
        close(epollFd_);
        close(signalFd_);
    }

    void addEvent(std::function<void()> event) {
        events_.push(event);
    }

    void addFdEvent(int fd, std::function<void()> event) {
        epoll_event ev;
        ev.events = EPOLLIN | EPOLLET;
        ev.data.fd = fd;
        epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &ev);
        fdEvents_[fd] = event;
    }

    void addTimerEvent(int milliseconds, std::function<void()> event) {
        int timerFd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
        struct itimerspec timerSpec;
        memset(&timerSpec, 0, sizeof(timerSpec));
        timerSpec.it_value.tv_sec = milliseconds / 1000;
        timerSpec.it_value.tv_nsec = (milliseconds % 1000) * 1000000;
        timerfd_settime(timerFd, 0, &timerSpec, nullptr);
        addFdEvent(timerFd, [this, timerFd, event]() {
            uint64_t expirations;
            read(timerFd, &expirations, sizeof(expirations));
            event();
            close(timerFd);
        });
    }

    void run() {
        while (true) {
            epoll_event events[10];
            int numEvents = epoll_wait(epollFd_, events, 10, -1);
            for (int i = 0; i < numEvents; i++) {
                handleFdEvent(events[i].data.fd);
            }
        }
    }

private:
    void handleEvents() {
        while (!events_.empty()) {
            auto event = events_.front();
            events_.pop();
            event();
        }
    }

    void handleFdEvent(int fd) {
        if (fdEvents_.count(fd) > 0) {
            auto event = fdEvents_[fd];
            event();
        }
    }

    void handleSignal() {
        struct signalfd_siginfo siginfo;
        read(signalFd_, &siginfo, sizeof(siginfo));
        std::cout << "Received signal: " << siginfo.ssi_signo << std::endl;

        // 执行自定义操作
        // ...
    }

private:
    int epollFd_;
    int signalFd_;
    std::queue<std::function<void()>> events_;
    std::unordered_map<int, std::function<void()>> fdEvents_;
};

int main() {
    EventLoop eventLoop;

    // 添加其他事件和定时器事件
    // ...

    eventLoop.run();

    return 0;
}           

上面代码中,使用signalfd来创建一个信号文件描述符,并将其添加到EventLoop类的epoll中进行监听。在handleSignal()函数中,使用read()函数从信号文件描述符中读取信号信息,并处理接收到的信号。

可以根据需要在handleSignal()函数中执行自定义操作,例如记录日志、执行清理操作等。同时,可以根据需要在EventLoop类中添加其他信号处理函数,只需在构造函数中使用signalfd()函数创建相应的信号文件描述符,并将其添加到epoll中监听即可。

上述代码删除了 eventFd_ ,因为 handleFdEvent 函数已经可以处理所有的文件描述符事件,包括信号文件描述符的事件。

继续阅读