ElasticSearch常用操作:文档篇
[TOC]
1 新建文档
1.1 指定id
PUT my_blog/article/1{ "id":1, "title":"elasticsearch", "posttime":"2017-05-01", "content":"elasticsearch is helpfull!"}
返回:
{ "_index": "my_blog", "_type": "article", "_id": "1", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true}
版本号会随着文档的更新自动递增。
1.2 不指定id
不指定id,es会自动生成,但是这时只能使用post:
POST my_blog/article{ "id":2, "title":"spark", "posttime":"2017-05-01", "content":"spark is helpfull!"}
返回:
{ "_index": "my_blog", "_type": "article", "_id": "AWagTCv8O1qbT1zqbREV", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true}
2 获取文档
2.1 普通获取
获取存在的文档:
GET my_blog/article/1
返回:
{ "_index": "my_blog", "_type": "article", "_id": "1", "_version": 1, "found": true, "_source": { "id": 1, "title": "elasticsearch", "posttime": "2017-05-01", "content": "elasticsearch is helpfull!" }}
获取不存在的文档:
GET my_blog/article/2
返回:
{ "_index": "my_blog", "_type": "article", "_id": "2", "found": false}
2.2 测试文档是否存在
使用HEAD可以测试文档是否存在:
HEAD my_blog/article/1200 - OKHEAD my_blog/article/2404 - Not Found
2.3 批量获取
不同index不同type:
GET _mget{ "docs":[ { "_index":"my_blog", "_type":"article", "_id":1 }, { "_index":"twitter", "_type":"tweet", "_id":2 } ]}
同一index下不同type:
GET my_blog/_mget{ "docs":[ { "_type":"article", "_id":1 }, { "_type":"essay", "_id":2 } ]}
同一index同一type:
GET my_blog/article/_mget{ "docs":[ {"_id":1}, {"_id":2} ]}
或:
GET my_blog/article/_mget{ "ids":[1,2]}
3 更新文档
es更新文档的原理为:先找到这个文档,删除旧的文档内容执行更新,更新完后再索引最新的文档。
先添加下面一份文档:
PUT test/type1/1{ "counter":1, "tags":["red"]}
3.1 更新文档字段内容
给counter的值增加4:
POST test/type1/1/_update{ "script": { "inline": "ctx._source.counter += params.count", "lang": "painless", "params": { "count":4 } }}
Note1:命令中inline是执行的脚本,ctx是脚本语言中的一个执行对象,painless是es内置的一种脚本语言,params是参数集合;
Note2:ctx对象除了可以访问
_source
之外,还可以访问_index
、_type
、_id
、_version
、_routing
、_parent
等字段;
对tags字段增加一个值:
POST test/type1/1/_update{ "script":{ "inline":"ctx._source.tags.add(params.tag)", "lang":"painless", "params":{ "tag":"blue" } }}
3.2 新增与移除字段
给test/type1/1添加一个字段name:
POST test/type1/1/_update{ "script": { "inline": "ctx._source.name=\"test\"" }}
上面的命令也可以简写为:
{"script":""ctx._source.name=\"test\""}
移除name字段:
POST test/type1/1/_update{ "script": { "inline": "ctx._source.remove(\"name\")" }}
3.3 upsert操作
如果文档不存在,upsert会新建一个文档,文档存在,则正常执行script脚本。如下:
POST test/type1/2/_update{ "script": { "inline": "ctx._source.counter += params.count", "lang": "painless", "params": { "count":4 } }, "upsert": { "counter":1, "tag":["pink"] }}
如果test/type1/2存在,则更新count,如果不存在,则新建一个包含字段counter和tag的文档。
返回:
{ "_index": "test", "_type": "type1", "_id": "2", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }}
3.4 基于doc的更新方式
也可以使用doc的方式来更新字段内容或者添加新的字段:
POST myblog/article/1/_update{ "doc": { "title":"test new title" }}POST myblog/article/1/_update{ "doc": { "new field":"this is a new field" }}
4 查询更新
POST my_blog/_update_by_query{ "script":{ "inline": "ctx._source.content = params.content", "lang": "painless", "params": { "content":"spark is popular" } }, "query":{ "term": { "title": { "value": "spark" } } }}
返回:
{ "took": 33, "timed_out": false, "total": 1, "updated": 1, "deleted": 0, "batches": 1, "version_conflicts": 0, "noops": 0, "retries": { "bulk": 0, "search": 0 }, "throttled_millis": 0, "requests_per_second": -1, "throttled_until_millis": 0, "failures": []}
5 删除文档
DELETE my_blog/article/2
如果在索引文档时指定了路由,删除时也可以增加路由参数:
DELETE my_blog/article/2?routing=user123
Note1:如果删除时路由值不正确,删除会失败;
Note2:当映射的_routing被设定为required且没有指定的路由值时,执行删除操作将抛出路由缺失异常并拒绝该请求;
6 查询删除
POST my_blog/_delete_by_query{ "query":{ "term": { "title": { "value": "mybatis" } } }}
删除一个type下的所有文档:
POST my_blog/article/_delete_by_query{ "query":{ "match_all":{} }}
7 批量操作
7.1 命令格式
使用如下命令:
curl -XPOST 'localhost:9200/indexname/_bulk?prettry' --data-binary @accounts.json
accounts.json文件应该满足下面的格式:
action_and_meta_data行data行
Note1:action_and_meta_data行中,action必须是index、create、update或者delete,metadata需要指明需要被操作文档的
_index
、_type
以及_id
;Note2:data行就是添加的数据,添加文档时就需要data行;
Note3:每一行的结尾处都必须有换行符"\n",最后一行也要有,换行符可以有效地分隔每行;
7.2 添加文档
{"index": {"_index": "my_blog"}, "_type": "article", "_id": "1"}{"title": "blog title"}
或
{"create": {"_index": "my_blog", "_type": "article", "_id": "1"}}{"title": "blog title"}
不写id也是可以的。
7.3 删除文档
{"delete": {"_index": "website", "_type": "blog", "_id": "123"}}
7.4 综合案例
下面内容包含索引文档请求、更新文档请求和删除文档请求:
{"delete": {"_index": "website", "_type": "blog", "_id": "123"}}{"create": {"_index": "website", "_type": "blog", "_id": "123"}}{"title": "blog title"}{"index": {"_index": "website", "_type": "blog"}}{"title": "blog title"}{"update": {"_index": "website", "_type": "blog", "_id": "123"}}{"doc": {"title": "blog title"}}
8 版本控制
初次接触,8.2可以忽略,了解一下8.3的命令操作即可。
es进行文档更新时,首先读取源文档,对原文档进行更新操作,更新操作执行完成以后再重新索引整个文档。
8.1 锁控制
很有可能多个用户同时对同一份文档数据进行修改更新操作,这时就需要进行事务性控制,或者说需要进行并发控制。
8.1.1 悲观锁控制
如果有线程对数据进行修改就对数据进行锁定,其他线程想要访问需要等待当前锁定释放,这样可确保同一时刻最多只有一个线程访问数据。传统的关系型数据库就用到了很多这种锁机制,比如行锁、表锁、读锁、写锁等。
8.1.2 乐观锁控制
对数据资源不会锁定,只有在数据提交操作时检查是否违反数据完整性。Elasticsearch使用的就是乐观锁机制,乐观锁适用于读操作比写操作比较多的应用类型,可省去锁开销,提高吞吐量。
8.2 es的版本控制
既然es使用的是乐观锁,那么如何保证旧的数据不会覆盖新的数据呢?在es中使用_version来进行版本的控制,文档每被更新一次,其就会加1.
es的文档版本控制机制主要有内部版本控制和外部版本控制:
- 内部版本控制机制要求每次操作请求,只有当版本号相等时才能操作成功;
- 外部版本控制要求外部文档版本比内部文档版本高时才能更新成功;
其实不管是请求获取数据还是请求更新数据,都是可以携带版本号的,不管情况多复杂,只需要记住下面两点即可:
- 1.如果只是请求获取数据操作,内部版本控制机制生效,外部版本控制机制不生效,情况如下:
- a.不携带版本号,操作成功:
- b.携带内部版本号,那么一定要跟文档当前的版本号相等;
- c.携带外部版本号,那么一定要跟文档当前的版本号相等;
- 2.如果是更新操作,情况如下:
- a.不携带版本号,操作成功,文档版本号会加1;
- b.携带内部版本号,那么一定要跟文档当前的版本号相等,文档版本号会加1;
- c.携带外部版本号,那么一定要大于文档当前的版本号,文档版本会等于携带的版本号;
也可以从是否携带版本号的角度去思考这个问题:
- 1.不携带版本号,请求获取数据和更新操作都会成功;
- 2.携带内部版本号,不管哪种操作,要求一定要与文档版本号相等;
- 3.携带外部版本号,请求获取数据时版本号需要相等,更新操作时版本号需要大于文档版本号;
至于es为什么这样设计?其实内部版本号是es提供的,但实际上你可能也有自己业务或程序上的需求,就是说你的应用系统本身就维护了一个版本号,或者说你自己想通过某种机制去维护一个版本号,那么就可以使用外部版本号。
8.3 命令操作
GET website/blog/1?version=1PUT website/blog/1?version=5&version_type=external
9 路由机制
9.1 分片位置计算与案例
当索引有多个分片时,索引一个文档,es是如何确定将文档保存到哪一个文档上的呢?假设环境如下:
Master Node:shard0(primary)shard1 shard2(primary)Common Node:shard0shard1(primary)shard2
es的路由机制通过哈希算法,将具有相同哈希值的文档放置到同一个主分片中,方法如下:
shard = hash(routing) % number_of_primary_shards
假如我们添加一份文档,没有指定id,es给我们生成的id是AWagTCv8O1qbT1zqbREV
,套用上面的公式,shard就应该为:
shard = hash("AWagTCv8O1qbT1zqbREV") % 3
当然哈希函数的实现不一定,我们可以在python中调用一下其提供的hash()函数来演示一下上面的计算:
>>> shard = hash("AWagTCv8O1qbT1zqbREV") % 3>>> shard2
显然,该文档会被存储到分片2也就是第3个分片上。
通过上面的介绍可以知道,默认的路由模式可以保证数据平均分布,不过也可以自定义routing值,从而指定文档的存储位置。
9.2 es查询流程与自定义routing值
假如存在一个有50个分片的索引,在集群上执行一次查询的过程如下:
- (1)查询请求首先被集群中的一个节点接收;
- (2)接收到这个请求的节点,将这个查询广播到这个索引的每个分片上;
- (3)每个分片执行完搜索查询并返回结果;
- (4)结果在通道节点上合并、排序并返回给用户;
可以自定义路由值来避免这种广播,下面举一个案例进行说明。
正常我们这样来添加一份文档:
PUT website/article/1{ "title":"My first blog entry", "text":"Just trying this out...", "user":"user123"}
查询时,希望查询user123的所有文章:
GET website/article/_search{ "query":{ "term":{"user":"user123"} }}
显然这样查询,会按照上面的流程去走,也就是请求会被发送到所有的分片上,这时希望优化一下。
将user作为routing来添加文档:
PUT website/article/1?routing=user123{ "title":"My first blog entry", "text":"Just trying this out...", "user":"user123"}
指定routing值后,后面user123发表的文章(文档)都会被存储到同一个分片上,这里假设是分片1,这样一来,我们再查询user123发表的文章时,只需要在search时指定routing值,那么就不会被广播请求的情况了,请求直接到达分片1,查询如下:
GET website/article/_search?routing=user123
Note1:这样也会带来问题,比如user123发表了数十万篇文章,但是其它用户只有很少的文章,显然数据的分配就不均衡了;
Note2:也可以为文档指定多个路由值,路由值之间使用逗号分隔(这样一来,文档就有可能会被分配到多个分片上,至于满足其条件的这几个分片,es是如何选择的,通过什么算法,可以自行研究一下);