一个 tip 成为 kafka sarama 老中医
在使用 kafka 处理消息量比较大的 topic 时,可能会遇到各种问题导致消息没有被及时消费。这类问题最直接的思路就是排查消费机器状态。但是有时机器是完全健康的,kafka 的生产/消费带宽,磁盘等也没有成为瓶颈。那么有没有一招,让我们找到顺藤摸瓜的藤呢?答案是有的,查看 sarama 的日志。
Sarama 的实现记录了丰富的日志信息,只不过默认是关闭的。
// Logger is the instance of a StdLogger interface that Sarama writes connection
// management events to. By default it is set to discard all log messages via io.Discard,
// but you can set it to redirect wherever you want.
Logger StdLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags)
我们可以替换成自己实现的 logger,甚至直接集成现有的日志框架,这样日志文件大小、输出位置、滚动存储、上报等都可以无缝集成到生产环境。废话不多说,直接上代码:
import "trpc.group/trpc-go/trpc-go/log"
type saramaLogger struct{}
func (s *saramaLogger) Print(v ...interface{}) {
log.Error(append([]interface{}{"[sarama]"}, v...)...)
}
func (s *saramaLogger) Printf(format string, v ...interface{}) {
log.Errorf("[sarama]"+format, v...)
}
func (s *saramaLogger) Println(v ...interface{}) {
log.Error(append([]interface{}{"[sarama]"}, v...)...)
}
func main() {
sarama.Logger = &saramaLogger{}
// ...
}
即使没有直接使用 sarama 也是可以的,例如使用 trpc-database/kafka,但是间接依赖了 sarama。
日志内容包含了消费的分区,重平衡状态,broker 的连接状态,心跳等信息。如果有些位置 sarama 的日志没有打印,比如这个问题 fix: add locking around broker throttle timer to prevent race condition (opens in a new tab),可以自己 fork 一份,上传到内网 git,修改构建系统把依赖指到自己的 repo,就可以继续加 log 愉快地排查了。