打怪升级rocketMq如何保证消息不重复消费青柠fisher

如何确定rocket会不会在消息处理成功后还会消费到重复的消息,我们可以根据消费者在生命周期启动到核心拉取消息消费的能力来决定。

上面是一个consumer的主要工作流程图,通过图我们可以观察到:

consumer启动后先进行消费者的注册,以及一些初始化init处理。

获取对应的消费点位offSet,如果是广播消费则通过本地存储获取,如果是集群消费则通过broker通信获取对应consumerGroup的消费点位。

开启异步线程,定时拉取消息。

开启异步线程,定时做消费重排。

更新本地配置工厂。

检查对应consumer配置及初始化clientFactory,初始化策略、pullApi。

获取consumer对应的消费点位,其中广播模式通过LocalFile本地文件获取对应offSet消费偏移量,并存储本地缓存offSetTable;集群模式通过向broker发送数据包请求对应consumergrop的消费点位。

异步线程池开启定时通过nameServer拉取对应的broker集群信息。

异步线程池开启通过broker拉取消息消费:

通过本地consumerTable获取对应的consumer集群信息。

根据对应pullRequest拉取消息处理缓存processQueue,如果当前处理对象已经被drop,则放弃处理;校验当前consumer服务的状态。

校验当前消费者缓存消息是否已经存在消息挤压、待处理消息超过阈值或当前服务处于busy状态,如果是将pullRequest放入延时线程队列executePullRequestLater,避免消息挤压。

如果当前消费者是广播消费,则处理前将pullRequest锁定,防止其他消费者进行消费。(这里可以体现广播消息不会出现重复消费,因为它们无法获取到已经锁定的pullRequest,并且广播消息是不支持重试的)

如果广播消息,则在锁定后立即触发一次rebalance,进行消费重排,并刷新消费节点,具体刷新规则依靠消费者设置的消费策略:从最开始节点消费或消费当前节点。后续同理。

根据consumer指定topic获取本地订阅的缓存数据,定义消息拉取callBack回调方法(这里的callBack是指consumer从broker拉取的response)

broker:

接收到pullMessage后,校验当前broker状态、校验消费者组订阅状态、校验topic状态。

获取consumergroup信息,校验消费者订阅是否合法、校验topic、校验队列数量。

构建messageFilter,校验consumer类型是否匹配;获取consumer同步到broker的consumerGroupInfo,校验版本、Tag、Filter。

在rocket5.0版本之前,采用了直接通过计算MinOffSet和MaxOffSet拉取,在5.0及之后,通过future拉取了一个message的任务(5.0之后支持了POP处理,这个后面再说):

broker拉取消息:

根据topic和队列id计算处理消息最大最小offset、校验offset点位

拉取对应的offset消息,采用零拷贝:mmap,获取到buffer流

messageFilter过滤符合条件的消息

通过commitLog计算当前消息的存储

计算下次拉取消息的建议slave,根据当前可拉取消息的长度及broker支持的长度对比

(这里的消息还是buffer流数组,是通过零拷贝直接获取的文件流)

broker后置操作:

获取到拉取的消息列表,组装responseHeader

根据状态给出下次拉取建议,master或slave

根据状态设置返回信息

执行对应的钩子接口

同步master的offset

根据拉取消息获取的buffer流数组进行组装,生成messageList

如果没有拉取到消息,broker允许挂起,将pullRequest挂起并开启子线程重试拉取

写入netty返回response

pullAPI拉取消息到processQueue:

当consumer从broker中拉取到了message后,回调pullCallBack方法:

当拉取到消息后,对消息做基本处理,然后放入processQueue缓存中。这时它就要包装consumeMessageContext并执行一些前置的钩子函数,如果这时消息并没有达到阈值,没有消息积压!这时就会通知messageListener,就是我们业务实现的方法,并对consumeMessage的返回值做封装、再执行定义的after钩子函数,然后要看这个消息是否是被丢弃的!!!注意,这里是消费完后采取处理,如果是isDrop,则不需要做一些offSet偏移!如果这次拉取并没有拉取到消息,那么根据消息拉取间隔将pullRequest再放入异步延时队列中等待下一次拉取!(拉取的核心)。

当没有拉取到消息或者拉取异常后,将pullRequest放入延时队列等待下一次拉取。

先从clientFactory拉取一个broker节点,组装requestHeader,向broker发送netty包,请求拉取消息。

