libevent 快速上手

Posted by Kalos Aner on April 14, 2026

简介

libevent 是一个用C语言编写的、轻量级的开源高性能事件驱动网络库。它为开发高性能、可扩展的网络应用程序提供了跨平台的抽象接口,使开发者能够使用事件回调机制来管理网络连接、定时器和信号等事件。libevent 底层采用 Reactor 模型实现,但是从编程应用视角上看它是异步事件驱动。

安装libevent

下面是一些安装方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Ubuntu/Debian
sudo apt-get install libevent-dev

# CentOS/RHEL
sudo yum install libevent-devel

# macOS (使用Homebrew)
brew install libevent

# 源码安装
wget https://github.com/libevent/libevent/releases/download/release-2.1.12-stable/libevent-2.1.12-stable.tar.gz
tar -xzf libevent-2.1.12-stable.tar.gz
cd libevent-2.1.12-stable
./configure
make && sudo make install

使用libevent编译项目的时候需要链接这个库,一个简单的命令如 gcc -o example example.c -levent。对于一些中大型项目一般都会使用 make 或者 cmake 编译项目,并在其中链接需要的库。

快速入门

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define PORT 8888

// 读取回调:客户端发送数据
void read_callback(struct bufferevent *bev, void *ctx) {
    struct evbuffer *input = bufferevent_get_input(bev);
    struct evbuffer *output = bufferevent_get_output(bev);
    
    char *data = evbuffer_readln(input, NULL, EVBUFFER_EOL_LF);
    if (data) {
        printf("Received: %s\n", data);
        
        // 回复客户端
        char reply[256];
        snprintf(reply, sizeof(reply), "Server echo: %s", data);
        bufferevent_write(bev, reply, strlen(reply));
        
        free(data);
    }
}

// 事件回调:处理错误和连接关闭
void event_callback(struct bufferevent *bev, short events, void *ctx) {
    if (events & BEV_EVENT_ERROR) {
        perror("Error from bufferevent");
    }
    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
        printf("Connection closed\n");
        bufferevent_free(bev);
    }
}

// 连接回调:新客户端连接
void accept_conn_callback(struct evconnlistener *listener,
                          evutil_socket_t fd,
                          struct sockaddr *address,
                          int socklen,
                          void *ctx) {
    struct event_base *base = evconnlistener_get_base(listener);
    struct bufferevent *bev;
    
    // 创建带缓冲区的事件
    bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
    
    // 设置回调函数
    bufferevent_setcb(bev, read_callback, NULL, event_callback, NULL);
    
    // 启用读写事件
    bufferevent_enable(bev, EV_READ | EV_WRITE);
    
    printf("New connection accepted\n");
}

// 错误回调:监听器错误
void accept_error_callback(struct evconnlistener *listener, void *ctx) {
    struct event_base *base = evconnlistener_get_base(listener);
    int err = EVUTIL_SOCKET_ERROR();
    fprintf(stderr, "Got an error %d (%s) on the listener. "
            "Shutting down.\n", err, evutil_socket_error_to_string(err));
    
    event_base_loopexit(base, NULL);
}

int main() {
    struct event_base *base;
    struct evconnlistener *listener;
    struct sockaddr_in sin;
    
    // 1. 创建事件循环
    base = event_base_new();
    if (!base) {
        fprintf(stderr, "Could not initialize libevent!\n");
        return 1;
    }
    
    // 2. 配置监听地址
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(PORT);
    sin.sin_addr.s_addr = htonl(INADDR_ANY);
    
    // 3. 创建监听器
    listener = evconnlistener_new_bind(base, accept_conn_callback, NULL,
                                       LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
                                       -1, (struct sockaddr*)&sin, sizeof(sin));
    
    if (!listener) {
        fprintf(stderr, "Could not create listener!\n");
        return 1;
    }
    
    evconnlistener_set_error_cb(listener, accept_error_callback);
    
    printf("Server listening on port %d\n", PORT);
    // 4. 启动事件循环
    event_base_dispatch(base);
    
    // 5. 清理资源
    evconnlistener_free(listener);
    event_base_free(base);
    
    return 0;
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#define PORT 8888
#define SERVER_IP "127.0.0.1"

int main() {
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) {
        perror("Socket creation failed");
        return 1;
    }
    
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(PORT);
    inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr);
    
    if (connect(sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        perror("Connection failed");
        return 1;
    }
    
    printf("Connected to server. Enter messages (type 'quit' to exit):\n");
    
    char buffer[1024];
    while (1) {
        printf("You: ");
        fgets(buffer, sizeof(buffer), stdin);
        buffer[strcspn(buffer, "\n")] = 0; // 去除换行符
        
        if (strcmp(buffer, "quit") == 0) {
            break;
        }
        
        // 发送消息
        send(sock, buffer, strlen(buffer), 0);
        send(sock, "\n", 1, 0); // 添加换行符
        
        // 接收回复
        memset(buffer, 0, sizeof(buffer));
        int n = recv(sock, buffer, sizeof(buffer), 0);
        if (n > 0) {
            printf("Server: %s\n", buffer);
        }
    }
    
    close(sock);
    return 0;
}

