消息队列工具 ActiveMQ

小兔叽 可爱的博主

时间: 2020-10-14 阅读: 88 字数:9255

{}
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。

消息队列工具 ActiveMQ

1 、简介

  • 同类产品: RabbitMQ 、 Kafka、Redis(List)

1.1 对比RabbitMQ

  • 最接近的同类型产品,经常拿来比较,性能伯仲之间,基本上可以互相替代。最主要区别是二者的协议不同RabbitMQ的协议是AMQP(Advanced Message Queueing Protoco),而ActiveMQ使用的是JMS(Java Messaging Service )协议。顾名思义JMS是针对Java体系的传输协议,队列两端必须有JVM,所以如果开发环境都是java的话推荐使用ActiveMQ,可以用Java的一些对象进行传递比如Map、BLob、Stream等。而AMQP通用行较强,非java环境经常使用,传输内容就是标准字符串。 另外一点就是RabbitMQ用Erlang开发,安装前要装Erlang环境,比较麻烦。ActiveMQ解压即可用不用任何安装。

1.2 对比KafKa

  • Kafka性能超过ActiveMQ等传统MQ工具,集群扩展性好。 弊端是: 在传输过程中可能会出现消息重复的情况, 不保证发送顺序 一些传统MQ的功能没有,比如消息的事务功能。 所以通常用Kafka处理大数据日志。

1.3 对比Redis

  • 其实Redis本身利用List可以实现消息队列的功能,但是功能很少,而且队列体积较大时性能会急剧下降。对于数据量不大、业务简单的场景可以使用。

2 安装 ActiveMQ

拷贝apache-activemq-5.14.4-bin.tar.gz到Linux服务器的/opt下

解压缩

tar -zxvf apache-activemq-5.14.4-bin.tar.gz

重命名

mv  apache-activemq-5.14.4  activemq

更改配置文件

  vim /opt/activemq/bin/activemq  

增加两行

JAVA_HOME="/opt/jdk1.8.0_152"
JAVA_CMD="/opt/jdk1.8.0_152/bin"

图片的描述

注册服务

ln -s  /opt/activemq/bin/activemq  /etc/init.d/activemq
chkconfig --add activemq

启动服务

service activemq start 

图片的描述

关闭服务

service activemq stop

通过netstat 查看端口

图片的描述

activemq两个重要的端口,一个是提供消息队列的默认端口:61616 另一个是控制台端口8161

通过控制台测试 启动消费端

图片的描述

进入网页控制台

图片的描述

账号/密码默认: admin/admin

3 在Java中使用消息队列

3.1 在gmall-service-util中导入依赖坐标

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
   <version>5.15.2</version>
   <exclusions>
      <exclusion>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
   </exclusions>
</dependency>

3.2 producer端

public static void main(String[] args) {

    ConnectionFactory connect = new ActiveMQConnectionFactory("tcp://192.168.67.163:61616");
    try {
        Connection connection = connect.createConnection();
        connection.start();
        //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
        Queue testqueue = session.createQueue("TEST1");

        MessageProducer producer = session.createProducer(testqueue);
        TextMessage textMessage=new ActiveMQTextMessage();
        textMessage.setText("今天天气真好!");
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        producer.send(textMessage);
        session.commit();
        connection.close();

    } catch (JMSException e) {
        e.printStackTrace();
    }

}

3.3 consumer端

public static void main(String[] args) {
    ConnectionFactory connect = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD,"tcp://192.168.67.163:61616");
    try {
        Connection connection = connect.createConnection();
        connection.start();
        //第一个值表示是否使用事务,如果选择true,第二个值相当于选择0
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination testqueue = session.createQueue("TEST1");

        MessageConsumer consumer = session.createConsumer(testqueue);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    try {
                        String text = ((TextMessage) message).getText();
                        System.out.println(text);

                        //session.rollback();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        });


    }catch (Exception e){
        e.printStackTrace();;
    }
}

