ElasticSearch笔记整理(三):Java API使用与ES中文分词
[TOC]
pom.xml
使用maven工程构建ES Java API的测试项目,其用到的依赖如下:
org.elasticsearch elasticsearch 2.3.0 com.fasterxml.jackson.core jackson-databind 2.7.0 org.dom4j dom4j 2.0.0 org.projectlombok lombok 1.16.10
ES API之基本增删改查
使用junit进行测试,其使用的全局变量与setUp函数如下:
private TransportClient client;private String index = "bigdata"; // 要操作的索引库为"bigdata"private String type = "product"; // 要操作的类型为"product"@Beforepublic void setup() throws UnknownHostException { // 连接的是ES集群,所以需要添加集群名称,否则无法创建客户端 Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build(); client = TransportClient.builder().settings(settings).build(); TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300); TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300); TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300); client.addTransportAddresses(ta1, ta2, ta3); /*settings = client.settings(); Map asMap = settings.getAsMap(); for(Map.Entry setting : asMap.entrySet()) { System.out.println(setting.getKey() + "::" + setting.getValue()); }*/}
索引添加:JSON方式
/** * 注意:往es中添加数据有4种方式 * 1.JSON * 2.Map * 3.Java Bean * 4.XContentBuilder * * 1.JSON方式 */@Testpublic void testAddJSON() { String source = "{\"name\":\"sqoop\", \"author\": \"apache\", \"version\": \"1.4.6\"}"; IndexResponse response = client.prepareIndex(index, type, "4").setSource(source).get(); System.out.println(response.isCreated());}
索引添加:Map方式
/** * 添加数据: * 2.Map方式 */@Testpublic void testAddMap() { Map source = new HashMap(); source.put("name", "flume"); source.put("author", "Cloudera"); source.put("version", "1.8.0"); IndexResponse response = client.prepareIndex(index, type, "5").setSource(source).get(); System.out.println(response.isCreated());}
索引添加:Java Bean方式
/** * 添加数据: * 3.Java Bean方式 * * 如果不将对象转换为json字符串,则会报下面的异常: * The number of object passed must be even but was [1] */@Testpublic void testAddObj() throws JsonProcessingException { Product product = new Product("kafka", "linkedIn", "0.10.0.1", "kafka.apache.org"); ObjectMapper objectMapper = new ObjectMapper(); String json = objectMapper.writeValueAsString(product); System.out.println(json); IndexResponse response = client.prepareIndex(index, type, "6").setSource(json).get(); System.out.println(response.isCreated());}
索引添加:XContentBuilder方式
/** * 添加数据: * 4.XContentBuilder方式 */@Testpublic void testAddXContentBuilder() throws IOException { XContentBuilder source = XContentFactory.jsonBuilder(); source.startObject() .field("name", "redis") .field("author", "redis") .field("version", "3.2.0") .field("url", "redis.cn") .endObject(); IndexResponse response = client.prepareIndex(index, type, "7").setSource(source).get(); System.out.println(response.isCreated());}
索引查询
/** * 查询具体的索引信息 */@Testpublic void testGet() { GetResponse response = client.prepareGet(index, type, "6").get(); Map map = response.getSource(); /*for(Map.Entry me : map.entrySet()) { System.out.println(me.getKey() + "=" + me.getValue()); }*/ // lambda表达式,jdk 1.8之后 map.forEach((k, v) -> System.out.println(k + "=" + v)); // map.keySet().forEach(key -> System.out.println(key + "xxx"));}
索引更新
/** * 局部更新操作与curl的操作是一致的 * curl -XPOST http://uplooking01:9200/bigdata/product/AWA184kojrSrzszxL-Zs/_update -d' {"doc":{"name":"sqoop", "author":"apache"}}' * * 做全局更新的时候,也不用prepareUpdate,而直接使用prepareIndex */@Testpublic void testUpdate() throws Exception { /*String source = "{\"doc\":{\"url\": \"http://flume.apache.org\"}}"; UpdateResponse response = client.prepareUpdate(index, type, "4").setSource(source.getBytes()).get();*/ // 使用下面这种方式也是可以的 String source = "{\"url\": \"http://flume.apache.org\"}"; UpdateResponse response = client.prepareUpdate(index, type, "4").setDoc(source.getBytes()).get(); System.out.println(response.getVersion());}
索引删除
/** * 删除操作 */@Testpublic void testDelete() { DeleteResponse response = client.prepareDelete(index, type, "5").get(); System.out.println(response.getVersion());}
批量操作
/** * 批量操作 */@Testpublic void testBulk() { IndexRequestBuilder indexRequestBuilder = client.prepareIndex(index, type, "8") .setSource("{\"name\":\"elasticsearch\", \"url\":\"http://www.elastic.co\"}"); UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate(index, type, "1").setDoc("{\"url\":\"http://hadoop.apache.org\"}"); BulkRequestBuilder bulk = client.prepareBulk(); BulkResponse bulkResponse = bulk.add(indexRequestBuilder).add(updateRequestBuilder).get(); Iterator it = bulkResponse.iterator(); while(it.hasNext()) { BulkItemResponse response = it.next(); System.out.println(response.getId() + "<--->" + response.getVersion()); }}
获取索引记录数
/** * 获取索引记录数 */@Testpublic void testCount() { CountResponse response = client.prepareCount(index).get(); System.out.println("索引记录数:" + response.getCount());}
ES API之高级查询
基于junit进行测试,其用到的setUp函数和showResult函数如下:
全局变量与setUp:
private TransportClient client;private String index = "bigdata";private String type = "product";private String[] indics = {"bigdata", "bank"};@Beforepublic void setUp() throws UnknownHostException { Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build(); client = TransportClient.builder().settings(settings).build(); TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300); TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300); TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300); client.addTransportAddresses(ta1, ta2, ta3);}
showResult:
/** * 格式化输出查询结果 * @param response */private void showResult(SearchResponse response) { SearchHits searchHits = response.getHits(); float maxScore = searchHits.getMaxScore(); // 查询结果中的最大文档得分 System.out.println("maxScore: " + maxScore); long totalHits = searchHits.getTotalHits(); // 查询结果记录条数 System.out.println("totalHits: " + totalHits); SearchHit[] hits = searchHits.getHits(); // 查询结果 System.out.println("当前返回结果记录条数:" + hits.length); for (SearchHit hit : hits) { long version = hit.version(); String id = hit.getId(); String index = hit.getIndex(); String type = hit.getType(); float score = hit.getScore(); System.out.println("==================================================="); String source = hit.getSourceAsString(); System.out.println("version: " + version); System.out.println("id: " + id); System.out.println("index: " + index); System.out.println("type: " + type); System.out.println("score: " + score); System.out.println("source: " + source); }}
ES查询类型说明
查询类型有如下4种:
query and fetch(速度最快)(返回N倍数据量)query then fetch(默认的搜索方式)DFS query and fetchDFS query then fetch(可以更精确控制搜索打分和排名。)
查看API的注释如下:
/** * Same as {@link #QUERY_THEN_FETCH}, except for an initial scatter phase which goes and computes the distributed * term frequencies for more accurate scoring. */DFS_QUERY_THEN_FETCH((byte) 0),/** * The query is executed against all shards, but only enough information is returned (not the document content). * The results are then sorted and ranked, and based on it, only the relevant shards are asked for the actual * document content. The return number of hits is exactly as specified in size, since they are the only ones that * are fetched. This is very handy when the index has a lot of shards (not replicas, shard id groups). */QUERY_THEN_FETCH((byte) 1),/** * Same as {@link #QUERY_AND_FETCH}, except for an initial scatter phase which goes and computes the distributed * term frequencies for more accurate scoring. */DFS_QUERY_AND_FETCH((byte) 2),/** * The most naive (and possibly fastest) implementation is to simply execute the query on all relevant shards * and return the results. Each shard returns size results. Since each shard already returns size hits, this * type actually returns size times number of shards results back to the caller. */QUERY_AND_FETCH((byte) 3),
关于DFS的说明:
DFS是什么缩写?这个D可能是Distributed,F可能是frequency的缩写,至于S可能是Scatter的缩写,整个单词可能是分布式词频率和文档频率散发的缩写。初始化散发是一个什么样的过程?从es的官方网站我们可以发现,初始化散发其实就是在进行真正的查询之前,先把各个分片的词频率和文档频率收集一下,然后进行词搜索的时候,各分片依据全局的词频率和文档频率进行搜索和排名。显然如果使用DFS_QUERY_THEN_FETCH这种查询方式,效率是最低的,因为一个搜索,可能要请求3次分片。但,使用DFS方法,搜索精度应该是最高的。
总结:
总结一下,从性能考虑QUERY_AND_FETCH是最快的,DFS_QUERY_THEN_FETCH是最慢的。从搜索的准确度来说,DFS要比非DFS的准确度更高。
精确查询
/** * 1.精确查询 * termQuery * term就是一个字段 */@Testpublic void testSearch2() { SearchRequestBuilder searchQuery = client.prepareSearch(indics) // 在prepareSearch()的参数为索引库列表,意为要从哪些索引库中进行查询 .setSearchType(SearchType.DEFAULT) // 设置查询类型,有QUERY_AND_FETCH QUERY_THEN_FETCH DFS_QUERY_AND_FETCH DFS_QUERY_THEN_FETCH .setQuery(QueryBuilders.termQuery("author", "apache"))// 设置相应的query,用于检索,termQuery的参数说明:name是doc中的具体的field,value就是要找的具体的值 ; // 如果上面不加查询条件,则会查询所有 SearchResponse response = searchQuery.get(); showResult(response);}
模糊查询
/** * 2.模糊查询 * prefixQuery */@Testpublic void testSearch3() { SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.prefixQuery("name", "h")) .get(); showResult(response);}
分页查询
/** * 3.分页查询 * 查询索引库bank中 * 年龄在(25, 35]之间的数据信息 * * 分页算法: * 查询的第几页,每一页显示几条 * 每页显示10条记录 * * 查询第4页的内容 * setFrom(30=(4-1)*size) * setSize(10) * 所以第N页的起始位置:(N - 1) * pageSize */@Testpublic void testSearch4() { // 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的记录数不一样,前者默认10条,后者是50条(5个分片) SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35)) // 下面setFrom和setSize用于设置查询结果进行分页 .setFrom(0) .setSize(5) .get(); showResult(response);}
高亮显示查询
/** * 4.高亮显示查询 * 获取数据, * 查询apache,不仅在author拥有,也可以在url,在name中也可能拥有 * author or url --->booleanQuery中的should操作 * 如果是and的类型--->booleanQuery中的must操作 * 如果是not的类型--->booleanQuery中的mustNot操作 * 使用的match操作,其实就是使用要查询的keyword和对应字段进行完整匹配,是否相等,相等返回 */@Testpublic void testSearch5() { SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DEFAULT) // .setQuery(QueryBuilders.multiMatchQuery("apache", "author", "url")) // .setQuery(QueryBuilders.regexpQuery("url", ".*apache.*")) // .setQuery(QueryBuilders.termQuery("author", "apache")) .setQuery(QueryBuilders.boolQuery() .should(QueryBuilders.regexpQuery("url", ".*apache.*")) .should(QueryBuilders.termQuery("author", "apache"))) // 设置高亮显示--->设置相应的前置标签和后置标签 .setHighlighterPreTags("") .setHighlighterPostTags("") // 哪个字段要求高亮显示 .addHighlightedField("author") .addHighlightedField("url") .get(); SearchHits searchHits = response.getHits(); float maxScore = searchHits.getMaxScore(); // 查询结果中的最大文档得分 System.out.println("maxScore: " + maxScore); long totalHits = searchHits.getTotalHits(); // 查询结果记录条数 System.out.println("totalHits: " + totalHits); SearchHit[] hits = searchHits.getHits(); // 查询结果 System.out.println("当前返回结果记录条数:" + hits.length); for(SearchHit hit : hits) { System.out.println("========================================================"); Map highlightFields = hit.getHighlightFields(); for(Map.Entry me : highlightFields.entrySet()) { System.out.println("--------------------------------------"); String key = me.getKey(); HighlightField highlightField = me.getValue(); String name = highlightField.getName(); System.out.println("key: " + key + ", name: " + name); Text[] texts = highlightField.fragments(); String value = ""; for(Text text : texts) { // System.out.println("text: " + text.toString()); value += text.toString(); } System.out.println("value: " + value); } }}
排序查询
/** * 5.排序查询 * 对结果集进行排序 * balance(收入)由高到低 */@Testpublic void testSearch6() { // 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的记录数不一样,前者默认10条,后者是50条(5个分片) SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35)) .addSort("balance", SortOrder.DESC) // 下面setFrom和setSize用于设置查询结果进行分页 .setFrom(0) .setSize(5) .get(); showResult(response);}
聚合查询:计算平均值
/** * 6.聚合查询:计算平均值 */@Testpublic void testSearch7() { indics = new String[]{"bank"}; // 注意QUERY_THEN_FETCH和注意QUERY_AND_FETCH返回的记录数不一样,前者默认10条,后者是50条(5个分片) SearchResponse response = client.prepareSearch(indics).setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setQuery(QueryBuilders.rangeQuery("age").gt(25).lte(35)) /* select avg(age) as avg_name from person; 那么这里的avg("balance")--->就是返回结果avg_name这个别名 */ .addAggregation(AggregationBuilders.avg("avg_balance").field("balance")) .addAggregation(AggregationBuilders.max("max").field("balance")) .get(); // System.out.println(response); /* response中包含的Aggregations "aggregations" : { "max" : { "value" : 49741.0 }, "avg_balance" : { "value" : 25142.137373737372 } } 则一个aggregation为: { "value" : 49741.0 } */ Aggregations aggregations = response.getAggregations(); List aggregationList = aggregations.asList(); for(Aggregation aggregation : aggregationList) { System.out.println("========================================"); String name = aggregation.getName(); // Map map = aggregation.getMetaData(); System.out.println("name: " + name); // System.out.println(map); Object obj = aggregation.getProperty("value"); System.out.println(obj); } /*Aggregation avgBalance = aggregations.get("avg_balance"); Object obj = avgBalance.getProperty("value"); System.out.println(obj);*/}
ES中文分词之集成IK分词
如果我们的数据包含中文,而在查询时希望可以支持对中文进行分词搜索,那么ES本身依赖于Lucene的分词对中文就不佳了,这时就可以考虑使用其它分词方法,如这里要说明的IK中文分词,其集成到ES的步骤如下:
1)下载地址: https://github.com/medcl/elasticsearch-analysis-ik 2)使用maven对源代码进行编译(mvn clean install -DskipTests)(package) 3)把编译后的target/releases下的zip文件拷贝到 ES_HOME/plugins/analysis-ik目录下面,然后解压 4)把下载的ik插件中的conf/ik目录拷贝到ES_HOME/config下 5)修改ES_HOME/config/elasticsearch.yml文件,添加index.analysis.analyzer.default.type: ik (把IK设置为默认分词器,这一步是可选的) 6)重启es服务 7)测试分词效果
需要说明的是,数据需要重新插入,并使用ik分词,即需要重新构建期望使用中文分词IK的索引库。
测试代码如下:
package cn.xpleaf.bigdata.elasticsearch;import org.elasticsearch.action.search.SearchRequestBuilder;import org.elasticsearch.action.search.SearchResponse;import org.elasticsearch.action.search.SearchType;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.text.Text;import org.elasticsearch.common.transport.InetSocketTransportAddress;import org.elasticsearch.common.transport.TransportAddress;import org.elasticsearch.index.query.QueryBuilders;import org.elasticsearch.search.SearchHit;import org.elasticsearch.search.SearchHits;import org.elasticsearch.search.aggregations.Aggregation;import org.elasticsearch.search.aggregations.AggregationBuilders;import org.elasticsearch.search.aggregations.Aggregations;import org.elasticsearch.search.highlight.HighlightField;import org.elasticsearch.search.sort.SortOrder;import org.junit.After;import org.junit.Before;import org.junit.Test;import java.net.InetAddress;import java.net.UnknownHostException;import java.util.List;import java.util.Map;/** * 使用Java API来操作es集群 * Transport * 代表了一个集群 * 我们客户端和集群通信是使用TransportClient * * 使用prepareSearch来完成全文检索之 * 中文分词 */public class ElasticSearchTest3 { private TransportClient client; private String index = "bigdata"; private String type = "product"; private String[] indics = {"chinese"}; @Before public void setUp() throws UnknownHostException { Settings settings = Settings.builder().put("cluster.name", "bigdata-08-28").build(); client = TransportClient.builder().settings(settings).build(); TransportAddress ta1 = new InetSocketTransportAddress(InetAddress.getByName("uplooking01"), 9300); TransportAddress ta2 = new InetSocketTransportAddress(InetAddress.getByName("uplooking02"), 9300); TransportAddress ta3 = new InetSocketTransportAddress(InetAddress.getByName("uplooking03"), 9300); client.addTransportAddresses(ta1, ta2, ta3); } /** * 中文分词的操作 * 1.查询以"中"开头的数据,有两条 * 2.查询以"中国"开头的数据,有0条 * 3.查询包含"烂"的数据,有1条 * 4.查询包含"烂摊子"的数据,有0条 * 分词: * 为什么我们搜索China is the greatest country~ * 中文:中国最牛逼 * * ××× * 中华 * 人民 * 共和国 * 中华人民 * 人民共和国 * 华人 * 共和 * 特殊的中文分词法: * 庖丁解牛 * IK分词法 * 搜狗分词法 */ @Test public void testSearch2() { SearchResponse response = client.prepareSearch(indics) // 在prepareSearch()的参数为索引库列表,意为要从哪些索引库中进行查询 .setSearchType(SearchType.DEFAULT) // 设置查询类型,有QUERY_AND_FETCH QUERY_THEN_FETCH DFS_QUERY_AND_FETCH DFS_QUERY_THEN_FETCH //.setQuery(QueryBuilders.prefixQuery("content", "烂摊子"))// 设置相应的query,用于检索,termQuery的参数说明:name是doc中的具体的field,value就是要找的具体的值// .setQuery(QueryBuilders.regexpQuery("content", ".*烂摊子.*")) .setQuery(QueryBuilders.prefixQuery("content", "中国")) .get(); showResult(response); } /** * 格式化输出查询结果 * @param response */ private void showResult(SearchResponse response) { SearchHits searchHits = response.getHits(); float maxScore = searchHits.getMaxScore(); // 查询结果中的最大文档得分 System.out.println("maxScore: " + maxScore); long totalHits = searchHits.getTotalHits(); // 查询结果记录条数 System.out.println("totalHits: " + totalHits); SearchHit[] hits = searchHits.getHits(); // 查询结果 System.out.println("当前返回结果记录条数:" + hits.length); for (SearchHit hit : hits) { long version = hit.version(); String id = hit.getId(); String index = hit.getIndex(); String type = hit.getType(); float score = hit.getScore(); System.out.println("==================================================="); String source = hit.getSourceAsString(); System.out.println("version: " + version); System.out.println("id: " + id); System.out.println("index: " + index); System.out.println("type: " + type); System.out.println("score: " + score); System.out.println("source: " + source); } } @After public void cleanUp() { client.close(); }}
相关测试代码已上传到GitHub:https://github.com/xpleaf/elasticsearch-study