一、概述
Elasticsearch 是一款非常强大的开源搜索引擎,可以帮助我们从海量数据中快速找到需要的内容。
Elasticsearch 结合Kibana、Logstash、Beats,也就是elastic stack(ELK)。被广泛应用在日志数据分析、实时监控等领域。
Elasticsearch 是Elastic stack的核心,负责存储,搜索,分析数据。

Lucene
是一个Java语言的搜索引擎类库,是Apache公司的顶级项目,由DougCutting于1999年研发。
Lucene的优势:
Lucene的缺点:
- 只限于Java语言开发
- 学习曲线陡峭
- 不支持水平扩展
2004年ShayBanon基于Lucene开发了Compass。
2010年Shay Banon重写了Compass,取名为Elasticsearch
。
相比与lucene,elasticsearch具备下列优势:
- 支持分布式,可水平扩展
- 提供Restful接口,可被任何语言调用
1.1 正向索引和倒排索引
正向索引,做局部内容检索效果较差
传统数据库(如Mysql)采用的就是正向索引,例如给下表(tb_goods)中的ID创建索引。

Elasticsearch
采用倒排索引。
文档(Document):每一条数据就是一个文档。
词条(term):文档按照语义分成的词语。

1.1.1文档
Elasticsearch 是面向文档存储的,可以是数据库中的一条商品数据,一个订单信息。
文档数据会被序列化成json格式后存储在Elasticsearch中。
1.1.2索引和映射
索引(index):是相同类型的文档的集合。
映射(mapping):索引中文文档的字段约束信息,类似于表结构的约束。



二、安装elasticsearch
2.1部署单点es
2.1.1创建网络
因为我们还需要部署kibana容器,因此需要让es和kibana容器互联。这里先创建一个网络:
1
| docker network create es-net
|
2.1.2加载镜像
这里我们采用elasticsearch的7.12.1版本的镜像,这个镜像体积非常大,接近1G。不建议大家自己pull。
课前资料提供了镜像的tar包:

大家将其上传到虚拟机中,然后运行命令加载即可:
同理还有kibana
的tar包也需要这样做。
2.1.3运行
运行docker命令,部署单点es:
1 2 3 4 5 6 7 8 9 10 11
| docker run -d \ --name es \ -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ -e "discovery.type=single-node" \ -v es-data:/usr/share/elasticsearch/data \ -v es-plugins:/usr/share/elasticsearch/plugins \ --privileged \ --network es-net \ -p 9200:9200 \ -p 9300:9300 \ elasticsearch:7.12.1
|
命令解释:
-e "cluster.name=es-docker-cluster"
:设置集群名称
-e "http.host=0.0.0.0"
:监听的地址,可以外网访问
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m"
:内存大小
-e "discovery.type=single-node"
:非集群模式
-v es-data:/usr/share/elasticsearch/data
:挂载逻辑卷,绑定es的数据目录
-v es-logs:/usr/share/elasticsearch/logs
:挂载逻辑卷,绑定es的日志目录
-v es-plugins:/usr/share/elasticsearch/plugins
:挂载逻辑卷,绑定es的插件目录
--privileged
:授予逻辑卷访问权
--network es-net
:加入一个名为es-net的网络中
-p 9200:9200
:端口映射配置
在浏览器中输入:http://192.168.150.101:9200 即可看到elasticsearch的响应结果:

2.2部署kibana
kibana可以给我们提供一个elasticsearch的可视化界面,便于我们学习。
2.2.1部署
运行docker命令,部署kibana
1 2 3 4 5 6
| docker run -d \ --name kibana \ -e ELASTICSEARCH_HOSTS=http://es:9200 \ --network=es-net \ -p 5601:5601 \ kibana:7.12.1
|
--network es-net
:加入一个名为es-net的网络中,与elasticsearch在同一个网络中
-e ELASTICSEARCH_HOSTS=http://es:9200"
:设置elasticsearch的地址,因为kibana已经与elasticsearch在一个网络,因此可以用容器名直接访问elasticsearch
-p 5601:5601
:端口映射配置
kibana启动一般比较慢,需要多等待一会,可以通过命令:
查看运行日志,当查看到下面的日志,说明成功:

此时,在浏览器输入地址访问:http://192.168.150.101:5601,即可看到结果
kibana中提供了一个DevTools界面:

