elasticsearch的写操作是原子性的,可以通过如下两种方式实现es写操作的乐观锁。
基于_version
==version_type
在elasticsearch6.x被移除,故该方法不适用于6.x版本==,详见https://www.elastic.co/guide/en/elasticsearch/reference/6.7/docs-update.html
1 2 3 4 5 6
| PUT /user/user_type/1 { "name": "tom", "age": 18, "doc_version": 1 // 自定义的文档版本号,用于乐观锁 }
|
只更新低版本的记录
1 2 3 4 5 6
| PUT /user/user_type/1?version=2&version_type=external_gt { "name": "tom", "age": 18, "doc_version": 2 }
|
忽略版本号强制更新
1 2 3 4 5 6
| PUT /user/user_type/1?version=3&version_type=force { "name": "tom", "age": 18, "doc_version": 3 }
|
基于script
1 2 3 4 5 6
| PUT /user/user_type/1 { "name": "tom", "age": 18, "doc_version": 1 }
|
只更新es中doc_version
版本低的记录
1 2 3 4 5 6 7 8 9 10 11 12
| POST /user/user_type/1/_update { "script": { "source": "if(params.doc_force == true || ctx._source.doc_version < params.doc_version){for(entry in params.entrySet()){if (entry.getKey() != 'ctx') ctx._source[entry.getKey()] = entry.getValue();}}else{ctx.op = 'none'}", "lang": "painless", "params": { "name": "tom", "age": 19, "doc_version": 19 } } }
|
忽略自定义版本号doc_version
强制更新记录
1 2 3 4 5 6 7 8 9 10 11 12 13
| POST /user/user_type/1/_update { "script": { "source": "if(params.doc_force == true || ctx._source.doc_version < params.doc_version){for(entry in params.entrySet()){if (entry.getKey() != 'ctx') ctx._source[entry.getKey()] = entry.getValue();}}else{ctx.op = 'none'}", "lang": "painless", "params": { "name": "tom", "age": 19, "doc_version": 19, "doc_force": true } } }
|
注:读取script
中的params
的值时,需要过滤掉params.ctx
,原因是es的painless
脚本会自动向params
中添加ctx
,如果不过滤,则上述的更新语法会报如下错误:
1 2 3 4 5 6 7 8 9 10 11 12 13
| { "error": { "root_cause": [ { "type": "remote_transport_exception", "reason": "[0l2AYvx][127.0.0.1:9300][indices:data/write/update[s]]" } ], "type": "illegal_argument_exception", "reason": "Iterable object is self-referencing itself" }, "status": 400 }
|
详见https://github.com/elastic/elasticsearch/pull/32096。