FlinkSQL怎么搭建
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,本篇内容主要讲解"FlinkSQL怎么搭建",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"FlinkSQL怎么搭建"吧!1.背景由于公司内部需求较多,并不想
千家信息网最后更新 2025年02月04日FlinkSQL怎么搭建
本篇内容主要讲解"FlinkSQL怎么搭建",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"FlinkSQL怎么搭建"吧!
1.背景
由于公司内部需求较多,并不想每次都写一个 streaming 程序,故而开始搭建 flinksql 平台,基于 jdk1.8,flink1.12.x
2.效果
传一个 sql 文件给 jar 包,然后 sql 文件内的 sql 将自动执行
3. jar 包 vs web 界面
调研了基于 web 的 zeppline
zeppline 设计的初衷其实是为了交互式分析
基于 zeppline rest api 与现有的监控不兼容,需要修改现有监控的代码
虽然带有 web 界面的对用户很是友好,对于分析人员来说,是一个不错的选择,但对于开发人员来说,真正的线上长时间的运行程序,开发成 HA 的 server 还是有必要的
基于以上 3 点最终选择 jar 作为最终的方式
4. 使用
将 sql 写入 xxx.sql 文件中,如
CREATE TEMPORARY FUNCTION MillisecondsToDateStr AS 'io.github.shengjk.udf.MillisecondsToDateStr' LANGUAGE JAVA;-- ExecutionCheckpointingOptionsset execution.checkpointing.mode=EXACTLY_ONCE;set execution.checkpointing.timeout=30 min;-- 30minset execution.checkpointing.interval=1 min ; -- 1minset execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;-- ExecutionConfigOptionsset table.exec.state.ttl=1 day; -- 1 dayset table.exec.mini-batch.enabled=true; -- enable mini-batch optimizationset table.exec.mini-batch.allow-latency=1 s; -- 1sset table.exec.mini-batch.size=1000;set table.exec.sink.not-null-enforcer=drop;-- -- dadadadadadaCREATE TABLE orders( status int, courier_id bigint, id bigint, finish_time BIGINT)WITH ( 'connector' = 'kafka','topic' = 'canal_monitor_order', 'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup', 'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'earliest-offset');-- flink.partition-discovery.interval-millis;CREATE TABLE infos( info_index int, order_id bigint)WITH ( 'connector' = 'kafka','topic' = 'canal_monitor_order', 'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup', 'format' = 'ss-canal-json','ss-canal-json.table.include' = 'infos','scan.startup.mode' = 'earliest-offset');CREATE TABLE redisCache( finishOrders BIGINT, courier_id BIGINT, dayStr String)WITH ( 'connector' = 'redis', 'hostPort'='localhost:6400', 'keyType'='hash', 'keyTemplate'='test2_${courier_id}', 'fieldTemplate'='${dayStr}', 'valueNames'='finishOrders', 'expireTime'='259200');create view temp asselect o.courier_id, (CASE WHEN sum(infosMaxIndex.info_index) is null then 0 else sum(infosMaxIndex.info_index) end) finishOrders, o.status, dayStrfrom ((select courier_id, id, last_value(status) status, MillisecondsToDateStr(finish_time, 'yyyyMMdd') dayStr from orders where status = 60 group by courier_id, id, MillisecondsToDateStr(finish_time, 'yyyyMMdd'))) oleft join (select max(info_index) info_index, order_id from infos group by order_id) infosMaxIndex on o.id = infosMaxIndex.order_idgroup by o.courier_id, o.status, dayStr;INSERT INTO redisCache SELECT finishOrders,courier_id,dayStr FROM temp;
将 flinksql-platform 打包并上传至服务器
将必要的 connector jar 放入到相应的目录下
执行,如
flink-1.12.0/bin/flink run -p 3 -yt ./flinkjar/ -C file:///home/shengjk/flinkjar/test-udf.jar -C file:///home/shengjk/flinkjar/jedis-2.10.2.jar -m yarn-cluster -ynm sqlDemo -c io.github.shengjk.Main ./flinksql-platform-1.0-SNAPSHOT.jar --sqlPath ./xxx.sql
其中
-C 添加 udfJar 等第三方 jar 包 -C 参数apply到了client端生成的JobGraph里,然后提交JobGraph来运行的
-yt 目录 将 udfJar 等第三方 jar 包提交到 TaskManager 上
到此,相信大家对"FlinkSQL怎么搭建"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
文件
必要
人员
内容
界面
目录
程序
第三方
分析
学习
开发
监控
运行
选择
不错
实用
更深
交互式
代码
公司
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器管理器禁止启动项
弥勒服务器显卡厂家
无线网络安全图片
中职网络技术课件
无线网络技术标准有几代
阿里云服务器怎么登录文件管理
影响期刊数据因子的数据库
关系数据库完整有哪些
手机挂vpm服务器加速软件
网信办网络安全局副局长郭涛
网络安全的突出性
连接服务器提示不安全模式
在线数据库网站
不一样的数据库复制进去无法识别
9月网络安全股票
go游戏服务器网关
数据库配套软件
山东文化产业数据库
华三交换机服务器端口聚合
软件开发公司纳税是多少
dayz版本比服务器更新
防治校园欺凌 维护网络安全
网络安全法 安全管理
互联网科技含量
高级网络技术开发工程师
夏普电视连接服务器错误
中国网络安全产品分析
数据传输自己服务器
cod13怎么切换服务器
my sql数据库软件开发