千家信息网

kafka javaAPI入库程序的实现方法

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要讲解了"kafka javaAPI入库程序的实现方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"kafka javaAPI入库程序的实现
千家信息网最后更新 2025年02月03日kafka javaAPI入库程序的实现方法

这篇文章主要讲解了"kafka javaAPI入库程序的实现方法",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"kafka javaAPI入库程序的实现方法"吧!

讲解

maven导包

      org.apache.kafka      kafka-clients      2.3.0  

连接kafka

Properties props = new Properties();props.put("acks", "all");  //保证所有副本接受到消息props.put("bootstrap.servers", Config.ipList);  //可设置多个props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");props.put("retries", "2");KafkaProducer produce= new KafkaProducer(props);

kerberos认证

  • kerberos是大数据平台的安全认证策略,可在项目启动时先一步完成。这里介绍两种实现方式。

方式一

  • 指定认证文件

//加载keberos配置文件System.setProperty("java.security.krb5.conf", "/etc/krb5.conf"); //加载kerberos用户文件System.setProperty("java.security.auth.login.config", "/etc/kafka/conf/kafka_jaas.conf");

方式二

  • 某些时候,考虑到用户切换,不同机器,有不同的用户信息,每个都要通过配置文件设置,比较麻烦,考虑使用java的启动的临时文件功能(主要是炫技--微笑)。

//加载keberos配置文件System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");KafkaUtil.configureJAAS(Config.tabFile, Config.principal);  //用户和认证文件/** * 生成jaas.conf临时文件 * @param keyTab  tab认证文件位置 * @param principal 认证用户 */public static void configureJAAS(String keyTab, String principal) {  String JAAS_TEMPLATE =            "KafkaClient {\n"            + "com.sun.security.auth.module.Krb5LoginModule required\n" +              "useKeyTab=true\n" +              "keyTab=\"%1$s\"\n" +              "principal=\"%2$s\";\n"            + "};";      String content = String.format(JAAS_TEMPLATE, keyTab, principal);          File jaasConf = null;      PrintWriter writer = null;    try {        jaasConf  = File.createTempFile("jaas", ".conf");        writer = new PrintWriter(jaasConf);        writer.println(content);            } catch (IOException e) {        e.printStackTrace();    } finally {                  if (writer != null) {              writer.close();          }          jaasConf.deleteOnExit();    }    System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath());    }

应用

  • 实际线上使用时,考虑到数据传输效率和稳定性,要做以下优化。

    • 传输类为线程类,线程池管理,增加传输效率。

    • 批量上传数据。

    • 添加Callback处理机制,避免数据丢失。

  • 上传线程类如下。

