RocketMQ队列的长度
RocketMQ没有内存Buffer概念,RocketMQ的队列都是持久化磁盘,数据定期清除。RocketMQ同其他MQ有非常显著的区别,RocketMQ的内存Buffer抽象成一个无限长度的队列,不管有多少数据进来都能装得下,这个无限是有前提的,Broker会定期删除过期的数据,例如Broker只保存3天的消息,那么这个Buffer虽然长度无限,但是3天前的数据会被从队尾删除。
RocketMQ的消息优先级
RocketMQ并不遵循任何规范,但参考了各种规范的设计思想。考虑到持久化的消息按照优先级排序开销大,RocketMQ没有特意支持消息优先级。在Message的API中的确没有提供和Priority(优先级)有关的方法。两种变通的处理思路供参考:
- 使用消息队列来表示不同的优先级:单独配置一个优先级高的队列,和一个普通优先级的队列,将不同优先级发送到不同队列即可;
- 使用t来表示不同的优先级:每个优先级可以用不同的Topic表示,发消息时,指定不同的Topic来表示优先级。
RocketMQ提供2级消息分类,方便使用者灵活控制
Topic
Topic表示消息的第一级类型,比如一个电商系统的消息可以分为:交易消息、物流消息……一条消息必须有一个Topic。
Tag
Tag表示消息的第二级类型,比如交易消息又可以分为:交易创建消息,交易完成消息…..一条消息可以没有Tag。
订阅指定Topic下Tags分别等于TagA或TagC或TagD
1
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
设置为广播消费模式
1
consumer.setMessageModel(MessageModel.BROADCASTING);
消费者的消费位置
ConsumeFromWhere consumeFromWhere
CONSUME_FROM_LAST_OFFSET
:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费CONSUME_FROM_FIRST_OFFSET
:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费CONSUME_FROM_TIMESTAMP
:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在Broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始。
如何拿到消费者的处理结果
绝大部分队列用于削峰和解耦,也有用于处理分布式事务。削峰是用于当应用处理不过来过大的并发请求时,将请求存于队列中,用单个或多个消费者来处理请求。这时候想要拿到请求怎么办呢?
- 每个消费者处理完成请求,再发送消息到消息队列中,生产者再实现消费者来消费这些处理结果信息;
- 将处理结果存于缓存等高性能组件中,通过轮询的方式获取任务处理结果,消息结果可以用推送的方式告知(移动端),也可以让请求方每个几秒轮询一次处理结果。
队列重新消费的间隔
每次重新消费间隔:10s,30s,1m,2m,3m
默认消息重发次数
为16次。