libevent 快速上手

Posted by Kalos Aner on April 14, 2026

简介

libevent 是一个用C语言编写的、轻量级的开源高性能事件驱动网络库。它为开发高性能、可扩展的网络应用程序提供了跨平台的抽象接口,使开发者能够使用事件回调机制来管理网络连接、定时器和信号等事件。libevent 底层采用 Reactor 模型实现,但是从编程应用视角上看它是异步事件驱动。

安装libevent

下面是一些安装方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Ubuntu/Debian
sudo apt-get install libevent-dev

# CentOS/RHEL
sudo yum install libevent-devel

# macOS (使用Homebrew)
brew install libevent

# 源码安装
wget https://github.com/libevent/libevent/releases/download/release-2.1.12-stable/libevent-2.1.12-stable.tar.gz
tar -xzf libevent-2.1.12-stable.tar.gz
cd libevent-2.1.12-stable
./configure
make && sudo make install

使用libevent编译项目的时候需要链接这个库,一个简单的命令如 gcc -o example example.c -levent。对于一些中大型项目一般都会使用 make 或者 cmake 编译项目,并在其中链接需要的库。

快速入门

服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define PORT 8888

// 读取回调:客户端发送数据
void read_callback(struct bufferevent *bev, void *ctx) {
    struct evbuffer *input = bufferevent_get_input(bev);
    struct evbuffer *output = bufferevent_get_output(bev);
    
    char *data = evbuffer_readln(input, NULL, EVBUFFER_EOL_LF);
    if (data) {
        printf("Received: %s\n", data);
        
        // 回复客户端
        char reply[256];
        snprintf(reply, sizeof(reply), "Server echo: %s", data);
        bufferevent_write(bev, reply, strlen(reply));
        
        free(data);
    }
}

// 事件回调:处理错误和连接关闭
void event_callback(struct bufferevent *bev, short events, void *ctx) {
    if (events & BEV_EVENT_ERROR) {
        perror("Error from bufferevent");
    }
    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
        printf("Connection closed\n");
        bufferevent_free(bev);
    }
}

// 连接回调:新客户端连接
void accept_conn_callback(struct evconnlistener *listener,
                          evutil_socket_t fd,
                          struct sockaddr *address,
                          int socklen,
                          void *ctx) {
    struct event_base *base = evconnlistener_get_base(listener);
    struct bufferevent *bev;
    
    // 创建带缓冲区的事件
    bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
    
    // 设置回调函数
    bufferevent_setcb(bev, read_callback, NULL, event_callback, NULL);
    
    // 启用读写事件
    bufferevent_enable(bev, EV_READ | EV_WRITE);
    
    printf("New connection accepted\n");
}

// 错误回调:监听器错误
void accept_error_callback(struct evconnlistener *listener, void *ctx) {
    struct event_base *base = evconnlistener_get_base(listener);
    int err = EVUTIL_SOCKET_ERROR();
    fprintf(stderr, "Got an error %d (%s) on the listener. "
            "Shutting down.\n", err, evutil_socket_error_to_string(err));
    
    event_base_loopexit(base, NULL);
}

int main() {
    struct event_base *base;
    struct evconnlistener *listener;
    struct sockaddr_in sin;
    
    // 1. 创建事件循环
    base = event_base_new();
    if (!base) {
        fprintf(stderr, "Could not initialize libevent!\n");
        return 1;
    }
    
    // 2. 配置监听地址
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(PORT);
    sin.sin_addr.s_addr = htonl(INADDR_ANY);
    
    // 3. 创建监听器
    listener = evconnlistener_new_bind(base, accept_conn_callback, NULL,
                                       LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
                                       -1, (struct sockaddr*)&sin, sizeof(sin));
    
    if (!listener) {
        fprintf(stderr, "Could not create listener!\n");
        return 1;
    }
    
    evconnlistener_set_error_cb(listener, accept_error_callback);
    
    printf("Server listening on port %d\n", PORT);
    // 4. 启动事件循环
    event_base_dispatch(base);
    
    // 5. 清理资源
    evconnlistener_free(listener);
    event_base_free(base);
    
    return 0;
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#define PORT 8888
#define SERVER_IP "127.0.0.1"

int main() {
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) {
        perror("Socket creation failed");
        return 1;
    }
    
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(PORT);
    inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr);
    
    if (connect(sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        perror("Connection failed");
        return 1;
    }
    
    printf("Connected to server. Enter messages (type 'quit' to exit):\n");
    
    char buffer[1024];
    while (1) {
        printf("You: ");
        fgets(buffer, sizeof(buffer), stdin);
        buffer[strcspn(buffer, "\n")] = 0; // 去除换行符
        
        if (strcmp(buffer, "quit") == 0) {
            break;
        }
        
        // 发送消息
        send(sock, buffer, strlen(buffer), 0);
        send(sock, "\n", 1, 0); // 添加换行符
        
        // 接收回复
        memset(buffer, 0, sizeof(buffer));
        int n = recv(sock, buffer, sizeof(buffer), 0);
        if (n > 0) {
            printf("Server: %s\n", buffer);
        }
    }
    
    close(sock);
    return 0;
}

