Kafka 是什么?

Posted by kalos Aner on November 19, 2024

Kafka 是什么?

一、为什么要用 Kafka?

为什么要使用Kafka,Kafka到底解决了什么问题?

Apache Kafka 是一个分布式系统,旨在解决多个微服务的协调问题。

举个例子:

公司刚刚启动一个新项目,业务很简单,你三五下搞定。直接部署一个web服务和数据库,架构简单清晰,服务很快就上线了,老板很满意,你也很开心。架构如下图:

Snipaste_2024-11-19_09-23-51

此时项目只有一个服务,该架构足以胜任。但是随着项目的增大,服务需求增多,如:为了能对数据进行快速搜索,需要增加个搜索服务;为了能实时查看业务状态,需要把数据汇总到数仓以提供报表;为了数据安全审查,需要增加审核服务。每个子服务间可能又存在数据的交集。那么我们将看到下面的架构图:

Snipaste_2024-11-19_09-27-50

这样造成了服务间的强依赖,高耦合,每次增加新服务都需要对架构进行修改,对未来服务架构的扩展和维护将是极大的挑战。

这时可以引入 Kafka,Kafka提供的消息系统,让生产者将数据直接添加到队列中,而消费者从队列中依次读取然后处理。架构如下图:

Snipaste_2024-11-19_09-34-54

从上面的小例子,我们应该可以感受到Kafka的一些优点,接下来总结一下: 1.系统解耦。生产端的服务和消息端的服务在遵守同样的接口约束条件下,可以独立扩展和修改,而互不影响。 2.流量削峰。面对突发大流量,也即生产端生产速度比消费端的消费速度快的时候,消费端服务不会因为超负荷的请求而完全崩溃。 3.可扩展性。因为生产者与消费者已经隔离解耦,所以一旦想增加生产端或消费端的处理逻辑,或者服务实例等都变得十分容易。 4.高吞吐量。以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。 5.数据冗余存储。Kafka支持多副本冗余存储机制,保障不正常宕机之后数据不丢失。 6.消息顺序性。Kafka分布式的单位是partition,Kafka保证同一个partition中的消息的有序性。一个topic多个partition时,则不能保证Topic级别的消息有序性。 7.回溯消费。指的是Kafka重新设置消息位移offset。kafka支持两种方式回溯。一种是基于消息偏移量回溯,一种是基于时间点的消息回溯。

二、Kafka 是什么?

1 定义

  1. Kafka 是一个分布式流媒体平台,或者说分布式消息队列,是互联网实时数据流的实际标准。Kafka 对消息保存时根据 Topic 进行归类,发送消息 者称为 Producer,消息接受者称为 Consumer,此外 kafka 集群有多个 kafka 实例组成,每个 实例(server)称为 代理(broker)。

  2. Apache Kafka 是一个开源 消息 系统,由 Scala 写成。

  3. 无论是 kafka 集群,还是 consumer 都依赖于 Zookeeper 集群保存一些 meta 信息, 来保证系统可用性。

2 优势

kafka 之所以受到越来越多的青睐,与它所扮演的三大角色是分不开的的:

  • 消息系统:kafka 与传统的消息中间件都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯性消费的功能。
  • 存储系统:kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效的降低了消息丢失的风险。这得益于其消息持久化和多副本机制。也可以将 kafka 作为长期的存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题日志压缩功能。
  • 流式处理平台:kafka 为流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理框架,比如窗口、连接、变换和聚合等各类操作。

3 概念

一个典型的 Kafka 包含若干 Producer、若干 Broker、若干 Consumer 以及一个 Zookeeper 集群。Zookeeper 是 Kafka 用来负责集群元数据管理、控制器选举等操作的。Producer 是负责将消息发送到 Broker 的,Broker 负责将消息持久化到磁盘,而 Consumer 是负责从 Broker 订阅并消费消息。Kafka 体系结构如下所示:

595a2a9e9ec5361c6167e76ab77fe7d6

概念一:生产者(Producer)与消费者(Consumer)

45efd58f8838145920ba825dd22063eb

对于 Kafka 来说客户端有两种基本类型:生产者(Producer)和 消费者(Consumer)。除此之外,还有用来做数据集成的 Kafka Connect API 和流式处理的 Kafka Streams 等高阶客户端,但这些高阶客户端底层仍然是生产者和消费者 API,只不过是在上层做了封装。

  • Producer :消息生产者,就是向 Kafka broker 发消息的客户端;
  • Consumer :消息消费者,向 Kafka broker 取消息的客户端;
