Flink SQL解析Json格式数据的方法
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,1. Flink版本1.7.22. 引入依赖使用maven构建工程,因此pom.xml添加如下依赖: org.apache.flink flink-
千家信息网最后更新 2025年02月05日Flink SQL解析Json格式数据的方法
1. Flink版本1.7.2
2. 引入依赖
使用maven构建工程,因此pom.xml添加如下依赖:
org.apache.flink flink-table_2.11 1.7.2 org.apache.flink flink-json 1.7.2 com.fasterxml.jackson.core jackson-databind 2.9.8 joda-time joda-time 2.10.1
3. Google Protobuf消息定义
3.1 消息定义
response.proto文件
syntax = "proto3";package com.google.protos;//搜索响应message SearchResponse { uint64 search_time = 1; uint32 code = 2; Result results = 3;}//搜索结果message Result { string id = 1; repeated Item items = 2;}//搜索结果项message Item{ string id = 1; string name = 2; string title = 3; string url = 4; uint64 publish_time = 5; float score = 6; //推荐或者相似加权分值}
消息示例,包含嵌套对象results以及数组对象items:
{ "search_time":1553650604, "code":200, "results":{ "id":"449", "items":[ { "id":"47", "name":"name47", "title":"标题47", "url":"https://www.google.com.hk/item-47", "publish_time":1552884870, "score":96.03 }, { "id":"2", "name":"name2", "title":"标题2", "url":"https://www.google.com.hk/item-2", "publish_time":1552978902, "score":16.06 }, { "id":"60", "name":"name60", "title":"标题60", "url":"https://www.google.com.hk/item-60", "publish_time":1553444982, "score":62.58 }, { "id":"67", "name":"name67", "title":"标题67", "url":"https://www.google.com.hk/item-67", "publish_time":1553522957, "score":12.17 }, { "id":"15", "name":"name15", "title":"标题15", "url":"https://www.google.com.hk/item-15", "publish_time":1553525421, "score":32.36 }, { "id":"53", "name":"name53", "title":"标题53", "url":"https://www.google.com.hk/item-53", "publish_time":1553109227, "score":52.13 }, { "id":"70", "name":"name70", "title":"标题70", "url":"https://www.google.com.hk/item-70", "publish_time":1552781921, "score":1.72 }, { "id":"53", "name":"name53", "title":"标题53", "url":"https://www.google.com.hk/item-53", "publish_time":1553229003, "score":5.31 }, { "id":"30", "name":"name30", "title":"标题30", "url":"https://www.google.com.hk/item-30", "publish_time":1553282629, "score":26.51 }, { "id":"36", "name":"name36", "title":"标题36", "url":"https://www.google.com.hk/item-36", "publish_time":1552665833, "score":48.76 } ] }}
3.2 Kakfa Producer发布随机响应Json串
import com.google.protos.GoogleProtobuf.*;import com.googlecode.protobuf.format.JsonFormat;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.text.DecimalFormat;import java.time.Instant;import java.util.Properties;import java.util.Random;import java.util.concurrent.TimeUnit;/** * @author lynn * @ClassName com.lynn.kafka.SearchResponsePublisher * @Description TODO * @Date 19-3-26 上午8:17 * @Version 1.0 **/public class SearchResponsePublisher { private static final Logger LOG = LoggerFactory.getLogger(SearchResponsePublisher.class); public String randomMessage(int results){ Random random = new Random(); DecimalFormat fmt = new DecimalFormat("##0.00"); SearchResponse.Builder response = SearchResponse.newBuilder(); response.setSearchTime(Instant.now().getEpochSecond()) .setCode(random.nextBoolean()?200:404); Result.Builder result = Result.newBuilder() .setId(""+random.nextInt(1000)); for (int i = 0; i < results; i++) { int number = random.nextInt(100); Item.Builder builder = Item.newBuilder() .setId(number+"") .setName("name"+number) .setTitle("标题"+number) .setUrl("https://www.google.com.hk/item-"+number) .setPublishTime(Instant.now().getEpochSecond() - random.nextInt(1000000)) .setScore(Float.parseFloat(fmt.format(random.nextInt(99) + random.nextFloat()))); result.addItems(builder.build()); } response.setResults(result.build()); return new JsonFormat().printToString(response.build()); } /** * * @param args */ public static void main(String[] args) throws InterruptedException{ if(args.length < 3){ System.err.println("Please input broker.servers and topic and records number!"); System.exit(-1); } String brokers = args[0]; String topic = args[1]; int recordsNumber = Integer.parseInt(args[2]); LOG.info("I will publish {} records...", recordsNumber); SearchResponsePublisher publisher = new SearchResponsePublisher();// System.out.println(publisher.randomMessage(10));// if(recordsNumber == 1000) return; Properties props = new Properties(); props.put("bootstrap.servers", brokers); //all:-1 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer<>(props); int count = 0; while (count++ < recordsNumber){ producer.send(new ProducerRecord(topic, String.valueOf(Instant.now().toEpochMilli()), publisher.randomMessage(10))); TimeUnit.MILLISECONDS.sleep(100); }// producer.flush(); producer.close(); }}
4. 源代码Java:
4.1 引入pakcages
import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.Types;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.table.sinks.PrintTableSink;import org.apache.flink.types.Row;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
4.2 源代码:
// set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); Kafka kafka = new Kafka().version("0.11") .topic(sourceTopic) .startFromEarliest()// .startFromLatest() .property("bootstrap.servers", brokers) .property("group.id", "res") .property("session.timeout.ms", "30000") .sinkPartitionerFixed(); tableEnv.connect(kafka) .withFormat(new Json() .failOnMissingField(false) .deriveSchema()) .withSchema(new Schema() .field("search_time", Types.LONG()) .field("code", Types.INT()) .field("results", Types.ROW( new String[]{"id", "items"}, new TypeInformation[]{ Types.STRING(), ObjectArrayTypeInfo.getInfoFor(Row[].class, //Array.newInstance(Row.class, 10).getClass(), Types.ROW( new String[]{"id", "name", "title", "url", "publish_time", "score"}, new TypeInformation[]{Types.STRING(),Types.STRING(),Types.STRING(),Types.STRING(),Types.LONG(),Types.FLOAT()} ))}) )).inAppendMode().registerTableSource("tb_json");//item[1] item[10] 数组下标从1开始String sql4 = "select search_time, code, results.id as result_id, items[1].name as item_1_name, items[2].id as item_2_id\n" + "from tb_json"; Table table4 = tableEnv.sqlQuery(sql4); tableEnv.registerTable("tb_item_2", table4); LOG.info("------------------print {} schema------------------", "tb_item_2"); table4.printSchema(); tableEnv.registerTableSink("console4", new String[]{"f0", "f1", "f2", "f3", "f4"}, new TypeInformation[]{ Types.LONG(),Types.INT(), Types.STRING(), Types.STRING(), Types.STRING() }, new PrintTableSink()); table4.insertInto("console4"); // execute program env.execute("Flink Table Json Engine");
4.3 SQL语句
select search_time, code, results.id as result_id, //嵌套json子字段 items[1].name as item_1_name, //数组对象子字段,数组下标从1开始 items[2].id as item_2_idfrom tb_json
嵌套字段可以通过.连接符直接获取,而数组元素可以通过[下标]获取,下标从1开始,与Java中数组下标从0开始不同.
4.3 Schema定义
按照Json对象的嵌套以及数组格式进行定义,即无需将每个字段展平进行定义,将嵌套字段定义为Row类型,数组类型定义为ObjectArrayTypeInfo或BasicArrayTypeInfo, ObjectArrayTypeInfo的第一个参数为数组类型,如示例中Row[].class 或Array.newInstance(Row.class, 10).getClass()方式获取class.
4.4 经测试发现flink-json*.jar中的代码问题:
convert方法中的类型判断使用==,可能时由于flink版本的原因引起的==运算符没有重载.因此将此运算符替换为.equals()方法.
JsonRowDeserializationSchema.java
private Object convert(JsonNode node, TypeInformation> info) { if (Types.VOID.equals(info) || node.isNull()) { return null; } else if (Types.BOOLEAN.equals(info)) { return node.asBoolean(); } else if (Types.STRING.equals(info)) { return node.asText(); } else if (Types.BIG_DEC.equals(info)) { return node.decimalValue(); } else if (Types.BIG_INT.equals(info)) { return node.bigIntegerValue(); } else if(Types.LONG.equals(info)){ return node.longValue(); } else if(Types.INT.equals(info)){ return node.intValue(); } else if(Types.FLOAT.equals(info)){ return node.floatValue(); } else if(Types.DOUBLE.equals(info)){ return node.doubleValue(); } else if (Types.SQL_DATE.equals(info)) { return Date.valueOf(node.asText()); } else if (Types.SQL_TIME.equals(info)) { // according to RFC 3339 every full-time must have a timezone; // until we have full timezone support, we only support UTC; // users can parse their time as string as a workaround final String time = node.asText(); if (time.indexOf('Z') < 0 || time.indexOf('.') >= 0) { throw new IllegalStateException( "Invalid time format. Only a time in UTC timezone without milliseconds is supported yet. " + "Format: HH:mm:ss'Z'"); } return Time.valueOf(time.substring(0, time.length() - 1)); } else if (Types.SQL_TIMESTAMP.equals(info)) { // according to RFC 3339 every date-time must have a timezone; // until we have full timezone support, we only support UTC; // users can parse their time as string as a workaround final String timestamp = node.asText(); if (timestamp.indexOf('Z') < 0) { throw new IllegalStateException( "Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. " + "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); } return Timestamp.valueOf(timestamp.substring(0, timestamp.length() - 1).replace('T', ' ')); } else if (info instanceof RowTypeInfo) { return convertRow(node, (RowTypeInfo) info); } else if (info instanceof ObjectArrayTypeInfo) { return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo()); } else if (info instanceof BasicArrayTypeInfo) { return convertObjectArray(node, ((BasicArrayTypeInfo) info).getComponentInfo()); } else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { return convertByteArray(node); } else { // for types that were specified without JSON schema // e.g. POJOs try { return objectMapper.treeToValue(node, info.getTypeClass()); } catch (JsonProcessingException e) { throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node); } } }
JsonRowSerializationSchema.java
private JsonNode convert(ContainerNode> container, JsonNode reuse, TypeInformation> info, Object object) { if (Types.VOID.equals(info) || object == null) { return container.nullNode(); } else if (Types.BOOLEAN.equals(info)) { return container.booleanNode((Boolean) object); } else if (Types.STRING.equals(info)) { return container.textNode((String) object); } else if (Types.BIG_DEC.equals(info)) { // convert decimal if necessary if (object instanceof BigDecimal) { return container.numberNode((BigDecimal) object); } return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue())); } else if (Types.BIG_INT.equals(info)) { // convert integer if necessary if (object instanceof BigInteger) { return container.numberNode((BigInteger) object); } return container.numberNode(BigInteger.valueOf(((Number) object).longValue())); } else if(Types.LONG.equals(info)){ if(object instanceof Long){ return container.numberNode((Long) object); } return container.numberNode(Long.valueOf(((Number) object).longValue())); } else if(Types.INT.equals(info)){ if(object instanceof Integer){ return container.numberNode((Integer) object); } return container.numberNode(Integer.valueOf(((Number) object).intValue())); } else if(Types.FLOAT.equals(info)){ if(object instanceof Float){ return container.numberNode((Float) object); } return container.numberNode(Float.valueOf(((Number) object).floatValue())); } else if(Types.DOUBLE.equals(info)){ if(object instanceof Double){ return container.numberNode((Double) object); } return container.numberNode(Double.valueOf(((Number) object).doubleValue())); } else if (Types.SQL_DATE.equals(info)) { return container.textNode(object.toString()); } else if (Types.SQL_TIME.equals(info)) { final Time time = (Time) object; // strip milliseconds if possible if (time.getTime() % 1000 > 0) { return container.textNode(timeFormatWithMillis.format(time)); } return container.textNode(timeFormat.format(time)); } else if (Types.SQL_TIMESTAMP.equals(info)) { return container.textNode(timestampFormat.format((Timestamp) object)); } else if (info instanceof RowTypeInfo) { if (reuse != null && reuse instanceof ObjectNode) { return convertRow((ObjectNode) reuse, (RowTypeInfo) info, (Row) object); } else { return convertRow(null, (RowTypeInfo) info, (Row) object); } } else if (info instanceof ObjectArrayTypeInfo) { if (reuse != null && reuse instanceof ArrayNode) { return convertObjectArray((ArrayNode) reuse, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } else { return convertObjectArray(null, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } } else if (info instanceof BasicArrayTypeInfo) { if (reuse != null && reuse instanceof ArrayNode) { return convertObjectArray((ArrayNode) reuse, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } else { return convertObjectArray(null, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } } else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { return container.binaryNode((byte[]) object); } else { // for types that were specified without JSON schema // e.g. POJOs try { return mapper.valueToTree(object); } catch (IllegalArgumentException e) { throw new IllegalStateException("Unsupported type information '" + info + "' for object: " + object, e); } } }
4.5 提交jar包到集群运行
添加文件:
resources/META-INF/services/org.apache.flink.table.factories.TableFactory
org.apache.flink.formats.json.JsonRowFormatFactoryorg.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
由于打包后kafka-connector jar中与json jar中的同名文件会覆盖,需要将两个文件的内容保留.
5. 附PrintTableSink源码
参考阿里巴巴blink分支
scala:
BatchCompatibleStreamTableSink.scala
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.flink.table.sinksimport org.apache.flink.table.api._import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}/** Defines an external [[TableSink]] to emit a batch [[Table]] for * compatible with stream connect plugin. */trait BatchCompatibleStreamTableSink[T] extends TableSink[T] { /** Emits the DataStream. */ def emitBoundedStream(boundedStream: DataStream[T]): DataStreamSink[_]}
PrintTableSink.scala
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.flink.table.sinksimport java.lang.{Boolean => JBool}import java.util.TimeZoneimport java.util.{Date => JDate}import java.sql.Dateimport java.sql.Timeimport java.sql.Timestampimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}import org.apache.flink.api.java.typeutils.RowTypeInfoimport org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}import org.apache.flink.streaming.api.functions.sink.RichSinkFunctionimport org.apache.flink.streaming.api.operators.StreamingRuntimeContextimport org.apache.flink.types.Rowimport org.apache.flink.configuration.Configurationimport org.apache.flink.table.runtime.functions.DateTimeFunctionsimport org.apache.flink.util.StringUtils/** * A simple [[TableSink]] to output data to console. * */class PrintTableSink() extends TableSinkBase[JTuple2[JBool, Row]] with BatchCompatibleStreamTableSink[JTuple2[JBool, Row]] with UpsertStreamTableSink[Row] { override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]) = { val sink: PrintSinkFunction = new PrintSinkFunction() dataStream.addSink(sink).name(sink.toString) } override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = new PrintTableSink() override def setKeyFields(keys: Array[String]): Unit = {} override def setIsAppendOnly(isAppendOnly: JBool): Unit = {}// override def getRecordType: DataType = DataTypes.createRowType(getFieldTypes, getFieldNames) override def getRecordType: TypeInformation[Row] = { new RowTypeInfo(getFieldTypes, getFieldNames) } /** Emits the DataStream. */ override def emitBoundedStream(boundedStream: DataStream[JTuple2[JBool, Row]]) = { val sink: PrintSinkFunction = new PrintSinkFunction() boundedStream.addSink(sink).name(sink.toString) }}/** * Implementation of the SinkFunction writing every tuple to the standard output. * */class PrintSinkFunction() extends RichSinkFunction[JTuple2[JBool, Row]] { private var prefix: String = _ override def open(parameters: Configuration): Unit = { super.open(parameters) val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext] prefix = "task-" + (context.getIndexOfThisSubtask + 1) + "> " } override def invoke(in: JTuple2[JBool, Row]): Unit = { val sb = new StringBuilder val row = in.f1 for (i <- 0 until row.getArity) { if (i > 0) sb.append(",") val f = row.getField(i) if (f.isInstanceOf[Date]) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd")) } else if (f.isInstanceOf[Time]) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "HH:mm:ss")) } else if (f.isInstanceOf1583857352) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd HH:mm:ss.SSS")) } else { sb.append(StringUtils.arrayAwareToString(f)) } } if (in.f0) { System.out.println(prefix + "(+)" + sb.toString()) } else { System.out.println(prefix + "(-)" + sb.toString()) } } override def close(): Unit = { this.prefix = "" } override def toString: String = "Print to System.out"}
标题
数组
下标
字段
对象
文件
类型
消息
搜索
方法
可以通过
搜索结果
源代码
版本
示例
结果
运算符
e.g.
运算
格式
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
uipath循环插入数据库
工程软件开发计划书
网络技术基本要求
浙江适合玩lol的服务器云空间
网络技术服务类税率
纪委网络技术中心
数据库查阅向导作用
舞力全开玩什么服务器好
神武4空白炼魂符服务器多少级
象山hp塔式服务器
数据库查看指定对象数据
java就是软件开发吗
安徽web前端软件开发如何收费
通用excel服务器 注册机
我的世界server服务器核心
渝中区网络软件开发流程参考价
投票代理服务器
未来之役2怎么换服务器
魔兽世界服务器地理位置
即时通信软件开发背景
警方提示网络安全
北京信息化软件开发服务应用
大连松下软件开发公司人员名单
碧蓝航线服务器人数排行2020
计算机网络技术基础章春梅
局域网p2p服务器架设
pdf预览服务器字体
计算机网络技术的好与坏
制造业smt防错料系统数据库
合肥网络技术专业专升本