100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > java 消息中间件 mq_RabbitMQ消息中间件示例详解

java 消息中间件 mq_RabbitMQ消息中间件示例详解

时间:2021-06-16 17:58:03

相关推荐

java 消息中间件 mq_RabbitMQ消息中间件示例详解

前言

RabbitMQ 是使用 Erlang 语言开发的消息中间件, 其遵循了高级消息队列协议(Advanced Message Queuing Protocol, AMQP)。

与 Kafka 等消息队列相比,RabbitMQ 最大的优势在于其较高的可靠性:

提供确认(ACK)和重传机制保证消息完成消费, 消费者异常不会导致消息丢失

提供消息持久化机制, broker 崩溃不会导致消息丢失

集群模式下工作, 保证高可用

因为具有较高可靠性和一致性, RabbitMQ 可以胜任订单处理、秒杀等一致性要求较高的业务场景。

RabbitMQ 概念与机制

RabbitMQ 中的概念模型:

Broker: 消息中间件实例, 可能是单个节点也可能是运行在多节点集群上的逻辑实体

消息(Message): 消息由消息头和消息体两部分组成。消息头中包括routing-key、priority等标准消息头以及其它自定义消息头,用于定义RabbitMQ对消息行为。消息体是字节流,包含消息内容。

连接(Connection): 客户端与 Broker 之间的 TCP连接

信道(Channel): Channel 是建立在 TCP 连接上的逻辑(虚拟)连接。多个 Channel 复用同一个 TCP 连接, 以避免建立 TCP 连接的巨大开销。 RabbitMQ 官方要求每个线程使用独立的 Channel, 禁止多个线程共用 Channel。

生产者(Publisher): 发送消息的客户端线程

消费者(Consumer): 处理消息的客户端线程

交换机(Exchange): 交换机负责将消息投递到相应的队列

队列(Queue): 接收并保存交换机投递的消息,直至被消费者成功消费。逻辑结构遵循先进先出FIFO。

绑定(Binding): 将队列(Queue)注册到交换机(Exchange)的路由表

虚拟主机(Vhost): 每个Broker下可建立多个vhost, 每个 vhost 可建立独立的 Exchange、Queue、绑定及权限系统。同一个 Broker 下的 vhost 共享 Connection、Channel 和 用户系统,就是说可以使用同一个用户身份使用同一个 Channel 访问不同 vhost。

交换机(Exchange)

生产者发送的消息会首先送到交换机(Exchange), 交换机根据自身类型和消息的 routing-key 等信息将消息投递到绑定的消息队列中。

RabbitMQ中的四种标准交换机:

direct: 如果消息的 routing-key 与队列的 binding-key 完全相同,direct类型的交换机则会将消息投递到该队列中。

多个队列可以使用相同的 binding-key 绑定到同一个 direct 交换机,direct 交换机会把消息投递到所有 binding-key 与消息 routing-key 相同的队列

topic: 允许队列的 binding-key 中包含通配符*和#, topic 交换机会将消息投递到 binding-key 与 routing-key 匹配的队列中。

通配符按照关键字进行匹配,如.a中的关键字是news、cn和a,即关键字按照.分割

#通配符匹配0个或多个关键字, news.#.a可以匹配news.a, .a和.a等

*通配符匹配一个关键字, news.*.a匹配.a不匹配news.a、.a

fanout: fanout 交换机不进行任何匹配, 将消息投递到所有绑定的队列

header: header 交换机根据消息头进行投递,现在已较少使用

我们可以使用 RabbitMQ 的插件机制使用第三方交换机或自行开发交换机。如实现延时投递的delayed-message-exchange。

消息头中的delivery-mode可以设置为 persistent(持久化) 或者 transient(易失)。 Exchange 和 Queue 在处理持久化的消息时都会先将消息写入磁盘中再进行下一步处理, 即使 RabbitMQ 崩溃也不会丢失。

消费者客户端通常使用的channel.basicConsume使用推(push)模式投递消息, 即当有新消息时 Broker 通过 channel 主动向客户端发送消息。客户端也可以使用channel.basicGet从 Broker 拉取消息。

ACK机制

RabbitMQ 提供了确认送达(acknowledge)机制保证消息被正确处理不会丢失。

确认送达的回执有三种:

ACK: 消息已被成功处理

NACK: 消息处理异常, 需要重新投递

REJECT: 消息非法, 丢弃消息

RabbitMQ 的 Queue 可以设置 no_ack=true, 则消息被投递后即删除不等待回执。

channel.basicConsume 可以指定auto_ack模式,若auto_ack=true当客户端收到完整消息后即会自动发出ACK回执,否则必须显式的发出回执。

Java 代码示例

首先安装并启动RabbitMQ实例, Mac用户可以使用 Homebrew 进行安装:

brew install rabbitmq

启动服务:

brew services start rabbitmq

或者使用官方docker镜像:

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management

RabbitMQ官网提供了Ubuntu、RPM以及Windows等多种平台安装方式。

RabbitMQ默认TCP端口为5672, Web控制台默认端口15672。

在Maven中添加依赖:

com.rabbitmq

amqp-client

5.5.1

编写生产者:

package rabbit;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

/**

* @author finley

*/

public class RabbitProducer {

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("guest");

factory.setPassword("guest");

factory.setHost("localhost");

try (Connection conn = factory.newConnection();

Channel channel = conn.createChannel()) {

String exchangeName = "test-exchange";

channel.exchangeDeclare(exchangeName, "direct", true);

String routingKey = "hello";

byte[] msg = "hello world".getBytes();

AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();

propsBuilder.deliveryMode(2); // persistent

propsBuilder.priority(0); // normal

propsBuilder.contentType("text/plain");

channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg);

}

}

}

编写消费者:

package rabbit;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

/**

* @author finley

*/

public class RabbitConsumer {

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("guest");

factory.setPassword("guest");

factory.setHost("localhost");

try (Connection conn = factory.newConnection();

Channel channel = conn.createChannel()) {

String exchangeName = "test-exchange";

channel.exchangeDeclare(exchangeName, "direct", true);

String queueName = channel.queueDeclare().getQueue();

String bindingKey = "hello";

channel.queueBind(queueName, exchangeName, bindingKey);

while(true) {

channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String routingKey = envelope.getRoutingKey();

String contentType = properties.getContentType();

String bodyStr = new String(body, "UTF-8");

System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr);

long deliveryTag = envelope.getDeliveryTag();

channel.basicAck(deliveryTag, false);

}

});

}

}

}

}

RabbitMQ 的消息为字节, 可以将 Java 对象序列化后作为消息体发送。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。