这个界面中可以编写DSL来操作elasticsearch。并且对DSL语句有自动补全功能。
2.3安装IK分词器
2.3.1在线安装ik插件(较慢)
1 2 3 4 5 6 7 8 9 10
| # 进入容器内部 docker exec -it elasticsearch /bin/bash
# 在线下载并安装 ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip
#退出 exit #重启容器 docker restart elasticsearch
|
2.3.2离线安装ik插件(推荐)
1)查看数据卷目录
安装插件需要知道elasticsearch的plugins目录位置,而我们用了数据卷挂载,因此需要查看elasticsearch的数据卷目录,通过下面命令查看:
1
| docker volume inspect es-plugins
|
显示结果:
1 2 3 4 5 6 7 8 9 10 11
| [ { "CreatedAt": "2022-05-06T10:06:34+08:00", "Driver": "local", "Labels": null, "Mountpoint": "/var/lib/docker/volumes/es-plugins/_data", "Name": "es-plugins", "Options": null, "Scope": "local" } ]
|
说明plugins目录被挂载到了:/var/lib/docker/volumes/es-plugins/_data
这个目录中。
2)解压缩分词器安装包
下面我们需要把课前资料中的ik分词器解压缩,重命名为ik

3)上传到es容器的插件数据卷中
也就是/var/lib/docker/volumes/es-plugins/_data
:

4)重启容器
1 2
| # 4、重启容器 docker restart es
|
5)测试:
IK分词器包含两种模式:
ik_smart
:最少切分
ik_max_word
:最细切分
1 2 3 4 5
| GET /_analyze { "analyzer": "ik_max_word", "text": "黑马程序员学习java太棒了" }
|
结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| { "tokens" : [ { "token" : "黑马", "start_offset" : 0, "end_offset" : 2, "type" : "CN_WORD", "position" : 0 }, { "token" : "程序员", "start_offset" : 2, "end_offset" : 5, "type" : "CN_WORD", "position" : 1 }, { "token" : "程序", "start_offset" : 2, "end_offset" : 4, "type" : "CN_WORD", "position" : 2 }, { "token" : "员", "start_offset" : 4, "end_offset" : 5, "type" : "CN_CHAR", "position" : 3 }, { "token" : "学习", "start_offset" : 5, "end_offset" : 7, "type" : "CN_WORD", "position" : 4 }, { "token" : "java", "start_offset" : 7, "end_offset" : 11, "type" : "ENGLISH", "position" : 5 }, { "token" : "太棒了", "start_offset" : 11, "end_offset" : 14, "type" : "CN_WORD", "position" : 6 }, { "token" : "太棒", "start_offset" : 11, "end_offset" : 13, "type" : "CN_WORD", "position" : 7 }, { "token" : "了", "start_offset" : 13, "end_offset" : 14, "type" : "CN_CHAR", "position" : 8 } ] }
|
2.3.3扩展词词典
随着互联网的发展,“造词运动”也越发的频繁。出现了很多新的词语,在原有的词汇列表中并不存在。比如:“奥力给”,“传智播客” 等。
所以我们的词汇也需要不断的更新,IK分词器提供了扩展词汇的功能。
1)打开IK分词器config目录:

2)在IKAnalyzer.cfg.xml配置文件内容添加:
1 2 3 4 5 6 7
| <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 扩展配置</comment> <entry key="ext_dict">ext.dic</entry> </properties>
|
3)新建一个 ext.dic,可以参考config目录下复制一个配置文件进行修改
4)重启elasticsearch
1 2 3 4
| docker restart es
docker logs -f elasticsearch
|