高级用法

下面给出一个 Memcached 项目中使用 libevent 的用法。Memcached 是一个自由开源、高性能、分布式内存对象缓存系统。

/**
 * Memcached架构精简版 - 简化命令版本
 * SET命令: set key value
 * GET命令: get key
 * DELETE命令: delete key
 * STATS命令: stats(显示统计信息)
 * QUIT命令: quit
 * FLUSH_ALL命令: flush_all(清空所有数据)
 * HELP命令: help(显示帮助信息)
 */

#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <string>
#include <memory>
#include <vector>
#include <atomic>
#include <thread>

#define THREAD_COUNT 4          // 工作线程数量
#define PORT 11211              // Memcached默认端口
#define MAX_CONN_QUEUE 10000    // 最大连接队列
#define MAX_KEY_LEN 250         // 最大key长度
#define MAX_VALUE_LEN 1024*1024 // 最大value长度(1MB)

// ==================== 数据结构定义 ====================

// 键值对存储(使用C++ STL容器)
struct KVItem {
    std::string value;
    KVItem() : value("") {}
    KVItem(const std::string& v) : value(v) {}
};

// 全局共享存储(所有线程共享)
static std::unordered_map<std::string, KVItem> global_storage;
static std::mutex global_storage_mutex;

// 连接对象(使用C++类)
class Connection {
public:
    int fd;                      // 客户端文件描述符
    struct bufferevent* bev;     // bufferevent
    Connection* next;            // 链表指针
    
    Connection(int fd_) : fd(fd_), bev(nullptr), next(nullptr) {}
    
    ~Connection() {
        if (bev) {
            bufferevent_free(bev);
        }
    }
};

// 工作线程类
class WorkerThread {
public:
    std::thread thread;          // C++线程
    struct event_base* base;     // 每个线程独立的event_base
    struct event* notify_event;  // 通知事件(监听管道)
    int notify_receive_fd;       // 接收通知的管道读端
    int notify_send_fd;          // 发送通知的管道写端(主线程使用)
    
    // 连接队列
    Connection* conn_queue;      // 队列头指针
    Connection* conn_queue_tail; // 队列尾指针
    std::mutex queue_mutex;      // 队列锁
    int queue_size;              // 队列大小
    int thread_idx;              // 线程索引

    WorkerThread() : thread(), thread_idx(-1), base(nullptr), notify_event(nullptr),
                     notify_receive_fd(-1), notify_send_fd(-1),
                     conn_queue(nullptr), conn_queue_tail(nullptr), queue_size(0) {}

    WorkerThread(int idx) : WorkerThread() {
        thread_idx = idx;
    }

    ~WorkerThread() {
        cleanup();
    }
    
    void cleanup() {
        if (notify_event) {
            event_free(notify_event);
            notify_event = nullptr;
        }
        if (base) {
            event_base_free(base);
            base = nullptr;
        }
        if (notify_receive_fd >= 0) {
            close(notify_receive_fd);
            notify_receive_fd = -1;
        }
        if (notify_send_fd >= 0) {
            close(notify_send_fd);
            notify_send_fd = -1;
        }
    }
};

// 全局变量
static std::vector<std::unique_ptr<WorkerThread>> worker_threads;
static struct event_base* main_base = nullptr;
static struct evconnlistener* main_listener = nullptr;
static std::atomic<bool> shutdown_requested{false};

// ==================== 存储操作函数(线程安全) ====================

// 设置键值对
bool set_kv_item(const std::string& key, const std::string& value) {
    std::lock_guard<std::mutex> lock(global_storage_mutex);
    
    try {
        global_storage[key] = KVItem(value);
        return true;
    } catch (const std::exception& e) {
        fprintf(stderr, "Error setting key '%s': %s\n", key.c_str(), e.what());
        return false;
    }
}