3.4 关于事务控制

producer****提交时的事务 事务开启 *只执行***send并不会提交到队列中,只有当执行session.commit()**时,消息才被真正的提交到队列中。
事务不开启 只要执行send,就进入到队列中。
consumer 接收时的事务 事务开启,签收必须写 Session.SESSION_TRANSACTED *收到消息后,消息并没有真正的被消费。消息只是被锁住。一旦出现该线程死掉、抛异常,或者程序执行了***session.rollback()**那么消息会释放,重新回到队列中被别的消费端再次消费。
事务不开启,签收方式选择 Session.AUTO_ACKNOWLEDGE 只要调用comsumer.receive方法 ,自动确认。
事务不开启,签收方式选择 Session.CLIENT_ACKNOWLEDGE 需要客户端执行 message.acknowledge(),否则视为未提交状态,线程结束后,其他线程还可以接收到。 这种方式跟事务模式很像,区别是不能手动回滚,而且可以单独确认某个消息。
事务不开启,签收方式选择 Session.DUPS_OK_ACKNOWLEDGE 在Topic模式下做批量签收时用的,可以提高性能。但是某些情况消息可能会被重复提交,使用这种模式的consumer要可以处理重复提交的问题。

3.5 持久化与非持久化

通过producer.setDeliveryMode(DeliveryMode.PERSISTENT) 进行设置 持久化的好处就是当activemq宕机的话,消息队列中的消息不会丢失。非持久化会丢失。但是会消耗一定的性能。

四 与springboot整合

1 配置类ActiveMQConfig

@Configuration
public class ActiveMQConfig {

    @Value("${spring.activemq.broker-url:disabled}")
    String brokerURL ;

    @Value("${activemq.listener.enable:disabled}")
    String listenerEnable;

    @Bean
    public    ActiveMQUtil   getActiveMQUtil() throws JMSException {
        if(brokerURL.equals("disabled")){
            return null;
        }
        ActiveMQUtil activeMQUtil=new ActiveMQUtil();
        activeMQUtil.init(brokerURL);
        return  activeMQUtil;
    }

    //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        if(!listenerEnable.equals("true")){
            return null;
        }

        factory.setConnectionFactory(activeMQConnectionFactory);
        //设置并发数
        factory.setConcurrency("5");

        //重连间隔时间
       factory.setRecoveryInterval(5000L);
       factory.setSessionTransacted(false);
       factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

        return factory;
    }


    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory ( ){
/*        if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){
            url=brokerURL;
        }*/
        ActiveMQConnectionFactory activeMQConnectionFactory =
                new ActiveMQConnectionFactory(  brokerURL);
        return activeMQConnectionFactory;
    }

}

2 工具类ActiveMQUtil

public class ActiveMQUtil {
    PooledConnectionFactory pooledConnectionFactory=null;
    public ConnectionFactory init(String brokerUrl) {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
         //加入连接池
        pooledConnectionFactory=new PooledConnectionFactory(factory);
        //出现异常时重新连接
        pooledConnectionFactory.setReconnectOnException(true);
        //
        pooledConnectionFactory.setMaxConnections(5);
        pooledConnectionFactory.setExpiryTimeout(10000);
        return pooledConnectionFactory;
    }

    public ConnectionFactory getConnectionFactory(){
        return pooledConnectionFactory;
    }
}
本文章网址:https://www.sjxi.cn/detil/5bf30cd1a4b24a78a64d6f4d1d71bb48

最新评论

当前未登陆哦
登陆后才可评论哦

湘ICP备2021009447号

×

(穷逼博主)在线接单

QQ: 1164453243

邮箱: abcdsjx@126.com

前端项目代做
前后端分离
Python 爬虫脚本
Java 后台开发
各种脚本编写
服务器搭建
个人博客搭建
Web 应用开发
Chrome 插件编写
Bug 修复