日志中已经成功加载ext.dic配置文件
5)测试效果:
1 2 3 4 5
| GET /_analyze { "analyzer": "ik_max_word", "text": "传智播客Java就业超过90%,奥力给!" }
|
注意当前文件的编码必须是 UTF-8 格式,严禁使用Windows记事本编辑
2.3.4停用词词典
在互联网项目中,在网络间传输的速度很快,所以很多语言是不允许在网络上传递的,如:关于宗教、政治等敏感词语,那么我们在搜索时也应该忽略当前词汇。
IK分词器也提供了强大的停用词功能,让我们在索引时就直接忽略当前的停用词汇表中的内容。
1)IKAnalyzer.cfg.xml配置文件内容添加:
1 2 3 4 5 6 7 8 9
| <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"> <properties> <comment>IK Analyzer 扩展配置</comment> <entry key="ext_dict">ext.dic</entry> <entry key="ext_stopwords">stopword.dic</entry> </properties>
|
3)在 stopword.dic 添加停用词
4)重启elasticsearch
1 2 3 4 5 6
| docker restart elasticsearch docker restart kibana
docker logs -f elasticsearch
|
日志中已经成功加载stopword.dic配置文件
5)测试效果:
1 2 3 4 5
| GET /_analyze { "analyzer": "ik_max_word", "text": "传智播客Java就业率超过95%,习大大都点赞,奥力给!" }
|
注意当前文件的编码必须是 UTF-8 格式,严禁使用Windows记事本编辑
2.4部署es集群
部署es集群可以直接使用docker-compose来完成,不过要求你的Linux虚拟机至少有4G的内存空间
首先编写一个docker-compose文件,内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| version: '2.2' services: es01: image: elasticsearch:7.12.1 container_name: es01 environment: - node.name=es01 - cluster.name=es-docker-cluster - discovery.seed_hosts=es02,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data01:/usr/share/elasticsearch/data ports: - 9200:9200 networks: - elastic es02: image: elasticsearch:7.12.1 container_name: es02 environment: - node.name=es02 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es03 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data02:/usr/share/elasticsearch/data ports: - 9201:9200 networks: - elastic es03: image: elasticsearch:7.12.1 container_name: es03 environment: - node.name=es03 - cluster.name=es-docker-cluster - discovery.seed_hosts=es01,es02 - cluster.initial_master_nodes=es01,es02,es03 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - data03:/usr/share/elasticsearch/data networks: - elastic ports: - 9202:9200 volumes: data01: driver: local data02: driver: local data03: driver: local
networks: elastic: driver: bridge
|
es需要修改一些Linux权限,修改/etc/sysctl.conf
文件
添加下面内容:
然后执行命令,让配置生效
通过docker-compose
启动集群
2.4.1集群状态监控
kibana可以监控es集群,不过新版本需要依赖es的x-pack
功能,配置比较复杂。
这里推荐使用cerebro
来监控es集群状态。


三、DSL
3.1 _cat
1 2 3 4 5 6 7 8
| # 查看所有节点 GET /_cat/nodes # 查看es健康状况 GET /_cat/health # 查看主节点 GET /_cat/master # 查看所有索引 GET /_cat/indices
|
3.2 操作索引库
3.2.1 mapping属性
mapping是对库中文档的约束,常见的mapping属性包括:
3.2.2 创建索引库
ES中通过操作Result请求操作索引库、文档。请求内容用DSL语句来表示。创建索引库和mapping的DSL语法如下:
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| PUT /heima2 { "mappings":{ "properties":{ "info":{ "type":"text", "analyzer":"ik_smart" }, "email":{ "type": "keyword", "index": false }, "name": { "type":"object", "properties":{ "firstname":{ "type":"keyword" }, "lastname":{ "type":"keyword" } } } } } }
|
3.2.3 查看和删除索引库
查看索引库语法:
删除索引库语法:
3.2.4 修改索引库
索引库和mapping一旦创建无法修改,但是可以添加新的字段。语法如下:
1 2 3 4 5 6 7 8
| PUT /索引库名/_mapping { "properties":{ "新字段名":{ "type":"integer" } } }
|
3.3 操作文档
3.3.1 新增文档
新增文档语法如下:
1 2 3 4 5 6 7 8 9 10
| POST /索引库名/_doc/文档id { "字段1":"值1", "字段2":"值2", "字段3": { "子属性1":"值3", "子属性2":"值4" }, }
|
3.3.2 查询文档
3.3.3 删除文档
3.3.4 修改文档
方式1:全量修改,会删除旧文档,添加新文档。文档存在就是修改,不存在就是新增。
1 2 3 4 5
| PUT /索引库名/_doc/文档id { "字段1":"值1", "字段2":"值2", }
|

方式2:增量修改,修改指定字段值。
1 2 3 4 5 6
| POST /索引库名/_update/文档id { "doc":{ "字段名":"新的值", } }
|
3.4 批量操作
语法格式如下:

1 2 3 4 5
| POST /_bulk {"index": {"_index":"heima", "_id": "3"}} {"info": "黑马程序员C++讲师", "email": "ww@itcast.cn", "name":{"firstName": "五", "lastName":"王"}} {"index": {"_index":"heima", "_id": "4"}} {"info": "黑马程序员前端讲师", "email": "zhangsan@itcast.cn", "name":{"firstName": "三", "lastName":"张"}}
|
1 2 3
| POST /_bulk {"delete":{"_index":"heima", "_id": "3"}} {"delete":{"_index":"heima", "_id": "4"}}
|
1 2 3 4 5
| POST /_bulk {"update":{"_id":"3","_index":"heima"}} {"doc":{"email":"666@qq.com"}} {"update":{"_id":"4","_index":"heima"}} {"doc":{"email":"777@qq.com"}}
|
四、RestClient
ES提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是组装DSL语句,通过http请求发送给ES。