oneWay单向消息,发送直接返回;Async异步消息,发送后回调CallBack的方法:成功后处理消费节点、并消费consumeService、调用钩子函数并通知consumeListener进行消费、如果消息出现了积压则延时后处理消息,如果消费异常,观察是否需要进行重试,并延时重试消费步骤。Sync同步消息,首先对处理的messageQueue进行一个大的synchronized:如果对应的processQueue已经被锁定,它可能正在被消费,将通知延后并重试获取锁,尝试通过processQueue获取对应的消息缓存,获取到后对当前队列处理的消费者进行加锁,通知消费者进行消费,并阻塞等待返回结果,处理完成后释放锁,消费成功修改offSet点位信息,并将当前消息从缓存中去除。(这是consumer在请求拉取之后的处理,如果是单向消息或异步消息,则开启线程处理回调方法,主线程直接返回)

异步线程开启定时定时清除过期消息,rocket认为消息在重试后必须消费成功后才会在processQueue中清除已经过期不需要,并且偏移量已经超过当前重试消费的消息。(这里抛砖引玉,会出现消息重复消费的问题)

异步线程开启定时rebalance重排,重排的机制不仅仅在与consumer启动注册时,在broker中也有对应的异步线程定时去通知所有consumer和processQueue进行重新匹配。

首先触发rebalance时,将消费进程暂停,通过CountDownLatch2挂起或唤醒,rocket中client很多都是基于封装的ServiceThread去做线程通信的,具体可以参考common-ServiceThread代码实现。

publicvoidwakeup(){if(hasNotified.compareAndSet(false,true)){waitPoint.countDown();//notify}}触发rebalance时的机制有这几种,首先在consumer启动时,注册节点后就会启动rebalance,broker在启动或者订阅信息发生变化后会主动要求rebalance。这里我们在后面rebalance中再具体的说明。

if(isNotifyConsumerIdsChangedEnable){this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE,group,consumerGroupInfo.getAllChannel());}}registerConsumerrebalance重排篇rocket的rebalance不仅仅是consumer的动作,broker也有定时强制要求rebalance的动作。消费重排是为了在consumer或broker发生变动的情况下,及时让消费绑定的消息发生变化。例如:

首先,通过ClientInstance获取到所有的消费者实例Inner,对消费实例进行doBalance。根据实例获取对应的订阅缓存,针对sub下每一个topic去做rebalance。

this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();如果是集群消息,要先获取对应topic下所有的消息队列和订阅的消费者,进行排序,然后根据指定策略进行消费者分配:

allocateResult=strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId()allocate根据选中分配获取到对应的结果后,再进行处理:处理方式与上文中同步消息类似,只是不是orderLy实例,不需要获取队列的锁,并创建一个processQueue计算对应的offSet消费点位,然后保存messageQueue与processQueue的关系,然后创建一个pullRequest进行拉取消息。当包含新的变动时,则需要将更新后的关系推送给broker。

当队列认为是被移除或者已经超时拉取时,都会给processQueue对应的队列设置isDrop,这时对应的processQueue是不会被消费者进行消费的

根据rocket的rebalance机制,因为它的分配机制可以帮助我们绑定一个队列只会被一个消费者进行消费,但一个消费者可以在subTable中订阅多个队列。但是rebalance在处理时会对processQueue和broker中的mq进行锁定,可能会发生短暂的消息无法拉取。

从源码的角度,其实在处理消费点位上,就能看出来当前的结论:

首先,我们可以确定的是,rocket不会帮助我们实现消息唯一消费,如果业务需要保证消费一致,一定需要通过强一致性,例如做幂等性业务来实现。

在rebalance时,异步处理消息时,如果消费者进行扩容,就会触发rebalance,这时A队列可能由Q1分配到了Q2,这时由于异步写可能offSet还没有经过处理,这时Q2在分配时已经订阅到了A,它就会通过broker获取A的消费点位,这时候可能会短暂触发A已经在Q1消费过后又被Q2拉取了一次进行消费,但是这样的做法是保证最终一致性,对整理来说并不存在问题。

从生产者考虑,如果producer发送了一条消息并等待响应,但是由于网络波动没有获取到响应时,如果设置了retryTimesWhenSendFailed或retryTimesWhenSendAsyncFailed,则会进行重发,在messageQueue中可能存在相同messageId,相同消息的问题。

