千家信息网

怎么进行Pulsar Kafka Client的简单分析

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇文章给大家分享的是有关怎么进行Pulsar Kafka Client的简单分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。为了方便
千家信息网最后更新 2025年02月03日怎么进行Pulsar Kafka Client的简单分析

本篇文章给大家分享的是有关怎么进行Pulsar Kafka Client的简单分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

为了方便 Kafka 用户使用 Pulsar,Pulsar 对 Kafka Client 做了一些封装,让 Kafka 用户更方便的使用 Pulsar。

下面主要介绍 Kafka Client 如何将消息发送到 Pulsar, 并从 Pulsar 消费消息,以及如何使用 Pulsar Schema。

⌨️ 引入依赖

  org.apache.pulsar  pulsar-client-kafka  {project.version}
依赖引入了 Kafka 的 0.10.2.1 版本的客户端,还有 Pulsar 对 Kafka Client 封装后的客户端。

⌨️ 使用 Kafka Schema

>>> 添加生产者代码

String topic = "persistent://public/default/test";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());
Producer producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord(topic, i, Integer.toString(i)));}
producer.close();
在上述配置中 topic 是指 Pulsar 中的 Topic,接着使用 Kafka 的配置方式来初始化各种配置,包括 Server 地址、key 的序列化与 value 的序列化类,然后构造一个 ProducerRecord 的类将其发送出去。

>>> 添加消费者代码

String topic = "persistent://public/default/test";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());
@SuppressWarnings("resource")Consumer consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));
while (true) { ConsumerRecords records = consumer.poll(100); records.forEach(record -> { log.info("Received record: {}", record); });
// Commit last offset consumer.commitSync();}
有些配置同生产者代码的配置是类似的,例如 topic,Server 等。另外使用 Kafka 的 group.id 作为配置 Pulsar 中的订阅名称,关闭自动提交,在消费者端为 key 和 value 配置的是反序列化的类。然后同常规的消费者类似,开始消费消息。

⌨️ 使用 Pulsar Schema

在上述情况中使用的是 Kafka 的 Schema 来进行序列化与反序列化,当然也支持使用 Pulsar 的 Schema 来进行此过程。下面使用 AVRO 进行简单的介绍。
首先定义 Schema 所需要使用的 pojo 类。
@Data@ToString@EqualsAndHashCodepublic class Foo {    @Nullable    private String field1;    @Nullable    private String field2;    private int field3;}
@Data@ToString@EqualsAndHashCodepublic class Bar { private boolean field1;}

>>> 生产者端代码

String topic = "persistent://public/default/test-avro";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());
AvroSchema barSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(Bar.class).build());AvroSchema fooSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build());
Bar bar = new Bar();bar.setField1(true);
Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);

Producer producer = new KafkaProducer<>(props, fooSchema, barSchema);
for (int i = 0; i < 10; i++) { producer.send(new ProducerRecord(topic, i, foo, bar)); log.info("Message {} sent successfully", i);}
producer.close();
可以看到大部分配置同上面使用 Kafka Client 的配置是类似的,但是中间加入了一些 Pulsar 的 Schema,使用 Foo 作为 key,使用 Bar 类作为 value。

>>> 消费者端代码

String topic = "persistent://public/default/test-avro";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());
AvroSchema barSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(Bar.class).build());AvroSchema fooSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build());
Bar bar = new Bar();bar.setField1(true);
Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);
@SuppressWarnings("resource")Consumer consumer = new PulsarKafkaConsumer<>(props, fooSchema, barSchema);consumer.subscribe(Arrays.asList(topic));
while (true) { ConsumerRecords records = consumer.poll(100); records.forEach(record -> { log.info("Received record: {}", record); });
// Commit last offset consumer.commitSync();}
消费者端同样是类似的配置,使用与生产者端相同的 Schema 进行数据的反序列化。

以上就是怎么进行Pulsar Kafka Client的简单分析,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

配置 消费 序列 代码 消费者 消息 生产者 生产 分析 客户 客户端 更多 用户 知识 篇文章 封装 实用 相同 产者 名称 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 web系统开发与软件开发 江西学而互联网科技有限公司 mvc文件上传至数据库 税务系统网络安全宣传总结 从数据库截取字符串 cf进去服务器很慢 进入游戏服务器怎么看不见 优质软件开发行业排行榜 上海佳速网络技术有... 数据库中m和n是什么意思 相机显示影像数据库文件错误 联想服务器有哪些 服务器网络维护怎么做 公司网络服务器对孕妇有影响吗 移动adsl电信服务器怎么样 北京推广网络技术咨询口碑推荐 域名服务器的分级 计算机软件开发是什么职业 北邮网络安全考研报考人数 网络安全产品与方案云智 扬州网络安全准入控制研发公司 计算机网络技术建设背景 u8的加密服务器在哪修改 济南租服务器哪里好 网络安全密钥英文缩写 移动adsl电信服务器怎么样 工业园区诚信软件开发服务电话 保密网络安全清查 Dell服务器r750xa 全国公安院校网络技术学院
0