// 查找键值对
std::pair<bool, std::string> get_kv_item(const std::string& key) {
    std::lock_guard<std::mutex> lock(global_storage_mutex);
    
    auto it = global_storage.find(key);
    if (it != global_storage.end()) {
        return {true, it->second.value};
    }
    return {false, ""};
}

// 删除键值对
bool delete_kv_item(const std::string& key) {
    std::lock_guard<std::mutex> lock(global_storage_mutex);
    
    auto it = global_storage.find(key);
    if (it != global_storage.end()) {
        global_storage.erase(it);
        return true;
    }
    return false;
}

// 获取存储统计信息
int get_storage_count() {
    std::lock_guard<std::mutex> lock(global_storage_mutex);
    return global_storage.size();
}

// 清空所有数据
void flush_all() {
    std::lock_guard<std::mutex> lock(global_storage_mutex);
    global_storage.clear();
}

// ==================== 工具函数 ====================

// 向工作线程添加连接
void add_conn_to_worker(WorkerThread& worker, Connection* conn) {
    std::lock_guard<std::mutex> lock(worker.queue_mutex);
    
    // 添加到队列尾部
    if (worker.conn_queue_tail) {
        worker.conn_queue_tail->next = conn;
    } else {
        worker.conn_queue = conn;
    }
    worker.conn_queue_tail = conn;
    worker.queue_size++;
    
    // 通知工作线程有新连接
    char buf = 1;
    if (write(worker.notify_send_fd, &buf, 1) != 1) {
        perror("write to notify pipe");
    }
}

// 从工作线程获取连接
Connection* get_conn_from_worker(WorkerThread& worker) {
    std::lock_guard<std::mutex> lock(worker.queue_mutex);
    
    if (!worker.conn_queue) {
        return nullptr;
    }
    
    // 从队列头部取出
    Connection* conn = worker.conn_queue;
    worker.conn_queue = conn->next;
    if (!worker.conn_queue) {
        worker.conn_queue_tail = nullptr;
    }
    worker.queue_size--;
    
    return conn;
}

// ==================== 工作线程回调函数 ====================

