1. 说明

本文介绍以下,在平台中如何使用消息队列收发消息。在平台中集成了spring cloud stream 来整合不同的消息队列来收发消息,他目前支持主流的消息队列,比如 KAFKA,ROCKETMQ,ROCKETMQ ,他的收发消息时接口统一的,因此,只需要调整配置就可以完成消息队列的切换。

2. 实现步骤

2.1 定义收发接口

在平台的 jpaas-share 项目中定义了一个InputOutput 的类,这个类就时负责消息收发的接口。

这个接口类目前定义了 4类的消息队列。

消息 说明
input,output 负责MessageModel类型的消息收发
mailOutput,mailInput 负责邮件的消息收发
eventOutput,eventInput 流程事件收发
fileOutput,fileInput 文件转换的消息收发

如果需要增加其他的消息收发,可以在这个类中添加。

让后在 application.properties 中做添加。

  1. ## 附件转换消息
  2. #spring.cloud.stream.bindings.fileOutput.destination=file
  3. #spring.cloud.stream.bindings.fileOutput.group= file-group
  4. #spring.cloud.stream.bindings.fileOutput.binder = local_rabbit
  5. #spring.cloud.stream.bindings.fileInput.destination= file
  6. #spring.cloud.stream.bindings.fileInput.group= file-group-consumer
  7. #spring.cloud.stream.bindings.fileInput.binder = local_rabbit
  8. # ROCKETMQ系统消息
  9. spring.cloud.stream.bindings.output.destination=bpmmessage
  10. spring.cloud.stream.bindings.output.group= bpmmessage-group
  11. spring.cloud.stream.bindings.input.destination= bpmmessage
  12. spring.cloud.stream.bindings.input.group= bpmmessage-group-consumer

2.2 使用接口

比如BPM项目,需要增加注解 使用上面定义的接口。

  1. @EnableBinding({InputOutput.class})
  2. public class JPaasBpmApplication {

2.3 发送消息

  1. //注入消息收发接口
  2. @Autowired(required = false)
  3. InputOutput inputOutput;
  4. //发送消息,注意 output 是发送 messagemodel 消息
  5. inputOutput.output().send(MessageBuilder.withPayload(messageModel).build());

2.4 接收消息

  1. @StreamListener(InputOutput.INPUT)
  2. public void handMessageModel(MessageModel messageModel) {
  3. }
文档更新时间: 2021-07-20 16:51   作者:zyg