千家信息网

ElasticSearch的API(java/scala)

发表于:2024-09-21 作者:千家信息网编辑
千家信息网最后更新 2024年09月21日,这里小编先将需要的pom.xml的依赖提供给大家:(根据自己的版本进行修改) UTF-8 1.7
千家信息网最后更新 2024年09月21日ElasticSearch的API(java/scala)

这里小编先将需要的pom.xml的依赖提供给大家:(根据自己的版本进行修改)

        UTF-8        1.7        1.7        2.3.2                            junit            junit            4.10                            org.elasticsearch.client            transport            6.2.0                            org.scala-lang            scala-library            2.10.3                            org.json            json            20180813                            org.elasticsearch            elasticsearch-hadoop            6.2.4                            org.apache.spark            spark-core_2.11            ${spark.version}                            org.apache.spark            spark-sql_2.11            ${spark.version}            

1. 创建ES的编程入口

  主要是提供一个Utils,通过读取配置文件进行创建ES的编程入口。
#elasticSearch.conf

elastic.host=192.168.130.131elastic.port=9300elastic.cluster.name=zzy-application

#Constants

public interface Constants {    String ELASTIC_HOST = "elastic.host";    String ELASTIC_PORT="elastic.port";    String ELASTIC_CLUSTER_NAME = "elastic.cluster.name";}

#ElasticSearchUtil