// 工作线程:读取回调(处理客户端请求)
void worker_read_cb(struct bufferevent* bev, void* arg) {
    Connection* conn = static_cast<Connection*>(arg);
    WorkerThread* worker = nullptr;
    
    // 找到对应的worker(通过遍历)
    for (size_t i = 0; i < worker_threads.size(); i++) {
        if (worker_threads[i]->base == bufferevent_get_base(bev)) {
            worker = worker_threads[i].get();
            break;
        }
    }
    
    if (!worker) {
        fprintf(stderr, "[Worker] Cannot find worker for connection\n");
        return;
    }
    
    struct evbuffer* input = bufferevent_get_input(bev);
    
    // 读取一行数据
    size_t n_read;
    char* line = evbuffer_readln(input, &n_read, EVBUFFER_EOL_CRLF_STRICT);
    if (!line) return;
    
    printf("[Worker %d] Received: '%s'\n", worker->thread_idx, line);
    
    // 简化的命令处理
    if (strcmp(line, "quit") == 0) {
        const char* response = "BYE\n";
        bufferevent_write(bev, response, strlen(response));
        bufferevent_free(bev);
        conn->bev = nullptr; // 避免双重释放
    } else if (strncmp(line, "get ", 4) == 0) {
        // GET命令: get key
        char* key = line + 4;
        
        // 去除开头和结尾的空白字符
        while (*key == ' ') key++;
        char* end = key + strlen(key) - 1;
        while (end >= key && (*end == ' ' || *end == '\r' || *end == '\n')) {
            *end = '\0';
            end--;
        }
        
        if (*key == '\0') {
            bufferevent_write(bev, "ERROR: key is empty\r\n", 22);
        } else {
            auto [found, value] = get_kv_item(key);
            if (found) {
                std::string response = "VALUE: " + std::string(key) + " = " + value + "\r\n";
                bufferevent_write(bev, response.c_str(), response.length());
                printf("[Worker %d] GET %s = %s\n", worker->thread_idx, key, value.c_str());
            } else {
                std::string response = "NOT FOUND: key '" + std::string(key) + "' does not exist\r\n";
                bufferevent_write(bev, response.c_str(), response.length());
                printf("[Worker %d] GET %s = NOT FOUND\n", worker->thread_idx, key);
            }
        }
    } else if (strncmp(line, "set ", 4) == 0) {
        // SET命令: set key value
        // 格式: set <key> <value>
        
        char* rest = line + 4;
        while (*rest == ' ') rest++;  // 跳过空格
        
        // 找到key(第一个空格之前)
        char* key_end = strchr(rest, ' ');
        if (!key_end) {
            bufferevent_write(bev, "ERROR: invalid set command, format: set key value\r\n", 53);
            free(line);
            return;
        }
        
        *key_end = '\0';  // 分割key和value
        char* key = rest;
        char* value = key_end + 1;
        
        // 去除value开头的空格
        while (*value == ' ') value++;
        
        // 去除key和value末尾的空白字符
        char* key_trim_end = key + strlen(key) - 1;
        while (key_trim_end >= key && (*key_trim_end == ' ' || *key_trim_end == '\r' || *key_trim_end == '\n')) {
            *key_trim_end = '\0';
            key_trim_end--;
        }
        
        char* value_trim_end = value + strlen(value) - 1;
        while (value_trim_end >= value && (*value_trim_end == ' ' || *value_trim_end == '\r' || *value_trim_end == '\n')) {
            *value_trim_end = '\0';
            value_trim_end--;
        }
        
        // 检查key和value是否为空
        if (*key == '\0' || *value == '\0') {
            bufferevent_write(bev, "ERROR: key and value cannot be empty\r\n", 39);
            free(line);
            return;
        }
        
        // 检查key长度
        if (strlen(key) > MAX_KEY_LEN) {
            bufferevent_write(bev, "ERROR: key too long\r\n", 22);
            free(line);
            return;
        }
        
        // 检查value长度
        if (strlen(value) > MAX_VALUE_LEN) {
            bufferevent_write(bev, "ERROR: value too long\r\n", 24);
            free(line);
            return;
        }
        
        // 存储到全局存储
        if (set_kv_item(key, value)) {
            std::string response = "STORED: key '" + std::string(key) + "' with value '" + std::string(value) + "'\r\n";
            bufferevent_write(bev, response.c_str(), response.length());
            printf("[Worker %d] SET %s = %s\n", worker->thread_idx, key, value);
        } else {
            bufferevent_write(bev, "ERROR: failed to store\r\n", 24);
        }
    } else if (strncmp(line, "delete ", 7) == 0) {
        // DELETE命令: delete key
        char* key = line + 7;
        while (*key == ' ') key++;
        
        char* end = key + strlen(key) - 1;
        while (end >= key && (*end == ' ' || *end == '\r' || *end == '\n')) {
            *end = '\0';
            end--;
        }
        
        if (delete_kv_item(key)) {
            std::string response = "DELETED: key '" + std::string(key) + "'\r\n";
            bufferevent_write(bev, response.c_str(), response.length());
            printf("[Worker %d] DELETE %s\n", worker->thread_idx, key);
        } else {
            std::string response = "NOT FOUND: key '" + std::string(key) + "' does not exist\r\n";
            bufferevent_write(bev, response.c_str(), response.length());
        }
    } else if (strcmp(line, "stats") == 0) {
        // 统计信息
        int count = get_storage_count();
        
        char stats[256];
        int len = snprintf(stats, sizeof(stats), 
                          "STAT items: %d\r\n"
                          "STAT threads: %d\r\n"
                          "END\r\n",
                          count, THREAD_COUNT);
        bufferevent_write(bev, stats, len);
    } else if (strcmp(line, "flush_all") == 0) {
        // 清空所有数据
        flush_all();
        bufferevent_write(bev, "OK: all data flushed\r\n", 23);
        printf("[Worker] All data flushed\n");
    } else if (strcmp(line, "help") == 0) {
        // 帮助信息
        const char* help = 
            "Available commands:\r\n"
            "  set key value      - Set a key-value pair\r\n"
            "  get key            - Get the value of a key\r\n"
            "  delete key         - Delete a key\r\n"
            "  stats              - Show server statistics\r\n"
            "  flush_all          - Clear all data\r\n"
            "  help               - Show this help message\r\n"
            "  quit               - Close connection\r\n";
        bufferevent_write(bev, help, strlen(help));
    } else {
        // 未知命令
        const char* response = "ERROR: unknown command. Type 'help' for available commands\r\n";
        bufferevent_write(bev, response, strlen(response));
    }
    
    free(line);
}

