访问量
访客数

Kafka详解

2021.10.19 阅读量

JMS消息模型

JMS即Java Message Service是Java平台的消息传递标准,用于实现消息中间件的通信。JMS提供了两种消息模型:点对点模型和发布订阅模型。

  • 点对点模型:消息发送者发送消息,消息代理将其放入消息队列中,消息接受者从队列中获取消息,消息读取后被移除消息队列。 每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。

    MQ详解-001

    虽然可能有多个客户端在队列中侦听消息,但只有一个可以读取到消息,之后消息将不存在,其他消费者将无法读取。 也就是说消息队列只有唯一一个发送者和接受者,但是并不能说只有一个接收者。

    它的特点是每个消息只有一个消费者,即消息一旦被消费,消息就不在消息队列中; 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列; 接收者在成功接收消息之后需向队列应答成功;

  • 发布订阅模型:发布者将消息发送到主题Topic中,多个订阅者订阅这个主题,订阅者不断的去轮询监听消息队列中的消息,那么就会在消息到达的同时接收消息。

    MQ详解-002

    它的特点是每个消息可以有多个消费者,消费完消息之后消息不会清除; 发布者和订阅者之间有时间上的依赖性:针对某个主题的订阅者,它必须创建一个订阅之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。 当然了缓和这种严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样即使订阅者没有运行,它也能接收到发布者的消息;

MQ概览

JMS定义了消息传递的标准接口。它并不是一个消息中间件,而是一个规范,用于与各种实现消息中间件,如ActiveMQRabbitMQRocketMQKafka进行交互。

MQ即messagequeue消息队列,是分布式系统的重要组件,主要解决异步消息,应用解耦,流量消峰等问题。从而实现高可用,高性能,可伸缩和最终一致性的架构。 使用较多的MQ有,activeMQrabbitMQKafka

  • 异步消息处理:可以将一些非核心流程,如日志,短信,邮件等,通过MQ的方式异步去处理。这样做的好处是缩短主流程的响应时间,提升用户体验;
  • 应用解耦:商品服务和订单服务之间。用户下单后,订单服务会通知商品服务。不使用MQ的情况是订单服务调用商品服务的接口,这样订单服务和商品服务之间是耦合的;使用MQ,订单服务完成持久化处理,将消息写入MQ消息队列中,返回用户订单下单成功,商品服务来订阅这个下单的消息,采用拉或推的方式获得下单信息,商品服务根据商品下单信息进行商品库存信息修改,这样当下单时商品服务不可用时,也不影响正常下单,这就完成了订单服务和商品服务之间的解耦;
  • 流量消峰:秒杀活动流量过大,导致流量暴增,最终可能导致应用挂掉。一般会在应用前端加入消息队列来控制活动人数,假如消息队列超过最大数量,应该直接抛弃用户请求或者跳转到错误页面。秒杀业务根据消息队列中的请求信息在做后续的业务处理。比如在抢购时,可能一下子过来了10万个请求,但MQ只接受前100个用户的请求,超过100个不接收了。这样就成功限制了用户请求;
特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 同 ActiveMQ 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
Topic 数量对吞吐量的影响 吞吐量基本稳定 吞吐量基本稳定 Topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 Topic Topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 Topic 数量不要过多,如果要支撑大规模的 Topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 同 ActiveMQ 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 Erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
消息模型 点对点和发布订阅 点对点和发布订阅 点对点和发布/订阅 发布/订阅

一般的业务系统要引入MQ,最早大家都用ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了。

后来大家开始用RabbitMQ,但是确实Erlang语言阻止了大量的Java工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高。

不过现在确实越来越多的公司会去用RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险,目前RocketMQ已捐给Apache,但GitHub上的活跃度其实不算高,对自己公司技术实力有绝对自信的,推荐用RocketMQ,否则回去老老实实用RabbitMQ吧,人家有活跃的开源社区,绝对不会黄。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用RabbitMQ是不错的选择。大型公司,基础架构研发实力较强,用RocketMQ是很好的选择。 如果是大数据领域的实时计算、日志采集等场景,用Kafka是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

Kafka

Kafka是一个分布式的基于发布订阅模式的消息队列,由Apache软件基金会开发。它主要用于处理大规模的实时数据流,适用于各种数据管道、流处理和日志收集场景。

基础架构

