一个 tip 成为 kafka sarama 老中医
在使用 kafka 处理消息量比较大的 topic 时,可能会遇到各种问题导致消息没有被及时消费。这类问题最直接的思路,就是排查消费机器状态。但是有时机器是完全健康的,甚至 broker 的监控(生产/消费带宽,磁盘等等)也是正常的。那么有没有一招,让我们找到顺藤摸瓜的藤呢?答案是有的,查看 sarama 的日志。
Sarama 的实现记录了丰富的日志信息,只不过默认是关闭的。
// Logger is the instance of a StdLogger interface that Sarama writes connection
// management events to. By default it is set to discard all log messages via io.Discard,
// but you can set it to redirect wherever you want.
Logger StdLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags)
我们可以替换成自己实现的 logger,甚至直接集成现有的日志框架,这样日志文件大小、输出位置、滚动存储、上报等都可以无缝集成到生产环境。废话不多说,直接上代码:
import "trpc.group/trpc-go/trpc-go/log"
type saramaLogger struct{}
func (s *saramaLogger) Print(v ...interface{}) {
log.Error(append([]interface{}{"[sarama]"}, v...)...)
}
func (s *saramaLogger) Printf(format string, v ...interface{}) {
log.Errorf("[sarama]"+format, v...)
}
func (s *saramaLogger) Println(v ...interface{}) {
log.Error(append([]interface{}{"[sarama]"}, v...)...)
}
func main() {
sarama.Logger = &saramaLogger{}
// ...
}
即使没有直接使用 sarama 也是可以的,例如使用 trpc-database/kafka,但是间接依赖了 sarama。
日志内容 Belike:
一点吐槽,sarama 的实现非常一把梭(一个比较明显的风格暗示,各种 Request entity (反)序列化和真正 producer,consumer 的实现都在一个目录)实际遇到问题的时候再查就比较清楚了。
最后
如果是 sarama 的 bug,又恰好没在对应位置打 log,日志一切正常怎么办?那当然是 fork 一份啦,或者下载下来传到内网 git,在构建系统把 repo 指到自己的 repo,就可以继续加 log 排查了。
CC BY-NC 4.0 2025 © Powered by Nextra.