千家信息网

如何实现elasticsearch导入mysql数据

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章主要讲解了"如何实现elasticsearch导入mysql数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何实现elasticsearc
千家信息网最后更新 2025年02月02日如何实现elasticsearch导入mysql数据

这篇文章主要讲解了"如何实现elasticsearch导入mysql数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何实现elasticsearch导入mysql数据"吧!

一、基于elasticsearch的官方API批量导入

引入maven依赖
                org.springframework.boot        spring-boot-starter-parent        2.3.2.RELEASE                                             org.springframework.boot            spring-boot-starter-web                            org.springframework.boot            spring-boot-starter-data-elasticsearch                            mysql            mysql-connector-java            runtime            

jdbc连接类

public class DBHelper {    public static final String url =            "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai";    public static final String name = "com.mysql.cj.jdbc.Driver";    public static final String user = "root";    public static final String password = "root";    public static Connection conn = null;    public static Connection getConn() {        try {            Class.forName(name);            conn = DriverManager.getConnection(url, user, password);//获取连接        } catch (Exception e) {            e.printStackTrace();        }        return conn;    }}
导入逻辑
@Service("positionService")public class PositionService {    @Autowired    ElasticsearchRestTemplate elasticsearchTemplate;    @Autowired    RestHighLevelClient client;    private static final String POSITIOIN_INDEX = "position";    public void importAll() throws IOException {        writeMysqlDataToES(POSITIOIN_INDEX);    }    /** 讲数据批量写入ES中 */    private void writeMysqlDataToES(String tableName) {        BulkProcessor bulkProcessor = getBulkProcessor(client);        Connection conn = null;        PreparedStatement ps = null;        ResultSet rs = null;        try {            conn = DBHelper.getConn();            System.out.println("Start handle data :" + tableName);            String sql = "SELECT * from " + tableName;            ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,                    ResultSet.CONCUR_READ_ONLY);            // 根据自己需要 设置            ps.setFetchSize(20);            rs = ps.executeQuery();            ResultSetMetaData colData = rs.getMetaData();            ArrayList> dataList = new                    ArrayList>();            // bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的方式,故笔者在此将查出来的数据转换成hashMap方式            HashMap map = null;            int count = 0;            String c = null;            String v = null;            while (rs.next()) {                count++;                map = new HashMap(128);                for (int i = 1; i <= colData.getColumnCount(); i++) {                    c = colData.getColumnName(i);                    v = rs.getString(c);                    map.put(c, v);                }                dataList.add(map);                // 每1万条写一次,不足的批次的最后再一并提交                if (count % 10000 == 0) {                    System.out.println("Mysql handle data number : " + count);                    // 将数据添加到 bulkProcessor 中                    for (HashMap hashMap2 : dataList) {                        bulkProcessor.add(                                new IndexRequest(POSITIOIN_INDEX).source(hashMap2));                    }                    // 每提交一次便将map与list清空                    map.clear();                    dataList.clear();                }            }            // 处理未提交的数据            for (HashMap hashMap2 : dataList) {                bulkProcessor.add(                        new IndexRequest(POSITIOIN_INDEX).source(hashMap2));                System.out.println(hashMap2);            }            System.out.println("-------------------------- Finally insert number total: " + count);            // 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的刷新时间            bulkProcessor.flush();        } catch (Exception e) {            System.out.println(e.getMessage());        } finally {            try {                rs.close();                ps.close();                conn.close();                boolean terminatedFlag = bulkProcessor.awaitClose(150L,                        TimeUnit.SECONDS);                System.out.println(terminatedFlag);            } catch (Exception e) {                System.out.println(e.getMessage());            }        }    }    private BulkProcessor getBulkProcessor(RestHighLevelClient client) {        BulkProcessor bulkProcessor = null;        try {            BulkProcessor.Listener listener = new BulkProcessor.Listener() {                @Override                public void beforeBulk(long executionId, BulkRequest request) {                    System.out.println("Try to insert data number : " + request.numberOfActions());                }                @Override                public void afterBulk(long executionId, BulkRequest request,                                      BulkResponse response) {                    System.out.println("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId);                }                @Override                public void afterBulk(long executionId, BulkRequest request,                                      Throwable failure) {                    System.out.println("Bulk is unsuccess : " + failure + ",executionId: " + executionId);                }            };            BiConsumer> bulkConsumer =                    (request, bulkListener) -> client                            .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,                    listener);            builder.setBulkActions(5000);            builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));            builder.setConcurrentRequests(10);            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));            // 注意点:让参数设置生效            bulkProcessor = builder.build();        } catch (Exception e) {            e.printStackTrace();            try {                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);            } catch (Exception e1) {                System.out.println(e1.getMessage());            }        }        return bulkProcessor;    }}
调用入口
@RestControllerpublic class PositionController {    @Autowired    PositionService positionService;    @RequestMapping("query")    public List query(String positionName) {        if(positionName == null){            return null;        }        return positionService.queryPositions(positionName);    }    @RequestMapping("/importAll")    public String importAll(){        try {            positionService.importAll();        } catch (IOException e) {            e.printStackTrace();        }        return "success";    }}
导入的数据表
public class Position implements Serializable {    //主键    private String id;    //公司名称    private String companyName;    //职位名称    private String positionName;    //职位诱惑    private String positionAdvantage;    //薪资    private String salary;    //薪资下限    private int salaryMin;    //薪资上限    private int salaryMax;    //学历    private String education;    //工作年限    private String workYear;    //发布时间    private String publishTime;    //工作城市    private String city;    //工作地点    private String workAddress;    //发布时间    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")    private Date createTime;    //工作模式    private String jobNature;}

二、基于logstash导入

前提:安装好logstash

import.conf
input {    stdin {    }    jdbc {         jdbc_connection_string => "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"       jdbc_user => "root"      jdbc_password => "root"       jdbc_driver_library => "D:/mysql-connector-java-5.1.10.jar"      jdbc_driver_class => "com.mysql.jdbc.Driver"      jdbc_paging_enabled => "true"      jdbc_page_size => "1000"         statement_filepath => "D:/import.sql"       }} filter {    json {        source => "message"        remove_field => ["message"]    }} output {    elasticsearch {        hosts => ["localhost:9200"]        index => "position"        document_type => "_doc"     }    stdout {        codec => json_lines    }}
import.sql
select * from position
启动logstash
logstash -f ../import.conf

感谢各位的阅读,以上就是"如何实现elasticsearch导入mysql数据"的内容了,经过本文的学习后,相信大家对如何实现elasticsearch导入mysql数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0