Flink中Connectors如何连接RabbitMq
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,这篇文章给大家分享的是有关Flink中Connectors如何连接RabbitMq的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。通过使用Flink DataStream C
千家信息网最后更新 2025年02月05日Flink中Connectors如何连接RabbitMq
这篇文章给大家分享的是有关Flink中Connectors如何连接RabbitMq的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
通过使用Flink DataStream Connectors 数据流连接器连接到RabbitMq消息队列中间件,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1rabbitMq:3.5.7
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
DataStreamSource.java
package com.flink.examples.rabbitmq;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/** * @Description 从MQ中获取数据并输出到DataStream流中 */public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("127.0.0.1") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("datastream") .build(); final DataStreamstream = env .addSource(new RMQSource ( connectionConfig, "test", true, new SimpleStringSchema())) .setParallelism(1); stream.print(); env.execute("flink rabbitMq source"); }}
数据流输出
DataStreamSink.java
package com.flink.examples.rabbitmq;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;/** * @Description 将DataStream流中的数据输出到rabbitMq队列中 */public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("127.0.0.1") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("datastream") .build(); String [] words = new String[]{"props","student","build","name","execute"}; final DataStreamstream = env.fromElements(words); stream.addSink(new RMQSink (connectionConfig,"test",new SimpleStringSchema())); env.execute("flink rabbitMq sink"); }}
数据展示
感谢各位的阅读!关于"Flink中Connectors如何连接RabbitMq"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
数据
数据流
示例
输出
内容
官方
文档
更多
模块
环境
篇文章
队列
输入
不错
实用
中间件
数据源
文章
消息
看吧
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
百度糯米推 网络安全
数据库服务器名称或地址
鹰角用的谁的服务器
放肆网络技术
计算机网络技术专业理解
我的世界重置服务器的后果
vba 数据库
企业软件开发可以专票抵税吗
如何连接格力服务器控制空调
服务器push
数据库安全机制有哪些
wow埃德萨拉服务器
scum服务器连接商城
企业软件开发哪家强
阿里云使用ftp服务器
中国网络安全局
软件定义 软件开发和
DTP数据库
成都理工大学网络安全考研
公司管理系统服务器
榆次公安分局网络安全支队
数据库 res选项
色弱可以学网络安全专业吗
创建企业级数据库英文
数据库原理关系代数学生教师
无锡好的软件开发诚信经营
显示网络技术高超的电视剧
人工智能编程与软件开发
数据库缓存技术 mem
表a不在表b中数据库