一个 tip 成为 kafka sarama 老中医

X1a0t,3 min read

在使用 kafka 处理消息量比较大的 topic 时,可能会遇到各种问题导致消息没有被及时消费。这类问题最直接的思路就是排查消费机器状态。但是有时机器是完全健康的,kafka 的生产/消费带宽,磁盘等也没有成为瓶颈。那么有没有一招,让我们找到顺藤摸瓜的藤呢?答案是有的,查看 sarama 的日志。

Sarama 的实现记录了丰富的日志信息,只不过默认是关闭的。

// Logger is the instance of a StdLogger interface that Sarama writes connection
// management events to. By default it is set to discard all log messages via io.Discard,
// but you can set it to redirect wherever you want.
Logger StdLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags)

我们可以替换成自己实现的 logger,甚至直接集成现有的日志框架,这样日志文件大小、输出位置、滚动存储、上报等都可以无缝集成到生产环境。废话不多说,直接上代码:

import "trpc.group/trpc-go/trpc-go/log"
 
type saramaLogger struct{}
 
func (s *saramaLogger) Print(v ...interface{}) {
	log.Error(append([]interface{}{"[sarama]"}, v...)...)
}
 
func (s *saramaLogger) Printf(format string, v ...interface{}) {
	log.Errorf("[sarama]"+format, v...)
}
 
func (s *saramaLogger) Println(v ...interface{}) {
	log.Error(append([]interface{}{"[sarama]"}, v...)...)
}
 
func main() {
  sarama.Logger = &saramaLogger{}
  // ...
}

即使没有直接使用 sarama 也是可以的,例如使用 trpc-database/kafka,但是间接依赖了 sarama。

日志内容包含了消费的分区,重平衡状态,broker 的连接状态,心跳等信息。如果有些位置 sarama 的日志没有打印,比如这个问题 fix: add locking around broker throttle timer to prevent race condition (opens in a new tab),可以自己 fork 一份,上传到内网 git,修改构建系统把依赖指到自己的 repo,就可以继续加 log 愉快地排查了。

CC BY-NC 4.0 2025 © Powered by Nextra.