MQ详解-003

  • BrokerBrokerKafka集群中的服务器节点,一台Kafka服务器就是一个Broker。每个Broker负责存储和管理消息,处理生产者的写入请求和消费者的读取请求。Kafka集群可以通过增加更多的Brokers来扩展处理能力和存储容量,一个Broker可以容纳多个Topic
  • TopicTopicKafka中的消息分类标准。消息被发送到特定的主题,消费者从主题中读取消息。每个主题可以分为多个Partition,用来提高处理能力和扩展性。
  • PartitionPartitionTopic的一个子集,是Kafka存储消息的实际单位。每个Partition是一个有序的、不可变的消息序列,存储在磁盘上的日志文件中。每个Partition有一个领导者(Leader)和多个副本(Replicas)。
  • Leader: 每个Partition有一个Leader,负责处理所有读写请求,写入消息到分区。
  • Follower: 每个Partition的副本从Leader复制消息,用于提供高可用性和数据备份。副本可以在Leader失败时接管。
  • ProducerProducer是负责将消息发送到Kafka``Topic的客户端应用程序。生产者将消息发布到一个或多个Topic中,可以选择Partition策略决定消息写入到哪个分区。
  • ConsumerConsumer是从Kafka``Topic中读取消息的客户端应用程序。消费者可以独立工作,也可以作为Consumer Group的一部分,订阅一个或多个Topic,从主题的一个或多个Partition中读取消息。
  • Consumer GroupConsumer Group是一组消费者组成的集合,用于共同处理一个或多个主题中的消息。消费者组中的每个消费者负责处理主题的不同Partition,确保消息的负载均衡和避免重复消费。
  • ZookeeperZookeeper是一个分布式协调服务,用于Kafka集群的管理。它维护Kafka集群的元数据,如Broker信息、TopicPartition信息,进行分区的领导者选举,监控Kafka集群的健康状态。

当消费者宕机后,再次启动的时候会继续消费消息,而不是从头消费消息。因为这个特性所以消费者会保存一些消费的进度信息,被称为offset,在Kafka 0.9之前保存在Zookeeper当中,在此之后保存在Kafka本地。即最终Kafka会将消息保存在本地磁盘中,默认保留 168 个小时,即 7 天。

Kafka中消息是以Topic进行分类的,Producer生产消息,Consumer消费消息,都是面向Topic的。Topic是逻辑上的概念,而Partition是物理上的概念,每个Partition对应于一个log文件,该log文件中存储的就是Producer生产的数据。

Producer生产的数据会被不断追加到该log文件末端,且每条数据都有自己的offsetConsumer组中的每个Consumer,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

发布订阅工作流程

MQ详解-004

  1. 生产者发布消息
    • 消息创建:生产者创建消息,并指定消息内容及相关元数据。
    • 选择主题和分区:生产者决定将消息发送到哪个Topic。如果未指定分区策略,Kafka会根据配置选择一个分区。分区策略可能包括按键分配或轮询。
    • 消息发送:消息被发送到指定的Topic的分区中。每个分区在本地磁盘上有一个日志文件,生产者将消息追加到日志文件末端。
    • 消息确认:生产者等待Kafka确认消息已成功写入,确认可以是来自单个副本或所有副本,取决于acks配置。
  2. 消息存储
    • 消息存储在分区的日志文件中,每条消息都有一个唯一的offset,表示在该分区中的位置。消息存储在磁盘上,Kafka默认保留消息 168 小时(即 7 天),可以通过配置调整保留时间。
  3. 消费者订阅消息
    • 消费者组:消费者可以独立工作,也可以作为消费者组的一部分。每个消费者组由一个或多个消费者组成,共同处理来自同一Topic的消息。
    • 主题订阅:消费者订阅一个或多个TopicKafka会将相应Topic的分区分配给消费者组中的消费者。
  4. 消息消费
    • 拉取消息:消费者从分区中拉取消息,消费者可以定期或按需拉取消息。
    • 处理消息:消费者处理拉取到的消息。处理完成后,消费者可以选择提交当前的offset,记录处理进度。
    • 提交offset:消费者将当前的offset提交到Kafka,以便在恢复或重新启动时从正确的位置继续消费。offset提交到Kafka(在Kafka 0.9及之后版本)或Zookeeper(在Kafka 0.8及之前版本)。
  5. 消息确认和备份
    • 消息确认:消费者可以选择自动或手动提交offset。自动提交是Kafka定期提交消费者的offset,手动提交是消费者显式提交。
    • 容错和恢复:如果消费者宕机或失败,Kafka会根据提交的offset进行消息恢复,确保消息不会丢失。Kafka使用LeaderFollower模型来提供高可用性和数据备份。