高级用法

下面给出一个 Memcached 项目中使用 libevent 的用法。Memcached 是一个自由开源、高性能、分布式内存对象缓存系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
/**
 * Memcached架构精简版 - 简化命令版本
 * SET命令: set key value
 * GET命令: get key
 * DELETE命令: delete key
 * STATS命令: stats(显示统计信息)
 * QUIT命令: quit
 * FLUSH_ALL命令: flush_all(清空所有数据)
 * HELP命令: help(显示帮助信息)
 */

#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <string>
#include <memory>
#include <vector>
#include <atomic>
#include <thread>

#define THREAD_COUNT 4          // 工作线程数量
#define PORT 11211              // Memcached默认端口
#define MAX_CONN_QUEUE 10000    // 最大连接队列
#define MAX_KEY_LEN 250         // 最大key长度
#define MAX_VALUE_LEN 1024*1024 // 最大value长度(1MB)

// ==================== 数据结构定义 ====================

// 键值对存储(使用C++ STL容器)
struct KVItem {
    std::string value;
    KVItem() : value("") {}
    KVItem(const std::string& v) : value(v) {}
};

// 全局共享存储(所有线程共享)
static std::unordered_map<std::string, KVItem> global_storage;
static std::mutex global_storage_mutex;

// 连接对象(使用C++类)
class Connection {
public:
    int fd;                      // 客户端文件描述符
    struct bufferevent* bev;     // bufferevent
    Connection* next;            // 链表指针
    
    Connection(int fd_) : fd(fd_), bev(nullptr), next(nullptr) {}
    
    ~Connection() {
        if (bev) {
            bufferevent_free(bev);
        }
    }
};

// 工作线程类
class WorkerThread {
public:
    std::thread thread;          // C++线程
    struct event_base* base;     // 每个线程独立的event_base
    struct event* notify_event;  // 通知事件(监听管道)
    int notify_receive_fd;       // 接收通知的管道读端
    int notify_send_fd;          // 发送通知的管道写端(主线程使用)
    
    // 连接队列
    Connection* conn_queue;      // 队列头指针
    Connection* conn_queue_tail; // 队列尾指针
    std::mutex queue_mutex;      // 队列锁
    int queue_size;              // 队列大小
    int thread_idx;              // 线程索引

    WorkerThread() : thread(), thread_idx(-1), base(nullptr), notify_event(nullptr),
                     notify_receive_fd(-1), notify_send_fd(-1),
                     conn_queue(nullptr), conn_queue_tail(nullptr), queue_size(0) {}

    WorkerThread(int idx) : WorkerThread() {
        thread_idx = idx;
    }

    ~WorkerThread() {
        cleanup();
    }
    
    void cleanup() {
        if (notify_event) {
            event_free(notify_event);
            notify_event = nullptr;
        }
        if (base) {
            event_base_free(base);
            base = nullptr;
        }
        if (notify_receive_fd >= 0) {
            close(notify_receive_fd);
            notify_receive_fd = -1;
        }
        if (notify_send_fd >= 0) {
            close(notify_send_fd);
            notify_send_fd = -1;
        }
    }
};

// 全局变量
static std::vector<std::unique_ptr<WorkerThread>> worker_threads;
static struct event_base* main_base = nullptr;
static struct evconnlistener* main_listener = nullptr;
static std::atomic<bool> shutdown_requested{false};

// ==================== 存储操作函数(线程安全) ====================

// 设置键值对
bool set_kv_item(const std::string& key, const std::string& value) {
    std::lock_guard<std::mutex> lock(global_storage_mutex);
    
    try {
        global_storage[key] = KVItem(value);
        return true;
    } catch (const std::exception& e) {
        fprintf(stderr, "Error setting key '%s': %s\n", key.c_str(), e.what());
        return false;
    }
}