THE END
1.别让先用后付成了消费陷阱付成新浪财经先用后付是指消费者无需提前支付钱款、零元下单即可体验商品,满意后再付款的一种授信赊购服务。消费者在不知情的情况下被开通先用后付,有时只是想收藏商品或先添加到购物车却变为直接下单,如此一来,可能因未在规定时间内付款而产生逾期费用及征信问题。此外,消费者无需输入支付密码就能直接下单,也可能因此出现过...http://finance.sina.com.cn/jjxw/2024-11-13/doc-incvwfmt5646472.shtml
2.如何避免重复消费陷阱?重复消费是一个普遍存在的问题,不仅会造成个人财务负担,也会对整个社会经济产生不利影响。那么,如何避免陷入重复消费的陷阱呢?下面我们就来探讨一下这个问题。 明确需求,合理消费 在进行消费之前,我们首先要明确自己的实际需求。很多人容易受广告或者周围人的影响,盲目地进行消费,结果往往是买到不需要的东西。因此,我们...https://life.yxlady.com/consume/202408/789385.shtml
3.Kafka如何保证消息的消费顺序Kafka如何保证消息不被重复消费...Kafka如何保证消息不被重复消费生产者消息重复发送 生产发送的消息没有收到正确的broke响应,导致producer重试。 详解:producer发出一条消息,broker落盘以后,因为网络等原因,发送端得到一个发送失败的响应或者网络中断,然后producer收到 一个可恢复的Exception重试消息导致消息重复。 解决:enable.idempotence=true //此时会...https://www.dtstack.com/bbs/article/19475
4.公务卡知识问答5、遇到商户不具备刷卡条件怎么办? 答:我市规定,在公务卡结算方式的适用范围内,行政事业单位应优先在具有刷卡条件的商户进行公务消费,其中市直单位原则上只在具有刷卡条件的商户进行公务消费。对确实不具备刷卡条件的公务消费活动,单位职工应先行垫付现金完成交易,在报销时单位财务部门将款项支付至职工的工资卡账户,不得...http://www.wancheng.gov.cn/sitesources/wcqczj/page_pc/tzgg/article50f4cb55dcd144318566e4675a1b7e3b.html
1.如何确保消费行为的合理性和安全性?这种合理性和安全性如何进行衡量...储蓄和债务状况:合理的消费行为应该能够保证有一定的储蓄,以应对突发情况或未来的规划。同时,要控制债务水平,避免过度负债导致财务困境。 消费满意度:消费后对所购买的产品或服务是否满意也是衡量消费合理性和安全性的重要指标。如果频繁出现不满意的情况,可能意味着消费决策存在问题。 https://stock.hexun.com/2024-11-10/215455561.html
2.降随e保重大疾病保险购买攻略阳光保险集团官方网站23、如果购买时出现“交易流水重复”,导致无法支付怎么办? 为了保证您的资金不会重复扣款,登录进会员中心,删除待支付订单,重新进行购买即可。 24、健康随e保的保费多少钱? 每个人按照不同的保额、年龄不同等,有较大的区别,您可按照自己的情况点击“保费计算”计算。 https://www.sinosig.com/page/common/prdt/html/130922.shtml
3.Alibaba最新1000多道Java面试题汇总详解,收藏起来慢慢刷!9、如何确保消息不丢失? 10、消息基于什么传输? 11、如何保证消息的顺序性 12、Kafka、ActiveMQ、RabbitMQ、RocketMQ 都有什么区别? 13、Fanout(广播分发)? 14、如何保证高可用的? 15、mq 的缺点 16、如何保证消息的可靠传输?如果消息丢了怎么办 17、如何避免消息重复投递或重复消费? https://maimai.cn/article/detail?fid=1728969401&efid=esjJLvGGL4fAr1LArgq_cQ
4.消息队列消费失败消息队列重复消费原因消息队列 消费失败 消息队列重复消费原因 1、如何保证消息不被重复消费? 一、为什么会出现重复消费的问题? RabbitMQ、RocketMQ、Kafka 都有可能出现重复消费的问题,导致重复消费的原因可能出现在生产者,也可能出现在 MQ 或 消费者。这里说的重复消费问题是指同一个数据被执行了两次,不单单指 MQ 中一条消息被消费...https://blog.51cto.com/u_16213586/9670405
5.面试官:RocketMQ如何保证消息不丢失,如何保证消息不被重复消费?先消费,消费成功后再提交; 思路一可以解决重复消费的问题但是会丢失消息,因此Rocketmq默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。 手段七:消费消息重试机制 当消费消息失败了,如果不提供重试消息的能力,则也不能算完全的可靠消费,因此RocketMQ本身提供了重新消费消息的能力。 https://cloud.tencent.com/developer/article/2085783
6.如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性...消费者处理消息失败:当消费者处理消息失败时,可能会导致消息没有被确认,从而出现重复消费的问题。 为了避免这些问题,我们需要采取一些措施来保证消息的可靠性,例如手动确认消息、消费者自身保证幂等性等。 3. 如何保证消息不被重复消费? 在MQ 中,消息的消费是异步的,消费者需要从队列中获取消息并进行处理。 https://www.nowcoder.com/discuss/522882458006618112
7.汽车电话销售话术5、客户要求与车型卖点或定位不符,怎么办? 引导需求 你要引导顾客需求甚至超越顾客需求。想想“和尚买梳”、“非洲卖鞋”,你就会明白这个道理。虽然现在汽车消费越来越崇尚个性,但依然摆脱不了汽车消费群体的“从众心理”,消费者不是专家,而你自己要树立专家形象,形成“权威效应”。 https://mip.jy135.com/zhichang/299913.html
8.公务卡知识问答33、因公务卡还款不及时而产生的循环利息和滞纳金,应由谁负担? 34、怎样避免公务卡还款不及时的问题? 35、如何查询公务卡报销是否已完成? 36、如果遗失了公务卡怎么办? 37、如果使用公务卡进行个人消费,能够保证消费信息的私密性吗? 38、如何获取短信息提示服务? https://caiwu.glut.edu.cn/info/1025/1069.htm
9.薛兆丰经济学课认真做下的笔记利息是对人们延迟消费,接受不确定性的一种补偿 商品紧俏是现货,如果可预见的未来有大量同样的商品补充,那囤货就是自杀;如果未来充满了不确定性,囤起来慢慢销售对社会更有利 23、价格管制 民间借贷不得超过银行利率的4倍,即不得超过24% 根据《合同法》、《最高人民法院关于审理民间借贷案件适用法律若干问题的规定》...https://www.douban.com/doubanapp/dispatch?uri=%2Fnote%2F860309728%3F%26
10.酒店服务礼仪标准8篇(全文)以顾客利益为重,为顾客创造更高消费价值,是酒店全体成员的共同职责。特殊服务的提供,有时涉及几个部门。只有沟通渠道畅通,各部门鼎立合作,才能保证服务的及时、有效提供。管理人员应建立明确的沟通和协作制度,增强部门间理解、上下级沟通,奖励内部服务激励协作精神,使协调工作成为各部门各成员的共同行为准则。 https://www.99xueshu.com/w/file2yyuwrp3.html
11.大数据培训Flink面试知识分享至少一次语义: barrier不对齐,会重复消费。如果不对齐,那么在 chk-100 快照之前,已经处理了一些 chk-100 对应的 offset 之后的数据, 当程序从 chk-100 恢复任务时,chk-100 对应的 offset 之后的数据还会被处理一次, 所以就出现了重复消费。 4. Savapoint了解多少? https://blog.itpub.net/70010293/viewspace-2884751/
12.Kafka中怎么保证消息不会丢失和不重复消费?consumer 采用 pull(拉) 模式从 broker 中读取数据。这个过程只涉及到了服务器和消费者两方,那消费者是怎么保证不丢失和不重复的获取消息呢? 关键在于consumer会维护一个offset,该offset实时记录着自己消费的位置。同时消费者能见到的最大的 offset,是HW, 是ISR 队列中最小的 LEO【这一点看1.3】,所以只要保证off...https://blog.csdn.net/qq_40322236/article/details/127222115
13.小本创业方法任何创业的成功案例,都可以找出更多失败的案例来反证。同样的项目,差不多的团队,操盘手法相仿,甚至办公都在楼上楼下。但是最终的结果可能南辕北辙,这就是艺术的魅力。成功是99%的努力+1%的运气,但是这1%的运气比99%的努力都要重要的多。谋事在人,成事在天。谋只是让成功的概率大一点,但是不保证一定会成功。https://www.yjbys.com/chuangye/zhidao/ruhechuangye/634661.html
14.大学生调查报告集锦15篇(3)动因分析。主要在于消费者自己的选择,其次抒告宣传,然后是亲友介绍,最后才是营业员推荐。不难发现,怎样吸引消费者的注意力,对于企业来说守键。 (二)饮食类产品的`消费情况。 本次调查主要针对一些饮食消费场所和消费者比较喜欢的饮食进行,调查表明,消费有以下几个重要特点: ...https://www.unjs.com/fanwenwang/dcbg/20221127152225_6024906.html