博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafka重复消费问题--consumer消费能力很低
阅读量:4167 次
发布时间:2019-05-26

本文共 1267 字,大约阅读时间需要 4 分钟。

问题描述
 

采用kafka读取消息进行处理时,consumer会重复读取afka队列中的数据,使用命令查看kafka消费数据时,发现待消费数据一直没变。

问题原因 

kafka的consumer消费数据时首先会从broker里读取一批消息数据进行处理,处理完成后再提交offset。而项目中的consumer消费能力比较低,导致取出的一批数据在session.timeout.ms时间内没有处理完成自动提交offset失败,然后kafka会重新分配partition给消费者,消费者又重新消费之前的一批数据,又出现了消费超时,所以会造成死循环,一直消费相同的数据,无限循环下去

解决方案 

项目中使用的是spring-kafka,所以把kafka消费者的配置enable.auto.commit设为false,禁止kafka自动提交offset,从而使用spring-kafka提供的offset提交策略。spring-kafka中的offset提交策略可以保证一批消息数据没有完成消费的情况下,也能提交offset,从而避免了提交失败而导致永远重复消费的问题。

先来看看spring-kafka的消费线程逻辑if (isRunning() && this.definedPartitions != null) {     initPartitionsIfNeeded();      // we start the invoker here as there will be no rebalance calls to       // trigger it, but only if the container is not set to autocommit       // otherwise we will process records on a separate thread         if (!this.autoCommit) {                  startInvoker();        }}
上面可以看到,
如果auto.commit关掉的话,spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,他消费的数据不是直接从kafka里面直接取的,那么他消费的数据从哪里来呢?他是从一个spring-kafka自己创建的阻塞队列里面取的。然后会进入一个循环,从源代码中可以看到如果auto.commit被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。

然后把取到的数据丢给上面提到的阻塞列队,由上面创建的线程去消费,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。接着spring-kafka还会处理一些异常的情况,比如失败之后是不是需要commit offset这样的逻辑

参考:

          

转载地址:http://edexi.baihongyu.com/

你可能感兴趣的文章
如何判断变量在内存中如何放置的?低位在前还是高位在前
查看>>
c语言中通过指针将数值赋值到制定内存地址
查看>>
64位与32位linux c开发时默认字节对齐值
查看>>
malloc(malloc在32位编译系统中分配的地址会8字节对齐,64为编译系统中会8或者16字节对齐)
查看>>
初始化时共享内存的key值和信号量初始化的key值可以一样
查看>>
linux创建线程之pthread_create
查看>>
pthread_attr_init线程通俗举例讲解与线程属性
查看>>
进程和线程的区别
查看>>
int main(int argc,char* argv[])详解,以及与int main()有什么区别
查看>>
SourceInsight全工程查找替换方法
查看>>
C语言chdir()函数:改变当前的工作目录
查看>>
Linux下的函数执行时间的统计方法(测试某个函数的执行时间)
查看>>
调整内核printk的打印级别(启动脚本中运行 echo 0 4 0 7 > /proc/sys/kernel/printk 关闭所有内核打印)
查看>>
临时关闭打开console办法
查看>>
Linux中gmtime和localtime的区别(time_t格式转换为tm格式)
查看>>
如果函数传递的是结构体,小心在调用的参数中给指针重新赋值(拿tm结构体举例)
查看>>
使用nm命令获取linux的可执行文件里或动态库中的所有函数名称
查看>>
动态库编写 头文件.h注意事项
查看>>
多个动态库的依赖问题(先后顺序务必注意)
查看>>
二叉树的最大深度
查看>>