4.1 JavaRestClient操作索引库
4.2 操作索引库
4.2.1 初始化JavaRestClient
- 引入es的RestHighLevelClient依赖:
1 2 3 4 5
| <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.12.1</version> </dependency>
|
- 因为springboot默认的ES版本是7.6.2,我们要覆盖默认的ES版本。
1 2 3 4
| <properties> <java.version>1.8</java.version> <elasticsearch.version>7.12.1</elasticsearch.version> </properties>
|
- 初始化RestHighLevelClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class HotelIndexTest {
private RestHighLevelClient client;
@Test public void test(){ System.out.println(client); }
@BeforeEach() void setUp(){ this.client = new RestHighLevelClient(RestClient.builder( HttpHost.create("http://192.168.16.128:9200") )); }
@AfterEach() void close(){ try { this.client.close(); } catch (IOException e) { throw new RuntimeException(e); } } }
|
4.2.2 数据结构分析
mapping要考虑的问题:
字段名;数据类型;是否参与搜索;是否分词;如果分词,分词器是什么?
以商品为例:


DSL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| # 商品映射 PUT /items { "mappings": { "properties": { "id":{ "type":"keyword" }, "name":{ "type": "text", "analyzer": "ik_smart" }, "price":{ "type": "integer" }, "image":{ "type": "keyword", "index": false }, "category":{ "type": "keyword" }, "brand":{ "type": "keyword" }, "sold":{ "type": "integer" }, "comment_count":{ "type": "integer", "index": false }, "isAD":{ "type": "boolean" }, "updateTime":{ "type": "date" } } } }
|
4.2.3 创建、删除、查询索引库

client.indices():返回的对象中包含索引库操作的多有方法
create()创建
delete()删除
get()查询
创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| static MAPPING_TEMPLATE = "{ "mappings": { "properties": { "id":{ "type":"keyword" }, "name":{ "type": "text", "analyzer": "ik_smart" }, "price":{ "type": "integer" }, "image":{ "type": "keyword", "index": false }, "category":{ "type": "keyword" }, "brand":{ "type": "keyword" }, "sold":{ "type": "integer" }, "comment_count":{ "type": "integer", "index": false }, "isAD":{ "type": "boolean" }, "updateTime":{ "type": "date" } } } }" @Test void createIndex() throws IOException { CreateIndexRequest request = new CreateIndexRequest("items"); request.source(MAPPING_TEMPLATE, XContentType.JSON); client.indices().create(request, RequestOptions.DEFAULT); }
|
删除
1 2 3 4 5 6 7
| @Test void deleteIndex() throws IOException { DeleteIndexRequest request = new DeleteIndexRequest("items"); client.indices().delete(request, RequestOptions.DEFAULT); }
|
查询
1 2 3 4 5 6 7 8 9
| @Test void getIndex() throws IOException { GetIndexRequest request = new GetIndexRequest("items"); GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT); Map<String, MappingMetadata> mappings = response.getMappings(); System.out.println(mappings.toString()); }
|
4.3 操作文档
4.3.1 创建文档
1 2 3 4 5 6 7 8 9 10 11 12
| @Test void testAddDoc() throws IOException { Item item = itemService.getById(317578L); ItemDoc itemDoc = BeanUtil.copyProperties(item, ItemDoc.class); IndexRequest request = new IndexRequest("items").id(itemDoc.getId()); request.source(JSONUtil.toJsonStr(itemDoc), XContentType.JSON); client.index(request, RequestOptions.DEFAULT); }
|
4.3.2 查询文档
1 2 3 4 5 6 7 8 9 10
| @Test public void testGetDocument() throws IOException { GetRequest request = new GetRequest("items","317578"); GetResponse response = client.get(request, RequestOptions.DEFAULT); String json= response.getSourceAsString(); System.out.println(json); }
|
4.3.3 修改文档
方式一:全量更新。与添加一样
方式二:局部更新。
1 2 3 4 5 6 7 8 9 10 11
| @Test public void testUpdateDocument() throws IOException { UpdateRequest request = new UpdateRequest("items", "317578"); request.doc( "price","755" ); client.update(request, RequestOptions.DEFAULT); }
|
4.3.4 删除文档
1 2 3 4 5 6 7
| @Test public void testDeleteDocument() throws IOException { DeleteRequest request = new DeleteRequest("items", "317578"); client.delete(request, RequestOptions.DEFAULT); }
|
4.3.5 批量导入文档
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Test void testBatch() throws IOException { int pageNo = 1,pageSize = 500; while (true){ Page<Item> page = itemService.lambdaQuery() .eq(Item::getStatus, 1) .page(Page.of(pageNo, pageSize)); List<Item> records = page.getRecords(); if (records == null || records.isEmpty()){ return; } List<ItemDoc> itemDocs = BeanUtil.copyToList(records, ItemDoc.class); BulkRequest request = new BulkRequest(); for (ItemDoc itemDoc : itemDocs) { IndexRequest items = new IndexRequest("items").id(itemDoc.getId()); items.source(JSONUtil.toJsonStr(itemDoc), XContentType.JSON); request.add(items); } client.bulk(request, RequestOptions.DEFAULT); pageNo++; } }
|
查询
1 2 3
| GET /items/_search # 统计数量 GET /items/_count
|
五、DSL查询文档语法
搜索完整语法:
DSL查询可以分为两大类:
- 叶子查询:一般是在特定的字段里面查询特定值,属于简单查询,很少单独使用。
- 复合查询:以逻辑方式组合多个叶子查询或者更改叶子查询的行为方式。
在查询后,还可以对查询的结果做处理,包括
- 排序:按照一个或多个字段值做排序。
- 分页:根据from和size做分页。
- 高亮:对搜索结果中的关键字添加特殊的样式,使其更新醒目。
- 聚合:对搜索结果做数据统计以形成报表。
ES提供了基于JSON的DSL来定义查询。常见的查询包括。
- 查询所有:查询所有数据,一般测试用。
match_all
叶子查询
- 全文检索查询:利用分词器对用户输入内容分词,然后去倒排索引库中匹配。例如:
match
:根据一个字段查
multi_match
:根据多个字段查询,参与查询的字段越多,查询性能越差。
- 精确查询:根据精确词条值查找数据,一般是查找keyword、数值、日期、boolean等类型字段。
- 地理(geo)查询:用搜索地理位置,搜索方式很多。例如:
geo_distance
geo_bounding_box
复合查询
- 复合查询:可以将上述各种查询条件组合起来,合并查询条件。例如:
查询基本语法:
1 2 3 4 5 6 7 8
| GET /indexName/_search { "query":{ "查询类型":{ "查询条件":"条件值" } } }
|
5.1 查询所有
1 2 3 4 5 6 7
| GET /hotel/_search { "query":{ "match_all":{ } } }
|
查询结果
1 2 3 4 5 6 7 8 9 10 11 12 13
| { "took":"4", "timed_out":false, "_shards":{}, "hits":{ "total":{ "value":10000, "relation":"gte" }, "max_score":1.0, "hits":{} } }
|
- took:请求耗时。
- timed_out:是否超时。
- _shards:分片。
- hits:数据。
- total:总条数。上述表示大于等于10000条
- hits:数据,和查询单条数据格式一致。
5.2 全文检索查询
- match:根据一个字段查
- multi_match:根据多个字段查询,参与查询的字段越多,查询性能越差。
会对用户输入内容分词,常用于搜索框搜索。

