Flink如何读取数据源
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章主要为大家展示了"Flink如何读取数据源",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Flink如何读取数据源"这篇文章吧。从集合中读取
千家信息网最后更新 2025年02月02日Flink如何读取数据源
这篇文章主要为大家展示了"Flink如何读取数据源",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Flink如何读取数据源"这篇文章吧。
从集合中读取
private static void radFromCollection(String[] args) throws Exception { //将参数转成对象 MultipleParameterTool params = MultipleParameterTool.fromArgs(args); //创建批处理执行环境// ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //创建流程处理 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置每个算子的的并行度,默认为cup核数(测试环境下) env.setParallelism(2); //设置最大并行度 env.setMaxParallelism(6); //从集合中读取 ListcollectionData = Arrays.asList("a", "b", "c", "d"); DataStreamSource dataStreamSource = env.fromCollection(collectionData); //从数组中读取 // env.fromElements("a", "b", "c", "d"); dataStreamSource.print(); //dataStreamSource.addSink(new PrintSinkFunction<>()); env.execute(); }
从文件中读取
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcedataStreamSource = env.readTextFile("E:\\GIT\\flink-learn\\flink1\\word.txt", "utf-8"); dataStreamSource.print(); env.execute();
从kafka 中读取
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.put("bootstrap.servers", "10.1.5.130:9092"); properties.put("zookeeper.connect", "10.2.5.135:2181"); properties.put("group.id", "my-flink"); properties.put("auto.offset.reset", "latest"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); FlinkKafkaConsumer010kafkaConsumer010 = new FlinkKafkaConsumer010<>( "flink",// topic new SimpleStringSchema(), properties ); DataStreamSource dataStreamSource = env.addSource(kafkaConsumer010); dataStreamSource.print(); env.execute();
从自定义Source 中读取
实现
org.apache.flink.streaming.api.functions.source.SourceFunction
public static final class MyDataSource implements SourceFunction{ private Boolean running = true; @Override public void run(SourceContext sourceContext) throws Exception { Random random = new Random(); while (running) { double data = random.nextDouble() * 100; sourceContext.collectWithTimestamp(String.valueOf(data), System.currentTimeMillis()); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { this.running = false; } }
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcedataStreamSource = env.addSource(new MyDataSource()); dataStreamSource.print(); env.execute();
以上是"Flink如何读取数据源"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
数据
数据源
内容
篇文章
环境
学习
帮助
最大
参数
对象
数组
文件
易懂
更多
条理
核数
流程
知识
算子
编带
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
饥荒专用服务器管理权限
安徽新闻网络安全
市基础设施网络安全管理办法
itmc的数据库类型
数据库表怎么添加指定一列
5g服务器要求
数据库序号排序指令
学习数据库的实训内容
ios14 邮件未从服务器下载
支持建设网络安全相关学科专业
中小企业网络安全的战略意见
河源通信软件开发
35岁自学软件开发
软件开发设计方案模板
北京工业软件开发哪家专业
网络技术应用选修三试题
碑林区网络安全主题绘画
IOS能不能用于服务器
镇党委政府网络安全计划
手游方舟服务器招管理员
饥荒专用服务器管理权限
西安数据库培训哪里有
阿里大牛数据库优化
河南物流运输软件开发电话
服务器里面有数据库吗
浙江财经大学国泰安数据库
查看本地建好的ftp服务器
服务器暴躁八卦图
图数据库深度学习
网络安全规范和常用技术