概念二:Broker 和集群(Cluster)

一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并存入磁盘;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。

若干个 Broker 组成一个 集群Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader;当然一个分区可以被复制到多个 Broker 上来实现冗余,这样当存在 Broker 故障时可以将其分区重新分配到其他 Broker 来负责。下图是一个样例:

05a4e26d510e07d5b00ec9feaf35d755

概念三:主题(Topic)与分区(Partition)

0a5755ecfc6b2b79f6e58aa152fc245e

在 Kafka 中,消息以 主题Topic)来分类,每一个主题都对应一个「消息队列」,这有点儿类似于数据库中的表。但是如果我们把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储。

我们使用一个生活中的例子来说明:现在 A 城市生产的某商品需要运输到 B 城市,走的是公路,那么单通道的高速公路不论是在「A 城市商品增多」还是「现在 C 城市也要往 B 城市运输东西」这样的情况下都会出现「吞吐量不足」的问题。所以我们现在引入 分区Partition)的概念,类似“允许多修几条道”的方式对我们的主题完成了水平扩展。

三、Kafka 工作流程

bc4253e612f382855a5c6eee6f09e784

1 生产流程

写入方式

producer 采用推(push)模式将消息发布到 broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)

分区(Partition)

消息发送时都被发送到一个 topic,其本质就是一个目录,而 topic 是由一些 Partition Logs(分区日志)组成,其组织结构如下图所示:

bdab635370bee00e39225fb70f8f8511

我们可以看到,每个 Partition 中的消息都是 有序 的,生产的消息被不断追加到 Partition log 上,其中的每一个消息都被赋予了一个唯一的 offset 值。

1)分区的原因

  1. 方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
  2. 可以提高并发,因为可以以 Partition 为单位读写了。

2)分区的原则

  1. 指定了 patition,则直接使用;
  2. 未指定 patition 但指定 key,通过对 key 的 value 进行 hash 出一个 patition;
  3. patition 和 key 都未指定,使用轮询选出一个 patition。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
DefaultPartitioner  
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 
  List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 
  int numPartitions = partitions.size(); 
  if (keyBytes == null) {
    int nextValue = nextValue(topic); 
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) { 
    int part = Utils.toPositive(nextValue) % availablePartitions.size(); 
    return availablePartitions.get(part).partition();
     } else { 
     // no partitions are available, give a non-available partition 
     return Utils.toPositive(nextValue) % numPartitions; 
     } 
    } else { 
    // hash the keyBytes to choose a partition 
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; 
    }
 }

副本(Replication)

同 一 个 partition 可 能 会 有 多 个 replication ( 对 应 server.properties 配 置 中 的 default.replication.factor=N)。没有 replication 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入 replication 之后,同一个 partition 可能会有多个 replication,而这时需要在这些 replication 之间选出一 个 leader,producer 和 consumer 只与这个 leader 交互,其它 replication 作为 follower 从 leader 中复制数据。

写入流程

producer 写入消息流程如下:

6d03a91b6298e975e70b0f3e7844b37e

1)producer 先从 zookeeper 的 “/brokers/…/state”节点找到该 partition 的 leader ;2)producer 将消息发送给该 leader ;3)leader 将消息写入本地 log ;4)followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK ;5)leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 发送 ACK ;

2 Broker 保存消息

存储方式

物理上把 topic 分成一个或多个 patition(对应 server.properties 中的 num.partitions=3 配 置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文 件),如下:

1
2
3
4
5
6
7
8
9
10
11
[root@hadoop102 logs]$ ll 
drwxrwxr-x. 2 demo demo 4096 8 月 6 14:37 first-0 
drwxrwxr-x. 2 demo demo 4096 8 月 6 14:35 first-1 
drwxrwxr-x. 2 demo demo 4096 8 月 6 14:37 first-2 

[root@hadoop102 logs]$ cd first-0 
[root@hadoop102 first-0]$ ll 
-rw-rw-r--. 1 demo demo 10485760 8 月 6 14:33 00000000000000000000.index 
-rw-rw-r--. 1 demo demo 219 8 月 6 15:07 00000000000000000000.log 
-rw-rw-r--. 1 demo demo 10485756 8 月 6 14:33 00000000000000000000.timeindex 
-rw-rw-r--. 1 demo demo 8 8 月 6 14:37 leader-epoch-checkpoint
存储策略

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:

  • 基于时间:log.retention.hours=168
  • 基于大小:log.retention.bytes=1073741824