1 2 3 4 5 6 7 8 9
| # match查询,copy_to放到一个字段中的 GET /hotel/_search { "query": { "match": { "all": "外滩如家" } } }
|
1 2 3 4 5 6 7 8 9 10
| # multi_match查询 GET /hotel/_search { "query": { "multi_match": { "query": "外滩如家", "fields":["name","brand","business"] } } }
|
查询符合的条件越多,数据越靠前,比如说:==外滩如家==会比只有外滩或者只有如家的靠前。
5.3 精确查询
不会对用户输入的搜索条件在分词,而是作为一个词条,与搜索的字段内容精确值匹配。因此推荐查找keyword、数值、日期、boolean类型的数据
term:根据词条的精确值查询。
range:根据值的范围查询。
1 2 3 4 5 6 7 8 9 10 11
| # term查询 GET /hotel/_search { "query": { "term": { "city": { "value": "北京" } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| # range GET /hotel/_search { "query": { "range": { "price": { "gte": 100, "lte": 300 } } } }
|
5.4 复合查询
bool
布尔查询是一个或多个查询字句的组合。子查询的组合方式有:
- must:必须匹配每个子查询,类似“与”
- should:选择匹配子查询,类似“或”
- must_not:必须不匹配,不参与算分,类似“非”
- filter:必须匹配,不参与算分。类似“与”
基本上,用户输入关键字搜索的才需要参与算分。

function_score
复合查询在简单查询的基础上,将简单查询结合起来,实现更复杂的搜索逻辑。
- function scope:算分函数查询,可以控制文档相关性算分,控制文档排名。例如百度竞价。
- 相关性算分:当我们利用match查询时,文档结果会根据搜索词条的关联度进行打分,返回结果按照分值排序。
使用Function Scope Query,可以修改文档的相关性算分,根据得到的算分排序。(有老板给钱了,自己的数据查出来要放到前面)。

