仓库源文站点原文


layout: post title: "SpringBoot整合RabbitMQ" categories: SpringBoot RabbitMQ tags: rabbitmq springboot

author: 张乘辉

创建项目

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
@Configuration
public class RabbitMQConfig {

  public final static String QUEUE_NAME = "spring-boot-queue";
  public final static String EXCHANGE_NAME = "spring-boot-exchange";
  public final static String ROUTING_KEY = "spring-boot-key";

  // 创建队列
  @Bean
  public Queue queue() {
    return new Queue(QUEUE_NAME);
  }

  // 创建一个 topic 类型的交换器
  @Bean
  public TopicExchange exchange() {
    return new TopicExchange(EXCHANGE_NAME);
  }

  // 使用路由键(routingKey)把队列(Queue)绑定到交换器(Exchange)
  @Bean
  public Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
  }

  @Bean
  public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("xx", 5670);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
  }

  @Bean
  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    return new RabbitTemplate(connectionFactory);
  }
}

这里我们创建 ConnectionFactory 填写 HAProxy 负载均衡地址与端口 。

@RestController
public class ProducerController {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @GetMapping("/sendMessage")
  public String sendMessage() {
    new Thread(() -> {
      for (int i = 0; i < 100; i++) {
        LocalDateTime time = LocalDateTime.now();
        System.out.println("send message: " + time.toString());
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, time.toString());
      }
    }).start();
    return "ok";
  }

}

直接用 Spring 的 RabbitTemplate 模版,根据交换器和路由键,将消息路由到特定队列。

@Component
public class Consumer {

    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
    public void consumeMessage(String message) {
        System.out.println("consume message:" + message);
    }
}

使用 @RabbitListener 注解,指定需要监听的队列。

server:
  port: 5008

在 application.yml 自定义项目端口。

运行项目

启动项目,打开浏览器,请求http://localhost:5008/sendMessage,得到打印消息:

打开 RabbitMQ web 控制台,也可以看到刚才我们在代码里面配置的交换器和队列,以及绑定信息:

可以看出,我们这次创建的队列,被创建在集群的节点 2 上了,也验证了 HAProxy 的负载均衡。

GitHub 项目地址rabbitmq-tutorial