// 工作线程:事件回调(处理错误和关闭)
void worker_event_cb(struct bufferevent* bev, short events, void* arg) {
    Connection* conn = static_cast<Connection*>(arg);
    
    if (events & BEV_EVENT_ERROR) {
        perror("Connection error");
    }
    
    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
        printf("[Worker] Connection closed fd=%d\n", conn->fd);
        bufferevent_free(bev);
        delete conn;
    }
}

// 工作线程:处理新连接通知
void worker_notify_cb(evutil_socket_t fd, short events, void* arg) {
    WorkerThread* worker = static_cast<WorkerThread*>(arg);
    char buf[1024];
    
    // 读取管道中的通知(清空管道)
    while (read(fd, buf, sizeof(buf)) > 0);
    
    // 处理队列中的所有连接
    Connection* conn;
    while ((conn = get_conn_from_worker(*worker)) != nullptr) {
        printf("[Worker %d] Processing new connection fd=%d\n", 
               worker->thread_idx, conn->fd);
        
        // 创建bufferevent处理连接
        conn->bev = bufferevent_socket_new(
            worker->base, conn->fd, BEV_OPT_CLOSE_ON_FREE);
        
        if (!conn->bev) {
            fprintf(stderr, "[Worker %d] Error creating bufferevent\n", 
                   worker->thread_idx);
            close(conn->fd);
            delete conn;
            continue;
        }
        
        // 设置回调
        bufferevent_setcb(conn->bev, 
                         worker_read_cb, 
                         nullptr, 
                         worker_event_cb, 
                         conn);
        
        // 启用读事件
        bufferevent_enable(conn->bev, EV_READ);
        
        // 发送欢迎消息
        const char* welcome = 
            "========================================\n"
            "  Welcome to Simple Key-Value Store!   \n"
            "========================================\n"
            "Type 'help' for available commands.\n\n";
        bufferevent_write(conn->bev, welcome, strlen(welcome));
    }
}

// ==================== 主线程回调函数 ====================

// 主线程:接受新连接
void master_accept_cb(struct evconnlistener* listener,
                     evutil_socket_t fd,
                     struct sockaddr* address,
                     int socklen,
                     void* ctx) {
    // 简单轮询选择工作线程
    static int next_worker = 0;
    int worker_idx = next_worker++ % THREAD_COUNT;
    
    printf("[Master] New connection fd=%d, assigned to worker %d\n", 
           fd, worker_idx);
    
    // 创建连接对象
    Connection* conn = new Connection(fd);
    
    // 添加到工作线程队列
    add_conn_to_worker(*worker_threads[worker_idx], conn);
}

// 主线程:监听器错误回调
void master_error_cb(struct evconnlistener* listener, void* ctx) {
    struct event_base* base = evconnlistener_get_base(listener);
    int err = EVUTIL_SOCKET_ERROR();
    
    fprintf(stderr, "[Master] Got error %d (%s) on listener. "
            "Shutting down.\n", err, evutil_socket_error_to_string(err));
    
    event_base_loopbreak(base);
}

// ==================== 信号处理 ====================

void signal_cb(evutil_socket_t sig, short events, void* arg) {
    printf("\n[Signal] Caught signal %d\n", sig);
    
    if (sig == SIGINT || sig == SIGTERM) {
        shutdown_requested = true;
        printf("[Signal] Shutting down gracefully...\n");
        
        // 停止监听新连接
        evconnlistener_disable(main_listener);
        
        // 退出主线程事件循环
        event_base_loopbreak(main_base);
    }
}

// ==================== 工作线程主函数 ====================

void worker_thread_func(WorkerThread& worker) {
    printf("[Worker %d] Thread started (base=%p)\n", 
           worker.thread_idx, worker.base);
    
    // 启动工作线程的事件循环
    event_base_dispatch(worker.base);
    
    printf("[Worker %d] Thread exiting\n", worker.thread_idx);
}

// ==================== 初始化工作线程 ====================