5.5 地理查询
- geo_bounding_box:查询geo_point值落在某个矩形范围内的所有文档

- geo_distance:查询某个指定中心点小于某个距离值的所有文档


1 2 3 4 5 6 7 8 9
| GET /hotel/_search { "query": { "geo_distance":{ "distance": "5km", "location": "31.21,121.5" } } }
|
六、搜索结果处理
6.1 排序
对酒店数据按照用户评价降序排序,评价相同的安装价格升序排序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| GET /hotel/_search { "query": { "match_all": {} }, "sort": [ { "score": {"order": "desc"} }, { "price": "asc" } ] }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| GET /hotel/_search { "query": { "match_all": {} }, "sort": [ { "_geo_distance": { "location": { "lat": 31.034, "lon": 121.612 }, "order": "asc", "unit": "km" } } ] }
|
6.2 分页
es默认情况下只返回top10的数据。而如果要查询更多的数据,就需要修改分页参数了。
通过修改from
,size
来控制返回的分页结果。
ES是分布式的,所以会面临深度分页问题。
如果搜索页数过深,或者结果集(from+size)越大,对内存和CPU的消耗也越高。因此ES设定结果集查询的上限是10000
。
深度分页问题

查询的页码越深,从每个分片查询的数据量越多,性能越差。
针对深度分页,ES提供了两种解决方案:
6.3 高亮
高亮就是在搜索结果中把关键字突出显示。前置和后置标签都是em,可以不加。
语法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| # 默认情况下,搜索字段必须和高亮字段一致,否则不会高亮 # "require_field_match": "false",名称不同也会高亮 GET /hotel/_search { "query": { "match": { "all": "如家" } }, "highlight": { "fields": { "name": { "require_field_match": "false" } } } }
|
七、RestClient查询文档
数据搜索的Java代码分为两部分:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Test void testMatchAll() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source().query(QueryBuilders.matchAllQuery()); SearchResponse response = client.search(request, RequestOptions.DEFAULT); long total = response.getHits().getTotalHits().value; System.out.println("总共有多少条数据:"+total); SearchHit[] hits = response.getHits().getHits(); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); HotelDoc hotelDoc = JSON.parseObject(sourceAsString, HotelDoc.class); System.out.println(hotelDoc); } }
|
RestApI中构建DSL都是通过request里面的source()
方法实现,包含查询,排序,分页,高亮等所有功能。
在JavaRestAPI中,所有类型的query查询条件都是通过QueryBuilders
构建的。

7.1 全文检索查询

7.2 精确查询

7.3 复合查询

1 2
| request.source().query(boolQuery);
|
7.4 排序和分页
1 2 3 4 5
| request.source() .query(QueryBuilders.matchAllQuery()) .from(0) .size(10) .sort("price", SortOrder.DESC);
|
7.5 高亮
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Test void testMatchAll() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source() .query(QueryBuilders.matchQuery("all","如家")) .highlighter(new HighlightBuilder().field("name") .requireFieldMatch(false)); SearchResponse response = client.search(request, RequestOptions.DEFAULT); long total = response.getHits().getTotalHits().value; System.out.println("总共有多少条数据:"+total); SearchHit[] hits = response.getHits().getHits(); for (SearchHit hit : hits) { String sourceAsString = hit.getSourceAsString(); HotelDoc hotelDoc = JSON.parseObject(sourceAsString, HotelDoc.class); System.out.println(hotelDoc); Map<String, HighlightField> files = hit.getHighlightFields(); if (!CollectionUtils.isEmpty(files)){ HighlightField highlightField = files.get("name"); if (highlightField != null){ String name = highlightField.getFragments()[0].string(); System.out.println(name); } } } }
|

八、数据聚合
聚合可以实现对文档数据的统计,分析和运算。聚合常见的有三类。
- 桶(Bucket)聚合:用来对文档做分租。(对数据进行分组统计)
- TermAggregation:按照文档字段值分组。
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
- 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等。(对分组的内容做最小值,最大值,平均值等)
- AVG:求平均值
- MAX:求最大值
- MIN:求最小值
- Stats:同时求,max,min,avg,sum等。
- 管道(pipeline)聚合:其他聚合的结果为基础做聚合。
参与聚合的类型必须不能是Text
。可以是:
8.1 DSL实现Bucket聚合
现在,我们要统计所有数据中的==酒店品牌有几种==,此时可以根据酒店品牌的名称做聚合。
聚合三要素:
语法:

