如何实现elasticsearch导入mysql数据
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章主要讲解了"如何实现elasticsearch导入mysql数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何实现elasticsearc
千家信息网最后更新 2025年02月02日如何实现elasticsearch导入mysql数据
这篇文章主要讲解了"如何实现elasticsearch导入mysql数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何实现elasticsearch导入mysql数据"吧!
一、基于elasticsearch的官方API批量导入
引入maven依赖
org.springframework.boot spring-boot-starter-parent 2.3.2.RELEASE org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-data-elasticsearch mysql mysql-connector-java runtime
jdbc连接类
public class DBHelper { public static final String url = "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"; public static final String name = "com.mysql.cj.jdbc.Driver"; public static final String user = "root"; public static final String password = "root"; public static Connection conn = null; public static Connection getConn() { try { Class.forName(name); conn = DriverManager.getConnection(url, user, password);//获取连接 } catch (Exception e) { e.printStackTrace(); } return conn; }}
导入逻辑
@Service("positionService")public class PositionService { @Autowired ElasticsearchRestTemplate elasticsearchTemplate; @Autowired RestHighLevelClient client; private static final String POSITIOIN_INDEX = "position"; public void importAll() throws IOException { writeMysqlDataToES(POSITIOIN_INDEX); } /** 讲数据批量写入ES中 */ private void writeMysqlDataToES(String tableName) { BulkProcessor bulkProcessor = getBulkProcessor(client); Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { conn = DBHelper.getConn(); System.out.println("Start handle data :" + tableName); String sql = "SELECT * from " + tableName; ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // 根据自己需要 设置 ps.setFetchSize(20); rs = ps.executeQuery(); ResultSetMetaData colData = rs.getMetaData(); ArrayList> dataList = new ArrayList >(); // bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的方式,故笔者在此将查出来的数据转换成hashMap方式 HashMap map = null; int count = 0; String c = null; String v = null; while (rs.next()) { count++; map = new HashMap (128); for (int i = 1; i <= colData.getColumnCount(); i++) { c = colData.getColumnName(i); v = rs.getString(c); map.put(c, v); } dataList.add(map); // 每1万条写一次,不足的批次的最后再一并提交 if (count % 10000 == 0) { System.out.println("Mysql handle data number : " + count); // 将数据添加到 bulkProcessor 中 for (HashMap hashMap2 : dataList) { bulkProcessor.add( new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); } // 每提交一次便将map与list清空 map.clear(); dataList.clear(); } } // 处理未提交的数据 for (HashMap hashMap2 : dataList) { bulkProcessor.add( new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); System.out.println(hashMap2); } System.out.println("-------------------------- Finally insert number total: " + count); // 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的刷新时间 bulkProcessor.flush(); } catch (Exception e) { System.out.println(e.getMessage()); } finally { try { rs.close(); ps.close(); conn.close(); boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS); System.out.println(terminatedFlag); } catch (Exception e) { System.out.println(e.getMessage()); } } } private BulkProcessor getBulkProcessor(RestHighLevelClient client) { BulkProcessor bulkProcessor = null; try { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { System.out.println("Try to insert data number : " + request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { System.out.println("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { System.out.println("Bulk is unsuccess : " + failure + ",executionId: " + executionId); } }; BiConsumer > bulkConsumer = (request, bulkListener) -> client .bulkAsync(request, RequestOptions.DEFAULT, bulkListener); BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB)); builder.setConcurrentRequests(10); builder.setFlushInterval(TimeValue.timeValueSeconds(100L)); builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // 注意点:让参数设置生效 bulkProcessor = builder.build(); } catch (Exception e) { e.printStackTrace(); try { bulkProcessor.awaitClose(100L, TimeUnit.SECONDS); } catch (Exception e1) { System.out.println(e1.getMessage()); } } return bulkProcessor; }}
调用入口
@RestControllerpublic class PositionController { @Autowired PositionService positionService; @RequestMapping("query") public List
导入的数据表
public class Position implements Serializable { //主键 private String id; //公司名称 private String companyName; //职位名称 private String positionName; //职位诱惑 private String positionAdvantage; //薪资 private String salary; //薪资下限 private int salaryMin; //薪资上限 private int salaryMax; //学历 private String education; //工作年限 private String workYear; //发布时间 private String publishTime; //工作城市 private String city; //工作地点 private String workAddress; //发布时间 @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; //工作模式 private String jobNature;}
二、基于logstash导入
前提:安装好logstash
import.conf
input { stdin { } jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai" jdbc_user => "root" jdbc_password => "root" jdbc_driver_library => "D:/mysql-connector-java-5.1.10.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "1000" statement_filepath => "D:/import.sql" }} filter { json { source => "message" remove_field => ["message"] }} output { elasticsearch { hosts => ["localhost:9200"] index => "position" document_type => "_doc" } stdout { codec => json_lines }}
import.sql
select * from position
启动logstash
logstash -f ../import.conf
感谢各位的阅读,以上就是"如何实现elasticsearch导入mysql数据"的内容了,经过本文的学习后,相信大家对如何实现elasticsearch导入mysql数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
工作
方式
时间
薪资
学习
内容
名称
职位
支持
万条
上限
下限
入口
公司
前提
参数
取决于
地点
城市
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
腾讯云服务器什么时候上线
网络安全系统单位名称
hcna网络技术pdf下载
武汉工程大学网络安全中心
云南网络服务器在哪
用友 错误数据库连接失败
数据库上机实验报告
软件开发工作要疯了怎么安慰
兰州软件开发公司排名
数据库和前段怎么交互
sql2005数据库
sql导入txt表数据库中
上传失败无法连接服务器怎么办
软件开发团队数量介绍
衢州网络技术有限公司
昌平区综合网络技术服务包括什么
h3c服务器网络接口怎么启用
杭州纽麦得网络技术
济南康健网络技术有限公司顾问
数据库性别中文显示
网络安全公益大讲堂在贵阳开课
怎么把导出sql执行到数据库
医院数据库管理制度
数据库的连接查询方式
魅族崩坏是哪个服务器
网络安全法 孙佑海
网络技术员工资怎么样
单位如何组织网络安全会议
Fafair找不到服务器
软件开发工程师技术分享