千家信息网

Flink中Connectors如何连接RabbitMq

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,这篇文章给大家分享的是有关Flink中Connectors如何连接RabbitMq的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。通过使用Flink DataStream C
千家信息网最后更新 2025年02月05日Flink中Connectors如何连接RabbitMq

这篇文章给大家分享的是有关Flink中Connectors如何连接RabbitMq的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

通过使用Flink DataStream Connectors 数据流连接器连接到RabbitMq消息队列中间件,并提供数据流输入与输出操作;

示例环境

java.version: 1.8.xflink.version: 1.11.1rabbitMq:3.5.7

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 DataStream Connectors 与 示例模块

数据流输入

DataStreamSource.java

package com.flink.examples.rabbitmq;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/** * @Description 从MQ中获取数据并输出到DataStream流中 */public class DataStreamSource {    /**     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html     */    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()                .setHost("127.0.0.1")                .setPort(5672)                .setUserName("admin")                .setPassword("admin")                .setVirtualHost("datastream")                .build();        final DataStream stream = env                .addSource(new RMQSource( connectionConfig, "test", true, new SimpleStringSchema()))                .setParallelism(1);        stream.print();        env.execute("flink rabbitMq source");    }}

数据流输出

DataStreamSink.java

package com.flink.examples.rabbitmq;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/** * @Description 将DataStream流中的数据输出到rabbitMq队列中 */public class DataStreamSink {    /**     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html     */    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()                .setHost("127.0.0.1")                .setPort(5672)                .setUserName("admin")                .setPassword("admin")                .setVirtualHost("datastream")                .build();        String [] words = new String[]{"props","student","build","name","execute"};        final DataStream stream = env.fromElements(words);        stream.addSink(new RMQSink(connectionConfig,"test",new SimpleStringSchema()));        env.execute("flink rabbitMq sink");    }}

数据展示

感谢各位的阅读!关于"Flink中Connectors如何连接RabbitMq"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

0