// 查找键值对
std::pair<bool, std::string> get_kv_item(const std::string& key) {
    std::lock_guard<std::mutex> lock(global_storage_mutex);
    
    auto it = global_storage.find(key);
    if (it != global_storage.end()) {
        return {true, it->second.value};
    }
    return {false, ""};
}

// 删除键值对
bool delete_kv_item(const std::string& key) {
    std::lock_guard<std::mutex> lock(global_storage_mutex);
    
    auto it = global_storage.find(key);
    if (it != global_storage.end()) {
        global_storage.erase(it);
        return true;
    }
    return false;
}

// 获取存储统计信息
int get_storage_count() {
    std::lock_guard<std::mutex> lock(global_storage_mutex);
    return global_storage.size();
}

// 清空所有数据
void flush_all() {
    std::lock_guard<std::mutex> lock(global_storage_mutex);
    global_storage.clear();
}

// ==================== 工具函数 ====================

// 向工作线程添加连接
void add_conn_to_worker(WorkerThread& worker, Connection* conn) {
    std::lock_guard<std::mutex> lock(worker.queue_mutex);
    
    // 添加到队列尾部
    if (worker.conn_queue_tail) {
        worker.conn_queue_tail->next = conn;
    } else {
        worker.conn_queue = conn;
    }
    worker.conn_queue_tail = conn;
    worker.queue_size++;
    
    // 通知工作线程有新连接
    char buf = 1;
    if (write(worker.notify_send_fd, &buf, 1) != 1) {
        perror("write to notify pipe");
    }
}

// 从工作线程获取连接
Connection* get_conn_from_worker(WorkerThread& worker) {
    std::lock_guard<std::mutex> lock(worker.queue_mutex);
    
    if (!worker.conn_queue) {
        return nullptr;
    }
    
    // 从队列头部取出
    Connection* conn = worker.conn_queue;
    worker.conn_queue = conn->next;
    if (!worker.conn_queue) {
        worker.conn_queue_tail = nullptr;
    }
    worker.queue_size--;
    
    return conn;
}

// ==================== 工作线程回调函数 ====================