public class Performance extends Thread{        private final static Logger log = LoggerFactory.getLogger(Performance.class);                        private List> recordList;        public Performance(List> recordList) {                this.recordList=recordList;        }                /**     *入库测试方法     */    public static void test() {                 log.info("Kafka Tool Test");                try {                    /* parse args */                    String topicName ="test40";                    /*总发包数*/                    long numRecords = 10000000000L;                    /*包大小*/                    int recordSize = 1500;                    /*每次最多发送包数*/                    int throughput = 10000000;                                        Properties props = new Properties();                    props.put("acks", "1");                        props.put("bootstrap.servers","ip:6667,ip:6667");                        props.put("sasl.kerberos.service.name", "kafka");                        props.put("security.protocol", "SASL_PLAINTEXT");                                            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");                                        KafkaProducer producer = new KafkaProducer(props);                                        /* 创建测试数据 */                    byte[] payload = new byte[recordSize];                    Random random = new Random(0);                    for (int i = 0; i < payload.length; ++i)                        payload[i] = (byte) (random.nextInt(26) + 65);                                                            /*创建测试数据发送对象*/                    ProducerRecord record = new ProducerRecord(topicName, payload);                                        /*测试数据模型  包总数*/                    Stats stats = new Stats(numRecords, 5000);                                        /*启动时间*/                    long startMs = System.currentTimeMillis();                                        /*帮助生成者发送流量类  每次最多发送包数  时间*/                    ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);                                        for (int i = 0; i < numRecords; i++) {                        long sendStartMs = System.currentTimeMillis();                        Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats,record.topic(),record.value());                        producer.send(record, cb);                        if (throttler.shouldThrottle(i, sendStartMs)) {                            throttler.throttle();                        }                    }                    /* 结束任务 */                    producer.close();                    stats.printTotal();                } catch (Exception e) {                    log.info("Test Error:"+e);                }        }            /**     * 实际入库方法     */        @Override        public void run() {//         log.info("Start To Send:");           super.run();           KafkaUtil kafkaUtil=new KafkaUtil();           KafkaProducer produce=kafkaUtil.create();                      //总包数           long size=recordList.size();//                       size=10000000000L;                      /*每次最多发送包数*/           int throughput = 900000;//             throughput = 10000000;                                               /*测试数据模型  包总数*/       Stats stats = new Stats(size, 5000);       /*启动时间*/       long startMs = System.currentTimeMillis();              /*帮助生成者发送流量类  每次最多发送包数  时间*/       ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);              int i=0;       for (ProducerRecord record:recordList) {                        long sendStartMs = System.currentTimeMillis();           //参数说明:发送数据时间  数据长度 数据模型类           Callback cb = stats.nextCompletion(sendStartMs, record.value().length, stats,record.topic(),record.value());                      produce.send(record,cb);           if (throttler.shouldThrottle(i, sendStartMs)) {               throttler.throttle();           }           i++;       }       produce.close();//       stats.printTotal();//       log.info("End to Send");       log.info("Finish Data To Send");       LogModel.sendNum++;        }            private static class Stats {        private long start;        private long windowStart;        private int[] latencies;        private int sampling;        private int iteration;        private int index;        private long count;        private long bytes;        private int maxLatency;        private long totalLatency;        private long windowCount;        private int windowMaxLatency;        private long windowTotalLatency;        private long windowBytes;        private long reportingInterval;        public Stats(long numRecords, int reportingInterval) {            this.start = System.currentTimeMillis();            this.windowStart = System.currentTimeMillis();            this.index = 0;            this.iteration = 0;            this.sampling = (int) (numRecords / Math.min(numRecords, 500000));            this.latencies = new int[(int) (numRecords / this.sampling) + 1];            this.index = 0;            this.maxLatency = 0;            this.totalLatency = 0;            this.windowCount = 0;            this.windowMaxLatency = 0;            this.windowTotalLatency = 0;            this.windowBytes = 0;            this.totalLatency = 0;            this.reportingInterval = reportingInterval;        }        public void record(int iter, int latency, int bytes, long time) {            this.count++;            this.bytes += bytes;            this.totalLatency += latency;            this.maxLatency = Math.max(this.maxLatency, latency);            this.windowCount++;            this.windowBytes += bytes;            this.windowTotalLatency += latency;            this.windowMaxLatency = Math.max(windowMaxLatency, latency);            if (iter % this.sampling == 0) {                this.latencies[index] = latency;                this.index++;            }            /* maybe report the recent perf */            if (time - windowStart >= reportingInterval) {                printWindow();                newWindow();            }        }        public Callback nextCompletion(long start, int bytes, Stats stats,String topic,byte[] data) {            Callback cb = new PerfCallback(this.iteration, start, bytes, stats,topic,data);            this.iteration++;            return cb;        }        /**         * 传输效率反馈         */        public void printWindow() {            long ellapsed = System.currentTimeMillis() - windowStart;            double recsPerSec = 1000.0 * windowCount / (double) ellapsed;            double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0);            System.out.printf("%d spend time,%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n",                                      ellapsed,                              windowCount,                              recsPerSec,                              mbPerSec,                              windowTotalLatency / (double) windowCount,                              (double) windowMaxLatency);        }        public void newWindow() {            this.windowStart = System.currentTimeMillis();            this.windowCount = 0;            this.windowMaxLatency = 0;            this.windowTotalLatency = 0;            this.windowBytes = 0;        }        /**         * 传输效率         */        public void printTotal() {            long elapsed = System.currentTimeMillis() - start;            double recsPerSec = 1000.0 * count / (double) elapsed;            double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);            int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);            System.out.printf("%d spend time,%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n",                              elapsed,                                              count,                              recsPerSec,                              mbPerSec,                              totalLatency / (double) count,                              (double) maxLatency,                              percs[0],                              percs[1],                              percs[2],                              percs[3]);        }        private static int[] percentiles(int[] latencies, int count, double... percentiles) {            int size = Math.min(count, latencies.length);            Arrays.sort(latencies, 0, size);            int[] values = new int[percentiles.length];            for (int i = 0; i < percentiles.length; i++) {                int index = (int) (percentiles[i] * size);                values[i] = latencies[index];            }            return values;        }    }    private static final class PerfCallback implements Callback {        private final long start;        private final int iteration;        private final int bytes;        private final Stats stats;        private final String topic;        private final byte[] data;        public PerfCallback(int iter, long start, int bytes, Stats stats,String topic,byte[] data) {            this.start = start;            this.stats = stats;            this.iteration = iter;            this.bytes = bytes;            this.topic=topic;            this.data=data;        }        public void onCompletion(RecordMetadata metadata, Exception exception) {            long now = System.currentTimeMillis();            int latency = (int) (now - start);            this.stats.record(iteration, latency, bytes, now);            if (exception != null){                    ProducerRecord record=new ProducerRecord(topic,data);                //将数据重新添加入数据队列,二次上传                    ControlTask.recordList.add(record);                    log.error("Send Error And Second To Send",exception);            }                   }    }}
  • KafkaUtil.java

public class KafkaUtil {//      private final static Logger log = LoggerFactory.getLogger(KafkaUtil.class);                private KafkaProducer produce;        /**         * 创建连接         * @return         */        public KafkaProducer create(){                Properties props = new Properties();            props.put("acks", "all");            props.put("bootstrap.servers", Config.ipList);            props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");            props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");//          props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 120000);  //增加等待时间            props.put("retries", "2");                        //kerbores安全认证            if(Config.kerberos==0){                                        props.put("security.protocol", "SASL_PLAINTEXT");                    props.put("sasl.mechanism", "GSSAPI");                    props.put("sasl.kerberos.service.name", "kafka");                                }                        produce = new KafkaProducer(props);                        return produce;        }                /**         * 发送数据         * @param record         * @param cb         */        public void send(ProducerRecord record,Callback cb){                produce.send(record,cb);        }                /**         * 关闭连接         * @param produce         */        public void close(){                produce.flush();                produce.close();        }                /**         * 生成jaas.conf临时文件         * @param keyTab  tab认证文件位置         * @param principal 认证用户         */    public static void configureJAAS(String keyTab, String principal) {            String JAAS_TEMPLATE =                "KafkaClient {\n"                + "com.sun.security.auth.module.Krb5LoginModule required\n" +                  "useKeyTab=true\n" +                  "keyTab=\"%1$s\"\n" +                  "principal=\"%2$s\";\n"                + "};";         String content = String.format(JAAS_TEMPLATE, keyTab, principal);                 File jaasConf = null;         PrintWriter writer = null;        try {            jaasConf  = File.createTempFile("jaas", ".conf");            writer = new PrintWriter(jaasConf);            writer.println(content);                    } catch (IOException e) {            e.printStackTrace();        } finally {                         if (writer != null) {                  writer.close();             }             jaasConf.deleteOnExit();        }        System.setProperty("java.security.auth.login.config", jaasConf.getAbsolutePath());    }}

感谢各位的阅读,以上就是"kafka javaAPI入库程序的实现方法"的内容了,经过本文的学习后,相信大家对kafka javaAPI入库程序的实现方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0