RabbitMQ理解和使用

## JMS—Java消息服务

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

RabbitMQ客户端、ActiveMQ、Kafka、RocketMQ等都有使用JMS API实现

异步:订阅者或消费者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

同步:订阅者或消费者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞

消息模型

点对点

每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

发布者/订阅者(Pub/Sub)

多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

消息队列

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?

以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。

以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。

  • 秒杀业务:把扣减库存服务和生成订单服务拆分
  • 用户注册后发送邮件和短信,无需等待发送,只需提交给RabbitMQ队列异步处理

RabbitMQ—基于AMQP的消息队列

使用ErLang编写,使用JMS API集成,但是RabbitMQ路由过程除了消息、发送者、接受者还有交换机绑定者的角色,决定发送到那个队列

消息中间件:分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。

  • 异步处理
  • 应用解耦(订单服务和库存服务)
  • 流量削峰
  • 不需要及时得到结果的消息放到MQ中
  1. Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  2. Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  3. Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  4. Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
  5. Broker 表示消息队列服务器实体。

Exchange类型

Direct

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式.

Fanout

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。

Topic

将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。#匹配0个或多个单词,*匹配不多不少一个单词。

常用命令行操作

  • sudo rabbitmq-server 启动RabbitMQ

  • systemctl status rabbitmq-server.service 查看RabbitMQ状态

  • sudo systemctl restart rabbitmq-server 重启RabbitMQ

  • sudo rabbitmq-plugins enable rabbitmq_management 启用 RabbitMQ web 管理插件 可以在 15672端口进入RabbitMQ后台

  • sudo rabbitmqctl stop 停止RabbitMQ(关闭整个节点)

  • sudo rabbitmqctl stop_app 只关闭应用程序不关闭节点

  • sudo rabbitmqctl start_app启动应用程序

  • sudo rabbitmqctl list_queues 查看RabbitMQ中的队列

  • sudo rabbitmqctl list_exchanges查看交换器

  • sudo rabbitmqctl list_bindings查看绑定

  • sudo rabbitmqctl reset 重置RabbitMQ节点(清空队列),但是在使用此命令前,要先关闭应用,否则不能清除

  • sudo rabbitmqctl add_user dzou 1234 添加用户

rabbitmq-client

  • maven依赖
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.0</version>
        </dependency>
  • 发送者
public class P {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println("P [x] Sent '" + message + "'");

    channel.close();
    connection.close();
  }
}
  • 接收者
public class C {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("C [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("C [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

AmqpTemplate

Maven依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml配置
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 手动应答
        concurrency: 5 # 消费端最小并发数
        max-concurrency: 10 # 消费端最大并发数
        prefetch: 5 # 一次请求中预处理的消息数量
    cache:
      channel:
        size: 50 # 缓存的channel数量
### 自定义配置 可不需要
mq:
  defaultExchange: amqpExchange # 默认交换器
  queue: queue # 队列名
  routeKey: queue_key # 路由key
配置MQ队列
@Configuration
public class MQConfig {

    public static final String SECKILL_QUEUE = "seckill.queue";

    @Bean
    public Queue secKillQueue(){
        return new Queue(SECKILL_QUEUE);
    }
}
发送者
@Service
@Slf4j
public class MQSender {

    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    RedisService redisService;

    public void sendSeckillInfo(SecKillMessage secKillMessage) {
        String msg = redisService.beanToString(secKillMessage);
        log.info("send message :{}",msg);
        amqpTemplate.convertAndSend(MQConfig.SECKILL_QUEUE,msg);
    }

    public void send(Object message){
        String msg = redisService.beanToString(message);
        log.info("send message :{}",msg);
        amqpTemplate.convertAndSend(MQConfig.QUEUE_NAME,msg);
    }
    //以Topic模式发送
    public void sendTopic(Object message){
        String msg = redisService.beanToString(message);
        log.info("send message :{}",msg);
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE_NAME,MQConfig.ROUTING_KEY1_NAME,msg);
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE_NAME,MQConfig.ROUTING_KEY_NAME,msg+"17231");
    }
    //以fanout模式发送
    public void sendFanout(Object message) {
        String msg = redisService.beanToString(message);
        log.info("send fanout message:"+msg);
        amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE_NAME, "", msg);
    }
    //以Header模式发送
    public void sendHeader(Object message) throws UnsupportedEncodingException {
        String msg = redisService.beanToString(message);
        log.info("send header message:" + msg);
        MessageProperties properties = new MessageProperties();
        properties.setHeader("header1", "value1");
        properties.setHeader("header2", "value2");
        Message obj = new Message(msg.getBytes("UTF-8"), properties);
        amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE_NAME, "", obj);
    }
}
接收者
@Slf4j
@Service
@RabbitListener(queues = MQConfig.SECKILL_QUEUE)
public class MQReceive {

    @Autowired
    private RedisService redisService;
    @Autowired
    private SecKillService secKillService;

    @RabbitHandler
    public void receiveMsg(String message){
        log.info("receive msg:{}",message);
        SecKillMessage msg = redisService.stringToBean(message,SecKillMessage.class);
            //处理相应的业务逻辑
    }
    //测试Topic模式
    @RabbitListener(queues = MQConfig.TOPIC_QUEUE1_NAME)
    public void receiveMsg(String message){
        log.info("receive msg:{}",message);
    }
    @RabbitListener(queues = MQConfig.TOPIC_QUEUE2_NAME)
    public void receiveMsg2(String message){
        log.info("receive msg:{}",message);
    }
    //测试Headers模式
    @RabbitListener(queues=MQConfig.HEADERS_QUEUE2_NAME)
    public void receiveHeaderQueue(byte[] message) throws UnsupportedEncodingException {
        log.info(" header  queue message:"+new String(message,"UTF-8"));
    }
}
String和Bean互转

