FlinkSQL怎么搭建
发表于:2024-10-09 作者:千家信息网编辑
千家信息网最后更新 2024年10月09日,本篇内容主要讲解"FlinkSQL怎么搭建",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"FlinkSQL怎么搭建"吧!1.背景由于公司内部需求较多,并不想
千家信息网最后更新 2024年10月09日FlinkSQL怎么搭建
本篇内容主要讲解"FlinkSQL怎么搭建",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"FlinkSQL怎么搭建"吧!
1.背景
由于公司内部需求较多,并不想每次都写一个 streaming 程序,故而开始搭建 flinksql 平台,基于 jdk1.8,flink1.12.x
2.效果
传一个 sql 文件给 jar 包,然后 sql 文件内的 sql 将自动执行
3. jar 包 vs web 界面
调研了基于 web 的 zeppline
zeppline 设计的初衷其实是为了交互式分析
基于 zeppline rest api 与现有的监控不兼容,需要修改现有监控的代码
虽然带有 web 界面的对用户很是友好,对于分析人员来说,是一个不错的选择,但对于开发人员来说,真正的线上长时间的运行程序,开发成 HA 的 server 还是有必要的
基于以上 3 点最终选择 jar 作为最终的方式
4. 使用
将 sql 写入 xxx.sql 文件中,如
CREATE TEMPORARY FUNCTION MillisecondsToDateStr AS 'io.github.shengjk.udf.MillisecondsToDateStr' LANGUAGE JAVA;-- ExecutionCheckpointingOptionsset execution.checkpointing.mode=EXACTLY_ONCE;set execution.checkpointing.timeout=30 min;-- 30minset execution.checkpointing.interval=1 min ; -- 1minset execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;-- ExecutionConfigOptionsset table.exec.state.ttl=1 day; -- 1 dayset table.exec.mini-batch.enabled=true; -- enable mini-batch optimizationset table.exec.mini-batch.allow-latency=1 s; -- 1sset table.exec.mini-batch.size=1000;set table.exec.sink.not-null-enforcer=drop;-- -- dadadadadadaCREATE TABLE orders( status int, courier_id bigint, id bigint, finish_time BIGINT)WITH ( 'connector' = 'kafka','topic' = 'canal_monitor_order', 'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup', 'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'earliest-offset');-- flink.partition-discovery.interval-millis;CREATE TABLE infos( info_index int, order_id bigint)WITH ( 'connector' = 'kafka','topic' = 'canal_monitor_order', 'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup', 'format' = 'ss-canal-json','ss-canal-json.table.include' = 'infos','scan.startup.mode' = 'earliest-offset');CREATE TABLE redisCache( finishOrders BIGINT, courier_id BIGINT, dayStr String)WITH ( 'connector' = 'redis', 'hostPort'='localhost:6400', 'keyType'='hash', 'keyTemplate'='test2_${courier_id}', 'fieldTemplate'='${dayStr}', 'valueNames'='finishOrders', 'expireTime'='259200');create view temp asselect o.courier_id, (CASE WHEN sum(infosMaxIndex.info_index) is null then 0 else sum(infosMaxIndex.info_index) end) finishOrders, o.status, dayStrfrom ((select courier_id, id, last_value(status) status, MillisecondsToDateStr(finish_time, 'yyyyMMdd') dayStr from orders where status = 60 group by courier_id, id, MillisecondsToDateStr(finish_time, 'yyyyMMdd'))) oleft join (select max(info_index) info_index, order_id from infos group by order_id) infosMaxIndex on o.id = infosMaxIndex.order_idgroup by o.courier_id, o.status, dayStr;INSERT INTO redisCache SELECT finishOrders,courier_id,dayStr FROM temp;
将 flinksql-platform 打包并上传至服务器
将必要的 connector jar 放入到相应的目录下
执行,如
flink-1.12.0/bin/flink run -p 3 -yt ./flinkjar/ -C file:///home/shengjk/flinkjar/test-udf.jar -C file:///home/shengjk/flinkjar/jedis-2.10.2.jar -m yarn-cluster -ynm sqlDemo -c io.github.shengjk.Main ./flinksql-platform-1.0-SNAPSHOT.jar --sqlPath ./xxx.sql
其中
-C 添加 udfJar 等第三方 jar 包 -C 参数apply到了client端生成的JobGraph里,然后提交JobGraph来运行的
-yt 目录 将 udfJar 等第三方 jar 包提交到 TaskManager 上
到此,相信大家对"FlinkSQL怎么搭建"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
文件
必要
人员
内容
界面
目录
程序
第三方
分析
学习
开发
监控
运行
选择
不错
实用
更深
交互式
代码
公司
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器遭黑客入侵
国家治理高峰论坛网络安全
软件开发需培训多久
69俄罗斯服务器下载
数据库一定有主码吗
网络显示服务器通讯异常
软件开发需报什么专业
学软件开发什么学历
网络技术各层级
福州高维质子软件开发有限公司
庆阳大数据中心服务器数量
适合学数据库的电脑
web 文章 数据库
广播电视网络安全系统
财政部 软件开发人员单价
昆明品质软件开发价格走势
王者荣耀 服务器在哪
广州软件开发好找工作么
乡镇网络安全教育会议记录
二维码背后的数据库
云服务器怎么增加配置
pe 服务器维护
软件开发程序 信息安全
服务器管理视频教程
谷歌服务器租的价格
vs怎么调试服务器数据库
计算机五级网络技术
包含网络安全的etf
网络安全和信息化文章总结
皮书数据库分析