一个 tip 成为 kafka sarama 老中医

X1a0t,3 min read

在使用 kafka 处理消息量比较大的 topic 时,可能会遇到各种问题导致消息没有被及时消费。这类问题最直接的思路,就是排查消费机器状态。但是有时机器是完全健康的,甚至 broker 的监控(生产/消费带宽,磁盘等等)也是正常的。那么有没有一招,让我们找到顺藤摸瓜的藤呢?答案是有的,查看 sarama 的日志。

Sarama 的实现记录了丰富的日志信息,只不过默认是关闭的。

// Logger is the instance of a StdLogger interface that Sarama writes connection
// management events to. By default it is set to discard all log messages via io.Discard,
// but you can set it to redirect wherever you want.
Logger StdLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags)

我们可以替换成自己实现的 logger,甚至直接集成现有的日志框架,这样日志文件大小、输出位置、滚动存储、上报等都可以无缝集成到生产环境。废话不多说,直接上代码:

import "trpc.group/trpc-go/trpc-go/log"
 
type saramaLogger struct{}
 
func (s *saramaLogger) Print(v ...interface{}) {
	log.Error(append([]interface{}{"[sarama]"}, v...)...)
}
 
func (s *saramaLogger) Printf(format string, v ...interface{}) {
	log.Errorf("[sarama]"+format, v...)
}
 
func (s *saramaLogger) Println(v ...interface{}) {
	log.Error(append([]interface{}{"[sarama]"}, v...)...)
}
 
func main() {
  sarama.Logger = &saramaLogger{}
  // ...
}

即使没有直接使用 sarama 也是可以的,例如使用 trpc-database/kafka,但是间接依赖了 sarama。

日志内容 Belike:

一点吐槽,sarama 的实现非常一把梭(一个比较明显的风格暗示,各种 Request entity (反)序列化和真正 producer,consumer 的实现都在一个目录)实际遇到问题的时候再查就比较清楚了。

最后

如果是 sarama 的 bug,又恰好没在对应位置打 log,日志一切正常怎么办?那当然是 fork 一份啦,或者下载下来传到内网 git,在构建系统把 repo 指到自己的 repo,就可以继续加 log 排查了。

CC BY-NC 4.0 2025 © Powered by Nextra.