如何进行flink中的kafka源码分析
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,今天就跟大家聊聊有关如何进行flink中的kafka源码分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。最近一直在弄flink sql相关的
千家信息网最后更新 2025年01月31日如何进行flink中的kafka源码分析
今天就跟大家聊聊有关如何进行flink中的kafka源码分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
最近一直在弄flink sql相关的东西,第一阶段的目标是从解决kafka的消费和写入的问题。不过也有些同学并不是很了解,今天我们来详细分析一下包的继承层次。
flink源码如下:
public class KafkaTableSourceFactory implements StreamTableSourceFactory{ private ConcurrentHashMap
kafkaTableSources = new ConcurrentHashMap<>(); @Override public Map requiredContext() { Map context = new HashMap<>(); context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE); context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION)); return context; } @Override public List supportedProperties() { List properties = new ArrayList<>(); properties.add(KafkaConnectorDescriptor.DATABASE_KEY); properties.add(KafkaConnectorDescriptor.TABLE_KEY); return properties; } @Override public StreamTableSource createStreamTableSource(Map
properties) { //避免频繁的触发 是否需要加缓存 KafkaTableSource kafkaTableSource; String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY); String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY); if (!kafkaTableSources.containsKey(dataBase + table)) { Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder(); kafkaTableSource = builder .cluster(dataBase) .subject(table) .build(); kafkaTableSources.put(dataBase + table,kafkaTableSource); } else { kafkaTableSource = kafkaTableSources.get(dataBase + table); } return kafkaTableSource; }}
class Kafka08PBTableSource protected(topic: String, properties: Properties, schema: TableSchema, typeInformation: TypeInformation[Row], paramMap: util.LinkedHashMap[String, AnyRef], entryClass: String) extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) { override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = { this.setStartupMode(StartupMode.EARLIEST) new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest() }}
下面用户自定义的kafka的sink类:
class Kafka08UDMPBTableSink (topic: String, properties: Properties, partitioner: Optional[FlinkKafkaPartitioner[Row]], paramMap: util.LinkedHashMap[String, AnyRef], serializationSchema: SerializationSchema[Row], fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]] ) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) { override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={ new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) } override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes) override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = { super.configure(this.fieldNames, this.fieldTypes) } override def getFieldNames: Array[String]=this.fieldNames /** Returns the types of the table fields. */ override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes override def emitDataStream(dataStream: DataStream[Row]): Unit = { val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner) dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames)) }}
public class TrackRowDeserializationSchema implements SerializationSchema, DeserializationSchema
{ private static final long serialVersionUID = -2885556750743978636L; /** Type information describing the input type. */ private TypeInformation
typeInfo = null; private LinkedHashMap paraMap; private String inSchema; private String outSchema; private String inClass; private String outClass;}
public class TrackRowFormatFactory extends TableFormatFactoryBaseimplements SerializationSchemaFactory
, DeserializationSchemaFactory
{ public TrackRowFormatFactory() { super(TrackValidator.FORMAT_TYPE_VALUE, 1, false); } public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) { super(type, version, supportsSchemaDerivation); } @Override protected List
supportedFormatProperties() { final List properties = new ArrayList<>(); properties.add(TrackValidator.FORMAT_IN_SCHEMA); properties.add(TrackValidator.FORMAT_IN_CLASS); properties.add(TrackValidator.FORMAT_OUT_CLASS); properties.add(TrackValidator.FORMAT_OUT_SCHEMA); properties.add(TrackValidator.FORMAT_TYPE_INFORMATION); properties.add(TrackValidator.FORMAT_TYPE_VALUE); return properties; }}
看完上述内容,你们对如何进行flink中的kafka源码分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
源码
分析
内容
频繁
东西
同学
层次
更多
用户
目标
知识
篇文章
缓存
行业
资讯
资讯频道
问题
阶段
频道
进一
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发人员调
计算机网络安全设计需求方案
外汇软件开发公司
英雄联盟网通大区哪个服务器最好
江阴重型软件开发项目信息
数据库管理系统安全技术要求
北京互联网软件开发费用是多少
北航的网络安全专业排名怎么样
玩游戏服务器总是断开连接
西游 服务器
安徽通用软件开发代理价格
网上书店系统 数据库设计
杰航软件开发
连接wifi服务器不响怎么回事
确认测试应该由软件开发者
国外期刊数据库
12306数据库技术类
互联网第三次科技革命
pi数据库计算点引用常数
计算机软件开发职业分析
关于网络安全纪录片
厦门软件开发工资低
smtp服务器验证
迷你型号的服务器
sql数据库2008ppt
ipv6与中国网络安全
数据库外键约束技巧
安卓软件开发学哪些语言
查网址服务器
达梦数据库下载linux