生产者

生产者是Kafka中负责将消息发送到Topic的客户端应用程序。它将消息发布到指定的TopicKafka中的消息是按Topic进行分类的。

生产者文件存储

MQ详解-005

Kafka中的消息存储在Broker服务器上的磁盘中,每个Topic被分为一个或多个Partition。 每个Partition在磁盘上对应一个日志文件,生产者发布的消息被追加到该日志文件的末尾。 由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大,导致数据定位效率低下,Kafka采取了分片和索引机制,将每个Partition分为多个Segment。 每个Segment对应两个文件,“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为,Topic名称+分区序号。 例如,first这个Topic有三个分区,则其对应的文件夹为first-0first-1first-2。其中,每个Segment中的日志数据文件大小均相等。 “.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元数据指向对应数据文件中message的物理偏移地址。其中文件的命名是以第一个数据的偏移量来命名的。

该日志数据文件的大小可以通过在Kafka Broker的config/server.properties配置文件的中的“log.segment.bytes”进行设置,默认为1G大小(1073741824字节),在顺序写入消息时如果超出该设定的阈值,将会创建一组新的日志数据和索引文件。

Kafka如何通过index文件快速找到log文件中的数据?

MQ详解-006

根据指定的偏移量,使用二分法查询定位出该偏移量对应的消息所在的分段索引文件和日志数据文件。 然后通过二分查找法,继续查找出小于等于指定偏移量的最大偏移量,同时也得出了对应的position即实际物理位置。

根据该物理位置在分段的日志数据文件中顺序扫描查找偏移量与指定偏移量相等的消息。 由于index文件中的每条对应log文件中存储内容大小都相同,所以想要找到指定的消息,只需要用index文件中的该条的大小加上该条的偏移量即可得出log文件中指定消息的位置。

生产者分区策略

分区是Kafka中的物理存储单位,每个Topic可以有多个Partition。消息在Partition中按顺序存储,每个消息有一个唯一的offset

分区策略决定了消息如何分配到不同的Partition中,直接影响到Kafka的负载均衡和性能。合理的分区策略可以提高并发处理能力,优化资源利用。

分区策略类型:

  • 轮询:轮询分区策略是Kafka的默认策略。生产者将消息按顺序轮询地发送到各个分区,确保负载均衡。 优点是实现简单,能够均匀分配负载,适用于消息内容无关的场景。但无法控制消息在分区中的顺序。
  • 按键分配:在这种策略中,生产者使用消息的Key来决定消息发送到哪个分区。消息的Key被哈希,确保具有相同Key的消息总是发送到同一个分区。 优点是保证具有相同Key的消息顺序一致,适用于需要顺序处理的场景。但可能导致某些分区负载过重,从而造成负载不均衡。
  • 自定义分区器:允许开发者实现自定义的分区策略。通过实现Partitioner接口,生产者可以定义复杂的分区逻辑以满足特定的业务需求。 优点是灵活性高,可以根据具体业务需求实现自定义分区规则。但需要开发者实现和维护自定义分区器类。

每种分区策略有其特定的应用场景和优缺点。根据具体的业务需求和系统性能要求,选择合适的分区策略可以优化Kafka的负载均衡和消息处理效率。

生产者数据可靠性保证

为保证生产者发送的数据,能可靠的发送到指定的TopicTopic的每个分区收到生产者发送的数据后,都需要向生产者发送ACK(acknowledgement确认收到)。 如果生产者收到ACK,就会进行下一轮的发送,否则重新发送数据。

MQ详解-008

Kafka确保消息在Leader副本和Follower副本中都得到确认。消息在Leader副本写入成功后,Leader会将消息复制到其他Follower副本。 副本确认机制保证了即使某个副本出现故障,数据也不会丢失。ACK配置项决定了生产者需要等待多少个副本确认消息已写入才能认为请求成功。可以设置为以下值:

  • 0:生产者不会等待任何确认,可能导致消息丢失。
  • 1:生产者等待领导者副本的确认,提供基本的可靠性。
  • all-1:生产者等待所有副本的确认,提供最高级别的数据可靠性。
