Maven依赖
1
2
3
4
5
6
7
fastjson-1.2.44.jar
netty-all-4.1.19.Final.jar
rocketmq-client-4.2.0.jar
rocketmq-common-4.2.0.jar
rocketmq-remoting-4.2.0.jar
slf4j-api-1.7.25.jar
slf4j-nop-1.7.25.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-remoting</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.19.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
RocketMQ配置文件spring-rokectmq.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="rocketmqProduct"
class="org.apache.rocketmq.client.producer.DefaultMQProducer"
init-method="start"
destroy-method="shutdown">
<property name="producerGroup" value="concurrent_producer"/>
<property name="namesrvAddr" value="192.168.0.103:9876"/>
</bean>
<bean id="consumerSpringListener"
class="com.ju.biz.mq.rokectmq.quickstartspring.ConsumerSpringListener" />
<bean id="rocketmqConsumer"
class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"
init-method="start" destroy-method="shutdown">
<property name="consumerGroup" value="concurrent_consumer"/>
<property name="namesrvAddr" value="192.168.0.103:9876"/>
<property name="messageListener" ref="consumerSpringListener"/>
<property name="subscription">
<map>
<entry key="TopicTest">
<value>*</value>
</entry>
</map>
</property>
</bean>
</beans>
生产者ProducerSpring.java
1
2
3
4
5
6
7
8
9
10
11
ApplicationContext context =
new ClassPathXmlApplicationContext("/spring/spring-rokectmq.xml");
DefaultMQProducer producer = (DefaultMQProducer) context.getBean("rocketmqProduct");
try {
Message msg = new Message("TopicTest", "TagA",
("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
消费者ConsumerSpringListener.java
1
2
3
4
5
6
7
8
9
public class ConsumerSpringListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}