layout: post title: "RabbitMQ的一些基本概念" categories: RabbitMQ tags: 消息队列 中间件
MQ 的模型从大体上看,都是类似的,如下:
而 RabbitMQ 由于是基于 AMQP 协议的开源实现,AMQP 协议比 MQ 模型有更加详细的模型概念,如下:
如果项目需要发布消息,那么必须要链接到 RabbitMQ,而项目于 RabbitMQ之间使用 TCP 连接,加入每次发布消息都要连接TCP,这不仅会造成连接资源严重浪费,会造成服务器性能瓶颈,所以 RabbitMQ 为所有的线程只用一条 TCP 连接,怎么实现的呢?RabbitMQ 引入了信道的概念,所有需要发布消息的线程都包装成一条信道在 TCP 中传输,理论上 一条 TCP 连接支持无限多个信道,模型如下:
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表,如下:
我们向 RabbitMQ 发送消息,实际上是把消息发到交换器了,再由交换器根据相关路由规则发到特定队列上,在队列上监听的消费者就可以进行消费了,目前 RabbitMQ 共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
每个发送到 fanout 交换器中的消息,他不会去匹配路由键,直接把消息投递到所有绑定到 fanout 交换器中的队列上,它就像一个广播站一样,它会向所有收听广播的用户发送消息。对应到系统上,它允许你针对一个消息作不同操作,比如用户上传了一张新的图片,系统要同时对这个事件进行不同的操作,比如删除旧的图片缓存、增加积分奖励等等。这样就大大降低了系统之间的耦合度了。
topic 交换器有点类似于 direct 交换器,它通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。
$ yum install rabbitmq-server
$ /sbin/rabbitmq-server // 开启
$ /sbin/rabbitmq-server -detached // 守护线程开启
$ /sbin/rabbitmqctl reset // 重启节点
$ /sbin/rabbitmqctl stop // 停止
$ /sbin//rabbitmqctl stop_app // 停止应用
$ /sbin/rabbitmqctl -n rabbit@server.example.com stop // 停止特定节点
这里需要特别注意一下这两个命令的区别:由于 RabbitmMQ 是用 Erlang 写的,Erlang 有节点的概念,也就是在一个 Erlang 节点上,可以运行很多个 Erlang 应用,比如 RabbitMQ。stop 命令是使得整个 Erlang 节点停止工作,而 stop_app 则是使得当前应用停止工作,不会影响其它应用的正常运行。
$ /sbin/rabbitmqctl status
$ /sbin/rabbitmqctl list_bindings
$ /sbin/rabbitmqctl list_exchanges
$ /sbin/rabbitmqctl list_queues
$ sudo vim /etc/rabbitmq/rabbitmq.config // 配置文件位置
RabbitMQ 客户端支持多种语言,作为一个 Javaer,这里当然是要用 Java 语言啦:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
public class Producer {
public static void main(String[] argv) throws Exception {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//设置 RabbitMQ 地址
factory.setHost("localhost");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hola";
//发布消息
byte[] messageBodyBytes = "quit".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close();
conn.close();
}
}
public class Consumer {
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
final Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//声明队列
String queueName = channel.queueDeclare().getQueue();
System.out.println("声明队列名称:" + queueName);
String routingKey = "hola";
//绑定队列,通过键 hola 将队列和交换器绑定起来
channel.queueBind(queueName, exchangeName, routingKey);
while(true) {
//消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
}
}