DSL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| # 相当于对brand做分组统计 GET /hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 20 } } } }
GET /hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "order": { "_count": "asc" }, "size": 20 } } } }
|
搜索结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| { "aggregations" : { "brandAgg" : { "doc_count_error_upper_bound" : 0, "sum_other_doc_count" : 0, "buckets" : [ { "key" : "7天酒店", "doc_count" : 30 }, { "key" : "如家", "doc_count" : 30 }, ] } } }
|
默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| # 限定聚合范围,价格在200元以内的的品牌的酒店数量 GET /hotel/_search { "query": { "range": { "price": { "lte": 200 } } }, "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "order": { "_count": "asc" # 对返回结果中的count进行排序 }, "size": 20 } } } }
|
8.2 DSL实现Metrics聚合
例如,要求获取每个品牌的用户评分的min,max,avg等值。在根据score的avg进行倒序排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| GET /hotel/_search { "size": 0, "aggs": { "brandAgg": { "terms": { "field": "brand", "size": 20, "order": { "scopeAgg.avg": "desc" } }, "aggs": { "scopeAgg": { "stats": { "field": "score" } } } } } }
|
8.3 RestClient实现聚合
8.3.1 Bucket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Test public void testAggregation() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source() .size(0) .aggregation(AggregationBuilders.terms("brandAgg") .field("brand").size(10) .order(BucketOrder.count(false)));
SearchResponse response = client.search(request, RequestOptions.DEFAULT); Aggregations aggregations = response.getAggregations(); Terms terms = aggregations.get("brandAgg"); List<? extends Terms.Bucket> buckets = terms.getBuckets(); buckets.forEach(bucket -> { String brandName = bucket.getKeyAsString(); System.out.println(brandName); }); }
|
8.3.2 Metrics
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Test public void testMetrics() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source() .size(0) .aggregation(AggregationBuilders.terms("brandAgg") .field("brand").size(20) .order(BucketOrder.aggregation("scopeAgg","avg",false)) .subAggregation(AggregationBuilders.stats("scopeAgg").field("score")));
SearchResponse response = client.search(request, RequestOptions.DEFAULT); System.out.println(response); Aggregations aggregations = response.getAggregations(); Terms terms = aggregations.get("brandAgg"); List<? extends Terms.Bucket> buckets = terms.getBuckets(); buckets.forEach(bucket -> { String brandName = bucket.getKeyAsString(); System.out.println(brandName); }); }
|
九、自动补全
要实现根据拼音字母做补全,就必须对文档安装拼音分词。在GitHub上恰好有es的拼音分词插件。
安装拼音分词器。
- 解压
- 上传到虚拟机中,es的plugin目录
- 重启es
- 测试
9.1 自定义分词器


