简介
libevent 是一个用C语言编写的、轻量级的开源高性能事件驱动网络库。它为开发高性能、可扩展的网络应用程序提供了跨平台的抽象接口,使开发者能够使用事件回调机制来管理网络连接、定时器和信号等事件。libevent 底层采用 Reactor 模型实现,但是从编程应用视角上看它是异步事件驱动。
安装libevent
下面是一些安装方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Ubuntu/Debian
sudo apt-get install libevent-dev
# CentOS/RHEL
sudo yum install libevent-devel
# macOS (使用Homebrew)
brew install libevent
# 源码安装
wget https://github.com/libevent/libevent/releases/download/release-2.1.12-stable/libevent-2.1.12-stable.tar.gz
tar -xzf libevent-2.1.12-stable.tar.gz
cd libevent-2.1.12-stable
./configure
make && sudo make install
使用libevent编译项目的时候需要链接这个库,一个简单的命令如
gcc -o example example.c -levent。对于一些中大型项目一般都会使用 make 或者 cmake 编译项目,并在其中链接需要的库。
快速入门
服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define PORT 8888
// 读取回调:客户端发送数据
void read_callback(struct bufferevent *bev, void *ctx) {
struct evbuffer *input = bufferevent_get_input(bev);
struct evbuffer *output = bufferevent_get_output(bev);
char *data = evbuffer_readln(input, NULL, EVBUFFER_EOL_LF);
if (data) {
printf("Received: %s\n", data);
// 回复客户端
char reply[256];
snprintf(reply, sizeof(reply), "Server echo: %s", data);
bufferevent_write(bev, reply, strlen(reply));
free(data);
}
}
// 事件回调:处理错误和连接关闭
void event_callback(struct bufferevent *bev, short events, void *ctx) {
if (events & BEV_EVENT_ERROR) {
perror("Error from bufferevent");
}
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
printf("Connection closed\n");
bufferevent_free(bev);
}
}
// 连接回调:新客户端连接
void accept_conn_callback(struct evconnlistener *listener,
evutil_socket_t fd,
struct sockaddr *address,
int socklen,
void *ctx) {
struct event_base *base = evconnlistener_get_base(listener);
struct bufferevent *bev;
// 创建带缓冲区的事件
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
// 设置回调函数
bufferevent_setcb(bev, read_callback, NULL, event_callback, NULL);
// 启用读写事件
bufferevent_enable(bev, EV_READ | EV_WRITE);
printf("New connection accepted\n");
}
// 错误回调:监听器错误
void accept_error_callback(struct evconnlistener *listener, void *ctx) {
struct event_base *base = evconnlistener_get_base(listener);
int err = EVUTIL_SOCKET_ERROR();
fprintf(stderr, "Got an error %d (%s) on the listener. "
"Shutting down.\n", err, evutil_socket_error_to_string(err));
event_base_loopexit(base, NULL);
}
int main() {
struct event_base *base;
struct evconnlistener *listener;
struct sockaddr_in sin;
// 1. 创建事件循环
base = event_base_new();
if (!base) {
fprintf(stderr, "Could not initialize libevent!\n");
return 1;
}
// 2. 配置监听地址
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);
sin.sin_addr.s_addr = htonl(INADDR_ANY);
// 3. 创建监听器
listener = evconnlistener_new_bind(base, accept_conn_callback, NULL,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
-1, (struct sockaddr*)&sin, sizeof(sin));
if (!listener) {
fprintf(stderr, "Could not create listener!\n");
return 1;
}
evconnlistener_set_error_cb(listener, accept_error_callback);
printf("Server listening on port %d\n", PORT);
// 4. 启动事件循环
event_base_dispatch(base);
// 5. 清理资源
evconnlistener_free(listener);
event_base_free(base);
return 0;
}
客户端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#define PORT 8888
#define SERVER_IP "127.0.0.1"
int main() {
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
perror("Socket creation failed");
return 1;
}
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(PORT);
inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr);
if (connect(sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("Connection failed");
return 1;
}
printf("Connected to server. Enter messages (type 'quit' to exit):\n");
char buffer[1024];
while (1) {
printf("You: ");
fgets(buffer, sizeof(buffer), stdin);
buffer[strcspn(buffer, "\n")] = 0; // 去除换行符
if (strcmp(buffer, "quit") == 0) {
break;
}
// 发送消息
send(sock, buffer, strlen(buffer), 0);
send(sock, "\n", 1, 0); // 添加换行符
// 接收回复
memset(buffer, 0, sizeof(buffer));
int n = recv(sock, buffer, sizeof(buffer), 0);
if (n > 0) {
printf("Server: %s\n", buffer);
}
}
close(sock);
return 0;
}
高级用法
下面给出一个 Memcached 项目中使用 libevent 的用法。Memcached 是一个自由开源、高性能、分布式内存对象缓存系统。
/**
* Memcached架构精简版 - 简化命令版本
* SET命令: set key value
* GET命令: get key
* DELETE命令: delete key
* STATS命令: stats(显示统计信息)
* QUIT命令: quit
* FLUSH_ALL命令: flush_all(清空所有数据)
* HELP命令: help(显示帮助信息)
*/
#include <event2/event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <event2/thread.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <string>
#include <memory>
#include <vector>
#include <atomic>
#include <thread>
#define THREAD_COUNT 4 // 工作线程数量
#define PORT 11211 // Memcached默认端口
#define MAX_CONN_QUEUE 10000 // 最大连接队列
#define MAX_KEY_LEN 250 // 最大key长度
#define MAX_VALUE_LEN 1024*1024 // 最大value长度(1MB)
// ==================== 数据结构定义 ====================
// 键值对存储(使用C++ STL容器)
struct KVItem {
std::string value;
KVItem() : value("") {}
KVItem(const std::string& v) : value(v) {}
};
// 全局共享存储(所有线程共享)
static std::unordered_map<std::string, KVItem> global_storage;
static std::mutex global_storage_mutex;
// 连接对象(使用C++类)
class Connection {
public:
int fd; // 客户端文件描述符
struct bufferevent* bev; // bufferevent
Connection* next; // 链表指针
Connection(int fd_) : fd(fd_), bev(nullptr), next(nullptr) {}
~Connection() {
if (bev) {
bufferevent_free(bev);
}
}
};
// 工作线程类
class WorkerThread {
public:
std::thread thread; // C++线程
struct event_base* base; // 每个线程独立的event_base
struct event* notify_event; // 通知事件(监听管道)
int notify_receive_fd; // 接收通知的管道读端
int notify_send_fd; // 发送通知的管道写端(主线程使用)
// 连接队列
Connection* conn_queue; // 队列头指针
Connection* conn_queue_tail; // 队列尾指针
std::mutex queue_mutex; // 队列锁
int queue_size; // 队列大小
int thread_idx; // 线程索引
WorkerThread() : thread(), thread_idx(-1), base(nullptr), notify_event(nullptr),
notify_receive_fd(-1), notify_send_fd(-1),
conn_queue(nullptr), conn_queue_tail(nullptr), queue_size(0) {}
WorkerThread(int idx) : WorkerThread() {
thread_idx = idx;
}
~WorkerThread() {
cleanup();
}
void cleanup() {
if (notify_event) {
event_free(notify_event);
notify_event = nullptr;
}
if (base) {
event_base_free(base);
base = nullptr;
}
if (notify_receive_fd >= 0) {
close(notify_receive_fd);
notify_receive_fd = -1;
}
if (notify_send_fd >= 0) {
close(notify_send_fd);
notify_send_fd = -1;
}
}
};
// 全局变量
static std::vector<std::unique_ptr<WorkerThread>> worker_threads;
static struct event_base* main_base = nullptr;
static struct evconnlistener* main_listener = nullptr;
static std::atomic<bool> shutdown_requested{false};
// ==================== 存储操作函数(线程安全) ====================
// 设置键值对
bool set_kv_item(const std::string& key, const std::string& value) {
std::lock_guard<std::mutex> lock(global_storage_mutex);
try {
global_storage[key] = KVItem(value);
return true;
} catch (const std::exception& e) {
fprintf(stderr, "Error setting key '%s': %s\n", key.c_str(), e.what());
return false;
}
}
// 查找键值对
std::pair<bool, std::string> get_kv_item(const std::string& key) {
std::lock_guard<std::mutex> lock(global_storage_mutex);
auto it = global_storage.find(key);
if (it != global_storage.end()) {
return {true, it->second.value};
}
return {false, ""};
}
// 删除键值对
bool delete_kv_item(const std::string& key) {
std::lock_guard<std::mutex> lock(global_storage_mutex);
auto it = global_storage.find(key);
if (it != global_storage.end()) {
global_storage.erase(it);
return true;
}
return false;
}
// 获取存储统计信息
int get_storage_count() {
std::lock_guard<std::mutex> lock(global_storage_mutex);
return global_storage.size();
}
// 清空所有数据
void flush_all() {
std::lock_guard<std::mutex> lock(global_storage_mutex);
global_storage.clear();
}
// ==================== 工具函数 ====================
// 向工作线程添加连接
void add_conn_to_worker(WorkerThread& worker, Connection* conn) {
std::lock_guard<std::mutex> lock(worker.queue_mutex);
// 添加到队列尾部
if (worker.conn_queue_tail) {
worker.conn_queue_tail->next = conn;
} else {
worker.conn_queue = conn;
}
worker.conn_queue_tail = conn;
worker.queue_size++;
// 通知工作线程有新连接
char buf = 1;
if (write(worker.notify_send_fd, &buf, 1) != 1) {
perror("write to notify pipe");
}
}
// 从工作线程获取连接
Connection* get_conn_from_worker(WorkerThread& worker) {
std::lock_guard<std::mutex> lock(worker.queue_mutex);
if (!worker.conn_queue) {
return nullptr;
}
// 从队列头部取出
Connection* conn = worker.conn_queue;
worker.conn_queue = conn->next;
if (!worker.conn_queue) {
worker.conn_queue_tail = nullptr;
}
worker.queue_size--;
return conn;
}
// ==================== 工作线程回调函数 ====================
// 工作线程:读取回调(处理客户端请求)
void worker_read_cb(struct bufferevent* bev, void* arg) {
Connection* conn = static_cast<Connection*>(arg);
WorkerThread* worker = nullptr;
// 找到对应的worker(通过遍历)
for (size_t i = 0; i < worker_threads.size(); i++) {
if (worker_threads[i]->base == bufferevent_get_base(bev)) {
worker = worker_threads[i].get();
break;
}
}
if (!worker) {
fprintf(stderr, "[Worker] Cannot find worker for connection\n");
return;
}
struct evbuffer* input = bufferevent_get_input(bev);
// 读取一行数据
size_t n_read;
char* line = evbuffer_readln(input, &n_read, EVBUFFER_EOL_CRLF_STRICT);
if (!line) return;
printf("[Worker %d] Received: '%s'\n", worker->thread_idx, line);
// 简化的命令处理
if (strcmp(line, "quit") == 0) {
const char* response = "BYE\n";
bufferevent_write(bev, response, strlen(response));
bufferevent_free(bev);
conn->bev = nullptr; // 避免双重释放
} else if (strncmp(line, "get ", 4) == 0) {
// GET命令: get key
char* key = line + 4;
// 去除开头和结尾的空白字符
while (*key == ' ') key++;
char* end = key + strlen(key) - 1;
while (end >= key && (*end == ' ' || *end == '\r' || *end == '\n')) {
*end = '\0';
end--;
}
if (*key == '\0') {
bufferevent_write(bev, "ERROR: key is empty\r\n", 22);
} else {
auto [found, value] = get_kv_item(key);
if (found) {
std::string response = "VALUE: " + std::string(key) + " = " + value + "\r\n";
bufferevent_write(bev, response.c_str(), response.length());
printf("[Worker %d] GET %s = %s\n", worker->thread_idx, key, value.c_str());
} else {
std::string response = "NOT FOUND: key '" + std::string(key) + "' does not exist\r\n";
bufferevent_write(bev, response.c_str(), response.length());
printf("[Worker %d] GET %s = NOT FOUND\n", worker->thread_idx, key);
}
}
} else if (strncmp(line, "set ", 4) == 0) {
// SET命令: set key value
// 格式: set <key> <value>
char* rest = line + 4;
while (*rest == ' ') rest++; // 跳过空格
// 找到key(第一个空格之前)
char* key_end = strchr(rest, ' ');
if (!key_end) {
bufferevent_write(bev, "ERROR: invalid set command, format: set key value\r\n", 53);
free(line);
return;
}
*key_end = '\0'; // 分割key和value
char* key = rest;
char* value = key_end + 1;
// 去除value开头的空格
while (*value == ' ') value++;
// 去除key和value末尾的空白字符
char* key_trim_end = key + strlen(key) - 1;
while (key_trim_end >= key && (*key_trim_end == ' ' || *key_trim_end == '\r' || *key_trim_end == '\n')) {
*key_trim_end = '\0';
key_trim_end--;
}
char* value_trim_end = value + strlen(value) - 1;
while (value_trim_end >= value && (*value_trim_end == ' ' || *value_trim_end == '\r' || *value_trim_end == '\n')) {
*value_trim_end = '\0';
value_trim_end--;
}
// 检查key和value是否为空
if (*key == '\0' || *value == '\0') {
bufferevent_write(bev, "ERROR: key and value cannot be empty\r\n", 39);
free(line);
return;
}
// 检查key长度
if (strlen(key) > MAX_KEY_LEN) {
bufferevent_write(bev, "ERROR: key too long\r\n", 22);
free(line);
return;
}
// 检查value长度
if (strlen(value) > MAX_VALUE_LEN) {
bufferevent_write(bev, "ERROR: value too long\r\n", 24);
free(line);
return;
}
// 存储到全局存储
if (set_kv_item(key, value)) {
std::string response = "STORED: key '" + std::string(key) + "' with value '" + std::string(value) + "'\r\n";
bufferevent_write(bev, response.c_str(), response.length());
printf("[Worker %d] SET %s = %s\n", worker->thread_idx, key, value);
} else {
bufferevent_write(bev, "ERROR: failed to store\r\n", 24);
}
} else if (strncmp(line, "delete ", 7) == 0) {
// DELETE命令: delete key
char* key = line + 7;
while (*key == ' ') key++;
char* end = key + strlen(key) - 1;
while (end >= key && (*end == ' ' || *end == '\r' || *end == '\n')) {
*end = '\0';
end--;
}
if (delete_kv_item(key)) {
std::string response = "DELETED: key '" + std::string(key) + "'\r\n";
bufferevent_write(bev, response.c_str(), response.length());
printf("[Worker %d] DELETE %s\n", worker->thread_idx, key);
} else {
std::string response = "NOT FOUND: key '" + std::string(key) + "' does not exist\r\n";
bufferevent_write(bev, response.c_str(), response.length());
}
} else if (strcmp(line, "stats") == 0) {
// 统计信息
int count = get_storage_count();
char stats[256];
int len = snprintf(stats, sizeof(stats),
"STAT items: %d\r\n"
"STAT threads: %d\r\n"
"END\r\n",
count, THREAD_COUNT);
bufferevent_write(bev, stats, len);
} else if (strcmp(line, "flush_all") == 0) {
// 清空所有数据
flush_all();
bufferevent_write(bev, "OK: all data flushed\r\n", 23);
printf("[Worker] All data flushed\n");
} else if (strcmp(line, "help") == 0) {
// 帮助信息
const char* help =
"Available commands:\r\n"
" set key value - Set a key-value pair\r\n"
" get key - Get the value of a key\r\n"
" delete key - Delete a key\r\n"
" stats - Show server statistics\r\n"
" flush_all - Clear all data\r\n"
" help - Show this help message\r\n"
" quit - Close connection\r\n";
bufferevent_write(bev, help, strlen(help));
} else {
// 未知命令
const char* response = "ERROR: unknown command. Type 'help' for available commands\r\n";
bufferevent_write(bev, response, strlen(response));
}
free(line);
}
// 工作线程:事件回调(处理错误和关闭)
void worker_event_cb(struct bufferevent* bev, short events, void* arg) {
Connection* conn = static_cast<Connection*>(arg);
if (events & BEV_EVENT_ERROR) {
perror("Connection error");
}
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
printf("[Worker] Connection closed fd=%d\n", conn->fd);
bufferevent_free(bev);
delete conn;
}
}
// 工作线程:处理新连接通知
void worker_notify_cb(evutil_socket_t fd, short events, void* arg) {
WorkerThread* worker = static_cast<WorkerThread*>(arg);
char buf[1024];
// 读取管道中的通知(清空管道)
while (read(fd, buf, sizeof(buf)) > 0);
// 处理队列中的所有连接
Connection* conn;
while ((conn = get_conn_from_worker(*worker)) != nullptr) {
printf("[Worker %d] Processing new connection fd=%d\n",
worker->thread_idx, conn->fd);
// 创建bufferevent处理连接
conn->bev = bufferevent_socket_new(
worker->base, conn->fd, BEV_OPT_CLOSE_ON_FREE);
if (!conn->bev) {
fprintf(stderr, "[Worker %d] Error creating bufferevent\n",
worker->thread_idx);
close(conn->fd);
delete conn;
continue;
}
// 设置回调
bufferevent_setcb(conn->bev,
worker_read_cb,
nullptr,
worker_event_cb,
conn);
// 启用读事件
bufferevent_enable(conn->bev, EV_READ);
// 发送欢迎消息
const char* welcome =
"========================================\n"
" Welcome to Simple Key-Value Store! \n"
"========================================\n"
"Type 'help' for available commands.\n\n";
bufferevent_write(conn->bev, welcome, strlen(welcome));
}
}
// ==================== 主线程回调函数 ====================
// 主线程:接受新连接
void master_accept_cb(struct evconnlistener* listener,
evutil_socket_t fd,
struct sockaddr* address,
int socklen,
void* ctx) {
// 简单轮询选择工作线程
static int next_worker = 0;
int worker_idx = next_worker++ % THREAD_COUNT;
printf("[Master] New connection fd=%d, assigned to worker %d\n",
fd, worker_idx);
// 创建连接对象
Connection* conn = new Connection(fd);
// 添加到工作线程队列
add_conn_to_worker(*worker_threads[worker_idx], conn);
}
// 主线程:监听器错误回调
void master_error_cb(struct evconnlistener* listener, void* ctx) {
struct event_base* base = evconnlistener_get_base(listener);
int err = EVUTIL_SOCKET_ERROR();
fprintf(stderr, "[Master] Got error %d (%s) on listener. "
"Shutting down.\n", err, evutil_socket_error_to_string(err));
event_base_loopbreak(base);
}
// ==================== 信号处理 ====================
void signal_cb(evutil_socket_t sig, short events, void* arg) {
printf("\n[Signal] Caught signal %d\n", sig);
if (sig == SIGINT || sig == SIGTERM) {
shutdown_requested = true;
printf("[Signal] Shutting down gracefully...\n");
// 停止监听新连接
evconnlistener_disable(main_listener);
// 退出主线程事件循环
event_base_loopbreak(main_base);
}
}
// ==================== 工作线程主函数 ====================
void worker_thread_func(WorkerThread& worker) {
printf("[Worker %d] Thread started (base=%p)\n",
worker.thread_idx, worker.base);
// 启动工作线程的事件循环
event_base_dispatch(worker.base);
printf("[Worker %d] Thread exiting\n", worker.thread_idx);
}
// ==================== 初始化工作线程 ====================
bool init_worker_threads() {
worker_threads.reserve(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
worker_threads.emplace_back(std::make_unique<WorkerThread>(i));
WorkerThread& worker = *worker_threads.back();
// 创建每个线程独立的event_base
worker.base = event_base_new();
if (!worker.base) {
fprintf(stderr, "Failed to create event_base for worker %d\n", i);
return false;
}
// 创建管道用于线程间通信
int fds[2];
if (pipe(fds) < 0) {
perror("pipe");
return false;
}
worker.notify_receive_fd = fds[0];
worker.notify_send_fd = fds[1];
// 设置管道为非阻塞
evutil_make_socket_nonblocking(worker.notify_receive_fd);
evutil_make_socket_nonblocking(worker.notify_send_fd);
worker.conn_queue = nullptr;
worker.conn_queue_tail = nullptr;
worker.queue_size = 0;
// 创建通知事件(监听管道读端)
worker.notify_event = event_new(
worker.base,
worker.notify_receive_fd,
EV_READ | EV_PERSIST,
worker_notify_cb,
&worker);
if (!worker.notify_event) {
fprintf(stderr, "Failed to create notify event for worker %d\n", i);
return false;
}
event_add(worker.notify_event, nullptr);
printf("[Init] Worker %d initialized (pipe=%d)\n", i, fds[0]);
}
return true;
}
// ==================== 启动工作线程 ====================
bool start_worker_threads() {
for (auto& worker : worker_threads) {
worker->thread = std::thread(worker_thread_func, std::ref(*worker));
printf("[Start] Worker %d thread started\n", worker->thread_idx);
}
return true;
}
// ==================== 主函数 ====================
int main(int argc, char** argv) {
printf("========================================\n");
printf(" Simple Key-Value Store Server \n");
printf("========================================\n\n");
// 1. 初始化libevent线程支持
evthread_use_pthreads();
// 2. 创建主线程的event_base
main_base = event_base_new();
if (!main_base) {
fprintf(stderr, "Failed to create main event_base\n");
return 1;
}
// 3. 初始化工作线程
if (!init_worker_threads()) {
fprintf(stderr, "Failed to initialize worker threads\n");
return 1;
}
// 4. 启动工作线程
if (!start_worker_threads()) {
fprintf(stderr, "Failed to start worker threads\n");
return 1;
}
// 5. 配置监听地址
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(PORT);
sin.sin_addr.s_addr = htonl(INADDR_ANY);
// 6. 创建监听器(主线程负责accept)
main_listener = evconnlistener_new_bind(
main_base, master_accept_cb, nullptr,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
-1, (struct sockaddr*)&sin, sizeof(sin));
if (!main_listener) {
fprintf(stderr, "Failed to create listener\n");
return 1;
}
evconnlistener_set_error_cb(main_listener, master_error_cb);
// 7. 注册信号处理
struct event* sigint_event = evsignal_new(main_base, SIGINT, signal_cb, nullptr);
struct event* sigterm_event = evsignal_new(main_base, SIGTERM, signal_cb, nullptr);
event_add(sigint_event, nullptr);
event_add(sigterm_event, nullptr);
// 8. 输出服务器信息
printf("[INFO] Server started successfully!\n");
printf("[INFO] Listening on port %d\n", PORT);
printf("[INFO] Worker threads: %d\n", THREAD_COUNT);
printf("[INFO] Max key length: %d characters\n", MAX_KEY_LEN);
printf("[INFO] Max value length: %d bytes\n", MAX_VALUE_LEN);
printf("[INFO] Press Ctrl+C to stop.\n\n");
// 9. 启动主线程事件循环
event_base_dispatch(main_base);
// 10. 清理资源
printf("\n[INFO] Cleaning up resources...\n");
// 释放监听器
evconnlistener_free(main_listener);
// 通知所有工作线程退出
for (auto& worker : worker_threads) {
// 向工作线程发送退出通知
event_base_loopbreak(worker->base);
// 等待线程结束
if (worker->thread.joinable()) {
worker->thread.join();
}
printf("[Cleanup] Worker %d cleaned up\n", worker->thread_idx);
}
// 释放主线程资源
event_free(sigint_event);
event_free(sigterm_event);
event_base_free(main_base);
printf("[INFO] Server stopped. Goodbye!\n");
return 0;
}