需要注意的是,因为 Kafka 读取特定消息的时间复杂度为 O(1),即与文件大小无关, 所以这里删除过期文件与提高 Kafka 性能无关。

Zookeeper 存储结构

a8de8937ab9006fb9617345313f24838

注意:producer 不在 zk 中注册,消费者在 zk 中注册。

3 Kafka 消费过程分析

kafka 提供了两套 consumer API:高级 Consumer API 和低级 Consumer API。

高级 API

1)高级 API 优点

  • 高级 API 写起来简单
  • 不需要自行去管理 offset,系统通过 zookeeper 自行管理。
  • 不需要管理分区,副本等情况,系统自动管理。
  • 消费者断线会自动根据上一次记录在 zookeeper 中的 offset 去接着获取数据(默认设置 1 分钟更新一下 zookeeper 中存的 offset)
  • 可以使用 group 来区分对同一个 topic 的不同程序访问分离开来(不同的 group 记录不同的 offset,这样不同程序读取同一个 topic 才不会因为 offset 互相影响)

2)高级 API 缺点

  • 不能自行控制 offset(对于某些特殊需求来说)
  • 不能细化控制如分区、副本、zk 等
低级 API

1)低级 API 优点

  • 能够让开发者自己控制 offset,想从哪里读取就从哪里读取。
  • 自行控制连接分区,对分区自定义进行负载均衡
  • 对 zookeeper 的依赖性降低(如:offset 不一定非要靠 zk 存储,自行存储 offset 即可, 比如存在文件或者内存中)

2)低级 API 缺点

  • 太过复杂,需要自行控制 offset,连接哪个分区,找到分区 leader 等。
消费者组

7b866bf0a1deef9a394a3a7ad340ad14

消费者是以 consumer group 消费者组的方式工作,由一个或者多个消费者组成一个组, 共同消费一个 topic。每个分区在同一时间只能由 group 中的一个消费者读取,但是多个 group 可以同时消费这个 partition。在图中,有一个由三个消费者组成的 group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。

在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的 group 成员会自动负载均衡读取之前失败的消费者读取的分区。

消费方式

consumer 采用 pull(拉)模式从 broker 中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。 它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费 消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直等待数据 到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达 的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)。

四、补充

Kafka 中的数据以*主题形式*存储。主题的基本基础是日志——一种按顺序存储记录的简单有序数据结构。

日志是不可变的,并且具有 O(1) 写入和读取(只要它们来自尾部或头部)。因此,访问其数据的速度不会随着日志变大而降低,并且由于其不可变性,它对于并发读取非常有效。日志的关键优势以及它被选为 Kafka 的主要原因是它针对 HDD 进行了优化。

Kafka 实际上将其所有记录存储到磁盘上,并且不会在内存中明确保存任何内容。消费者一次性获取大量线性块,磁盘上的线性读取/写入速度可能很快。人们通常认为 HDD 速度很慢,因为当你进行多次磁盘寻道时,速度会很慢,因为驱动器磁头在移动到新位置时会受到物理移动的限制。使用线性读取/写入时,这不是问题,因为你会随着磁头的移动连续读取/写入数据。

更进一步说,所述线性操作已被操作系统高度优化。

预读优化会在请求之前预取大块的倍数并将其存储在内存中,从而导致下一次读取不会触及磁盘。

后写优化将小的逻辑写入分组为大的物理写入 - Kafka 不使用 fsync,它的写入以异步方式写入磁盘。

Kafka 支持 零拷贝优化,但是零拷贝不太可能在优化 Kafka 中发挥重要作用,因为1.CPU 很少是优化良好的 Kafka 部署中的瓶颈;2.加密和 SSL/TLS(所有生产部署都必须具备)已经禁止 Kafka 使用零拷贝,因为会在整个路径中修改消息。尽管如此,Kafka 仍然表现良好。

参考:

https://xie.infoq.cn/article/0d832da5558aff98529af397e

https://juejin.cn/post/6974913928161656863

https://highscalability.com/untitled-2/