怎么用C语言与java实现kafka avro生产者和消费者
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,本篇内容介绍了"怎么用C语言与java实现kafka avro生产者和消费者"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔
千家信息网最后更新 2025年02月02日怎么用C语言与java实现kafka avro生产者和消费者
本篇内容介绍了"怎么用C语言与java实现kafka avro生产者和消费者"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
原始数据格式
请求IP 应答IP 域名 类型
3183375114 3729673322 "mx.hc.spdrb.com" A
以上数据是test文件的内容
schema定义如下
{
"type":"record",
"name":"data",
"fields":
[
{"name":"qip","type":"long"},
{"name":"aip","type":"long"},
{"name":"domain","type":"string"},
{"name":"type","type":"string"}
]
}
C语言生产者代码如下
#include#include #include #include #include "avro.h" #include "producer.h"const char PERSON_SCHEMA[] = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}";const char *file = "avro_file.dat";const char *brokers = "xxxxx:9092"; const char *topic = "topic1";void print_avro_value(avro_value_t *value) { char *json; if (!avro_value_to_json(value, 1, &json)) { printf("%s\n", json); free(json); } }if (avro_schema_from_json(PERSON_SCHEMA, sizeof(PERSON_SCHEMA), &test_schema, &error)) { fprintf(stderr, "schema error\n"); exit(EXIT_FAILURE);}return test_schema;avro_schema_t init_schema() { avro_schema_t test_schema; avro_schema_error_t error;}void add_data(avro_writer_t writer, avro_schema_t schema, int64_t qip, uint64_t aip, const char* domain, const char* type) { avro_datum_t data = avro_record(schema); avro_datum_t dqip = avro_int64(qip); avro_datum_t daip = avro_int64(aip); avro_datum_t ddomain = avro_string(domain); avro_datum_t dtype = avro_string(type); avro_record_set(data, "qip", dqip); avro_record_set(data, "aip", daip); avro_record_set(data, "domain", ddomain); avro_record_set(data, "type", dtype); avro_write_data(writer, NULL, f2c); avro_datum_decref(dqip); avro_datum_decref(daip); avro_datum_decref(ddomain); avro_datum_decref(dtype); avro_datum_decref(data);}int main(int argc, char* argv[]) { int len = 0; avro_schema_t schema; avro_writer_t mem_writer; char buf[1024]; char tmp[4][500]={{0x00}}; FILE *fp = fopen("test","r"); if(!fp) { printf("open test file error!\n"); return -1; } schema = init_schema(); mem_writer = avro_writer_memory(buf, 1024); while(fgets(buf, 1024,fp)!=NULL) { if(buf[strlen(buf)] == '\n') buf[strlen(buf)] = '\0'; if(sscanf(buf, "%s%s%s%s", tmp[0],tmp[1],tmp[2],tmp[3])!=4) continue; add_data(mem_writer,schema,atol(tmp[0]),atol(tmp[1]),tmp[2],tmp[3]); printf("data len = %ld\n", avro_writer_tell(mem_writer)); len = avro_writer_tell(mem_writer); kafka_putdata(buf, len,brokers,topic);//librdkafka实现的生产者代码 未列出 memset(tmp, 0x00, sizeof(tmp)); memset(buf, 0x00, sizeof(buf)); avro_writer_reset(mem_writer); } fclose(fp); avro_writer_free(mem_writer); return 0;}
C语言实现的消费者如下
#include "consumer.h" #include "avro.h" #include#include const char *brokers = "xxxx:9092"; const char *topic = "topic1"; const char *group = "avrotest"; const char PERSON_SCHEMA[] = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}";avro_schema_t init_schema() { avro_schema_t test_schema; avro_schema_error_t error; if (avro_schema_from_json(PERSON_SCHEMA, sizeof(PERSON_SCHEMA), &test_schema, &error)) { fprintf(stderr, "schema error\n"); exit(EXIT_FAILURE); } return test_schema;}void print_data(avro_reader_t reader, avro_schema_t schema) { avro_datum_t data; if(avro_read_data(reader, schema, schema, &data) == 0) { int64_t qip; int64_t aip; char *domain; char *type; avro_datum_t q_datum,a_datum,d_datum,t_datum; avro_record_get(data, "qip", &q_datum); avro_int64_get(q_datum, &qip); avro_record_get(data, "aip", &a_datum); avro_int64_get(a_datum, &aip); avro_record_get(data, "domain", &d_datum); avro_string_get(d_datum, &domain); avro_record_get(data, "type", &t_datum); avro_string_get(t_datum, &type); printf("qip: %lld, aip: %lld,domain: %s,type:%s\n", qip,aip,domain,type); avro_datum_decref(data);}int main(int argc, char* argv[]) { rd_kafka_t *rk; rd_kafka_topic_partition_list_t *topics; if(initKafka(&rk, brokers, group, topic, &topics)<0){return -1;} char buf[1024] = {0x00}; int len = 0; avro_schema_t schema; avro_reader_t mem_reader; schema = init_schema(); mem_reader = avro_reader_memory(buf, 1024); while(1) { get_consumer_msg(rk, buf, &len); //librdkafka实现的消费者 代码未列出 if(len == 0) continue; printf("len=%d\n",len); print_data(mem_reader,schema); avro_reader_reset(mem_reader); memset(buf, 0x00, sizeof(buf)); } return 0;}
C编译的Makefile如下 两个C程序通用
TARGET=avro-test INCLUDE=./avrolib/include/ SLIB=./avrolib/lib/libavro.a DLIB=-lz -llzma -lrdkafka INC = -I. -I./avrolib/include SOURCES =$(wildcard *.c) OBJECTS =$(SOURCES:.c=.o) RM=rm -rf CC=gcc -g CFLAGS= -Wall $(INC) all:$(TARGET) $(TARGET): $(OBJECTS) $(CC) -o $@ $? $(SLIB) $(DLIB) $(CFLAGS) :$(SOURCES) $(CC) -c clean: $(RM) $(TARGET) $(OBJECTS) *~
java消费者 gradle配置
dependencies { testCompile group: 'junit', name: 'junit', version: '4.12' compile group: 'org.apache.avro', name: 'avro', version: '1.9.1' compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0' }
avro解析 借鉴别人 言作者未知 请作者见谅
package zc;import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.specific.SpecificDatumReader;import java.io.IOException;public class MyRecordDecoder { public static GenericRecord genericRecord; datumReader; static MyRecordDecoder myRecordDecoder = new MyRecordDecoder(); final String USER_SCHEMA = "{"type":"record","name":"data","fields":[{"name":"qip","type":"long"},{"name":"aip","type":"long"},{"name":"domain","type":"string"},{"name":"type","type":"string"}]}"; public MyRecordDecoder() { Schema schema = null; schema = new Schema.Parser().parse(USER_SCHEMA); datumReader = new SpecificDatumReader(schema); } public GenericRecord getGenericRecord(BinaryDecoder decoder, byte[] value) throws IOException{ return datumReader.read(null, decoder); } public static MyRecordDecoder getInstance() { if (myRecordDecoder==null) myRecordDecoder = new MyRecordDecoder(); return myRecordDecoder; }}
java消费者 package zc;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections; import java.util.Properties;public class KafkaMessageAvro{ public static void main(String[] args) throws Exception { String inTopic = args[0]; Properties props = new Properties(); props.setProperty("bootstrap.servers", "xxxxx:9092"); props.setProperty("group.id", "flink-topn-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); KafkaConsumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(inTopic)); try { while (true) { ConsumerRecords records = consumer.poll(1000); for (ConsumerRecord record : records) { byte[] ss = record.value(); if (ss==null) { continue; } System.out.println(ss.toString()); GenericRecord genericRecord = null; BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(ss, null); while (!decoder.isEnd()) { genericRecord = MyRecordDecoder.getInstance().getGenericRecord(decoder, ss); System.out.println(genericRecord.get("qip").toString()+" "+genericRecord.get("aip").toString()+" "+genericRecord.get("domain").toString()+" "+genericRecord.get("type").toString()); } } } } finally { consumer.close(); }}
"怎么用C语言与java实现kafka avro生产者和消费者"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
消费者
消费
生产者
语言
生产
代码
内容
作者
数据
更多
知识
原始
实用
学有所成
接下来
两个
困境
域名
实际
情况
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
深蓝网络技术
mac电脑服务器地址
东莞社交软件开发费用是多少
软件开发动态建模
个性化网络技术服务哪里好
有关网络技术的理念
杭州手机软件开发哪家正规
轻松有效学习网络技术
中国香港超频服务器解决方案
网吧的服务器有没有内存条
供电合规库边界网络安全风险评估
软件开发南宁
浮动型数据库怎么连
开展网络安全大会
wps表格查找一行数据库
全面战争战锤怎么连接服务器
国内顶尖的网络安全企业
数据库跨进程事务
印度股票数据库
如何使用云服务器数据库
大数据中心网络安全管理办法
3d设计软件开发计划书
网络安全周启动p
帝释天网络技术
无锡本地服务器租用
王者多久不登服务器会注销
做软件开发电脑要求
RISC架构服务器
国家网络安全宣传周团会总结
海南风战网络技术有限公司