千家信息网

spring cloud stream和kafka的原理及作用是什么

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容主要讲解"spring cloud stream和kafka的原理及作用是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"spring clou
千家信息网最后更新 2025年01月23日spring cloud stream和kafka的原理及作用是什么

本篇内容主要讲解"spring cloud stream和kafka的原理及作用是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"spring cloud stream和kafka的原理及作用是什么"吧!

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.

野生翻译:spring cloud stream是打算统一消息中间件后宫的男人,他身手灵活,身后有靠山spring,会使十八般武器(消息订阅模式啦,消费者组,stateful partitions什么的),目前后宫有东宫娘娘kafka和西宫娘娘rabbitMQ。

八卦党:今天我们扒一扒spring cloud stream和kafka的关系,rabbitMQ就让她在冷宫里面呆着吧。

1、先出场的正宫娘娘:kafka

Apache Kafka® is a distributed streaming platform. What exactly does that mean?

A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.

  • Store streams of records in a fault-tolerant durable way.

  • Process streams of records as they occur.

野生翻译:老娘是个流处理平台,能干的活可多了:

  • 能处理发布/订阅消息

  • 用很稳的方式保存消息

  • 一来就处理,真的很快

总结一句话,就是快、稳、准。

kafka的运行非常简单,从这里下载,然后先运行zookeeper。在最新的kafka的下载包里面也包含了一个zookeeper,可以直接用里面的。zookeeper启动后,需要在kafka的配置文件里面配置好zookeeper的ip和端口,配置文件是config/server.properties。

############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).# This is a comma separated host:port pairs, each corresponding to a zk# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an optional chroot string to the urls to specify the# root directory for all kafka znodes.zookeeper.connect=localhost:2181# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000

然后运行bin目录下的命令,启动kafka就可以啦

bin/kafka-server-start.sh -daemon config/server.properties

2、kafka的贴身总管,kafka-manager

kafka虽然启动了,但我们需要了解她的话,还是需要一个总管来汇报情况,我这边用的就是kafka-manager,下载地址在这里。很可惜的是只有源代码的下载,没有可运行版本的,需要自行编译,这个编译速度还挺慢的,我这边提供一个编译好的版本给大家,点这里。

kafka-manager同样需要配置一下和kafka的关系,在conf/application.conf文件里面,不过配置的不是kafka自己,而是kafka挂载的zookeeper。

kafka-manager.zkhosts="localhost:2181"

然后启动bin/kafka-manager就可以了(windows环境下也有kafka-manager.bat可以运行)

这里有个坑,在windows下面运行的话,可能启动失败,提示输入行太长

这个是因为目录太长,把kafak-manager-2.0.0.2目录名缩短就可以正常运行了。

启动后通过Add Cluster把Cluster Zookeeper Host把zookeeper的地址端口填上,Kafka Version的版本一定要和正在使用的kafka版本对上,否则可能看不到kafka的内容。

然后我们就能看到kafka的broker,topic,consumers,partitions等信息了。

3、皇上驾到,spring cloud stream

一切的起点,还在start.spring.io

这黑乎乎的界面是spring为了万圣节搞的事情。和我们相关的是右边这两个依赖,这两个依赖在pom.xml里面对应的是这些

                                org.apache.kafka                        kafka-streams                                                        org.springframework.cloud                        spring-cloud-stream                                                        org.springframework.cloud                        spring-cloud-stream-binder-kafka-streams                                                org.springframework.cloud                        spring-cloud-stream-test-support                        test                                                                                        org.springframework.cloud                                spring-cloud-dependencies                                ${spring-cloud.version}                                pom                                import                                                

不过只凭这些还不行,直接运行的话,会提示

Caused by: java.lang.IllegalStateException: Unknown binder configuration: kafka

还需要加上一个依赖包

                                org.springframework.cloud                        spring-cloud-stream-binder-kafka                

4、发消息,biubiubiu

spring cloud stream项目框架搭好后,我们需要分两个部分,一个是发消息的部分,一个是收消息的地方。我们先看发消息的部分,首先是配置文件,application.yml

spring:  cloud:    stream:      default-binder: kafka #默认的绑定器,      kafka: #如果用的是rabbitMQ这里填 rabbit        binder:          brokers: #Kafka的消息中间件服务器地址          - localhost:9092      bindings:        output: #通道名称          binder: kafka          destination: test1 #消息发往的目的地,对应topic          group: output-group-1 #对应kafka的group          content-type: text/plain #消息的格式

注意这里的output,表示是发布消息的,和后面订阅消息是对应的。这个output的名字是消息通道名称,是可以自定义的,后面会讲到。

然后我们需要创建一个发布者

import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;@EnableBinding(Source.class)public class Producer {        private Source mySource;        public Producer(Source mySource) {                super();                this.mySource = mySource;        }        public Source getMysource() {                return mySource;        }        public void setMysource(Source mysource) {                mySource = mySource;        }}

@EnableBinding 按字面理解就知道是绑定通道的,绑定的通道名就是上面的output,Soure.class是spring 提供的,表示这是一个可绑定的发布通道,它的通道名称就是output,和application.yml里面的output对应

源码可以看的很清楚

package org.springframework.cloud.stream.messaging;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;/** * Bindable interface with one output channel. * * @author Dave Syer * @author Marius Bogoevici * @see org.springframework.cloud.stream.annotation.EnableBinding */public interface Source {        /**         * Name of the output channel.         */        String OUTPUT = "output";        /**         * @return output channel         */        @Output(Source.OUTPUT)        MessageChannel output();}

如果我们需要定义我们自己的通道,可以自己写一个类,比如下面这种,通道名就改成了my-out