使用alibaba的fastJson

//bean转为字符串
    public  <T> String beanToString(T value) {
        if (value == null) {
            return null;
        }
        Class<?> clazz = value.getClass();
        if (clazz == int.class || clazz == Integer.class) {
            return "" + value;
        } else if (clazz == String.class) {
            return (String) value;
        } else if (clazz == long.class || clazz == Long.class) {
            return "" + value;
        } else {
            return JSON.toJSONString(value);
        }
    }

    //字符串转为bean
    @SuppressWarnings("unchecked")
    public  <T> T stringToBean(String str, Class<T> clazz) {
        if (str == null || str.length() <= 0 || clazz == null) {
            return null;
        }
        if (clazz == int.class || clazz == Integer.class) {
            return (T) Integer.valueOf(str);
        } else if (clazz == String.class) {
            return (T) str;
        } else if (clazz == long.class || clazz == Long.class) {
            return (T) Long.valueOf(str);
        } else {
            return JSON.parseObject(str, clazz);
        }
    }

RabbitMQ支持集群分发消息

工作队列

将任务封装为消息并发给队列。在后台运行的工作者(consumer)将其取出,然后最终执行。当你运行多个工作者(consumer),队列中的任务被工作进行共享执行。

轮询分发

一个生产者生产消息到队列,多个消费者从队列依次轮询消费消息。这里的轮询机制是你一个,我一个轮询分发机制,比如我有50个消息,无论C1、C2的消费处理能力如何,最后他们都会按照你一个我一个的方式都拿到25个消息消费,这就是所谓的轮询,轮着来的意思。

消息应答和持久化—针对消费者

消息应答—ACK

默认情况下如果一个 Message 被消费者所正确接收则会被从 Queue 中移除

如果一个 Queue 没被任何消费者订阅,那么这个 Queue 中的消息会被 Cache(缓存),当有消费者订阅时则会立即发送,当 Message 被消费者正确接收时,就会被从 Queue 中移除

Message acknowledgment(消息应答):当消息被消费者成功接收到时,就会告诉RabbitMQ你可以从队列删除该消息,我已经接收到了

autoAck = true;//自动确认模式,当消息被接收,就从内存中删除

这种情况下,杀死消费者就会导致消息丢失。

autoAck = false;//手动确认模式,如果有消息挂了,就把该消息交给其他消费者

这种情况下,消费者挂了不会丢失消息,但是如果服务器宕机RabbitMQ挂了,消息也会丢失。

所以我们需要使用其他机制把消息存储起来。

持久化—durable

  • 声明好的队列无法更改持久化

默认队列和消息都是放在内存中的,当RabbitMQ退出或者崩溃,将会丢失队列和消息。为了保证即使RabbitMQ崩溃也不会丢失消息,我们必须把“队列”和“消息”设为持久化,当队列和消息持久化以后即使RabbitMQ崩溃,消息还存在磁盘中,当RabbitMQ再次启动的时候,队列和消息仍然还在。

  • durable设置为true
// 队列持久化
boolean durable = true;  
channel.queueDeclare("hello", durable, false, false, null); 

// 消息持久化 方式一
channel.basicPublish("", "key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

// 消息持久化 方式二
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2);  // 设置消息是否持久化,1: 非持久化 2:持久化
channel.basicPublish("", "key", properties.build(), message.getBytes("UTF-8"));

但是消息任然可以没有保存到内存中,如果要完全100%保证写入RabbitMQ的数据必须落地磁盘,不会丢失,需要依靠其他的机制。

消息确认机制—针对生产者

当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?

如果到不了,那么持久化将无法使用,为此,我们还需要对生产者发送消息建立确认机制

RabbitMQ提供两种消息确认机制:

  • 事务:通过AMQP事务机制实现
  • confirm:通过将channel设置成confirm模式来实现

事务

RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。


channel.txSelect();//开启事务模式
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
channel.txCommit();

只有消息成功被服务器接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发,但是使用事务机制的话会降低RabbitMQ的性能,下面我们还用一种更高效的方法确认消息的提交:confirm

confirm

消息的确认是指生产者投递消息后,如果 Broker 代理服务器接收到消息,则会给生产者一个应答(根据一个指定的唯一id确认消息)。生产者进行接收应答,用来确认这条消息是否正常的发送到 Broker,这种方式也是消息可靠性投递的核心保障

  • 设置为confirm模式
channel.confirmSelect();
confirm模式有三种:
  • 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm。
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if(!channel.waitForConfirms()){
    System.out.println("send message failed.");
}
  • 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
channel.confirmSelect();
for(int i=0;i<batchCount;i++){
    channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){
    System.out.println("send message failed.");
}

其中一个消息返回false需要消息全部重发

  • 异步confirm模式:提供一个回调方法(监听器),服务端confirm了一条或者多条消息后Client端会回调这个方法。
 channel.confirmSelect();
        channel.addConfirmListener(new ConfirmListener() {
            //发送成功
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                //成功进入这,根据业务实现
            }
            //发送失败
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
                //失败进入这,根据业务实现
            }
        });

        while (true) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
            confirmSet.add(nextSeqNo);
        }

参考博文:

https://juejin.im/post/5a67f7836fb9a01cb74e8931

https://segmentfault.com/a/1190000017130224

安装参考:

https://blog.csdn.net/nextyu/article/details/79250174#commentBox

  • 本文作者: dzou | 微信:17856530567
  • 本文链接: http://www.dzou.top/post/rabbitmq.html
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
  • 并保留本声明和上方二维码。感谢您的阅读和支持!