ElasticSearch的API(java/scala)
这里小编先将需要的pom.xml的依赖提供给大家:(根据自己的版本进行修改)
UTF-8 1.7 1.7 2.3.2 junit junit 4.10 org.elasticsearch.client transport 6.2.0 org.scala-lang scala-library 2.10.3 org.json json 20180813 org.elasticsearch elasticsearch-hadoop 6.2.4 org.apache.spark spark-core_2.11 ${spark.version} org.apache.spark spark-sql_2.11 ${spark.version}
1. 创建ES的编程入口
主要是提供一个Utils,通过读取配置文件进行创建ES的编程入口。
#elasticSearch.conf
elastic.host=192.168.130.131elastic.port=9300elastic.cluster.name=zzy-application
#Constants
public interface Constants { String ELASTIC_HOST = "elastic.host"; String ELASTIC_PORT="elastic.port"; String ELASTIC_CLUSTER_NAME = "elastic.cluster.name";}
#ElasticSearchUtil
import com.zy.es.constant.Constants;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Setting;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.TransportAddress;import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.io.IOException;import java.io.InputStream;import java.net.InetAddress;import java.util.Properties;/** * 一般情况下的工具类都是单例 * 里面若干方法一般都是static * 如果在连接集群的时候,集群的名称对应不上: * NoNodeAvailableException[None of the configured nodes are available: */public class ElasticSearchUtil { private static TransportClient client; private static Properties ps; static { try { InputStream resourceAsStream = ElasticSearchUtil.class.getClassLoader().getResourceAsStream("elasticsearch.conf"); ps =new Properties(); ps.load(resourceAsStream); String host=ps.getProperty(Constants.ELASTIC_HOST); int port = Integer.parseInt(ps.getProperty(Constants.ELASTIC_PORT)); String clusterName=ps.getProperty(Constants.ELASTIC_CLUSTER_NAME); Settings settings =Settings.builder() .put("cluster.name",clusterName) .build(); client=new PreBuiltTransportClient(settings); //这里可以有多个,集群模式 TransportAddress ta=new TransportAddress( InetAddress.getByName(host), port ); //addTransportAddresses(TransportAddress... transportAddress),参数为一个可变参数 client.addTransportAddresses(ta); } catch (IOException e) { e.printStackTrace(); } } public static TransportClient getTransportClient(){ return client; } public static void close(TransportClient client){ if(client!=null){ client.close(); } }}
2. 创建索引
小编这里提供了json、map、javabean、XContentBuilder四种创建方式。
import java.utilimport com.zy.es.pojo.Bookimport com.zy.es.utils.ElasticSearchUtilimport org.elasticsearch.action.index.IndexResponseimport org.elasticsearch.cluster.metadata.MetaData.XContentContextimport org.elasticsearch.common.xcontent.{XContentBuilder, XContentType}import org.elasticsearch.common.xcontent.json.JsonXContentimport org.json.JSONObjectobject createIndex { private var index="library" private var `type`="books" privateval client = ElasticSearchUtil.getTransportClient() def main(args: Array[String]): Unit = { createIndexByJson() //createIndexByMap() // createIndexByBean() // createIndexByXContentBuilder() //关闭es连接对象 ElasticSearchUtil.close(client) } /** * 1.通过json方式创建 * java.lang.IllegalArgumentException: The number of object passed must be even but was [1] * 在es5.x以上,使用XContentType.JSON来制定即可 *setSource(json.toString(),XContentType.JSON) 必须指定第二个参数。 */ def createIndexByJson()={ val json=new JSONObject json.put("name","我爱你中国") json.put("author","周迅") json.put("date","2018-6-6") //返回创建后的结果 var response: IndexResponse = client.prepareIndex(index, `type`, "9") .setSource(json.toString, XContentType.JSON).get() //查看版本 println(response.getVersion) } /** * 2.map方式 */ def createIndexByMap(): Unit ={ val sourceMap=new util.HashMap[String,String]() sourceMap.put("name","朝花夕拾") sourceMap.put("author","鲁迅") sourceMap.put("date","2009-4-5") var response: IndexResponse = client.prepareIndex(index, `type`, "2").setSource(sourceMap) .get() //查看版本 println(response.getVersion) } /** * 3.使用普通的javabean */ def createIndexByBean()={ val book:Book=new Book("斗破苍穹","天蚕土豆","2012-2-6"); val json=new JSONObject(book) //返回创建后的结果 var response: IndexResponse = client.prepareIndex(index, `type`, "3") .setSource(json.toString, XContentType.JSON).get() //查看版本 println(response.getVersion) } /** * 4.XContentBuilder方式 */ def createIndexByXContentBuilder()={ var builder: XContentBuilder = JsonXContent.contentBuilder() builder.startObject() .field("name","西游记") .field("author","吴承恩") .field("version","1.0") .endObject() var response: IndexResponse = client.prepareIndex(index, `type`,"4").setSource(builder) .get() println(response.getVersion) }}
3.删除数据 & 更新数据 &批量处理
小编这里提供了删除数据,更新数据,批量操作。
import java.utilimport com.zy.es.utils.ElasticSearchUtilimport org.elasticsearch.action.bulk.BulkResponseimport org.elasticsearch.action.delete.DeleteResponseimport org.elasticsearch.action.update.{UpdateRequestBuilder, UpdateResponse}import org.elasticsearch.common.xcontent.{XContentBuilder, XContentType}import org.elasticsearch.common.xcontent.json.JsonXContentimport org.json.JSONObjectobject ElasticsearchCRUD { private var index="library" private var `type`="books" privateval client = ElasticSearchUtil.getTransportClient() def main(args: Array[String]): Unit = { //删除数据 testDelete() //更新 //testUpdate() //批量操作 //testBulk() //关闭连接对象 ElasticSearchUtil.close(client) } //删除数据 def testDelete()={ var response: DeleteResponse = client.prepareDelete(index, `type`, "2").get() println("version:"+response.getVersion) } //更新 def testUpdate()={ var builder: XContentBuilder = JsonXContent.contentBuilder() builder.startObject() .field("version","3.0") .endObject() var response: UpdateResponse = client.prepareUpdate(index, `type`, "4") .setDoc(builder).get() println("version:"+response.getVersion) } //批量操作 def testBulk()={ val map=new util.HashMap[String,String]() map.put("name","无双") map.put("author","周润发") map.put("version","2") val json=new JSONObject json.put("name","红楼梦") json.put("author","曹雪芹") json.put("version","1.0") var responses: BulkResponse = client.prepareBulk().add(client.prepareIndex(index, `type`, "7") .setSource(map)) .add(client.prepareIndex(index, `type`, "8").setSource(json.toString(),XContentType.JSON)) .get() for(response <-responses.getItems){ print(response.getVersion) } }}
4.全文索引、分页索引、高亮显示
import java.utilimport com.zy.es.utils.ElasticSearchUtilimport org.elasticsearch.action.search.{SearchResponse, SearchType}import org.elasticsearch.index.query.QueryBuildersimport org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilderimport org.elasticsearch.search.{SearchHit, SearchHits}import org.json.JSONObjectimport scala.collection.JavaConversionsobject testSearch { private var index="library" private var `type`="books" privateval client = ElasticSearchUtil.getTransportClient() def main(args: Array[String]): Unit = { //全文索引 //fullTextSearch() //分页索引 //pagingSearch() //高亮索引 highlightSearch() } //全文索引 def fullTextSearch()={ val json=new JSONObject() val response = client.prepareSearch(index) //设置检索的类型 .setSearchType(SearchType.DEFAULT) //设置检索的类型 .setQuery(QueryBuilders.matchQuery("author", "天蚕土豆")) //设置检索方式 .get() val hits = response.getHits //获取检索结果 println("totals:"+hits.getTotalHits) //检索出的数据的个数 println("maxSource"+hits.getMaxScore) //最大的得分 //查询的具体的内容 val myhits = hits.getHits for(hit <- myhits){ val index = hit.getIndex val id = hit.getId val `type` = hit.getTypeval source =hit.getSourceAsString val score=hit.getScore json.put("_index",index) json.put("_id",id) json.put("_type",`type`) json.put("_score", score ) json.put("_source",new JSONObject(source)) println(json.toString()) } } //分页索引 //分页查询:查询第num页,查count条 每一页的长度*(num-1)+count def pagingSearch(from:Int=0,size:Int=10)={ var response: SearchResponse = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchQuery("name", "西游记")) .setFrom(from) .setSize(size) .get() val myhits: SearchHits = response.getHits val total=myhits.totalHits println("zzy为您查询出"+total+"记录:") val hits: Array[SearchHit] = myhits.getHits for (hit<-hits){ val map: util.Map[String, AnyRef] = hit.getSourceAsMap val author=map.get("author") val name=map.get("name") val version=map.get("version") print( s""" |author:${author} |name:${name} |version:${version} """.stripMargin) } } //高亮索引 def highlightSearch()={ val response=client.prepareSearch(index) .setSearchType(SearchType.DEFAULT) .setQuery(QueryBuilders.matchQuery("author","周润发")) .highlighter(new HighlightBuilder() .field("author")//给哪个字段添加标签 .preTags("")//添加的前置标签 .postTags(""))//添加的后置标签 .get() val myHits = response.getHits val total = myHits.totalHits println("zzy为您查询出" + total + "记录:") val hits: Array[SearchHit] = myHits.getHits for(hit <-hits){ //注意这里如果想要获取高亮的字段,必须使用高亮的方式获取 val HLfields = hit.getHighlightFields //这里的field是设置高亮的字段名:author highlight查询的所有的字段值(含高亮的) for((field,highlight)<-JavaConversions.mapAsScalaMap(HLfields)){ var date="" val fragments=highlight.getFragments for(fragment <-fragments){ date+=fragment.toString } print(date) } } }}
5. 中文分词
(1)错误演示
首先我们现在自己的ES集群中添加一些数据:
#创建索引库curl -H "Content-Type: application/json" -XPUT 'http://192.168.130.131:9200/chinese'#添加数据curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/1 -d'{"content":"美国留给伊拉克的是个烂摊子吗"}'curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/2 -d'{"content":"公安部:各地校车将享最高路权"}'curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/3 -d'{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}'curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/4 -d'{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}'
#然后使用不同的查询看看效果:
import com.zy.es.utils.ElasticSearchUtilimport org.elasticsearch.action.search.{SearchResponse, SearchType}import org.elasticsearch.index.query.QueryBuildersobject ChineseParticipleSearch { private var index="chinese" private var `type`="fulltext" privateval client = ElasticSearchUtil.getTransportClient() def main(args: Array[String]): Unit = { val response: SearchResponse =client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchQuery("content","中国")) .get() val myHits = response.getHits.getHits for(hit <- myHits){ println(hit.getSourceAsString) } }}
注意:我们这里使用match查询,查询了是"中国"
看看运行结果:
这里为什么美国也会被查询出来?
这是因为:原生的查询将'中国'这个两个字分开之后在进行检索,索引会出现上图中的查询错误的情况。
那我们该怎么办呢,我只想查询出来有关中国的内容啊,没关系中文分词帮你解决。
(2)ES配置中文分词
常见的中文分词插件:IK,庖丁解牛中文分词等等。这里我们使用IK分词。
① 下载: https://github.com/medcl/elasticsearch-analysis-ik 版本对应
② 使用maven对源代码进行编译(在IK_HOME下):(mvn clean install -DskipTests)
③ 把编译后的target/releases下的zip文件拷贝到 ES_HOME/plugins/analysis-ik目录下面,然后解压将其中的plugin-descriptor.properties 和plugin-security.policy文件中的ES的版本改为自己使用的版本
④ 修改ES_HOME/config/elasticsearch.yml文件,添加(ES6.x以上版本无需此操作)index.analysis.analyzer.default.type: ik
⑤ 重启es服务
这里小编就有些粗暴了:
#ps -aux|grep elasticsearch
#kill -9 pid
#/ES_HOME/bin/elasticsearch -d 启动
(3)重新测试
第一步: 将之前数据进行删除
curl -XDELETE 'http://192.168.130.131:9200/chinese/1'
curl -XDELETE 'http://192.168.130.131:9200/chinese/2'
curl -XDELETE 'http://192.168.130.131:9200/chinese/3'
curl -XDELETE 'http://192.168.130.131:9200/chinese/4'
第二步: 重新加载数据,并设置为IK分词
#设置为ik分词
curl -XPOST http://192.168.130.131:9200/chinese/fulltext/_mapping -H 'Content-Type:application/json' -d'
{
"properties": {
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
}'
#添加数据 curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/1 -d'{"content":"美国留给伊拉克的是个烂摊子吗"}' curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/2 -d'{"content":"公安部:各地校车将享最高路权"}' curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/3 -d'{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}' curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/4 -d'{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}'
第三步:
重新执行刚刚上面的代码,这里我们看看结果:
6.Elasticsearch On Spark
整合条件:
ES官网:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html
maven依赖:https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop/6.2.4
org.elasticsearch elasticsearch-hadoop 6.2.4
//如果使用spark中可以读到ES中的数据,需要导入隐式转换import java.util.Dateimport com.zy.es.utils.ElasticSearchUtilimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.cluster.metadata.MetaData.XContentContextimport org.elasticsearch.common.xcontent.XContentTypeimport org.elasticsearch.spark._/** * spark整合ES * 通过spark去读取es中的数据,同时将操作之后的结果落地到ES */object EsOnSpark { privateval client = ElasticSearchUtil.getTransportClient() def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("EsOnSpark") .setMaster("local[2]") .set("es.index.auto.create", "true") //写数据的时候如果索引库不存在,自动创建 .set("es.nodes", "192.168.130.131") //设置ES集群的节点 .set("es.port", "9200") //设置ES集群的端口 val sc = new SparkContext(conf) var EsRDD: RDD[(String, String)] = sc.esJsonRDD("library/books") //指定index/type var index = "es-spark" var `type` = "book" EsRDD.foreach { case (id, json) => { client.prepareIndex(index, `type`, new Date().getTime.toString) .setSource(json, XContentType.JSON).get() println(id + "" + json) } } sc.stop() }}
这里只是小编介绍一些常见的API操作,大家知道ES最大的优势在于他的查询,后期小编会进一步的补充关于ElasticSearch强大的查询功能的API。