千家信息网

Pulsar IO 中怎么调用Schema

发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,这篇文章给大家介绍Pulsar IO 中怎么调用Schema ,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。Schema 是一种描述数据的数据 。例如,数据库中表的信息和字段类型
千家信息网最后更新 2024年09月22日Pulsar IO 中怎么调用Schema

这篇文章给大家介绍Pulsar IO 中怎么调用Schema ,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

Schema 是一种描述数据的数据 。例如,数据库中表的信息和字段类型等都是 Schema。Pulsar 对 Schema 也有比较好的支持。


>>> Schema 简单应用 <<<

在使用 pub/sub 生产和消费消息时,可以通过以下代码使用 Schema:
              
public class SensorReading { public float temperature;
public SensorReading(float temperature) { this.temperature = temperature; }
// A no-arg constructor is required public SensorReading() { }
public float getTemperature() { return temperature; }
public void setTemperature(float temperature) { this.temperature = temperature; } } Producer producer = client.newProducer(JSONSchema.of(SensorReading.class)) .topic("my-topic") .create(); Consumer consumer = client.newConsumer(JSONSchema.of(SensorReading.class)) .topic("my-topic") .subscriptionName("my-subscription") .subscribe();


通过以上操作,生产者和消费者可以识别出关于 SensorReading 这个类的含义。这是 Schema 在客户端的应用,也是比较普遍的使用方法。

前文已经提到,Source 和 Sink 是对 pub/sub 的封装,因此,Schema 的应用也是基于以上原理。以下为详细说明。

>>> Source 中的 Schema <<<

在内建的 Sink 中,实现了一个 Consumer,用于接收从 Pulsar 发来的数据。
              if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {              schema = (Schema) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName(), true);          } else {              schema = (Schema) topicSchema.getSchema(topic, typeArg, conf.getSchemaType(), true);          }


getSerdeClassName 会获取用户指定的用于序列化与反序列化的类,通过指定 -- custom-serde-inputs 参数,从而构建真正的 Schema。
              
case NONE: return (Schema) Schema.BYTES;
case AUTO_CONSUME: case AUTO: return (Schema) Schema.AUTO_CONSUME();
case STRING: return (Schema) Schema.STRING;
case AVRO: return AvroSchema.of(SchemaDefinition.builder().withPojo(clazz).build());
case JSON: return JSONSchema.of(SchemaDefinition.builder().withPojo(clazz).build());
case KEY_VALUE: return (Schema)Schema.KV_BYTES();
case PROTOBUF: return ProtobufSchema.ofGenericClass(clazz, Collections.emptyMap()); }


关于Pulsar IO 中怎么调用Schema 就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

数据 应用 内容 序列 更多 帮助 消费 生产 不错 中表 代码 使用方法 信息 兴趣 原理 参数 可以通过 含义 字段 客户 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 数据库数据类型的查询向导怎么弄 选择服务器角色 访问数据库的思维导图 网络安全四个字的写法 公安落实网络安全综述 学校网络安全教育知识讲座 互联网科技股份公司部门职务 惠普服务器闪红灯 数据库是一个服务器吗 桓台快消品软件开发公司 增强网络安全意识手抄报内容 惠普380服务器安全模式 sql用什么模型构建数据库 哈尔滨应用软件开发前十名 黄冈市公安局网络安全支队 落实网络安全工作责任制报告 北京互联网软件开发价格表 广东第七城网络技术公司怎么样 软件开发 变量设计文档 反馈服务器为空是怎么回事 方舟怎么转服务器 做券商网络安全的公司 无名杀本地服务器未运行 腾讯公司用的数据库 新路由3改打印服务器 济南直播软件开发外包公司 赵志宏共建网络安全环境 江苏省水利厅网络安全 网络安全定义属性 顺科软件开发
0