kafka javaAPI入库程序的实现方法
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要讲解了"kafka javaAPI入库程序的实现方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"kafka javaAPI入库程序的实现
千家信息网最后更新 2025年02月03日kafka javaAPI入库程序的实现方法
这篇文章主要讲解了"kafka javaAPI入库程序的实现方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"kafka javaAPI入库程序的实现方法"吧!
讲解
maven导包
org.apache.kafka kafka-clients 2.3.0
连接kafka
Properties props = new Properties();props.put("acks", "all"); //保证所有副本接受到消息props.put("bootstrap.servers", Config.ipList); //可设置多个props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");props.put("retries", "2");KafkaProducerproduce= new KafkaProducer (props);
kerberos认证
kerberos是大数据平台的安全认证策略,可在项目启动时先一步完成。这里介绍两种实现方式。
方式一
指定认证文件
//加载keberos配置文件System.setProperty("java.security.krb5.conf", "/etc/krb5.conf"); //加载kerberos用户文件System.setProperty("java.security.auth.login.config", "/etc/kafka/conf/kafka_jaas.conf");
方式二
某些时候,考虑到用户切换,不同机器,有不同的用户信息,每个都要通过配置文件设置,比较麻烦,考虑使用java的启动的临时文件功能(主要是炫技--微笑)。
//加载keberos配置文件System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");KafkaUtil.configureJAAS(Config.tabFile, Config.principal); //用户和认证文件/** * 生成jaas.conf临时文件 * @param keyTab tab认证文件位置 * @param principal 认证用户 */public static void configureJAAS(String keyTab, String principal) { String JAAS_TEMPLATE = "KafkaClient {\n" + "com.sun.security.auth.module.Krb5LoginModule required\n" + "useKeyTab=true\n" + "keyTab=\"%1$s\"\n" + "principal=\"%2$s\";\n" + "};"; String content = String.format(JAAS_TEMPLATE, keyTab, principal); File jaasConf = null; PrintWriter writer = null; try { jaasConf = File.createTempFile("jaas", ".conf"); writer = new PrintWriter(jaasConf); writer.println(content); } catch (IOException e) { e.printStackTrace(); } finally { if (writer != null) { writer.close(); } jaasConf.deleteOnExit(); } System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath()); }
应用
实际线上使用时,考虑到数据传输效率和稳定性,要做以下优化。
传输类为线程类,线程池管理,增加传输效率。
批量上传数据。
添加Callback处理机制,避免数据丢失。
上传线程类如下。
public class Performance extends Thread{ private final static Logger log = LoggerFactory.getLogger(Performance.class); private List> recordList; public Performance(List > recordList) { this.recordList=recordList; } /** *入库测试方法 */ public static void test() { log.info("Kafka Tool Test"); try { /* parse args */ String topicName ="test40"; /*总发包数*/ long numRecords = 10000000000L; /*包大小*/ int recordSize = 1500; /*每次最多发送包数*/ int throughput = 10000000; Properties props = new Properties(); props.put("acks", "1"); props.put("bootstrap.servers","ip:6667,ip:6667"); props.put("sasl.kerberos.service.name", "kafka"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaProducer producer = new KafkaProducer (props); /* 创建测试数据 */ byte[] payload = new byte[recordSize]; Random random = new Random(0); for (int i = 0; i < payload.length; ++i) payload[i] = (byte) (random.nextInt(26) + 65); /*创建测试数据发送对象*/ ProducerRecord record = new ProducerRecord (topicName, payload); /*测试数据模型 包总数*/ Stats stats = new Stats(numRecords, 5000); /*启动时间*/ long startMs = System.currentTimeMillis(); /*帮助生成者发送流量类 每次最多发送包数 时间*/ ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); for (int i = 0; i < numRecords; i++) { long sendStartMs = System.currentTimeMillis(); Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats,record.topic(),record.value()); producer.send(record, cb); if (throttler.shouldThrottle(i, sendStartMs)) { throttler.throttle(); } } /* 结束任务 */ producer.close(); stats.printTotal(); } catch (Exception e) { log.info("Test Error:"+e); } } /** * 实际入库方法 */ @Override public void run() {// log.info("Start To Send:"); super.run(); KafkaUtil kafkaUtil=new KafkaUtil(); KafkaProducer produce=kafkaUtil.create(); //总包数 long size=recordList.size();// size=10000000000L; /*每次最多发送包数*/ int throughput = 900000;// throughput = 10000000; /*测试数据模型 包总数*/ Stats stats = new Stats(size, 5000); /*启动时间*/ long startMs = System.currentTimeMillis(); /*帮助生成者发送流量类 每次最多发送包数 时间*/ ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); int i=0; for (ProducerRecord record:recordList) { long sendStartMs = System.currentTimeMillis(); //参数说明:发送数据时间 数据长度 数据模型类 Callback cb = stats.nextCompletion(sendStartMs, record.value().length, stats,record.topic(),record.value()); produce.send(record,cb); if (throttler.shouldThrottle(i, sendStartMs)) { throttler.throttle(); } i++; } produce.close();// stats.printTotal();// log.info("End to Send"); log.info("Finish Data To Send"); LogModel.sendNum++; } private static class Stats { private long start; private long windowStart; private int[] latencies; private int sampling; private int iteration; private int index; private long count; private long bytes; private int maxLatency; private long totalLatency; private long windowCount; private int windowMaxLatency; private long windowTotalLatency; private long windowBytes; private long reportingInterval; public Stats(long numRecords, int reportingInterval) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.index = 0; this.iteration = 0; this.sampling = (int) (numRecords / Math.min(numRecords, 500000)); this.latencies = new int[(int) (numRecords / this.sampling) + 1]; this.index = 0; this.maxLatency = 0; this.totalLatency = 0; this.windowCount = 0; this.windowMaxLatency = 0; this.windowTotalLatency = 0; this.windowBytes = 0; this.totalLatency = 0; this.reportingInterval = reportingInterval; } public void record(int iter, int latency, int bytes, long time) { this.count++; this.bytes += bytes; this.totalLatency += latency; this.maxLatency = Math.max(this.maxLatency, latency); this.windowCount++; this.windowBytes += bytes; this.windowTotalLatency += latency; this.windowMaxLatency = Math.max(windowMaxLatency, latency); if (iter % this.sampling == 0) { this.latencies[index] = latency; this.index++; } /* maybe report the recent perf */ if (time - windowStart >= reportingInterval) { printWindow(); newWindow(); } } public Callback nextCompletion(long start, int bytes, Stats stats,String topic,byte[] data) { Callback cb = new PerfCallback(this.iteration, start, bytes, stats,topic,data); this.iteration++; return cb; } /** * 传输效率反馈 */ public void printWindow() { long ellapsed = System.currentTimeMillis() - windowStart; double recsPerSec = 1000.0 * windowCount / (double) ellapsed; double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0); System.out.printf("%d spend time,%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n", ellapsed, windowCount, recsPerSec, mbPerSec, windowTotalLatency / (double) windowCount, (double) windowMaxLatency); } public void newWindow() { this.windowStart = System.currentTimeMillis(); this.windowCount = 0; this.windowMaxLatency = 0; this.windowTotalLatency = 0; this.windowBytes = 0; } /** * 传输效率 */ public void printTotal() { long elapsed = System.currentTimeMillis() - start; double recsPerSec = 1000.0 * count / (double) elapsed; double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); System.out.printf("%d spend time,%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n", elapsed, count, recsPerSec, mbPerSec, totalLatency / (double) count, (double) maxLatency, percs[0], percs[1], percs[2], percs[3]); } private static int[] percentiles(int[] latencies, int count, double... percentiles) { int size = Math.min(count, latencies.length); Arrays.sort(latencies, 0, size); int[] values = new int[percentiles.length]; for (int i = 0; i < percentiles.length; i++) { int index = (int) (percentiles[i] * size); values[i] = latencies[index]; } return values; } } private static final class PerfCallback implements Callback { private final long start; private final int iteration; private final int bytes; private final Stats stats; private final String topic; private final byte[] data; public PerfCallback(int iter, long start, int bytes, Stats stats,String topic,byte[] data) { this.start = start; this.stats = stats; this.iteration = iter; this.bytes = bytes; this.topic=topic; this.data=data; } public void onCompletion(RecordMetadata metadata, Exception exception) { long now = System.currentTimeMillis(); int latency = (int) (now - start); this.stats.record(iteration, latency, bytes, now); if (exception != null){ ProducerRecord record=new ProducerRecord (topic,data); //将数据重新添加入数据队列,二次上传 ControlTask.recordList.add(record); log.error("Send Error And Second To Send",exception); } } }}
KafkaUtil.java
public class KafkaUtil {// private final static Logger log = LoggerFactory.getLogger(KafkaUtil.class); private KafkaProducerproduce; /** * 创建连接 * @return */ public KafkaProducer create(){ Properties props = new Properties(); props.put("acks", "all"); props.put("bootstrap.servers", Config.ipList); props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");// props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 120000); //增加等待时间 props.put("retries", "2"); //kerbores安全认证 if(Config.kerberos==0){ props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "GSSAPI"); props.put("sasl.kerberos.service.name", "kafka"); } produce = new KafkaProducer (props); return produce; } /** * 发送数据 * @param record * @param cb */ public void send(ProducerRecord record,Callback cb){ produce.send(record,cb); } /** * 关闭连接 * @param produce */ public void close(){ produce.flush(); produce.close(); } /** * 生成jaas.conf临时文件 * @param keyTab tab认证文件位置 * @param principal 认证用户 */ public static void configureJAAS(String keyTab, String principal) { String JAAS_TEMPLATE = "KafkaClient {\n" + "com.sun.security.auth.module.Krb5LoginModule required\n" + "useKeyTab=true\n" + "keyTab=\"%1$s\"\n" + "principal=\"%2$s\";\n" + "};"; String content = String.format(JAAS_TEMPLATE, keyTab, principal); File jaasConf = null; PrintWriter writer = null; try { jaasConf = File.createTempFile("jaas", ".conf"); writer = new PrintWriter(jaasConf); writer.println(content); } catch (IOException e) { e.printStackTrace(); } finally { if (writer != null) { writer.close(); } jaasConf.deleteOnExit(); } System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath()); }}
感谢各位的阅读,以上就是"kafka javaAPI入库程序的实现方法"的内容了,经过本文的学习后,相信大家对kafka javaAPI入库程序的实现方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
文件
认证
方法
时间
用户
传输
测试
程序
效率
生成
方式
模型
线程
学习
配置
不同
安全
位置
内容
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
运维工程师的云服务器
qq服务器安全设置
杨浦区数据软件开发优势
炫舞连接服务器失败
h3h网络安全工程师
军用通信网络安全
项目式场景化教学软件开发
软件开发和硬件测试哪个有前途
数据库居民信息登记怎么做
大连软件开发基地
服务器如何做企业管理软件
网络安全法的实施时间及意义
app移动办公无法连接服务器
嘉兴南湖区高端软件开发
嵌入式图形数据库论文2000字
aaa软件开发怎么样
数据库系统工程师资料详解
数据库三张表tigger
姑苏区数据网络技术怎么样
三级网络技术地址类别
删数据库是什么语句
融集网络技术
cf服务器显示满号
上海鲸彩软件开发有限公司
数据库的典型数据模型
媒体广电网络安全论文
软件开发需要什么电脑软件
网络技术公司财务建账
辛集微型企业财务软件网络技术
网络安全手抄报有20个字