同步方案 优点 缺点
半数以上完成同步,就发送ACK 延迟低 选举新的Leader时,容忍n台 节点的故障,需要2n+1个副本
全部完成同步,才发送ACK 选举新的Leader时,容忍n台节点的故障,需要n+1个副本 延迟高

理解 2n+1: 半数以上完成同步才可以发ACK,如果挂了n台有副本的服务器,那么就需要有另外n台正常发送(这样正常发送的刚好是总数(挂的和没挂的)的一半(n(挂的)+n(正常的)=2n)),因为是半数以上所以2n+1。所以总数2n+1的时候最多只能容忍n台有故障。即如果挂了n台有副本的服务器,那么存在副本的服务器的总和为 2n+1

Kafka选择了第二种方案,原因是,同样为了容忍n台节点的故障,第一种方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一种方案会造成大量数据的冗余; 虽然第二种方案的网络延迟会比较高,但网络延迟对Kafka的影响较小。

采用第二种方案之后,设想以下情景,Leader收到数据,所有Follower都开始同步数据,但有一个Follower,因为某种故障,迟迟不能与Leader进行同步,那Leader就要一直等下去,直到它完成同步,才能发送ACK。这个问题怎么解决呢?

Leader维护了一个动态的in-sync replica setISR。 当和Leader保持同步的Follower集合。当ISR中的Follower完成数据的同步之后,就会给Leader发送ACK。 如果Follower长时间未向Leader同步数据,则该Follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的Leader

生产者数据一致性保证

MQ详解-009

  • LEOLog End offset每个副本的最后一个offset
  • HWHigh Watermark高水位,指的是消费者能见到的最大的offsetISR队列中最小的LEO

Follower发生故障后会被临时踢出ISR,待该Follower恢复后,Follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。等该FollowerLEO大于等于该PartitionHW,即 Follower 追上Leader之后,就可以重新加入ISR了。 当Leader发生故障之后,会从ISR中选出一个新的Leader,之后,为保证多个副本之间的数据一致性, 其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据;如果少于Leader中的数据则会从Leader中进行同步。

生产者发送消息流程

Kafka的生产者发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程main线程和Sender线程,以及一个线程共享变量即RecordAccumulator

MQ详解-010

在生产者发送消息时,main线程将消息发送给RecordAccumulator,当数据积累到batch.size之后,sender线程才会不断从RecordAccumulator中拉取消息发送到KafkaBroker;如果数据迟迟未达到batch.sizesender线程等待linger.time之后就会发送数据。

消费者

Kafka消费者是用于从Kafka主题中读取消息的客户端应用程序。

消费者分区分配策略

一个消费者组中有多个消费者,一个主题有多个分区,所以必然会涉及到分区的分配问题,即确定哪个分区由哪个消费者来消费。消费分配策略:

  • round-robin:采用轮询的方式将当前所有的分区依次分配给所有的消费者,这种方法确保每个消费者都能获得一个相对均匀的分区负载;
  • range:首先会计算每个消费者可以消费的分区个数,然后按照顺序将指定个数范围的分区分配给各个消费者;
  • sticky:这种分区策略是最新版本中新增的一种策略,将现有的分区尽可能均衡的分配给各个消费者,存在此目的的原因在于round-robinrange分配策略实际上都会导致某几个消费者承载过多的分区,从而导致消费压力不均衡;

range按范围分配,先将所有的分区放到一起然后排序,按照平均分配的方式计算每个消费者会得到多少个分区,如果没有除尽,则会将多出来的分区依次计算到前面几个消费者。 比如这里是三个分区和两个消费者,那么每个消费者至少会得到1个分区,而3除以2后还余1,那么就会将多余的部分依次算到前面几个消费者,也就是这里的1会分配给第一个消费者。 如果按照range分区方式进行分配,其本质上是依次遍历每个Topic,然后将这些Topic的分区按照其所订阅的消费者数量进行平均的范围分配。这种方式从计算原理上就会导致排序在前面的消费者分配到更多的分区,从而导致各个消费者的压力不均衡。

消费者重复消费

消费者在消费的时候,需要维护一个offset,用于记录消费的位置。当提交的offset小于当前程序处理的最后一条消息的offset,会造成重复消费。 就是先消费,后提交offset,如果消费成功、提交失败,消费者下次获取的offset还是以前的,所以会造成重复消费。

