MLSQL Stack如何让流调试更加简单详解
发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,前言有一位同学正在调研MLSQL Stack对流的支持。然后说了流调试其实挺困难的。经过实践,希望实现如下三点:能随时查看最新固定条数的Kafka数据调试结果(sink)能打印在web控制台流程序能自
千家信息网最后更新 2025年02月08日MLSQL Stack如何让流调试更加简单详解
前言
有一位同学正在调研MLSQL Stack对流的支持。然后说了流调试其实挺困难的。经过实践,希望实现如下三点:
- 能随时查看最新固定条数的Kafka数据
- 调试结果(sink)能打印在web控制台
- 流程序能自动推测json schema(现在spark是不行的)
实现这三个点之后,我发现调试确实就变得简单很多了。
流程
首先我新建了一个kaf_write.mlsql,里面方便我往Kafka里写数据:
set abc='''{ "x": 100, "y": 200, "z": 200 ,"dataType":"A group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}{ "x": 120, "y": 100, "z": 260 ,"dataType":"B group"}''';load jsonStr.`abc` as table1;select to_json(struct(*)) as value from table1 as table2;save append table2 as kafka.`wow` where kafka.bootstrap.servers="127.0.0.1:9092";
这样我每次运行,数据就能写入到Kafka.
接着,我写完后,需要看看数据是不是真的都写进去了,写成了什么样子:
!kafkaTool sampleData 10 records from "127.0.0.1:9092" wow;
这句话表示,我要采样Kafka 10条Kafka数据,该Kafka的地址为127.0.0.1:9092,主题为wow.运行结果如下:
没有什么问题。接着我写一个非常简单的流式程序:
-- the stream name, should be uniq.set streamName="streamExample";-- use kafkaTool to infer schema from kafka!kafkaTool registerSchema 2 records from "127.0.0.1:9092" wow;load kafka.`wow` options kafka.bootstrap.servers="127.0.0.1:9092"as newkafkatable1;select * from newkafkatable1as table21;-- print in webConsole instead of terminal console.save append table21 as webConsole.`` options mode="Append"and duration="15"and checkpointLocation="/tmp/s-cpl4";
运行结果如下:
在终端我们也可以看到实时效果了。
补充
当然,MLSQL Stack 还有对流还有两个特别好地方,第一个是你可以对流的事件设置http协议的callback,以及对流的处理结果再使用批SQL进行处理,最后入库。参看如下脚本:
-- the stream name, should be uniq.set streamName="streamExample";-- mock some data.set data='''{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}''';-- load data as tableload jsonStr.`data` as datasource;-- convert table as stream sourceload mockStream.`datasource` options stepSizeRange="0-3"as newkafkatable1;-- aggregation select cast(value as string) as k from newkafkatable1as table21;!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";-- output the the result to console.save append table21 as custom.`` options mode="append"and duration="15"and sourceTable="jack"and code='''select count(*) as c from jack as newjack;save append newjack as parquet.`/tmp/jack`; '''and checkpointLocation="/tmp/cpl15";
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。
数据
结果
对流
运行
内容
程序
处理
学习
支持
不行
困难
三个
两个
事件
价值
前言
同学
地址
地方
实时
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
ssms新建数据库
网络设置不能访问服务器
数据库有多大
网络安全设备场景
安徽定制网络技术服务以客为尊
拼多多提示服务器有问题
辰安科技是互联网公司吗
软件开发必须懂的知识
数据库怎么更改名字
网络安全法规定网络运营者应健全
软件开发切面方法
计算机二级数据库含金量排名
部队手机网络安全讨论个人发言
数据库有后台搜索不到
软件开发有几年发展前景
梅州数据链软件开发费用
java实现sql数据库
个人数据库用哪种磁盘
河南农大网络安全
pdo连接数据库测试
大兴区进口软件开发差异
oracle 停止数据库
能代表网络技术的词汇
商品参数 数据库设计
武汉飞扬网络技术有限公司
什么是mysql数据库
网络安全知识从零开始
数据库数据到前端
监察委 网络安全保卫
软件开发课程达内培训