// 工作线程:读取回调(处理客户端请求)
void worker_read_cb(struct bufferevent* bev, void* arg) {
    Connection* conn = static_cast<Connection*>(arg);
    WorkerThread* worker = nullptr;
    
    // 找到对应的worker(通过遍历)
    for (size_t i = 0; i < worker_threads.size(); i++) {
        if (worker_threads[i]->base == bufferevent_get_base(bev)) {
            worker = worker_threads[i].get();
            break;
        }
    }
    
    if (!worker) {
        fprintf(stderr, "[Worker] Cannot find worker for connection\n");
        return;
    }
    
    struct evbuffer* input = bufferevent_get_input(bev);
    
    // 读取一行数据
    size_t n_read;
    char* line = evbuffer_readln(input, &n_read, EVBUFFER_EOL_CRLF_STRICT);
    if (!line) return;
    
    printf("[Worker %d] Received: '%s'\n", worker->thread_idx, line);
    
    // 简化的命令处理
    if (strcmp(line, "quit") == 0) {
        const char* response = "BYE\n";
        bufferevent_write(bev, response, strlen(response));
        bufferevent_free(bev);
        conn->bev = nullptr; // 避免双重释放
    } else if (strncmp(line, "get ", 4) == 0) {
        // GET命令: get key
        char* key = line + 4;
        
        // 去除开头和结尾的空白字符
        while (*key == ' ') key++;
        char* end = key + strlen(key) - 1;
        while (end >= key && (*end == ' ' || *end == '\r' || *end == '\n')) {
            *end = '\0';
            end--;
        }
        
        if (*key == '\0') {
            bufferevent_write(bev, "ERROR: key is empty\r\n", 22);
        } else {
            auto [found, value] = get_kv_item(key);
            if (found) {
                std::string response = "VALUE: " + std::string(key) + " = " + value + "\r\n";
                bufferevent_write(bev, response.c_str(), response.length());
                printf("[Worker %d] GET %s = %s\n", worker->thread_idx, key, value.c_str());
            } else {
                std::string response = "NOT FOUND: key '" + std::string(key) + "' does not exist\r\n";
                bufferevent_write(bev, response.c_str(), response.length());
                printf("[Worker %d] GET %s = NOT FOUND\n", worker->thread_idx, key);
            }
        }
    } else if (strncmp(line, "set ", 4) == 0) {
        // SET命令: set key value
        // 格式: set <key> <value>
        
        char* rest = line + 4;
        while (*rest == ' ') rest++;  // 跳过空格
        
        // 找到key(第一个空格之前)
        char* key_end = strchr(rest, ' ');
        if (!key_end) {
            bufferevent_write(bev, "ERROR: invalid set command, format: set key value\r\n", 53);
            free(line);
            return;
        }
        
        *key_end = '\0';  // 分割key和value
        char* key = rest;
        char* value = key_end + 1;
        
        // 去除value开头的空格
        while (*value == ' ') value++;
        
        // 去除key和value末尾的空白字符
        char* key_trim_end = key + strlen(key) - 1;
        while (key_trim_end >= key && (*key_trim_end == ' ' || *key_trim_end == '\r' || *key_trim_end == '\n')) {
            *key_trim_end = '\0';
            key_trim_end--;
        }
        
        char* value_trim_end = value + strlen(value) - 1;
        while (value_trim_end >= value && (*value_trim_end == ' ' || *value_trim_end == '\r' || *value_trim_end == '\n')) {
            *value_trim_end = '\0';
            value_trim_end--;
        }
        
        // 检查key和value是否为空
        if (*key == '\0' || *value == '\0') {
            bufferevent_write(bev, "ERROR: key and value cannot be empty\r\n", 39);
            free(line);
            return;
        }
        
        // 检查key长度
        if (strlen(key) > MAX_KEY_LEN) {
            bufferevent_write(bev, "ERROR: key too long\r\n", 22);
            free(line);
            return;
        }
        
        // 检查value长度
        if (strlen(value) > MAX_VALUE_LEN) {
            bufferevent_write(bev, "ERROR: value too long\r\n", 24);
            free(line);
            return;
        }
        
        // 存储到全局存储
        if (set_kv_item(key, value)) {
            std::string response = "STORED: key '" + std::string(key) + "' with value '" + std::string(value) + "'\r\n";
            bufferevent_write(bev, response.c_str(), response.length());
            printf("[Worker %d] SET %s = %s\n", worker->thread_idx, key, value);
        } else {
            bufferevent_write(bev, "ERROR: failed to store\r\n", 24);
        }
    } else if (strncmp(line, "delete ", 7) == 0) {
        // DELETE命令: delete key
        char* key = line + 7;
        while (*key == ' ') key++;
        
        char* end = key + strlen(key) - 1;
        while (end >= key && (*end == ' ' || *end == '\r' || *end == '\n')) {
            *end = '\0';
            end--;
        }
        
        if (delete_kv_item(key)) {
            std::string response = "DELETED: key '" + std::string(key) + "'\r\n";
            bufferevent_write(bev, response.c_str(), response.length());
            printf("[Worker %d] DELETE %s\n", worker->thread_idx, key);
        } else {
            std::string response = "NOT FOUND: key '" + std::string(key) + "' does not exist\r\n";
            bufferevent_write(bev, response.c_str(), response.length());
        }
    } else if (strcmp(line, "stats") == 0) {
        // 统计信息
        int count = get_storage_count();
        
        char stats[256];
        int len = snprintf(stats, sizeof(stats), 
                          "STAT items: %d\r\n"
                          "STAT threads: %d\r\n"
                          "END\r\n",
                          count, THREAD_COUNT);
        bufferevent_write(bev, stats, len);
    } else if (strcmp(line, "flush_all") == 0) {
        // 清空所有数据
        flush_all();
        bufferevent_write(bev, "OK: all data flushed\r\n", 23);
        printf("[Worker] All data flushed\n");
    } else if (strcmp(line, "help") == 0) {
        // 帮助信息
        const char* help = 
            "Available commands:\r\n"
            "  set key value      - Set a key-value pair\r\n"
            "  get key            - Get the value of a key\r\n"
            "  delete key         - Delete a key\r\n"
            "  stats              - Show server statistics\r\n"
            "  flush_all          - Clear all data\r\n"
            "  help               - Show this help message\r\n"
            "  quit               - Close connection\r\n";
        bufferevent_write(bev, help, strlen(help));
    } else {
        // 未知命令
        const char* response = "ERROR: unknown command. Type 'help' for available commands\r\n";
        bufferevent_write(bev, response, strlen(response));
    }
    
    free(line);
}

