Home 关于RocketMQ
Post
Cancel

关于RocketMQ

RocketMQ队列的长度

RocketMQ没有内存Buffer概念,RocketMQ的队列都是持久化磁盘,数据定期清除。RocketMQ同其他MQ有非常显著的区别,RocketMQ的内存Buffer抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker会定期删除过期的数据,例如Broker只保存3天的消息,那么这个Buffer虽然长度无限,但是3天前的数据会被从队尾删除。

RocketMQ的消息优先级

RocketMQ并不遵循任何规范,但参考了各种规范的设计思想。考虑到持久化的消息按照优先级排序开销大,RocketMQ没有特意支持消息优先级。在Message的API中的确没有提供和Priority(优先级)有关的方法。两种变通的处理思路供参考:

  • 使用消息队列来表示不同的优先级:单独配置一个优先级高的队列,和一个普通优先级的队列,将不同优先级发送到不同队列即可;
  • 使用t来表示不同的优先级:每个优先级可以用不同的Topic表示,发消息时,指定不同的Topic来表示优先级。

RocketMQ提供2级消息分类,方便使用者灵活控制

Topic

Topic表示消息的第一级类型,比如一个电商系统的消息可以分为:交易消息、物流消息……一条消息必须有一个Topic。

Tag

Tag表示消息的第二级类型,比如交易消息又可以分为:交易创建消息,交易完成消息…..一条消息可以没有Tag。

订阅指定Topic下Tags分别等于TagA或TagC或TagD

1
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

设置为广播消费模式

1
consumer.setMessageModel(MessageModel.BROADCASTING);

消费者的消费位置

ConsumeFromWhere consumeFromWhere 

  • CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 
  • CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 
  • CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费   以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在Broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始。

如何拿到消费者的处理结果

绝大部分队列用于削峰和解耦,也有用于处理分布式事务。削峰是用于当应用处理不过来过大的并发请求时,将请求存于队列中,用单个或多个消费者来处理请求。这时候想要拿到请求怎么办呢?

  1. 每个消费者处理完成请求,再发送消息到消息队列中,生产者再实现消费者来消费这些处理结果信息;
  2. 将处理结果存于缓存等高性能组件中,通过轮询的方式获取任务处理结果,消息结果可以用推送的方式告知(移动端),也可以让请求方每个几秒轮询一次处理结果。

队列重新消费的间隔

每次重新消费间隔:10s,30s,1m,2m,3m

默认消息重发次数

为16次。

This post is licensed under CC BY 4.0 by the author.