怎么进行Pulsar Kafka Client的简单分析
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇文章给大家分享的是有关怎么进行Pulsar Kafka Client的简单分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。为了方便
千家信息网最后更新 2025年02月03日怎么进行Pulsar Kafka Client的简单分析为了方便 Kafka 用户使用 Pulsar,Pulsar 对 Kafka Client 做了一些封装,让 Kafka 用户更方便的使用 Pulsar。
下面主要介绍 Kafka Client 如何将消息发送到 Pulsar, 并从 Pulsar 消费消息,以及如何使用 Pulsar Schema。
依赖引入了 Kafka 的 0.10.2.1 版本的客户端,还有 Pulsar 对 Kafka Client 封装后的客户端。 在上述配置中 topic 是指 Pulsar 中的 Topic,接着使用 Kafka 的配置方式来初始化各种配置,包括 Server 地址、key 的序列化与 value 的序列化类,然后构造一个 ProducerRecord 的类将其发送出去。 有些配置同生产者代码的配置是类似的,例如 topic,Server 等。另外使用 Kafka 的 group.id 作为配置 Pulsar 中的订阅名称,关闭自动提交,在消费者端为 key 和 value 配置的是反序列化的类。然后同常规的消费者类似,开始消费消息。
在上述情况中使用的是 Kafka 的 Schema 来进行序列化与反序列化,当然也支持使用 Pulsar 的 Schema 来进行此过程。下面使用 AVRO 进行简单的介绍。 首先定义 Schema 所需要使用的 pojo 类。 可以看到大部分配置同上面使用 Kafka Client 的配置是类似的,但是中间加入了一些 Pulsar 的 Schema,使用 Foo 作为 key,使用 Bar 类作为 value。 消费者端同样是类似的配置,使用与生产者端相同的 Schema 进行数据的反序列化。
本篇文章给大家分享的是有关怎么进行Pulsar Kafka Client的简单分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
⌨️ 引入依赖
org.apache.pulsar pulsar-client-kafka {project.version}
⌨️ 使用 Kafka Schema
>>> 添加生产者代码
String topic = "persistent://public/default/test";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
Producer
producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord
(topic, i, Integer.toString(i))); }
producer.close();
>>> 添加消费者代码
String topic = "persistent://public/default/test";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
@SuppressWarnings("resource")
Consumer
consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords
records = consumer.poll(100); records.forEach(record -> {
log.info("Received record: {}", record);
});
// Commit last offset
consumer.commitSync();
}
⌨️ 使用 Pulsar Schema
@Data
@ToString
@EqualsAndHashCode
public class Foo {
@Nullable
private String field1;
@Nullable
private String field2;
private int field3;
}
@Data
@ToString
@EqualsAndHashCode
public class Bar {
private boolean field1;
}
>>> 生产者端代码
String topic = "persistent://public/default/test-avro";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
AvroSchema
barSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Bar.class).build()); AvroSchema
fooSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
Producer
producer = new KafkaProducer<>(props, fooSchema, barSchema);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord
(topic, i, foo, bar)); log.info("Message {} sent successfully", i);
}
producer.close();
>>> 消费者端代码
String topic = "persistent://public/default/test-avro";
Properties props = new Properties();
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
AvroSchema
barSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Bar.class).build()); AvroSchema
fooSchema = AvroSchema.of(SchemaDefinition. builder().withPojo(Foo.class).build());
Bar bar = new Bar();
bar.setField1(true);
Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
@SuppressWarnings("resource")
Consumer
consumer = new PulsarKafkaConsumer<>(props, fooSchema, barSchema); consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords
records = consumer.poll(100); records.forEach(record -> {
log.info("Received record: {}", record);
});
// Commit last offset
consumer.commitSync();
}
以上就是怎么进行Pulsar Kafka Client的简单分析,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
配置
消费
序列
代码
消费者
消息
生产者
生产
分析
客户
客户端
更多
用户
知识
篇文章
封装
实用
相同
产者
名称
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
web系统开发与软件开发
江西学而互联网科技有限公司
mvc文件上传至数据库
税务系统网络安全宣传总结
从数据库截取字符串
cf进去服务器很慢
进入游戏服务器怎么看不见
优质软件开发行业排行榜
上海佳速网络技术有...
数据库中m和n是什么意思
相机显示影像数据库文件错误
联想服务器有哪些
服务器网络维护怎么做
公司网络服务器对孕妇有影响吗
移动adsl电信服务器怎么样
北京推广网络技术咨询口碑推荐
域名服务器的分级
计算机软件开发是什么职业
北邮网络安全考研报考人数
网络安全产品与方案云智
扬州网络安全准入控制研发公司
计算机网络技术建设背景
u8的加密服务器在哪修改
济南租服务器哪里好
网络安全密钥英文缩写
移动adsl电信服务器怎么样
工业园区诚信软件开发服务电话
保密网络安全清查
Dell服务器r750xa
全国公安院校网络技术学院