Springboot整合RabbitMQ(二)——Fanout扇形交换机

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

1、Maven依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.4.RELEASE</version>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

2、配置文件

server:
  port: 30200

spring:
  rabbitmq:
    host: 148.70.153.63
    port: 5672
    username: libai
    password: password

3、生产者

3.1、配置文件

声明队列和交换机。

@Configuration
public class FanoutRabbitConfig {
    @Bean
    public Queue queueA() {
        return new Queue("fanoutA", true, false, false);
    }

    @Bean
    public Queue queueB() {
        return new Queue("fanoutB", true, false, false);
    }

    @Bean
    public Queue queueC() {
        return new Queue("fanoutC", true, false, false);
    }

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange", true, false);
    }

    @Bean
    public Binding bindingExchangeA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingExchangeC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}

创建三个队列 :fanoutAfanoutBfanoutC,将三个队列都绑定在交换机fanoutExchange上。

因为是扇型交换机, 路由键无需配置,配置也不起作用。

3.2、发送消息

@RestController
public class SendMessageController {
    @Autowired
    private RabbitTemplate rabbitTemplate;  // 使用RabbitTemplate,这提供了接收/发送等等方法

    @PostMapping("/sendFanoutMessage")
    public String sendFanoutMessage() {
        Map<String, Object> map = new HashMap<>();
        map.put("messageId", String.valueOf(UUID.randomUUID()));
        map.put("messageData", "testFanoutMessage");
        map.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        rabbitTemplate.convertAndSend("fanoutExchange", null, map);
        return "ok";
    }
}

发送到指定的交换机上,因为是扇形交换机,所以会把消息广播到所有和该交换机绑定的队列上。

启动服务,用postman调用发送消息接口。

3.3、查看RabbitMQ的后台管理界面

可以看到已经有3条消息推送到队列中,等待被消费。
coding-Springboot整合RabbitMQ(二)——Fanout扇形交换机-

3.4、查看交换机

可以看到和3个队列绑定,每发到该交换机上的一条消息都会被广播到这3个队列上。
coding-Springboot整合RabbitMQ(二)——Fanout扇形交换机-

4、消费者

通过注解@RabbitListener指定要消费的队列。

@Component
@RabbitListener(queues = "fanoutA")
@Slf4j
public class FanoutReceiverA {
    @RabbitHandler
    public void process(Map testMessage) {
        log.info("FanoutReceiverA消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
    }
}
@Component
@RabbitListener(queues = "fanoutB")
@Slf4j
public class FanoutReceiverB {
    @RabbitHandler
    public void process(Map testMessage) {
        log.info("FanoutReceiverB消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
    }
}
@Component
@RabbitListener(queues = "fanoutC")
@Slf4j
public class FanoutReceiverC {
    @RabbitHandler
    public void process(Map testMessage) {
        log.info("FanoutReceiverC消费者收到消息:{}", JSONUtil.toJsonPrettyStr(testMessage));
    }
}

重新启动服务,可以看到控制台打印输出,说明该条消息已经被3个消费者消费成功了。

2020-09-13 21:47:56,509 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] [com.zxb.rabbitmq.consumer.fanout.FanoutReceiverA:25] [] FanoutReceiverA消费者收到消息:{
    "createTime": "2020-09-13 21:38:57",
    "messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
    "messageData": "testFanoutMessage"
}
2020-09-13 21:47:56,678 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#2-1] [com.zxb.rabbitmq.consumer.fanout.FanoutReceiverB:25] [] FanoutReceiverB消费者收到消息:{
    "createTime": "2020-09-13 21:38:57",
    "messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
    "messageData": "testFanoutMessage"
}
2020-09-13 21:47:56,885 [INFO] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] [com.zxb.rabbitmq.consumer.fanout.FanoutReceiverC:25] [] FanoutReceiverC消费者收到消息:{
    "createTime": "2020-09-13 21:38:57",
    "messageId": "765c1218-6c4c-4a72-af2c-6f386ff8e0d2",
    "messageData": "testFanoutMessage"
}

参考资料

Springboot 整合RabbitMq ,用心看完这一篇就够了

代码地址

github:https://github.com/senlinmu1008/spring-boot/tree/master/rabbitmqgitee:https://gitee.com/ppbin/spring-boot/tree/master/rabbitmq

个人网站

Gitee PagesGithub Pages

匿名

发表评论

匿名网友