千家信息网

如何进行flink中的kafka源码分析

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,今天就跟大家聊聊有关如何进行flink中的kafka源码分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。最近一直在弄flink sql相关的
千家信息网最后更新 2025年01月31日如何进行flink中的kafka源码分析

今天就跟大家聊聊有关如何进行flink中的kafka源码分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

最近一直在弄flink sql相关的东西,第一阶段的目标是从解决kafka的消费和写入的问题。不过也有些同学并不是很了解,今天我们来详细分析一下包的继承层次。

flink源码如下:

public class KafkaTableSourceFactory implements StreamTableSourceFactory{    private ConcurrentHashMap kafkaTableSources = new ConcurrentHashMap<>();    @Override    public Map requiredContext() {        Map context = new HashMap<>();        context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE);        context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION));        return context;    }    @Override    public List supportedProperties() {        List properties = new ArrayList<>();        properties.add(KafkaConnectorDescriptor.DATABASE_KEY);        properties.add(KafkaConnectorDescriptor.TABLE_KEY);        return properties;    }    @Override    public StreamTableSource createStreamTableSource(Map properties) {        //避免频繁的触发 是否需要加缓存        KafkaTableSource kafkaTableSource;        String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY);        String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY);        if (!kafkaTableSources.containsKey(dataBase + table)) {            Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder();            kafkaTableSource = builder                    .cluster(dataBase)                    .subject(table)                    .build();            kafkaTableSources.put(dataBase + table,kafkaTableSource);        } else {            kafkaTableSource = kafkaTableSources.get(dataBase + table);        }        return kafkaTableSource;    }}
class Kafka08PBTableSource protected(topic: String,                                     properties: Properties,                                     schema: TableSchema,                                     typeInformation: TypeInformation[Row],                                     paramMap: util.LinkedHashMap[String, AnyRef],                                     entryClass: String)  extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) {  override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = {    this.setStartupMode(StartupMode.EARLIEST)    new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest()  }}

下面用户自定义的kafka的sink类:

class Kafka08UDMPBTableSink (topic: String,                              properties: Properties,                              partitioner: Optional[FlinkKafkaPartitioner[Row]],                              paramMap: util.LinkedHashMap[String, AnyRef],                              serializationSchema: SerializationSchema[Row],                              fieldNames: Array[String],                              fieldTypes: Array[TypeInformation[_]]                            ) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) {  override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={    new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row]))  }  override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema  override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes)  override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = {    super.configure(this.fieldNames, this.fieldTypes)  }  override def getFieldNames: Array[String]=this.fieldNames  /** Returns the types of the table fields. */  override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes  override def emitDataStream(dataStream: DataStream[Row]): Unit = {    val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner)    dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames))  }}
public class TrackRowDeserializationSchema implements SerializationSchema, DeserializationSchema {    private static final long serialVersionUID = -2885556750743978636L;    /** Type information describing the input type. */    private TypeInformation typeInfo = null;    private LinkedHashMap paraMap;    private String inSchema;    private String outSchema;    private String inClass;    private String outClass;}
public class TrackRowFormatFactory extends TableFormatFactoryBase        implements SerializationSchemaFactory, DeserializationSchemaFactory {    public TrackRowFormatFactory() {        super(TrackValidator.FORMAT_TYPE_VALUE, 1, false);    }    public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) {        super(type, version, supportsSchemaDerivation);    }    @Override    protected List supportedFormatProperties() {        final List properties = new ArrayList<>();        properties.add(TrackValidator.FORMAT_IN_SCHEMA);        properties.add(TrackValidator.FORMAT_IN_CLASS);        properties.add(TrackValidator.FORMAT_OUT_CLASS);        properties.add(TrackValidator.FORMAT_OUT_SCHEMA);        properties.add(TrackValidator.FORMAT_TYPE_INFORMATION);        properties.add(TrackValidator.FORMAT_TYPE_VALUE);        return properties;    }}

看完上述内容,你们对如何进行flink中的kafka源码分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0