我们可以在创建索引库时,通过settings来配置自定义的analyzer(分词器)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| PUT /test { "settings": { "analysis": { "analyzer": { "my_analyzer": { "tokenizer": "ik_max_word", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "name":{ "type": "text", "analyzer": "my_analyzer", "search_analyzer": "ik_smart" } } } }
|

9.2 自动补全
elasticsearch提供了Completion Suggester
查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束。
- 参与补全查询的字段必须是
Completion
类型
- 字段的内容一般是用来补全的多个词条形成的数组。

查询语法如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| # 自动补全索引库 PUT test2 { "mappings": { "properties": { "title":{ "type": "completion" } } } } # 示例数据 POST test2/_doc { "title":["Sony","WH-10010"] } POST test2/_doc { "title":["SK-II","PITERA"] } POST test2/_doc { "title":["Nintendo","swich"] }
GET /test2/_search { "query": { "match_all": {} } } # 自动补全查询 # "skip_duplicates":true, # "size":10 GET /test2/_search { "suggest": { "title_suggest": { "text": "s", "completion": { "field": "title", "skip_duplicates":true, "size":10 } } } }
|
9.3 酒店数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| PUT /hotel { "settings": { "analysis": { "analyzer": { "text_anlyzer": { "tokenizer": "ik_max_word", "filter": "py" }, "completion_analyzer": { "tokenizer": "keyword", "filter": "py" } }, "filter": { "py": { "type": "pinyin", "keep_full_pinyin": false, "keep_joined_full_pinyin": true, "keep_original": true, "limit_first_letter_length": 16, "remove_duplicated_term": true, "none_chinese_pinyin_tokenize": false } } } }, "mappings": { "properties": { "id":{ "type": "keyword" }, "name":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart", "copy_to": "all" }, "address":{ "type": "keyword", "index": false }, "price":{ "type": "integer" }, "score":{ "type": "integer" }, "brand":{ "type": "keyword", "copy_to": "all" }, "city":{ "type": "keyword" }, "starName":{ "type": "keyword" }, "business":{ "type": "keyword", "copy_to": "all" }, "location":{ "type": "geo_point" }, "pic":{ "type": "keyword", "index": false }, "all":{ "type": "text", "analyzer": "text_anlyzer", "search_analyzer": "ik_smart" }, "suggestion":{ "type": "completion", "analyzer": "completion_analyzer" } } } }
|
9.4 自动补全代码
请求解析:

结果解析:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Test void testSuggest() throws IOException { SearchRequest request = new SearchRequest("hotel"); request.source().suggest(new SuggestBuilder().addSuggestion( "suggestions", SuggestBuilders.completionSuggestion("suggestion") .prefix("hc") .skipDuplicates(true) .size(10) )); SearchResponse response = client.search(request, RequestOptions.DEFAULT); Suggest suggest = response.getSuggest(); CompletionSuggestion suggestion = suggest.getSuggestion("suggestions"); List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions(); options.forEach(option -> { String text = option.getText().toString(); System.out.println(text); }); }
|
十、数据同步
es中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,es也必须发生改变,这个就是es与mysql之间的数据同步。




利用MQ来实现Mysql和es的数据同步
当酒店数据发生增、删、改时,要求对easticsearch中的数据也要完成相同的操作。
durable:设置持久化后,将会存到硬盘中。服务器重启也不删除。
autoDelete:当所有的与此连接的客户端都断开时,该交换器会被删除
10.1 消费者
- 导入
spring-boot-starter-amqp
依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
- 添加配置
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.16.128 port: 5672 username: itcast password: 123321 virtual-host: /
|
- 编写交换机、队列之间的绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class MqConstants {
public final static String HOTEL_EXCHANGE = "hotel.topic";
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
public final static String HOTEL_INSERT_KEY = "hotel.insert";
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Configuration public class MqConfig {
@Bean public TopicExchange topicExchange(){ return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false); }
@Bean public Queue insertQueue(){ return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true); }
@Bean public Queue deleteQueue(){ return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true); }
@Bean public Binding insertQueueBinding(){ return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY); }
@Bean public Binding deleteQueueBinding(){ return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY); }
}
|
- 编写监听器实现业务功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Component public class HotelListener {
@Autowired private IHotelService hotelService;
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE) public void listenHotelInsertOrUpdate(Long id){ hotelService.insertById(id); }
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE) public void listenHotelDelete(Long id){ hotelService.deleteById(id); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Override public void insertById(Long id) { try { Hotel hotel = getById(id); HotelDoc hotelDoc = new HotelDoc(hotel); IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString()); request.source(JSON.toJSONString(hotelDoc), XContentType.JSON); client.index(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
@Override public void deleteById(Long id) { try { DeleteRequest request = new DeleteRequest("hotel").id(id.toString()); client.delete(request, RequestOptions.DEFAULT); } catch (IOException e) { throw new RuntimeException(e); } }
|
10.2 提供者
- 导入
spring-boot-starter-amqp
依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
- 添加配置
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.16.128 port: 5672 username: itcast password: 123321 virtual-host: /
|
- 在新增、修改、删除的时候发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class MqConstants {
public final static String HOTEL_EXCHANGE = "hotel.topic";
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
public final static String HOTEL_INSERT_KEY = "hotel.insert";
public final static String HOTEL_DELETE_KEY = "hotel.delete";
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @PostMapping public void saveHotel(@RequestBody Hotel hotel){ hotelService.save(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId()); }
@PutMapping() public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空"); } hotelService.updateById(hotel); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId()); }
@DeleteMapping("/{id}") public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id); rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id); }
|
十一、es集群
单机的es做数据存储,必然面临两个问题:海量的数据存储问题、单点故障。
- 海量数据存储问题:将索引库从逻辑上拆分为N个分片,存储多个节点。
- 单点故障问题:将分片数据在不同的节点备份。

11.1 搭建es集群
我们计划利用3个docker容器模拟3个es节点。
4.部署es集群
11.2 集群职责及脑裂
es中集群节点有不同的职责划分:

es中的每个节点都有自己不同的职责,因此建议部署集群角色时,每个节点都有独立的角色。

11.2.1 ES集群中的脑裂问题
默认情况下,每个节点都是master eligible
节点,因此一旦master节点宕机,其他候选节点会选举一个成为主节点。当主节点与其他节点发生网络故障时,可能发生脑裂问题。



为了避免脑裂,需要要求选票超过(eligible节点数量+1)/2
才能当选为主,因此eligible节点数量最好是奇数。对应配项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题。
11.3 es节点的分布式存储
当新增文档时,应该保存到不同的分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?


当查询没有id时(查询全部时):

11.4 es集群的故障转移
集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。


