Spring process analysis of ActiveMQ message queue technology integration

This article mainly introduces the integration of ActiveMQ message queue technology and spring process parsing. It is introduced in great detail through the example code, which has a certain reference value for everyone's study or work. Friends in need can refer to it

1、 Business logic

When modifying the status of an item, I want to send a broadcast to the corresponding listener at the same time. This item is stored in Solr, and a detail page of the current item is generated through the web page static template. At this time, the broadcast mechanism is used

When I delete an item, I send a broadcast to the corresponding listener and delete the corresponding item in Solr.

Broadcast mechanism: you must be online at the same time to receive my messages

Using message oriented middleware requires importing configuration files

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:jms="http://www.springframework.org/schema/jms"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd">
  <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
  <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
  </bean>
  <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
  <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
  <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
  </bean>
  <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
  <bean id="jmstemplate" class="org.springframework.jms.core.jmstemplate">
    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
    <property name="connectionFactory" ref="connectionFactory"/>
  </bean>  

  <!-- 发布订阅模式,商品导入索引库和生成静态页面 -->
  <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">
     <!--将商品上架所有的商品的id发送到这个队列中-->
     <constructor-arg value="youlexuan_topic_page_solr"/>
  </bean>
  <!-- 点对点模式-->
  <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <!--将商品上架所有的商品的id发送到这个队列中-->
    <constructor-arg value="youlexuan_queue_solr_delete"/>
  </bean> 

</beans>

Publish broadcast:

if ("1".equals(status)){
  jmstemplate.send(topicPageAndSolrDestination,new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
      TextMessage textMessage = session.createTextMessage(String.valueOf(id));
      return textMessage;
    }
  });
}

Listener 1: store the current product in Solr: operate the server configuration file of Solr

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:jms="http://www.springframework.org/schema/jms"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd">
   <!--产生Connection-->
  <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
  </bean>
  <!--spring 管理connectionFactory-->
  <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
  </bean>
  <!--发布订阅模式  将数据导入solr 索引库-->
  <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="youlexuan_topic_page_solr"/>
  </bean>
  <!--发布订阅模式  消息监听容器 将数据导入solr 索引库-->
  <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="topicPageAndSolrDestination" />
    <property name="messageListener" ref="pageAndSolrListener" />
  </bean>
#对应的用来监听执行往solr中保存库存的消息
  <bean id="pageAndSolrListener" class="com.ghh.sellergoods.service.listener.ItemSearchListener"></bean>
  <!--点对点的模式 删除索引库-->
  <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <!--指定从这个队列中 接收下架商品的-->
    <constructor-arg value="youlexuan_queue_solr_delete"/>
  </bean>
  <!--点对点的模式 消息监听器 删除索引库-->
  <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="queueSolrDeleteDestination" />
    <property name="messageListener" ref="itemDeleteListener" />
  </bean>
  <bean id="itemDeleteListener" class="com.ghh.sellergoods.service.listener.ItemDeleteListener"></bean>
</beans>

Listener class

public class ItemSearchListener implements MessageListener {
  @Autowired
  private SearchService searchService;
  @Autowired
  private ItemDao itemDao;
  @Override
  public void onMessage(Message message) {
    //获取生产者发布的广播,往solr中添加库存列表
    ActiveMQTextMessage atm = (ActiveMQTextMessage) message;
    try {
      //获取广播中的数据。
      Long goodsId = Long.valueOf(atm.getText());
      //通过传过来的商品id去查询库存表
      ItemQuery query = new ItemQuery();
      ItemQuery.Criteria criteria = query.createCriteria();
      criteria.andGoodsIdEqualTo(goodsId);
      //查询对应商品id的库存表
      List<Item> items = itemDao.selectByExample(query);
        //调用对应的方法,往solr中添加当前商品对应库存信息
      searchService.importList(items);
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}

Listener class 2: configuration files

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:jms="http://www.springframework.org/schema/jms"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd">
  <!--产生Connection工厂类-->
  <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
  </bean>
  <!--spring管理工厂类-->
  <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
  </bean>
  <!--发布订阅模式 生成页面-->
  <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">
     <!--指定从这个队列上获取上架的商品id-->
     <constructor-arg value="youlexuan_topic_page_solr"/>
  </bean>
  <!--发布订阅模式 消息监听器 生成页面-->
  <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="topicPageAndSolrDestination" />
    <property name="messageListener" ref="Pagelistener" />
  </bean>
  <bean id="Pagelistener" class="com.ghh.core.service.listener.Pagelistener"></bean>

</beans>

Listener class 2: generate static web page template

public class Pagelistener implements MessageListener {
  @Autowired
  private CmsService cmsService;
  @Override
  public void onMessage(Message message) {
    ActiveMQTextMessage atm = (ActiveMQTextMessage) message;
    try {
      Long goodsId = Long.valueOf(atm.getText());
      Map<String,Object> goodsData = cmsService.findGoodsData(goodsId);
      cmsService.createStaticPage(goodsId,goodsData);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

Point to point

When I delete goods, I need the corresponding service to delete the inventory information in Solr. The addition and deletion use the same service and the above configuration file

//发布广播,@Autowired
  private ActiveMQTopic topicPageAndSolrDestination;
//在修改的代码方法中来广播发布当前商品的id

if (ids.length>0) {
      jmstemplate.send(queueSolrDeleteDestination,new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
          TextMessage textMessage = session.createTextMessage(String.valueOf(ids));
          return textMessage;
        }
      });
    }
#执行删除solr中库存信息

public class ItemDeleteListener implements MessageListener {
  @Autowired
  private SearchService searchService;

  @Override
  public void onMessage(Message message) {
    ActiveMQTextMessage atm = (ActiveMQTextMessage) message;
    try {
      Long goodsId = Long.valueOf(atm.getText());
      searchService.deleteById(goodsId);
    } catch (JMSException e) {
      e.printStackTrace();
    }
  }
}

The above is the whole content of this article. I hope it will help you in your study, and I hope you will support us a lot.

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>