// 工作线程:事件回调(处理错误和关闭)
void worker_event_cb(struct bufferevent* bev, short events, void* arg) {
    Connection* conn = static_cast<Connection*>(arg);
    
    if (events & BEV_EVENT_ERROR) {
        perror("Connection error");
    }
    
    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
        printf("[Worker] Connection closed fd=%d\n", conn->fd);
        bufferevent_free(bev);
        delete conn;
    }
}

// 工作线程:处理新连接通知
void worker_notify_cb(evutil_socket_t fd, short events, void* arg) {
    WorkerThread* worker = static_cast<WorkerThread*>(arg);
    char buf[1024];
    
    // 读取管道中的通知(清空管道)
    while (read(fd, buf, sizeof(buf)) > 0);
    
    // 处理队列中的所有连接
    Connection* conn;
    while ((conn = get_conn_from_worker(*worker)) != nullptr) {
        printf("[Worker %d] Processing new connection fd=%d\n", 
               worker->thread_idx, conn->fd);
        
        // 创建bufferevent处理连接
        conn->bev = bufferevent_socket_new(
            worker->base, conn->fd, BEV_OPT_CLOSE_ON_FREE);
        
        if (!conn->bev) {
            fprintf(stderr, "[Worker %d] Error creating bufferevent\n", 
                   worker->thread_idx);
            close(conn->fd);
            delete conn;
            continue;
        }
        
        // 设置回调
        bufferevent_setcb(conn->bev, 
                         worker_read_cb, 
                         nullptr, 
                         worker_event_cb, 
                         conn);
        
        // 启用读事件
        bufferevent_enable(conn->bev, EV_READ);
        
        // 发送欢迎消息
        const char* welcome = 
            "========================================\n"
            "  Welcome to Simple Key-Value Store!   \n"
            "========================================\n"
            "Type 'help' for available commands.\n\n";
        bufferevent_write(conn->bev, welcome, strlen(welcome));
    }
}

// ==================== 主线程回调函数 ====================

// 主线程:接受新连接
void master_accept_cb(struct evconnlistener* listener,
                     evutil_socket_t fd,
                     struct sockaddr* address,
                     int socklen,
                     void* ctx) {
    // 简单轮询选择工作线程
    static int next_worker = 0;
    int worker_idx = next_worker++ % THREAD_COUNT;
    
    printf("[Master] New connection fd=%d, assigned to worker %d\n", 
           fd, worker_idx);
    
    // 创建连接对象
    Connection* conn = new Connection(fd);
    
    // 添加到工作线程队列
    add_conn_to_worker(*worker_threads[worker_idx], conn);
}

// 主线程:监听器错误回调
void master_error_cb(struct evconnlistener* listener, void* ctx) {
    struct event_base* base = evconnlistener_get_base(listener);
    int err = EVUTIL_SOCKET_ERROR();
    
    fprintf(stderr, "[Master] Got error %d (%s) on listener. "
            "Shutting down.\n", err, evutil_socket_error_to_string(err));
    
    event_base_loopbreak(base);
}

// ==================== 信号处理 ====================

void signal_cb(evutil_socket_t sig, short events, void* arg) {
    printf("\n[Signal] Caught signal %d\n", sig);
    
    if (sig == SIGINT || sig == SIGTERM) {
        shutdown_requested = true;
        printf("[Signal] Shutting down gracefully...\n");
        
        // 停止监听新连接
        evconnlistener_disable(main_listener);
        
        // 退出主线程事件循环
        event_base_loopbreak(main_base);
    }
}

// ==================== 工作线程主函数 ====================

void worker_thread_func(WorkerThread& worker) {
    printf("[Worker %d] Thread started (base=%p)\n", 
           worker.thread_idx, worker.base);
    
    // 启动工作线程的事件循环
    event_base_dispatch(worker.base);
    
    printf("[Worker %d] Thread exiting\n", worker.thread_idx);
}

// ==================== 初始化工作线程 ====================

