Home Spring集成RocketMQ
Post
Cancel

Spring集成RocketMQ

Maven依赖

1
2
3
4
5
6
7
fastjson-1.2.44.jar
netty-all-4.1.19.Final.jar
rocketmq-client-4.2.0.jar
rocketmq-common-4.2.0.jar
rocketmq-remoting-4.2.0.jar
slf4j-api-1.7.25.jar
slf4j-nop-1.7.25.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<!-- RocketMQ -->
<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-client</artifactId>
 <version>4.2.0</version>
</dependency>
<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-common</artifactId>
 <version>4.2.0</version>
</dependency>
<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-remoting</artifactId>
 <version>4.2.0</version>
</dependency>
<dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-all</artifactId>
 <version>4.1.19.Final</version>
</dependency>
<dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>1.2.44</version>
</dependency>
<dependency>
 <groupId>org.slf4j</groupId>
 <artifactId>slf4j-api</artifactId>
 <version>1.7.25</version>
</dependency>
<dependency>
 <groupId>org.slf4j</groupId>
 <artifactId>slf4j-nop</artifactId>
 <version>1.7.25</version>
 <scope>test</scope>
</dependency>

RocketMQ配置文件spring-rokectmq.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
            http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="rocketmqProduct" 
        class="org.apache.rocketmq.client.producer.DefaultMQProducer" 
        init-method="start"
        destroy-method="shutdown">
        <property name="producerGroup" value="concurrent_producer"/>
        <property name="namesrvAddr" value="192.168.0.103:9876"/>
    </bean>

    <bean id="consumerSpringListener" 
        class="com.ju.biz.mq.rokectmq.quickstartspring.ConsumerSpringListener" />

    <bean id="rocketmqConsumer" 
        class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer" 
        init-method="start" destroy-method="shutdown">
        <property name="consumerGroup" value="concurrent_consumer"/>
        <property name="namesrvAddr" value="192.168.0.103:9876"/>
        <property name="messageListener" ref="consumerSpringListener"/>
        <property name="subscription">
            <map>
                <entry key="TopicTest">
                <value>*</value>
                </entry>
            </map>
        </property>
    </bean>
</beans>

生产者ProducerSpring.java

1
2
3
4
5
6
7
8
9
10
11
ApplicationContext context = 
            new ClassPathXmlApplicationContext("/spring/spring-rokectmq.xml"); 
DefaultMQProducer producer = (DefaultMQProducer) context.getBean("rocketmqProduct");
try {
    Message msg = new Message("TopicTest", "TagA", 
                    ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = producer.send(msg);
} catch (Exception e) {
    e.printStackTrace();
    Thread.sleep(1000);
}

消费者ConsumerSpringListener.java

1
2
3
4
5
6
7
8
9
public class ConsumerSpringListener implements MessageListenerConcurrently { 
     @Override 
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
                ConsumeConcurrentlyContext context) { 
        System.out.printf("%s Receive New Messages: %s %n", 
                    Thread.currentThread().getName(), msgs); 
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 
    } 
}
This post is licensed under CC BY 4.0 by the author.