import org.springframework.cloud.stream.annotation.Input;import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.SubscribableChannel;  public interface MySource {    String INPUT = "my-in";    String OUTPUT = "my-out";    @Input(INPUT)    SubscribableChannel myInput();    @Output(OUTPUT)    MessageChannel myOutput();}

这样的话,application.yml就要改了

        my-out:          binder: kafka          destination: mytest #消息发往的目的地,对应topic          group: output-group-2 #对应kafka的group          content-type: text/plain #消息的格式

Product.class的@EnableBinding也需要改,为了做对应,我另外写了一个MyProducer

import org.springframework.cloud.stream.annotation.EnableBinding;@EnableBinding(MySource.class)public class MyProducer {        private MySource mySource;        public MyProducer(MySource mySource) {                super();                this.mySource = mySource;        }        public MySource getMysource() {                return mySource;        }        public void setMysource(MySource mysource) {                mySource = mySource;        }}

这样,发布消息的部分就写好了,我们写个controller来发送消息

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.support.MessageBuilder;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.RestController;import com.wphmoon.kscs.service.ChatMessage;import com.wphmoon.kscs.service.MyProducer;import com.wphmoon.kscs.service.Producer;@RestControllerpublic class MyController {        @Autowired        private Producer producer;        @Autowired        private MyProducer myProducer;        // get the String message via HTTP, publish it to broker using spring cloud stream        @RequestMapping(value = "/sendMessage/string", method = RequestMethod.POST)        public String publishMessageString(@RequestBody String payload) {// send message to channel output                producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build());                return "success";        }        @RequestMapping(value = "/sendMyMessage/string", method = RequestMethod.POST)        public String publishMyMessageString(@RequestBody String payload) {// send message to channel myoutput                myProducer.getMysource().myOutput().send(MessageBuilder.withPayload(payload).setHeader("type", "string").build());                return "success";        }}

很简单,直接调用producer发送一个字符串就行了,我使用postman来发起这个动作

消息发送出去了,我们怎么收消息呢?往下看。

5、收消息,来来来

同样的,我们用之前的spring cloud stream项目框架做收消息的部分,首先是application.yml文件

server:  port: 8081spring:  cloud:    stream:      default-binder: kafka      kafka:        binder:          brokers:          - localhost:9092      bindings:        input:         binder: kafka         destination: test1         content-type: text/plain         group: input-group-1        my-in:         binder: kafka         destination: mytest         content-type: text/plain         group: input-group-2

重点关注的就是input和my-in ,这个和之前的output和my-out一一对应。

默认和Source类对应的是Sink,这个是官方提供的,代码如下

package org.springframework.cloud.stream.messaging;import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;/** * Bindable interface with one input channel. * * @author Dave Syer * @author Marius Bogoevici * @see org.springframework.cloud.stream.annotation.EnableBinding */public interface Sink {        /**         * Input channel name.         */        String INPUT = "input";        /**         * @return input channel.         */        @Input(Sink.INPUT)        SubscribableChannel input();}

调用它的类Consumer用来接收消息,代码如下

import java.time.Instant;import java.time.ZoneId;import java.time.format.DateTimeFormatter;import java.time.format.FormatStyle;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.messaging.handler.annotation.Payload;@EnableBinding(Sink.class)public class Consumer {        private static final Logger logger = LoggerFactory.getLogger(Consumer.class);        @StreamListener(target = Sink.INPUT)        public void consume(String message) {                logger.info("recieved a string message : " + message);        }        @StreamListener(target = Sink.INPUT, condition = "headers['type']=='chat'")        public void handle(@Payload ChatMessage message) {                final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)                                .withZone(ZoneId.systemDefault());                final String time = df.format(Instant.ofEpochMilli(message.getTime()));                logger.info("recieved a complex message : [{}]: {}", time, message.getContents());        }}

而我们自定义channel的类MySink和MyConsumer代码如下:

import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;public interface MySink {        String INPUT = "my-in";    @Input(INPUT)    SubscribableChannel myInput();}
import java.time.Instant;import java.time.ZoneId;import java.time.format.DateTimeFormatter;import java.time.format.FormatStyle;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;import org.springframework.messaging.handler.annotation.Payload;@EnableBinding(MySink.class)public class MyConsumer {        private static final Logger logger = LoggerFactory.getLogger(MyConsumer.class);        @StreamListener(target = MySink.INPUT)        public void consume(String message) {                logger.info("recieved a string message : " + message);        }        @StreamListener(target = MySink.INPUT, condition = "headers['type']=='chat'")        public void handle(@Payload ChatMessage message) {                final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM)                                .withZone(ZoneId.systemDefault());                final String time = df.format(Instant.ofEpochMilli(message.getTime()));                logger.info("recieved a complex message : [{}]: {}", time, message.getContents());        }}

这样就OK了,当上面我们用postman发了消息后,这边就能直接在日志里面看到

2019-10-29 18:42:39.455  INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.MyConsumer        : recieved a string message : 你瞅啥2019-10-29 18:43:17.017  INFO 13556 --- [container-0-C-1] com.wphmoon.kscsclient.Consumer          : recieved a string message : 你瞅啥

6、到kafka-manager里面再瞅瞅

我们在application.yml里面定义的destination,就是kafka的topic,在kafka-manager的topic list里面可以看到

而接收消息的consumer也可以看到

这就是spring cloud stream和kafka的帝后之恋,不过他们这种政治联姻哪有这么简单,里面复杂的部分我们后面再讲,敬请期待,起驾回宫(野生翻译:The Return of the King)

到此,相信大家对"spring cloud stream和kafka的原理及作用是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0