import com.zy.es.constant.Constants;import org.elasticsearch.client.transport.TransportClient;import org.elasticsearch.common.settings.Setting;import org.elasticsearch.common.settings.Settings;import org.elasticsearch.common.transport.TransportAddress;import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.io.IOException;import java.io.InputStream;import java.net.InetAddress;import java.util.Properties;/** * 一般情况下的工具类都是单例 * 里面若干方法一般都是static * 如果在连接集群的时候,集群的名称对应不上: *  NoNodeAvailableException[None of the configured nodes are available: */public class ElasticSearchUtil {    private static TransportClient client;    private static Properties ps;    static {        try {            InputStream resourceAsStream = ElasticSearchUtil.class.getClassLoader().getResourceAsStream("elasticsearch.conf");            ps =new Properties();            ps.load(resourceAsStream);            String host=ps.getProperty(Constants.ELASTIC_HOST);            int port = Integer.parseInt(ps.getProperty(Constants.ELASTIC_PORT));            String clusterName=ps.getProperty(Constants.ELASTIC_CLUSTER_NAME);            Settings settings =Settings.builder()                    .put("cluster.name",clusterName)                    .build();            client=new PreBuiltTransportClient(settings);            //这里可以有多个,集群模式            TransportAddress ta=new TransportAddress(                    InetAddress.getByName(host),                    port            );            //addTransportAddresses(TransportAddress... transportAddress),参数为一个可变参数            client.addTransportAddresses(ta);        } catch (IOException e) {            e.printStackTrace();        }    }    public static TransportClient getTransportClient(){        return client;    }    public static void close(TransportClient client){        if(client!=null){            client.close();        }    }}

2. 创建索引

  小编这里提供了json、map、javabean、XContentBuilder四种创建方式。

import java.utilimport com.zy.es.pojo.Bookimport com.zy.es.utils.ElasticSearchUtilimport org.elasticsearch.action.index.IndexResponseimport org.elasticsearch.cluster.metadata.MetaData.XContentContextimport org.elasticsearch.common.xcontent.{XContentBuilder, XContentType}import org.elasticsearch.common.xcontent.json.JsonXContentimport org.json.JSONObjectobject createIndex {  private var index="library"  private var `type`="books"  privateval client = ElasticSearchUtil.getTransportClient()  def main(args: Array[String]): Unit = {    createIndexByJson()    //createIndexByMap()   // createIndexByBean()   // createIndexByXContentBuilder()    //关闭es连接对象    ElasticSearchUtil.close(client)  }  /**    * 1.通过json方式创建    * java.lang.IllegalArgumentException: The number of object passed must be even but was [1]    * 在es5.x以上,使用XContentType.JSON来制定即可    *setSource(json.toString(),XContentType.JSON)  必须指定第二个参数。    */  def createIndexByJson()={    val json=new JSONObject    json.put("name","我爱你中国")    json.put("author","周迅")    json.put("date","2018-6-6")    //返回创建后的结果    var response: IndexResponse = client.prepareIndex(index, `type`, "9")      .setSource(json.toString, XContentType.JSON).get()    //查看版本    println(response.getVersion)  }  /**    * 2.map方式    */  def createIndexByMap(): Unit ={    val sourceMap=new util.HashMap[String,String]()    sourceMap.put("name","朝花夕拾")    sourceMap.put("author","鲁迅")    sourceMap.put("date","2009-4-5")    var response: IndexResponse = client.prepareIndex(index, `type`, "2").setSource(sourceMap)      .get()    //查看版本    println(response.getVersion)  }  /**    * 3.使用普通的javabean    */  def createIndexByBean()={    val book:Book=new Book("斗破苍穹","天蚕土豆","2012-2-6");    val json=new JSONObject(book)    //返回创建后的结果    var response: IndexResponse = client.prepareIndex(index, `type`, "3")      .setSource(json.toString, XContentType.JSON).get()    //查看版本    println(response.getVersion)  }  /**    * 4.XContentBuilder方式    */  def createIndexByXContentBuilder()={    var builder: XContentBuilder = JsonXContent.contentBuilder()    builder.startObject()      .field("name","西游记")      .field("author","吴承恩")      .field("version","1.0")      .endObject()    var response: IndexResponse = client.prepareIndex(index, `type`,"4").setSource(builder)      .get()    println(response.getVersion)  }}

3.删除数据 & 更新数据 &批量处理

  小编这里提供了删除数据,更新数据,批量操作。

import java.utilimport com.zy.es.utils.ElasticSearchUtilimport org.elasticsearch.action.bulk.BulkResponseimport org.elasticsearch.action.delete.DeleteResponseimport org.elasticsearch.action.update.{UpdateRequestBuilder, UpdateResponse}import org.elasticsearch.common.xcontent.{XContentBuilder, XContentType}import org.elasticsearch.common.xcontent.json.JsonXContentimport org.json.JSONObjectobject ElasticsearchCRUD {  private var index="library"  private var `type`="books"  privateval client = ElasticSearchUtil.getTransportClient()  def main(args: Array[String]): Unit = {    //删除数据    testDelete()    //更新    //testUpdate()    //批量操作    //testBulk()    //关闭连接对象    ElasticSearchUtil.close(client)  }  //删除数据  def testDelete()={    var response: DeleteResponse = client.prepareDelete(index, `type`, "2").get()    println("version:"+response.getVersion)  }  //更新  def testUpdate()={    var builder: XContentBuilder = JsonXContent.contentBuilder()    builder.startObject()      .field("version","3.0")      .endObject()    var response: UpdateResponse  = client.prepareUpdate(index, `type`, "4")      .setDoc(builder).get()    println("version:"+response.getVersion)  }  //批量操作  def testBulk()={    val map=new util.HashMap[String,String]()    map.put("name","无双")    map.put("author","周润发")    map.put("version","2")    val json=new JSONObject    json.put("name","红楼梦")    json.put("author","曹雪芹")    json.put("version","1.0")    var responses: BulkResponse = client.prepareBulk().add(client.prepareIndex(index, `type`, "7")      .setSource(map))      .add(client.prepareIndex(index, `type`, "8").setSource(json.toString(),XContentType.JSON))      .get()    for(response <-responses.getItems){      print(response.getVersion)    }  }}

4.全文索引、分页索引、高亮显示

import java.utilimport com.zy.es.utils.ElasticSearchUtilimport org.elasticsearch.action.search.{SearchResponse, SearchType}import org.elasticsearch.index.query.QueryBuildersimport org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilderimport org.elasticsearch.search.{SearchHit, SearchHits}import org.json.JSONObjectimport scala.collection.JavaConversionsobject testSearch {  private var index="library"  private var `type`="books"  privateval client = ElasticSearchUtil.getTransportClient()  def main(args: Array[String]): Unit = {    //全文索引    //fullTextSearch()    //分页索引    //pagingSearch()    //高亮索引    highlightSearch()  }  //全文索引  def fullTextSearch()={    val json=new JSONObject()    val response = client.prepareSearch(index) //设置检索的类型      .setSearchType(SearchType.DEFAULT) //设置检索的类型      .setQuery(QueryBuilders.matchQuery("author", "天蚕土豆")) //设置检索方式      .get()    val hits = response.getHits  //获取检索结果    println("totals:"+hits.getTotalHits)  //检索出的数据的个数    println("maxSource"+hits.getMaxScore) //最大的得分    //查询的具体的内容    val myhits = hits.getHits    for(hit <- myhits){      val index = hit.getIndex      val id = hit.getId      val `type` = hit.getTypeval source =hit.getSourceAsString      val score=hit.getScore      json.put("_index",index)      json.put("_id",id)      json.put("_type",`type`)      json.put("_score", score )      json.put("_source",new JSONObject(source))      println(json.toString())    }  }  //分页索引  //分页查询:查询第num页,查count条   每一页的长度*(num-1)+count  def pagingSearch(from:Int=0,size:Int=10)={    var response: SearchResponse = client.prepareSearch(index)      .setSearchType(SearchType.QUERY_THEN_FETCH)      .setQuery(QueryBuilders.matchQuery("name", "西游记"))      .setFrom(from)      .setSize(size)      .get()    val myhits: SearchHits = response.getHits    val total=myhits.totalHits    println("zzy为您查询出"+total+"记录:")    val hits: Array[SearchHit] = myhits.getHits    for (hit<-hits){      val map: util.Map[String, AnyRef] = hit.getSourceAsMap      val author=map.get("author")      val name=map.get("name")      val version=map.get("version")      print(        s"""           |author:${author}           |name:${name}           |version:${version}         """.stripMargin)    }  }  //高亮索引   def highlightSearch()={     val response=client.prepareSearch(index)       .setSearchType(SearchType.DEFAULT)       .setQuery(QueryBuilders.matchQuery("author","周润发"))       .highlighter(new HighlightBuilder()         .field("author")//给哪个字段添加标签         .preTags("")//添加的前置标签         .postTags(""))//添加的后置标签            .get()     val myHits = response.getHits     val total = myHits.totalHits     println("zzy为您查询出" + total + "记录:")     val hits: Array[SearchHit] = myHits.getHits     for(hit <-hits){       //注意这里如果想要获取高亮的字段,必须使用高亮的方式获取       val HLfields = hit.getHighlightFields       //这里的field是设置高亮的字段名:author  highlight查询的所有的字段值(含高亮的)       for((field,highlight)<-JavaConversions.mapAsScalaMap(HLfields)){         var date=""         val fragments=highlight.getFragments         for(fragment <-fragments){           date+=fragment.toString         }         print(date)       }     }   }}

5. 中文分词

(1)错误演示

首先我们现在自己的ES集群中添加一些数据:

#创建索引库curl -H "Content-Type: application/json" -XPUT 'http://192.168.130.131:9200/chinese'#添加数据curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/1 -d'{"content":"美国留给伊拉克的是个烂摊子吗"}'curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/2 -d'{"content":"公安部:各地校车将享最高路权"}'curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/3 -d'{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}'curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/4 -d'{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}'

#然后使用不同的查询看看效果:

import com.zy.es.utils.ElasticSearchUtilimport org.elasticsearch.action.search.{SearchResponse, SearchType}import org.elasticsearch.index.query.QueryBuildersobject ChineseParticipleSearch {  private var index="chinese"  private var `type`="fulltext"  privateval client = ElasticSearchUtil.getTransportClient()  def main(args: Array[String]): Unit = {    val response: SearchResponse =client.prepareSearch(index)      .setSearchType(SearchType.QUERY_THEN_FETCH)      .setQuery(QueryBuilders.matchQuery("content","中国"))      .get()    val myHits = response.getHits.getHits    for(hit <- myHits){      println(hit.getSourceAsString)    }  }}

注意:我们这里使用match查询,查询了是"中国"
看看运行结果:

这里为什么美国也会被查询出来?
这是因为:原生的查询将'中国'这个两个字分开之后在进行检索,索引会出现上图中的查询错误的情况。
那我们该怎么办呢,我只想查询出来有关中国的内容啊,没关系中文分词帮你解决。

(2)ES配置中文分词

  常见的中文分词插件:IK,庖丁解牛中文分词等等。这里我们使用IK分词。
① 下载: https://github.com/medcl/elasticsearch-analysis-ik 版本对应

② 使用maven对源代码进行编译(在IK_HOME下):(mvn clean install -DskipTests)
③ 把编译后的target/releases下的zip文件拷贝到 ES_HOME/plugins/analysis-ik目录下面,然后解压将其中的plugin-descriptor.properties 和plugin-security.policy文件中的ES的版本改为自己使用的版本
④ 修改ES_HOME/config/elasticsearch.yml文件,添加(ES6.x以上版本无需此操作)index.analysis.analyzer.default.type: ik
⑤ 重启es服务
这里小编就有些粗暴了:
#ps -aux|grep elasticsearch
#kill -9 pid
#/ES_HOME/bin/elasticsearch -d 启动

(3)重新测试

第一步: 将之前数据进行删除
curl -XDELETE 'http://192.168.130.131:9200/chinese/1'
curl -XDELETE 'http://192.168.130.131:9200/chinese/2'
curl -XDELETE 'http://192.168.130.131:9200/chinese/3'
curl -XDELETE 'http://192.168.130.131:9200/chinese/4'
第二步: 重新加载数据,并设置为IK分词
#设置为ik分词
curl -XPOST http://192.168.130.131:9200/chinese/fulltext/_mapping -H 'Content-Type:application/json' -d'
{
"properties": {
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
}'

    #添加数据    curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/1 -d'{"content":"美国留给伊拉克的是个烂摊子吗"}'    curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/2 -d'{"content":"公安部:各地校车将享最高路权"}'    curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/3 -d'{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}'    curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/4 -d'{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}'

第三步
重新执行刚刚上面的代码,这里我们看看结果:

6.Elasticsearch On Spark

整合条件:
ES官网:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html
maven依赖:https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop/6.2.4

    org.elasticsearch    elasticsearch-hadoop    6.2.4
//如果使用spark中可以读到ES中的数据,需要导入隐式转换import java.util.Dateimport com.zy.es.utils.ElasticSearchUtilimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.cluster.metadata.MetaData.XContentContextimport org.elasticsearch.common.xcontent.XContentTypeimport org.elasticsearch.spark._/**  * spark整合ES  * 通过spark去读取es中的数据,同时将操作之后的结果落地到ES  */object EsOnSpark {  privateval client = ElasticSearchUtil.getTransportClient()  def main(args: Array[String]): Unit = {    val conf = new SparkConf()    conf.setAppName("EsOnSpark")      .setMaster("local[2]")      .set("es.index.auto.create", "true") //写数据的时候如果索引库不存在,自动创建      .set("es.nodes", "192.168.130.131") //设置ES集群的节点      .set("es.port", "9200") //设置ES集群的端口    val sc = new SparkContext(conf)    var EsRDD: RDD[(String, String)] = sc.esJsonRDD("library/books") //指定index/type    var index = "es-spark"    var `type` = "book"    EsRDD.foreach { case (id, json) => {      client.prepareIndex(index, `type`, new Date().getTime.toString)        .setSource(json, XContentType.JSON).get()      println(id + "" + json)    }    }    sc.stop()  }}

这里只是小编介绍一些常见的API操作,大家知道ES最大的优势在于他的查询,后期小编会进一步的补充关于ElasticSearch强大的查询功能的API。

0