kafka 极速消费方案

X1a0t,10 min read

消息队列作为一种常见的解耦方案,通过把业务逻辑中的非关键的部分解耦到单独的消费服务异步处理,保证了原有服务的核心链路使用全部机器资源,以及异步的业务发生故障时不影响主链路的正常运行,提高了主链路的稳定性。

意义

本质上看消息队列是一种异步方案,但是有一些场景,尽管实时性要求没有像交易系统一样极端,从产品逻辑上我们往往也要求“异步”对用户无感。也就是说,在用户体验上,这里就像是在同步处理

比如用户注册时,需要发送一封激活邮件,这个过程往往从注册服务拆出单独服务异步来做,但是同时希望用户在注册账号后立刻收到激活邮件,此时就需要保证消息队列的消费速度。或者一个服务的容量计费系统,用户在使用产品时,希望能“实时”把容量的增/减量计算展示给用户。再或者一个不太恰当的例子,例如 AI Token 的计费界面,如果可以在调用的同时,余额以小数点后 X 位实时跳字,是不是也有查水表的激情(误)。

并且我们还希望即使异步比同步延迟更高,但是也能够保证消息能够被稳定的处理,而不是因为异步,就导致消息被漏掉。

归根结底,Kafka 的快速消费要满足的是 既要拆异步服务保主链路,同时还希望做到 像同步服务一样低时延,稳定处理 的目标。另外低时延额外的好处是可以避开一种极端情况,(以 kafka 为例)当消息消费速度慢到 broker 认为 kafka client 已经失联(迟迟处理不完当前的消息导致不拉取下一个 offset 消息),这时候就要触发 kafka 的重平衡了,经验上看,重平衡的原因通常比较稳定(只有 0 次和无数次)。

寻找瓶颈

异步消费的处理流程其实比较清晰,以 kafka 为例:

kafka client 获取当前消费的分区 -> 从分区当前的 offset 拿到消息 -> 传入业务逻辑处理消息 -> 处理成功(如果失败看情况重试或者跳过) -> 提交消息增加 offset -> 循环处理下一个 offset 消息。(单机器分配多分区的情况同样,并发进行这个过程)

  1. 首先最容易解决的是 Kafka 这类消息中间件的带宽。无论是适当加钱,或者是精简消息内容,或者极端情况只传 id 把真正的内容放到数据库,都可以保证业务可以及时拿到消息。

  2. 耗时的部分往往在业务逻辑处理消息,很多框架(sarama, trpc-kafka, ...) 都是希望业务去实现一个 func (ctx context.Context, *sarama.ConsumerMessage) error, (或者 ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error) 之类的 Handler,里面怎么实现框架不关心。框架只关心当返回成功时,就去帮忙拉下个消息继续用业务提供的方法处理。越快返回越好,如果迟迟不返回,框架的各种兜底措施就基本都要触发了。

    关于业务耗时:

    i. 有一类瓶颈是来自于外部系统,服务在收到消息之后调用外部系统,这种 case 大概率是外部系统需要优化或者加钱的部分。例如邮件的例子瓶颈可能在外部邮件服务的处理速度(第三方 SMTP、SendGrid、阿里云邮件服务等)。

    ii. 另外一类例子就是内部系统了,比较明显的特征就是机器的 CPU 和内存水位都很高。

解决方案

对于内部系统瓶颈的情况,我们简化一下处理模型,假设内存优化比较理想完全够用,主要瓶颈在 CPU。机器的有效处理速度是单个被调度到的线程可以在 10ms ~ 10s 不等的时间处理一条消息(不同用户的业务数据规模可能不同)。

处理消息瓶颈在 CPU 的情况下,为了尽快处理消息,需要考虑的是 如何消费消息可以让 CPU 跑满?

首先考虑业务的 Handler 在拿到消息后,是立刻返回 nil,同时内部 go 出去一个协程处理消息,还是在 Handler 里面同步处理完再返回。前者对应 1. 业务侧异步处理,后者对应 2. 业务侧同步处理。抛开控制协程数量等问题,业务异步处理的最大问题是无法保证消息被稳定处理,例如机器由于故障重启,或者正常发布重启,期间正在处理的消息就会丢失,因为 Kafka 的 Offset 已经向前提交,由此看来异步的方案不能满足需求。

考虑一下同步的方案:

  1. 在业务侧同步处理,也就是在 func (ctx context.Context, *sarama.ConsumerMessage) error 真正处理完再返回,而不是 go 出去一个协程跑完立刻返回。

    i. 最常用的消费模型,拉单个 offset 消息处理再提交,因为同时只有一个消息在被处理,因此只能跑满单个 CPU。

    ii. 如果是批量拉取,例如一个批次拉 32 条消息,等这 32 条全部处理完之后,再一次性把 offset 前移 32。在一批消息最初处理时确实可以跑满,但是在剩下消息数量 < 核数时,就会有空闲的核。极端情况下,如果只剩单个处理非常慢消息,就会回退到一次单条消息处理的模型。

既然方案 ii. 的问题在于,有消息处理完时无法及时提交 mark offset。那么是不是可以预拉取一些消息,维持一定的预拉取窗口大小。机器内部边并发消费,边拉取新的消息,这样既实现了同步消费,也解决了 CPU 空载的问题。这里有个缺陷是机器总会有重启,例如正常发布重启,这时窗口内的消息会被漏消费。

如果我们可以设法在容器重启前给服务发送一个 SIGTERM 信号,并且在服务中拦截,在收到信号后不再处理新消息,而是给刚刚已经提交 offset 的消息留出一些处理的时间,就可以解决绝大部分丢消息的情况了。

因此,我们可以给容器的 pre_stop 生命周期挂上一个 hook, 例如 lifecycle_pre_stop = ["/bin/sh", "-c", "pkill -SIGTERM app; sleep 30"], 并且配好优雅退出时间,就可以实现了。一个最小 Demo:

type MsgHandler = func(ctx context.Context, msg *sarama.ConsumerMessage) error
 
type ConcurrentConsumer struct {
	msgHandler       MsgHandler
	shouldConsumeMsg atomic.Bool
	msgDispatcher    chan struct{}
}
 
func NewConcurrentConsumer(msgHandler MsgHandler) *ConcurrentConsumer {
	msgDispatcher := make(chan struct{}, chanSize)
	for i := 0; i < chanSize; i++ {
		msgDispatcher <- struct{}{}
	}
	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, server.DefaultServerCloseSIG...)
	c := &ConcurrentConsumer{
		msgHandler:    msgHandler,
		msgDispatcher: msgDispatcher,
	}
	c.shouldConsumeMsg.Store(true)
	go func() {
		<-signalCh
		c.shouldConsumeMsg.Store(false)
	}()
	return c
}
 
func (c *ConcurrentConsumer) handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
	defer() {
		c.msgDispatcher <- struct{}{}
	}()
	_ = c.msgHandler(ctx, msg)
}
 
func (c *ConcurrentConsumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
	if !c.shouldConsumeMsg.Load() {
		log.ErrorContext(ctx, "received ServerCloseSIG, stop consuming")
		select {}
	}
	<-c.msgDispatcher
	go c.handle(ctx, msg)
	return nil
}

一些延伸

  1. 如果机器在处理 Kafka 消息时,按顺序对相同的 Key 处理速度可以更快,应该如何设计消费方式?

  2. 机器故障重启的情况,应该如何保证消息不丢失?

CC BY-NC 4.0 2025 © Powered by Nextra.