解决重复消费:

  • 将接口设计具有幂等性。处理消息时使用唯一标识符来检测是否已经处理过该消息,从而避免重复处理。
  • Kafka支持事务机制,允许将消息处理和偏移量提交放在同一个事务中,确保操作的原子性。使用事务可以减少因消费者失败导致的消息重复处理。
  • offset保存在数据库中,使当前业务与offset提交绑定起来,这样可以一定程度避免重复消费问题。

消费者漏消费

消费者在消费时,当提交的offset大于当前程序处理的最后一条消息的offset,会造成漏消费。 就是先提交offset,后消费,如果提交成功、消费失败,消费者下次获取的offset已经是新的,所以会造成漏消费。

解决漏消费:

  • 在处理完消息并确保处理成功后,再提交偏移量。可以使用commitSync方法来保证偏移量的同步提交,从而确保只有在消息处理成功后才提交偏移量。
     try {
         processMessage(message);
         consumer.commitSync(Collections.singletonMap(new TopicPartition(topic, partition), new OffsetAndMetadata(lastOffset + 1)));
     } catch (Exception e) {
         // 处理异常情况
     }
    
  • 使用事务来保证消息处理和偏移量提交的原子性。Kafka提供了事务机制来确保消息处理和偏移量提交的可靠性。
     consumer.beginTransaction();
     try {
         processMessage(message);
         consumer.commitTransaction();
     } catch (Exception e) {
         consumer.abortTransaction();
     }
    

消费者消息积压

消费者消息积压是指消费者处理消息的速度跟不上生产者发送消息的速度,导致消息在Kafka的分区中堆积,从而造成系统的延迟增加和资源消耗增加。 如果线上遇到大量消息积压,那就是线上故障了,最可能的原因是消费者出现故障。

一般这个时候,只能临时紧急扩容了。先修复消费者的问题,确保其恢复消费速度,然后将现有消费者都停掉。 新建一个TopicPartition是原来的 10 倍,临时建立好原先10倍的queue数量。 然后写一个临时的分发数据的Consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的queue。 接着临时征用10倍的机器来部署Consumer,每一批Consumer消费一个临时queue的数据。这种做法相当于是临时将queue资源和Consumer资源扩大 10 倍,以正常的 10 倍速度来消费数据。 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的Consumer机器来消费消息。

如果没有消费者没有出现问题,而出现了消费积压的情况可以参考以下思路:

  • 提高消费并行度;
  • 批量方式消费;
  • 跳过非重要方式消费;
  • 优化消息消费业务处理过程,简化过程;

Kafka事务

Kafka0.11版本开始引入了事务支持。Kafka的事务机制允许生产者以原子性方式写入消息到多个分区和主题中。这意味着生产者可以将一组消息作为一个事务提交,要么全部成功,要么全部失败。 这种机制对于保证消息的一致性和防止数据丢失至关重要。

为了管理 Transaction,Kafka引入了一个新的组件 Transaction Coordinator。 Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。 Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

生产者事务工作流程:

  • 初始化事务:生产者需要通过initTransactions()方法初始化事务资源,为事务操作做准备。
  • 开始事务:通过beginTransaction()方法启动一个事务。这个事务会有一个全局唯一的事务ID,和生产者的PID绑定。如果生产者重启,能够通过这个事务ID找到原来的PID,从而继续事务。
  • 发送消息:在事务进行中,可以将消息发送到Kafka主题。消息会被标记为事务的一部分,直到事务完成。
  • 提交事务:使用commitTransaction()方法来提交事务。如果所有消息都成功发送,事务被提交,消息对消费者可见。
  • 中止事务:如果在事务期间出现问题,可以调用abortTransaction()方法来中止事务。这样未提交的消息不会对消费者可见。

对于消费者而言,事务的保证就会相对较弱,尤其时无法保证提交的信息被精确消费。这是由于消费者可以通过offset访问任意信息,而且不同的Segment File生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

消费者事务工作流程:

  • 配置隔离级别:设置isolation.level=read_committed,让消费者只读取已经提交的消息。
  • 消费消息:消费者从Kafka读取消息。由于配置了事务隔离级别,只会处理已经提交的消息,未提交的消息不会被读取。
发表评论