千家信息网

结构化Kafka sql的代码框架是怎样的

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,本篇文章给大家分享的是有关结构化Kafka sql的代码框架是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。结构化流的典型应用是持
千家信息网最后更新 2025年02月04日结构化Kafka sql的代码框架是怎样的

本篇文章给大家分享的是有关结构化Kafka sql的代码框架是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

结构化流的典型应用是持续的读取kafka流。实现机制从SparkSession的readStream开始,readStream就是DataStreamReader:

def readStream: DataStreamReader = new DataStreamReader(self)

下面从DataStreamReader开始。可以想象得到,最终肯定是生成一个RDD来持续读取kafka流数据。

例子:

// Create DataFrame representing the stream of input lines from connection to localhost:9999val lines = spark.readStream  .format("socket")  .option("host", "localhost")  .option("port", 9999)  .load()

分两步:找到TableProvider;找到SupportRead然后生成StreamingRelationV2。

最后用StreamingRelationV2来调用Dataset.ofRows返回DataFrame,DataFrame就是Dataset[Row]。

下面首先要看看TableProvider接口和SupportRead接口是啥东东。

TableProvider

TableProvider接口未找到在哪里定义。

KafkaSourceRDD

先看看kafkaSourceRDD这个类,这是基础类,最基础的来读取kafka数据的RDD,入参包含一个offsetRange,表示读取kafka数据的区间范围。如果是Kafka.lastest则可以表示永久读取kafka。

既然是RDD,那么最重要的方法就是compute方法了,代码不解析了很简单,就是用Kafka的API来读取kafka分区的数据,形成RDD。

KafkaSource

KafkaSource顾名思义就是Kafka的读取者。

KafkaSource的父类是Source,最重要的方法是:getOffset和getBatch。

getBatch返回DataFrame,那么getBatch又是怎么返回DataFrame的呢?看代码就知道原来是通过创建KafkaSourceRDD来达到生成DataFrame的目的的。所以可以认为KafkaSource是KafkaSourceRDD的一种封装形式罢了。

KafkaSourceProvider

The provider class for all Kafka readers and writers。这个类是用来生成各种各样的Kafka的读取者和写入者的,比较重要,先看看这个类的定义:

private[kafka010] class KafkaSourceProvider extends DataSourceRegister

with StreamSourceProvider

with StreamSinkProvider

with RelationProvider

with CreatableRelationProvider

with TableProvider

with Logging

继承了很多的特性或接口。比如:StreamSourceProvider、TableProvider、RelationProvider等等。我们这里就看看和读相关的特性吧,和写相关的不看了(道理差不多)。

(1)createSource

createSource方法返回Source,看代码其实返回的是KafkaSource,KafkaSource前面已经说过了,这里就不涉及了。

(2)createRelation

createRelation返回BaseRelation,实际返回的是KafkaRelation。

KafkaRelation继承BaseRelation,重写父 类的buildScan方法,buildScan方法返回KafkaSourceRDD作为RDD[Row]。

(3)KafkaTable

KafkaTable继承Table并且继承SupportsRead特性,其定义:

class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite

里面辗转反侧看看如何生成ContinuousStream,主要是方法toContinuousStream,返回的ContinuousStream就是KafkaContinuousStream。

(4)KafkaContinuousStream

KafkaContinuousStream继承自ContinuousStream,具体的看代码,最后反正都是调用了Kafka的API来读取数据,所不同的只是外部表现形式的不同罢了。

以上就是结构化Kafka sql的代码框架是怎样的,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

就是 方法 代码 数据 生成 接口 结构 结构化 重要 特性 框架 不同 基础 形式 更多 知识 篇文章 罢了 实用 辗转反侧 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 坦克世界怎么加服务器 曲靖市网络安全宣传标语 数据库模式称为什么 上海云服务器供货厂 口碑好的浙江戴尔服务器云空间 无极数据库读取版本对不上 ncbi数据库使用教程 网络安全宣传周主题板报 关于网络安全答题活动的新闻稿 网络安全内容150个字 知网的数据库几天一更新 只有ftp怎么连数据库 荆州华辉互联网科技有限公司 参观中国网络安全科技馆观后感 云服务器 数据安全性 做好网络安全维护 软件开发最终会使自己过时吗 手机nplayer新建服务器 仅开放一个tcp端口 网络安全 国家网络安全局需要什么专业 安仁软件开发工程师培训多少钱 网络安全教育板报初中 神武手游服务器互通 深圳mcu软件开发兼职 扬州智能化服务器厂家直供 医院网络安全组织建设规范 抽奖转盘软件开发 网络技术培养人才 联想公司服务器服务工程师 网络安全的困惑
0