bool init_worker_threads() {
    worker_threads.reserve(THREAD_COUNT);

    for (int i = 0; i < THREAD_COUNT; i++) {
        worker_threads.emplace_back(std::make_unique<WorkerThread>(i));
        WorkerThread& worker = *worker_threads.back();
        
        // 创建每个线程独立的event_base
        worker.base = event_base_new();
        if (!worker.base) {
            fprintf(stderr, "Failed to create event_base for worker %d\n", i);
            return false;
        }
        
        // 创建管道用于线程间通信
        int fds[2];
        if (pipe(fds) < 0) {
            perror("pipe");
            return false;
        }
        worker.notify_receive_fd = fds[0];
        worker.notify_send_fd = fds[1];
        
        // 设置管道为非阻塞
        evutil_make_socket_nonblocking(worker.notify_receive_fd);
        evutil_make_socket_nonblocking(worker.notify_send_fd);
        
        worker.conn_queue = nullptr;
        worker.conn_queue_tail = nullptr;
        worker.queue_size = 0;
        
        // 创建通知事件(监听管道读端)
        worker.notify_event = event_new(
            worker.base,
            worker.notify_receive_fd, 
            EV_READ | EV_PERSIST,
            worker_notify_cb, 
            &worker);
        
        if (!worker.notify_event) {
            fprintf(stderr, "Failed to create notify event for worker %d\n", i);
            return false;
        }
        
        event_add(worker.notify_event, nullptr);
        
        printf("[Init] Worker %d initialized (pipe=%d)\n", i, fds[0]);
    }
    
    return true;
}

// ==================== 启动工作线程 ====================

bool start_worker_threads() {
    for (auto& worker : worker_threads) {
        worker->thread = std::thread(worker_thread_func, std::ref(*worker));
        printf("[Start] Worker %d thread started\n", worker->thread_idx);
    }
    
    return true;
}

// ==================== 主函数 ====================

int main(int argc, char** argv) {
    printf("========================================\n");
    printf("  Simple Key-Value Store Server        \n");
    printf("========================================\n\n");
    
    // 1. 初始化libevent线程支持
    evthread_use_pthreads();
    
    // 2. 创建主线程的event_base
    main_base = event_base_new();
    if (!main_base) {
        fprintf(stderr, "Failed to create main event_base\n");
        return 1;
    }
    
    // 3. 初始化工作线程
    if (!init_worker_threads()) {
        fprintf(stderr, "Failed to initialize worker threads\n");
        return 1;
    }
    
    // 4. 启动工作线程
    if (!start_worker_threads()) {
        fprintf(stderr, "Failed to start worker threads\n");
        return 1;
    }
    
    // 5. 配置监听地址
    struct sockaddr_in sin;
    memset(&sin, 0, sizeof(sin));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(PORT);
    sin.sin_addr.s_addr = htonl(INADDR_ANY);
    
    // 6. 创建监听器(主线程负责accept)
    main_listener = evconnlistener_new_bind(
        main_base, master_accept_cb, nullptr,
        LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
        -1, (struct sockaddr*)&sin, sizeof(sin));
    
    if (!main_listener) {
        fprintf(stderr, "Failed to create listener\n");
        return 1;
    }
    
    evconnlistener_set_error_cb(main_listener, master_error_cb);
    
    // 7. 注册信号处理
    struct event* sigint_event = evsignal_new(main_base, SIGINT, signal_cb, nullptr);
    struct event* sigterm_event = evsignal_new(main_base, SIGTERM, signal_cb, nullptr);
    event_add(sigint_event, nullptr);
    event_add(sigterm_event, nullptr);
    
    // 8. 输出服务器信息
    printf("[INFO] Server started successfully!\n");
    printf("[INFO] Listening on port %d\n", PORT);
    printf("[INFO] Worker threads: %d\n", THREAD_COUNT);
    printf("[INFO] Max key length: %d characters\n", MAX_KEY_LEN);
    printf("[INFO] Max value length: %d bytes\n", MAX_VALUE_LEN);
    printf("[INFO] Press Ctrl+C to stop.\n\n");
    
    // 9. 启动主线程事件循环
    event_base_dispatch(main_base);
    
    // 10. 清理资源
    printf("\n[INFO] Cleaning up resources...\n");
    
    // 释放监听器
    evconnlistener_free(main_listener);
    
    // 通知所有工作线程退出
    for (auto& worker : worker_threads) {
        // 向工作线程发送退出通知
        event_base_loopbreak(worker->base);
        
        // 等待线程结束
        if (worker->thread.joinable()) {
            worker->thread.join();
        }
        
        printf("[Cleanup] Worker %d cleaned up\n", worker->thread_idx);
    }
    
    // 释放主线程资源
    event_free(sigint_event);
    event_free(sigterm_event);
    event_base_free(main_base);
    
    printf("[INFO] Server stopped. Goodbye!\n");
    
    return 0;
}

测试方法

1
2
3
4
5
6
7
telnet localhost 11211
get key
set key value
get key
delete key
get key
quit