Spring process analysis of ActiveMQ message queue technology integration
This article mainly introduces the integration of ActiveMQ message queue technology and spring process parsing. It is introduced in great detail through the example code, which has a certain reference value for everyone's study or work. Friends in need can refer to it
1、 Business logic
When modifying the status of an item, I want to send a broadcast to the corresponding listener at the same time. This item is stored in Solr, and a detail page of the current item is generated through the web page static template. At this time, the broadcast mechanism is used
When I delete an item, I send a broadcast to the corresponding listener and delete the corresponding item in Solr.
Broadcast mechanism: you must be online at the same time to receive my messages
Using message oriented middleware requires importing configuration files
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.200.128:61616"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmstemplate" class="org.springframework.jms.core.jmstemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- 发布订阅模式,商品导入索引库和生成静态页面 --> <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!--将商品上架所有的商品的id发送到这个队列中--> <constructor-arg value="youlexuan_topic_page_solr"/> </bean> <!-- 点对点模式--> <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!--将商品上架所有的商品的id发送到这个队列中--> <constructor-arg value="youlexuan_queue_solr_delete"/> </bean> </beans>
Publish broadcast:
if ("1".equals(status)){ jmstemplate.send(topicPageAndSolrDestination,new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(String.valueOf(id)); return textMessage; } }); }
Listener 1: store the current product in Solr: operate the server configuration file of Solr
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!--产生Connection--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.200.128:61616"/> </bean> <!--spring 管理connectionFactory--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--发布订阅模式 将数据导入solr 索引库--> <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="youlexuan_topic_page_solr"/> </bean> <!--发布订阅模式 消息监听容器 将数据导入solr 索引库--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicPageAndSolrDestination" /> <property name="messageListener" ref="pageAndSolrListener" /> </bean> #对应的用来监听执行往solr中保存库存的消息 <bean id="pageAndSolrListener" class="com.ghh.sellergoods.service.listener.ItemSearchListener"></bean> <!--点对点的模式 删除索引库--> <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!--指定从这个队列中 接收下架商品的--> <constructor-arg value="youlexuan_queue_solr_delete"/> </bean> <!--点对点的模式 消息监听器 删除索引库--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueSolrDeleteDestination" /> <property name="messageListener" ref="itemDeleteListener" /> </bean> <bean id="itemDeleteListener" class="com.ghh.sellergoods.service.listener.ItemDeleteListener"></bean> </beans>
Listener class
public class ItemSearchListener implements MessageListener { @Autowired private SearchService searchService; @Autowired private ItemDao itemDao; @Override public void onMessage(Message message) { //获取生产者发布的广播,往solr中添加库存列表 ActiveMQTextMessage atm = (ActiveMQTextMessage) message; try { //获取广播中的数据。 Long goodsId = Long.valueOf(atm.getText()); //通过传过来的商品id去查询库存表 ItemQuery query = new ItemQuery(); ItemQuery.Criteria criteria = query.createCriteria(); criteria.andGoodsIdEqualTo(goodsId); //查询对应商品id的库存表 List<Item> items = itemDao.selectByExample(query); //调用对应的方法,往solr中添加当前商品对应库存信息 searchService.importList(items); } catch (JMSException e) { e.printStackTrace(); } } }
Listener class 2: configuration files
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!--产生Connection工厂类--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.200.128:61616"/> </bean> <!--spring管理工厂类--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--发布订阅模式 生成页面--> <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic"> <!--指定从这个队列上获取上架的商品id--> <constructor-arg value="youlexuan_topic_page_solr"/> </bean> <!--发布订阅模式 消息监听器 生成页面--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="topicPageAndSolrDestination" /> <property name="messageListener" ref="Pagelistener" /> </bean> <bean id="Pagelistener" class="com.ghh.core.service.listener.Pagelistener"></bean> </beans>
Listener class 2: generate static web page template
public class Pagelistener implements MessageListener { @Autowired private CmsService cmsService; @Override public void onMessage(Message message) { ActiveMQTextMessage atm = (ActiveMQTextMessage) message; try { Long goodsId = Long.valueOf(atm.getText()); Map<String,Object> goodsData = cmsService.findGoodsData(goodsId); cmsService.createStaticPage(goodsId,goodsData); } catch (Exception e) { e.printStackTrace(); } } }
Point to point
When I delete goods, I need the corresponding service to delete the inventory information in Solr. The addition and deletion use the same service and the above configuration file
//发布广播,@Autowired private ActiveMQTopic topicPageAndSolrDestination; //在修改的代码方法中来广播发布当前商品的id if (ids.length>0) { jmstemplate.send(queueSolrDeleteDestination,new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(String.valueOf(ids)); return textMessage; } }); }
#执行删除solr中库存信息 public class ItemDeleteListener implements MessageListener { @Autowired private SearchService searchService; @Override public void onMessage(Message message) { ActiveMQTextMessage atm = (ActiveMQTextMessage) message; try { Long goodsId = Long.valueOf(atm.getText()); searchService.deleteById(goodsId); } catch (JMSException e) { e.printStackTrace(); } } }
The above is the whole content of this article. I hope it will help you in your study, and I hope you will support us a lot.