bool init_worker_threads() {
    worker_threads.reserve(THREAD_COUNT);

    for (int i = 0; i < THREAD_COUNT; i++) {
        worker_threads.emplace_back(std::make_unique<WorkerThread>(i));
        WorkerThread& worker = *worker_threads.back();
        
        // 创建每个线程独立的event_base
        worker.base = event_base_new();
        if (!worker.base) {
            fprintf(stderr, "Failed to create event_base for worker %d\n", i);
            return false;
        }
        
        // 创建管道用于线程间通信
        int fds[2];
        if (pipe(fds) < 0) {
            perror("pipe");
            return false;
        }
        worker.notify_receive_fd = fds[0];
        worker.notify_send_fd = fds[1];
        
        // 设置管道为非阻塞
        evutil_make_socket_nonblocking(worker.notify_receive_fd);
        evutil_make_socket_nonblocking(worker.notify_send_fd);
        
        worker.conn_queue = nullptr;
        worker.conn_queue_tail = nullptr;
        worker.queue_size = 0;
        
        // 创建通知事件(监听管道读端)
        worker.notify_event = event_new(
            worker.base,
            worker.notify_receive_fd, 
            EV_READ | EV_PERSIST,
            worker_notify_cb, 
            &worker);
        
        if (!worker.notify_event) {
            fprintf(stderr, "Failed to create notify event for worker %d\n", i);
            return false;
        }
        
        event_add(worker.notify_event, nullptr);
        
        printf("[Init] Worker %d initialized (pipe=%d)\n", i, fds[0]);
    }
    
    return true;
}

// ==================== 启动工作线程 ====================

bool start_worker_threads() {
    for (auto& worker : worker_threads) {
        worker->thread = std::thread(worker_thread_func, std::ref(*worker));
        printf("[Start] Worker %d thread started\n", worker->thread_idx);
    }
    
    return true;
}

// ==================== 主函数 ====================

int main(int argc, char** argv) {
    printf("========================================\n");
    printf("  Simple Key-Value Store Server        \n");
    printf("========================================\n\n");
    
    // 1. 初始化libevent线程支持
    evthread_use_pthreads();
    
    // 2. 创建主线程的event_base
    main_base = event_base_new();
    if (!main_base) {
        fprintf(stderr, "Failed to create main event_base\n");
        return 1;
    }
    
    // 3. 初始化工作线程
    if (!init_worker_threads()) {
        fprintf(stderr, "Failed to initialize worker threads\n");
        return 1;
    }
    
    // 4. 启动工作线程
    if (!start_worker_threads()) {
        fprintf(stderr, "Failed to start worker threads\n");
        return 1;
    }
    
    // 5. 配置监听地址
    struct sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(PORT);
    sin.sin_addr.s_addr = htonl(INADDR_ANY);
    
    // 6. 创建监听器(主线程负责accept)
    main_listener = evconnlistener_new_bind(
        main_base, master_accept_cb, nullptr,
        LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
        -1, (struct sockaddr*)&sin, sizeof(sin));
    
    if (!main_listener) {
        fprintf(stderr, "Failed to create listener\n");
        return 1;
    }
    
    evconnlistener_set_error_cb(main_listener, master_error_cb);
    
    // 7. 注册信号处理
    struct event* sigint_event = evsignal_new(main_base, SIGINT, signal_cb, nullptr);
    struct event* sigterm_event = evsignal_new(main_base, SIGTERM, signal_cb, nullptr);
    event_add(sigint_event, nullptr);
    event_add(sigterm_event, nullptr);
    
    // 8. 输出服务器信息
    printf("[INFO] Server started successfully!\n");
    printf("[INFO] Listening on port %d\n", PORT);
    printf("[INFO] Worker threads: %d\n", THREAD_COUNT);
    printf("[INFO] Max key length: %d characters\n", MAX_KEY_LEN);
    printf("[INFO] Max value length: %d bytes\n", MAX_VALUE_LEN);
    printf("[INFO] Press Ctrl+C to stop.\n\n");
    
    // 9. 启动主线程事件循环
    event_base_dispatch(main_base);
    
    // 10. 清理资源
    printf("\n[INFO] Cleaning up resources...\n");
    
    // 释放监听器
    evconnlistener_free(main_listener);
    
    // 通知所有工作线程退出
    for (auto& worker : worker_threads) {
        // 向工作线程发送退出通知
        event_base_loopbreak(worker->base);
        
        // 等待线程结束
        if (worker->thread.joinable()) {
            worker->thread.join();
        }
        
        printf("[Cleanup] Worker %d cleaned up\n", worker->thread_idx);
    }
    
    // 释放主线程资源
    event_free(sigint_event);
    event_free(sigterm_event);
    event_base_free(main_base);
    
    printf("[INFO] Server stopped. Goodbye!\n");
    
    return 0;
}