博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JAVA 多用户商城系统b2b2c-Spring Cloud Stream 介绍
阅读量:7234 次
发布时间:2019-06-29

本文共 3033 字,大约阅读时间需要 10 分钟。

介绍Spring Cloud Stream

电子商务平台源码请加企鹅求求:一零三八七七四六二六。 Spring Cloud Stream是构建消息驱动的微服务应用程序的框架。Spring Cloud Stream基于Spring Boot建立独立的生产级Spring应用程序,并使用Spring Integration提供与消息代理的连接。它提供了来自几家供应商的中间件的意见配置,介绍了持久发布订阅语义,消费者组和分区的概念。

您可以将@EnableBinding注释添加到应用程序,以便立即连接到消息代理,并且可以将@StreamListener添加到方法中,以使其接收流处理的事件。以下是接收外部消息的简单接收器应用程序。

@SpringBootApplication@EnableBinding(Sink.class)public class VoteRecordingSinkApplication {  public static void main(String[] args) {    SpringApplication.run(VoteRecordingSinkApplication.class, args);  }  @StreamListener(Sink.INPUT)  public void processVote(Vote vote) {      votingService.recordVote(vote);  }}复制代码

@EnableBinding注释需要一个或多个接口作为参数(在这种情况下,该参数是单个Sink接口)。接口声明输入和/或输出通道。Spring Cloud Stream提供了接口Source,Sink和Processor; 您还可以定义自己的界面。

以下是Sink接口的定义:

public interface Sink {  String INPUT = "input";  @Input(Sink.INPUT)  SubscribableChannel input();}复制代码

@Input注释标识输入通道,通过该输入通道接收到的消息进入应用程序; @Output注释标识输出通道,发布的消息将通过该通道离开应用程序。@Input和@Output注释可以使用频道名称作为参数; 如果未提供名称,将使用注释方法的名称。

Spring Cloud Stream将为您创建一个界面的实现。您可以在应用程序中通过自动连接来使用它,如下面的测试用例示例。

@RunWith(SpringJUnit4ClassRunner.class)@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)@WebAppConfiguration@DirtiesContextpublic class StreamApplicationTests {  @Autowired  private Sink sink;  @Test  public void contextLoads() {    assertNotNull(this.sink.input());  }}复制代码

编程模型

Binder

Binder 是 Spring Cloud Stream 的一个抽象概念,是应用与消息中间件之间的粘合剂。 目前 Spring Cloud Stream 实现了 Kafka 和 Rabbit MQ 的binder。通过 binder ,可以很方便的连接中间件,可以动态的改变消息的destinations(对应于 Kafka 的topic,Rabbit MQ 的 exchanges),这些都可以通过外部配置项来做到。甚至可以任意的改变中间件的类型而不需要修改一行代码。

Publish-Subscribe

消息的发布(Publish)和订阅(Subscribe)是事件驱动的经典模式。Spring Cloud Stream 的数据交互也是基于这个思想。生产者把消息通过某个 topic 广播出去(Spring Cloud Stream 中的 destinations)。其他的微服务,通过订阅特定 topic 来获取广播出来的消息来触发业务的进行。

这种模式,极大的降低了生产者与消费者之间的耦合。即使有新的应用的引入,也不需要破坏当前系统的整体结构。

Consumer Groups

“Group”, Kafka 中的概念。Spring Cloud Stream 的这个分组概念的意思基本和 Kafka 一致。

微服务中动态的缩放同一个应用的数量以此来达到更高的处理能力是非常必须的。对于这种情况,同一个事件防止被重复消费,只要把这些应用放置于同一个 “group” 中,就能够保证消息只会被其中一个应用消费一次。

Message

Message,就是所说的消息体,用来承载传输的信息用的。Message分为两部分,header和payload。header是头部信息,用来存储传输的一些特性属性参数。payload是用来装载数据的,他可以携带的任何Object对象  不同的对象在binder中传输 可以指定不同的mini类型 具体参考

可以通过application.yml中设置 输入input和输出output的mini类型

spring.cloud.stream.bindings..content-type

MessageChannel

消息管道,生产者生产一个消息到channel,消费者从channel消费一个消息,所以channel可以对消息组件解耦,并且提供一个方便的拦截功能和监控功能。  默认的通道

输入(SubscribableChannel)和输出通道(MessageChannel)参考 Processor接口

springcloudstream提供通道的定义 比如自定义通过可以使用接口

public interface OrderChannel {   String INPUT = "input_order";   String OUTPUT="ouput_order";   /**    * input注解制定通道的名称  将来在yml中配置该通道的实际绑定的topic或者订阅组    * @return    */   @Input(INPUT)   SubscribableChannel orderInput();   /**    * output注解指定输出通道的名称    * @return    */   @Output(OUTPUT)   MessageChannel orderOutput();}复制代码

以下 代码参考 Source Sink Processor接口 将来在yml关于该通道的配置既可以

spring:     cloud:         stream:             bindings:                 通道名称:                     destination: mydest复制代码

转载于:https://juejin.im/post/5cb3e148e51d456e2907f216

你可能感兴趣的文章
Node入门教程(10)第八章:Node 的事件处理
查看>>
html5 css3构造的漂亮表格
查看>>
m2014_c->c语言容器类工具列
查看>>
spider-抓取网页内容
查看>>
在Ubuntu下安装和配置Rails 3详解 (LightTPD + FastCGI)
查看>>
DRBD试用手记
查看>>
argparse – Command line option and argument parsing.¶
查看>>
UML 图使用心得
查看>>
《肖申克的救赎》- 阅后小记
查看>>
Zookeeper系列五:Master选举、ZK高级特性:基本模型
查看>>
关于 DataRow 中为 DataRowState.Deleted 状态的 字段列值取值方法
查看>>
nginx配置解决vue单页面打包文件大,首次加载慢的问题
查看>>
win7方面API學習
查看>>
mongodb 安装
查看>>
BATJ等公司必问的8道Java经典面试题,你都会了吗?
查看>>
开学季学生宿舍竟然限电,学校管理因噎废食?
查看>>
奇点汽车回应欠薪3月传闻:多轮融资顺利 不存在资金问题
查看>>
孕妇高速上产女 交警医生合力架起生命绿色通道
查看>>
西藏尼阿底遗址项目获“2018年中国考古新发现”入围奖
查看>>
火箭队再遭伤病打击 曝中锋卡佩拉至少缺阵1月
查看>>