一、概述

Elasticsearch 是一款非常强大的开源搜索引擎,可以帮助我们从海量数据中快速找到需要的内容。

Elasticsearch 结合Kibana、Logstash、Beats,也就是elastic stack(ELK)。被广泛应用在日志数据分析、实时监控等领域。

Elasticsearch 是Elastic stack的核心,负责存储,搜索,分析数据。

image-20230819143510992

Lucene是一个Java语言的搜索引擎类库,是Apache公司的顶级项目,由DougCutting于1999年研发。

Lucene的优势:

  • 易于扩展
  • 高性能

Lucene的缺点:

  • 只限于Java语言开发
  • 学习曲线陡峭
  • 不支持水平扩展

2004年ShayBanon基于Lucene开发了Compass。

2010年Shay Banon重写了Compass,取名为Elasticsearch

相比与lucene,elasticsearch具备下列优势:

  • 支持分布式,可水平扩展
  • 提供Restful接口,可被任何语言调用

1.1 正向索引和倒排索引

正向索引,做局部内容检索效果较差

传统数据库(如Mysql)采用的就是正向索引,例如给下表(tb_goods)中的ID创建索引。

image-20230819144546645

Elasticsearch 采用倒排索引

文档(Document):每一条数据就是一个文档

词条(term):文档按照语义分成的词语

image-20230819145233569

image-20230819145252756

1.1.1文档

Elasticsearch 是面向文档存储的,可以是数据库中的一条商品数据,一个订单信息。

文档数据会被序列化成json格式后存储在Elasticsearch中。

1.1.2索引和映射

索引(index):是相同类型的文档的集合。

映射(mapping):索引中文文档的字段约束信息,类似于表结构的约束。

image-20230819150545713

image-20230819150721728

image-20230819151604643

二、安装elasticsearch

2.1部署单点es

2.1.1创建网络

因为我们还需要部署kibana容器,因此需要让es和kibana容器互联。这里先创建一个网络:

1
docker network create es-net

2.1.2加载镜像

这里我们采用elasticsearch的7.12.1版本的镜像,这个镜像体积非常大,接近1G。不建议大家自己pull。

课前资料提供了镜像的tar包:

image-20210510165308064

大家将其上传到虚拟机中,然后运行命令加载即可:

1
2
# 导入数据
docker load -i es.tar

同理还有kibana的tar包也需要这样做。

2.1.3运行

运行docker命令,部署单点es:

1
2
3
4
5
6
7
8
9
10
11
docker run -d \
--name es \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
-e "discovery.type=single-node" \
-v es-data:/usr/share/elasticsearch/data \
-v es-plugins:/usr/share/elasticsearch/plugins \
--privileged \
--network es-net \
-p 9200:9200 \
-p 9300:9300 \
elasticsearch:7.12.1

命令解释:

  • -e "cluster.name=es-docker-cluster":设置集群名称
  • -e "http.host=0.0.0.0":监听的地址,可以外网访问
  • -e "ES_JAVA_OPTS=-Xms512m -Xmx512m":内存大小
  • -e "discovery.type=single-node":非集群模式
  • -v es-data:/usr/share/elasticsearch/data:挂载逻辑卷,绑定es的数据目录
  • -v es-logs:/usr/share/elasticsearch/logs:挂载逻辑卷,绑定es的日志目录
  • -v es-plugins:/usr/share/elasticsearch/plugins:挂载逻辑卷,绑定es的插件目录
  • --privileged:授予逻辑卷访问权
  • --network es-net :加入一个名为es-net的网络中
  • -p 9200:9200:端口映射配置

在浏览器中输入:http://192.168.150.101:9200 即可看到elasticsearch的响应结果:

image-20210506101053676

2.2部署kibana

kibana可以给我们提供一个elasticsearch的可视化界面,便于我们学习。

2.2.1部署

运行docker命令,部署kibana

1
2
3
4
5
6
docker run -d \
--name kibana \
-e ELASTICSEARCH_HOSTS=http://es:9200 \
--network=es-net \
-p 5601:5601 \
kibana:7.12.1
  • --network es-net :加入一个名为es-net的网络中,与elasticsearch在同一个网络中
  • -e ELASTICSEARCH_HOSTS=http://es:9200":设置elasticsearch的地址,因为kibana已经与elasticsearch在一个网络,因此可以用容器名直接访问elasticsearch
  • -p 5601:5601:端口映射配置

kibana启动一般比较慢,需要多等待一会,可以通过命令:

1
docker logs -f kibana

查看运行日志,当查看到下面的日志,说明成功:

image-20210109105135812

此时,在浏览器输入地址访问:http://192.168.150.101:5601,即可看到结果

2.2.2DevTools

kibana中提供了一个DevTools界面:

image-20210506102630393

这个界面中可以编写DSL来操作elasticsearch。并且对DSL语句有自动补全功能。

2.3安装IK分词器

2.3.1在线安装ik插件(较慢)

1
2
3
4
5
6
7
8
9
10
# 进入容器内部
docker exec -it elasticsearch /bin/bash

# 在线下载并安装
./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip

#退出
exit
#重启容器
docker restart elasticsearch

2.3.2离线安装ik插件(推荐)

1)查看数据卷目录

安装插件需要知道elasticsearch的plugins目录位置,而我们用了数据卷挂载,因此需要查看elasticsearch的数据卷目录,通过下面命令查看:

1
docker volume inspect es-plugins

显示结果:

1
2
3
4
5
6
7
8
9
10
11
[
{
"CreatedAt": "2022-05-06T10:06:34+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/es-plugins/_data",
"Name": "es-plugins",
"Options": null,
"Scope": "local"
}
]

说明plugins目录被挂载到了:/var/lib/docker/volumes/es-plugins/_data 这个目录中。

2)解压缩分词器安装包

下面我们需要把课前资料中的ik分词器解压缩,重命名为ik

image-20210506110249144

3)上传到es容器的插件数据卷中

也就是/var/lib/docker/volumes/es-plugins/_data

image-20210506110704293

4)重启容器

1
2
# 4、重启容器
docker restart es
1
2
# 查看es日志
docker logs -f es

5)测试:

IK分词器包含两种模式:

  • ik_smart:最少切分

  • ik_max_word:最细切分

1
2
3
4
5
GET /_analyze
{
"analyzer": "ik_max_word",
"text": "黑马程序员学习java太棒了"
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
{
"tokens" : [
{
"token" : "黑马",
"start_offset" : 0,
"end_offset" : 2,
"type" : "CN_WORD",
"position" : 0
},
{
"token" : "程序员",
"start_offset" : 2,
"end_offset" : 5,
"type" : "CN_WORD",
"position" : 1
},
{
"token" : "程序",
"start_offset" : 2,
"end_offset" : 4,
"type" : "CN_WORD",
"position" : 2
},
{
"token" : "员",
"start_offset" : 4,
"end_offset" : 5,
"type" : "CN_CHAR",
"position" : 3
},
{
"token" : "学习",
"start_offset" : 5,
"end_offset" : 7,
"type" : "CN_WORD",
"position" : 4
},
{
"token" : "java",
"start_offset" : 7,
"end_offset" : 11,
"type" : "ENGLISH",
"position" : 5
},
{
"token" : "太棒了",
"start_offset" : 11,
"end_offset" : 14,
"type" : "CN_WORD",
"position" : 6
},
{
"token" : "太棒",
"start_offset" : 11,
"end_offset" : 13,
"type" : "CN_WORD",
"position" : 7
},
{
"token" : "了",
"start_offset" : 13,
"end_offset" : 14,
"type" : "CN_CHAR",
"position" : 8
}
]
}

2.3.3扩展词词典

随着互联网的发展,“造词运动”也越发的频繁。出现了很多新的词语,在原有的词汇列表中并不存在。比如:“奥力给”,“传智播客” 等。

所以我们的词汇也需要不断的更新,IK分词器提供了扩展词汇的功能。

1)打开IK分词器config目录:

image-20210506112225508

2)在IKAnalyzer.cfg.xml配置文件内容添加:

1
2
3
4
5
6
7
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 *** 添加扩展词典-->
<entry key="ext_dict">ext.dic</entry>
</properties>

3)新建一个 ext.dic,可以参考config目录下复制一个配置文件进行修改

1
2
传智播客
奥力给

4)重启elasticsearch

1
2
3
4
docker restart es

# 查看 日志
docker logs -f elasticsearch

image-20201115230900504

日志中已经成功加载ext.dic配置文件

5)测试效果:

1
2
3
4
5
GET /_analyze
{
"analyzer": "ik_max_word",
"text": "传智播客Java就业超过90%,奥力给!"
}

注意当前文件的编码必须是 UTF-8 格式,严禁使用Windows记事本编辑

2.3.4停用词词典

在互联网项目中,在网络间传输的速度很快,所以很多语言是不允许在网络上传递的,如:关于宗教、政治等敏感词语,那么我们在搜索时也应该忽略当前词汇。

IK分词器也提供了强大的停用词功能,让我们在索引时就直接忽略当前的停用词汇表中的内容。

1)IKAnalyzer.cfg.xml配置文件内容添加:

1
2
3
4
5
6
7
8
9
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Analyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典-->
<entry key="ext_dict">ext.dic</entry>
<!--用户可以在这里配置自己的扩展停止词字典 *** 添加停用词词典-->
<entry key="ext_stopwords">stopword.dic</entry>
</properties>

3)在 stopword.dic 添加停用词

1
习大大

4)重启elasticsearch

1
2
3
4
5
6
# 重启服务
docker restart elasticsearch
docker restart kibana

# 查看 日志
docker logs -f elasticsearch

日志中已经成功加载stopword.dic配置文件

5)测试效果:

1
2
3
4
5
GET /_analyze
{
"analyzer": "ik_max_word",
"text": "传智播客Java就业率超过95%,习大大都点赞,奥力给!"
}

注意当前文件的编码必须是 UTF-8 格式,严禁使用Windows记事本编辑

2.4部署es集群

部署es集群可以直接使用docker-compose来完成,不过要求你的Linux虚拟机至少有4G的内存空间

首先编写一个docker-compose文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
version: '2.2'
services:
es01:
# 镜像
image: elasticsearch:7.12.1
# 容器名称
container_name: es01
environment:
# 节点名称
- node.name=es01
# 集群名称,如果集群名称一样,es会自动将他们组装成一个集群
- cluster.name=es-docker-cluster
# 集群中的另外两个节点,容器内可以用容器名互联,不用ip地址。
- discovery.seed_hosts=es02,es03
# 初始化的主节点,这三个节点都是参与选举的主节点
- cluster.initial_master_nodes=es01,es02,es03
# 设置堆内存大小,最小内存。
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data01:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- elastic
es02:
image: elasticsearch:7.12.1
container_name: es02
environment:
- node.name=es02
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es03
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data02:/usr/share/elasticsearch/data
ports:
- 9201:9200
networks:
- elastic
es03:
image: elasticsearch:7.12.1
container_name: es03
environment:
- node.name=es03
- cluster.name=es-docker-cluster
- discovery.seed_hosts=es01,es02
- cluster.initial_master_nodes=es01,es02,es03
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- data03:/usr/share/elasticsearch/data
networks:
- elastic
ports:
- 9202:9200
volumes:
data01:
driver: local
data02:
driver: local
data03:
driver: local

networks:
elastic:
driver: bridge

es需要修改一些Linux权限,修改/etc/sysctl.conf文件

1
vi /etc/sysctl.conf

添加下面内容:

1
vm.max_map_count=262144

然后执行命令,让配置生效

1
sysctl -p

通过docker-compose启动集群

1
docker-compose up -d

2.4.1集群状态监控

kibana可以监控es集群,不过新版本需要依赖es的x-pack功能,配置比较复杂。

这里推荐使用cerebro来监控es集群状态。

1
http://localhost:9000/

image-20230823221910498

image-20230823221920325

三、DSL

3.1 _cat

1
2
3
4
5
6
7
8
# 查看所有节点
GET /_cat/nodes
# 查看es健康状况
GET /_cat/health
# 查看主节点
GET /_cat/master
# 查看所有索引
GET /_cat/indices

3.2 操作索引库

3.2.1 mapping属性

mapping是对库中文档的约束,常见的mapping属性包括:

  • type:字段数据类型,常见的类型有:

    • 字符串:text(可分词的文本)、keyword(精确值,例如,品牌,国家)
    • 数值:long、integer、short、byte、double、float
    • 布尔:boolean
    • 日期:date
    • 对象:object
  • index:是否创建索引,默认为true。该字段是否需要进行搜索和排序就需要设置为true。

  • analyzer:使用哪种分词器。

  • properties:该字段的子字段。

3.2.2 创建索引库

ES中通过操作Result请求操作索引库、文档。请求内容用DSL语句来表示。创建索引库和mapping的DSL语法如下:

image-20230819223020969 示例:image-20230819223105402

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
PUT /heima2
{
"mappings":{
"properties":{
"info":{
"type":"text",
"analyzer":"ik_smart"
},
"email":{
"type": "keyword",
"index": false
},
"name": {
"type":"object",
"properties":{
"firstname":{
"type":"keyword"
},
"lastname":{
"type":"keyword"
}
}
}
}
}
}

3.2.3 查看和删除索引库

查看索引库语法:

1
GET /索引库名称

删除索引库语法:

1
DELETE /索引库名称

3.2.4 修改索引库

索引库和mapping一旦创建无法修改,但是可以添加新的字段。语法如下:

1
2
3
4
5
6
7
8
PUT /索引库名/_mapping
{
"properties":{
"新字段名":{
"type":"integer"
}
}
}

3.3 操作文档

3.3.1 新增文档

新增文档语法如下:

1
2
3
4
5
6
7
8
9
10
POST /索引库名/_doc/文档id
{
"字段1":"值1",
"字段2":"值2",
"字段3": {
"子属性1":"值3",
"子属性2":"值4"
},
// ...
}

image-20230819230534780

3.3.2 查询文档

1
GET /heima2/_doc/1

3.3.3 删除文档

1
DELETE /heima2/_doc/1

3.3.4 修改文档

方式1:全量修改,会删除旧文档,添加新文档。文档存在就是修改,不存在就是新增

1
2
3
4
5
PUT /索引库名/_doc/文档id
{
"字段1":"值1",
"字段2":"值2",
}

image-20230819231358779

方式2:增量修改,修改指定字段值。

1
2
3
4
5
6
POST /索引库名/_update/文档id
{
"doc":{
"字段名":"新的值",
}
}

3.4 批量操作

语法格式如下:

image-20250610232452611

  • 批量新增
1
2
3
4
5
POST /_bulk
{"index": {"_index":"heima", "_id": "3"}}
{"info": "黑马程序员C++讲师", "email": "ww@itcast.cn", "name":{"firstName": "五", "lastName":"王"}}
{"index": {"_index":"heima", "_id": "4"}}
{"info": "黑马程序员前端讲师", "email": "zhangsan@itcast.cn", "name":{"firstName": "三", "lastName":"张"}}
  • 批量删除
1
2
3
POST /_bulk
{"delete":{"_index":"heima", "_id": "3"}}
{"delete":{"_index":"heima", "_id": "4"}}
  • 批量更新
1
2
3
4
5
POST /_bulk
{"update":{"_id":"3","_index":"heima"}}
{"doc":{"email":"666@qq.com"}}
{"update":{"_id":"4","_index":"heima"}}
{"doc":{"email":"777@qq.com"}}

四、RestClient

ES提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是组装DSL语句,通过http请求发送给ES。

image-20230819232637919

4.1 JavaRestClient操作索引库

4.2 操作索引库

4.2.1 初始化JavaRestClient

  1. 引入es的RestHighLevelClient依赖:
1
2
3
4
5
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.12.1</version>
</dependency>
  1. 因为springboot默认的ES版本是7.6.2,我们要覆盖默认的ES版本。
1
2
3
4
<properties>
<java.version>1.8</java.version>
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>
  1. 初始化RestHighLevelClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class HotelIndexTest {

private RestHighLevelClient client;

@Test
public void test(){
System.out.println(client);
}

@BeforeEach()
void setUp(){
this.client = new RestHighLevelClient(RestClient.builder(
HttpHost.create("http://192.168.16.128:9200")
));
}

@AfterEach()
void close(){
try {
this.client.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

4.2.2 数据结构分析

mapping要考虑的问题:

字段名;数据类型;是否参与搜索;是否分词;如果分词,分词器是什么?

以商品为例:

image-20250611203359876

image-20250611204642917

DSL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 商品映射
PUT /items
{
"mappings": {
"properties": {
"id":{
"type":"keyword"
},
"name":{
"type": "text",
"analyzer": "ik_smart"
},
"price":{
"type": "integer"
},
"image":{
"type": "keyword",
"index": false
},
"category":{
"type": "keyword"
},
"brand":{
"type": "keyword"
},
"sold":{
"type": "integer"
},
"comment_count":{
"type": "integer",
"index": false
},
"isAD":{
"type": "boolean"
},
"updateTime":{
"type": "date"
}
}
}
}

4.2.3 创建、删除、查询索引库

image-20230820161234313

client.indices():返回的对象中包含索引库操作的多有方法

create()创建

delete()删除

get()查询

创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static MAPPING_TEMPLATE = "{
"mappings": {
"properties": {
"id":{
"type":"keyword"
},
"name":{
"type": "text",
"analyzer": "ik_smart"
},
"price":{
"type": "integer"
},
"image":{
"type": "keyword",
"index": false
},
"category":{
"type": "keyword"
},
"brand":{
"type": "keyword"
},
"sold":{
"type": "integer"
},
"comment_count":{
"type": "integer",
"index": false
},
"isAD":{
"type": "boolean"
},
"updateTime":{
"type": "date"
}
}
}
}"
@Test
void createIndex() throws IOException {
// 1.创建request对象
CreateIndexRequest request = new CreateIndexRequest("items");
// 2.准备请求参数
request.source(MAPPING_TEMPLATE, XContentType.JSON);
// 3.发送请求
client.indices().create(request, RequestOptions.DEFAULT);
}

删除

1
2
3
4
5
6
7
@Test
void deleteIndex() throws IOException {
// 1.创建request对象
DeleteIndexRequest request = new DeleteIndexRequest("items");
// 2.发送请求
client.indices().delete(request, RequestOptions.DEFAULT);
}

查询

1
2
3
4
5
6
7
8
9
@Test
void getIndex() throws IOException {
// 1.创建request对象
GetIndexRequest request = new GetIndexRequest("items");
// 2.发送请求,exists,判断索引库是否存在
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
Map<String, MappingMetadata> mappings = response.getMappings();
System.out.println(mappings.toString());
}

4.3 操作文档

4.3.1 创建文档

1
2
3
4
5
6
7
8
9
10
11
12
@Test
void testAddDoc() throws IOException {
// 1.查询文档数据
Item item = itemService.getById(317578L);
ItemDoc itemDoc = BeanUtil.copyProperties(item, ItemDoc.class);
// 2.准备Request。id不存在就是新增,存在就是修改
IndexRequest request = new IndexRequest("items").id(itemDoc.getId());
// 3.准备请求参数
request.source(JSONUtil.toJsonStr(itemDoc), XContentType.JSON);
// 4.发送请求
client.index(request, RequestOptions.DEFAULT);
}

4.3.2 查询文档

1
2
3
4
5
6
7
8
9
10
@Test
public void testGetDocument() throws IOException {
// 1.创建request对象
GetRequest request = new GetRequest("items","317578");
// 2.发送请求,得到结果
GetResponse response = client.get(request, RequestOptions.DEFAULT);
// 3.解析结果
String json= response.getSourceAsString();
System.out.println(json);
}

4.3.3 修改文档

方式一:全量更新。与添加一样

方式二:局部更新。

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testUpdateDocument() throws IOException {
// 1.创建request对象
UpdateRequest request = new UpdateRequest("items", "317578");
// 2.准备请求参数
request.doc(
"price","755"
);
// 3.更新文档
client.update(request, RequestOptions.DEFAULT);
}

4.3.4 删除文档

1
2
3
4
5
6
7
@Test
public void testDeleteDocument() throws IOException {
// 1.创建request对象
DeleteRequest request = new DeleteRequest("items", "317578");
// 3.删除文档
client.delete(request, RequestOptions.DEFAULT);
}

4.3.5 批量导入文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
void testBatch() throws IOException {
int pageNo = 1,pageSize = 500;
// 1.获取数据
while (true){
Page<Item> page = itemService.lambdaQuery()
.eq(Item::getStatus, 1)
.page(Page.of(pageNo, pageSize));
List<Item> records = page.getRecords();
if (records == null || records.isEmpty()){
return;
}
List<ItemDoc> itemDocs = BeanUtil.copyToList(records, ItemDoc.class);
// 2.构建request
BulkRequest request = new BulkRequest();
for (ItemDoc itemDoc : itemDocs) {
// 2.1 构建批量新增IndexRequest
IndexRequest items = new IndexRequest("items").id(itemDoc.getId());
items.source(JSONUtil.toJsonStr(itemDoc), XContentType.JSON);
// 2.2 将新增的IndexRequest添加到批量请求中
request.add(items);
}
// 3.发送请求
client.bulk(request, RequestOptions.DEFAULT);
// 4.翻页
pageNo++;
}
}

查询

1
2
3
GET /items/_search
# 统计数量
GET /items/_count

五、DSL查询文档语法

搜索完整语法:

image-20250612222551304

DSL查询可以分为两大类:

  • 叶子查询:一般是在特定的字段里面查询特定值,属于简单查询,很少单独使用。
  • 复合查询:以逻辑方式组合多个叶子查询或者更改叶子查询的行为方式。

在查询后,还可以对查询的结果做处理,包括

  • 排序:按照一个或多个字段值做排序。
  • 分页:根据from和size做分页。
  • 高亮:对搜索结果中的关键字添加特殊的样式,使其更新醒目。
  • 聚合:对搜索结果做数据统计以形成报表。

ES提供了基于JSON的DSL来定义查询。常见的查询包括。

  • 查询所有:查询所有数据,一般测试用。match_all

叶子查询

  • 全文检索查询:利用分词器对用户输入内容分词,然后去倒排索引库中匹配。例如:
    • match:根据一个字段查
    • multi_match:根据多个字段查询,参与查询的字段越多,查询性能越差。
  • 精确查询:根据精确词条值查找数据,一般是查找keyword、数值、日期、boolean等类型字段。
    • ids
    • range
    • term
  • 地理(geo)查询:用搜索地理位置,搜索方式很多。例如:
    • geo_distance
    • geo_bounding_box

复合查询

  • 复合查询:可以将上述各种查询条件组合起来,合并查询条件。例如:
    • bool
    • function_score

查询基本语法:

1
2
3
4
5
6
7
8
GET /indexName/_search
{
"query":{
"查询类型":{
"查询条件":"条件值"
}
}
}

5.1 查询所有

1
2
3
4
5
6
7
GET /hotel/_search
{
"query":{
"match_all":{
}
}
}

查询结果

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"took":"4",
"timed_out":false,
"_shards":{},
"hits":{
"total":{
"value":10000,
"relation":"gte"
},
"max_score":1.0,
"hits":{}
}
}
  • took:请求耗时。
  • timed_out:是否超时。
  • _shards:分片。
  • hits:数据。
    • total:总条数。上述表示大于等于10000条
    • hits:数据,和查询单条数据格式一致。

5.2 全文检索查询

  • match:根据一个字段查
  • multi_match:根据多个字段查询,参与查询的字段越多,查询性能越差。

会对用户输入内容分词,常用于搜索框搜索。

image-20230820212020722

1
2
3
4
5
6
7
8
9
# match查询,copy_to放到一个字段中的
GET /hotel/_search
{
"query": {
"match": {
"all": "外滩如家"
}
}
}
1
2
3
4
5
6
7
8
9
10
# multi_match查询
GET /hotel/_search
{
"query": {
"multi_match": {
"query": "外滩如家",
"fields":["name","brand","business"]
}
}
}

查询符合的条件越多,数据越靠前,比如说:==外滩如家==会比只有外滩或者只有如家的靠前。

5.3 精确查询

不会对用户输入的搜索条件在分词,而是作为一个词条,与搜索的字段内容精确值匹配。因此推荐查找keyword、数值、日期、boolean类型的数据

term:根据词条的精确值查询。

range:根据值的范围查询。

1
2
3
4
5
6
7
8
9
10
11
# term查询
GET /hotel/_search
{
"query": {
"term": {
"city": {
"value": "北京"
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
# range
GET /hotel/_search
{
"query": {
"range": {
"price": {
"gte": 100,
"lte": 300
}
}
}
}

5.4 复合查询

bool

布尔查询是一个或多个查询字句的组合。子查询的组合方式有:

  • must:必须匹配每个子查询,类似“与”
  • should:选择匹配子查询,类似“或”
  • must_not:必须不匹配,不参与算分,类似“非”
  • filter:必须匹配,不参与算分。类似“与”

基本上,用户输入关键字搜索的才需要参与算分。

image-20230820231725072

function_score

复合查询在简单查询的基础上,将简单查询结合起来,实现更复杂的搜索逻辑。

  • function scope:算分函数查询,可以控制文档相关性算分,控制文档排名。例如百度竞价。
  • 相关性算分:当我们利用match查询时,文档结果会根据搜索词条的关联度进行打分,返回结果按照分值排序。
image-20230820222823444

使用Function Scope Query,可以修改文档的相关性算分,根据得到的算分排序。(有老板给钱了,自己的数据查出来要放到前面)。

image-20230820224436759

5.5 地理查询

  • geo_bounding_box:查询geo_point值落在某个矩形范围内的所有文档
image-20230820220557155

image-20230820220543846

  • geo_distance:查询某个指定中心点小于某个距离值的所有文档

image-20230820220808409

image-20230820220820030

1
2
3
4
5
6
7
8
9
GET /hotel/_search
{
"query": {
"geo_distance":{
"distance": "5km",
"location": "31.21,121.5"
}
}
}

六、搜索结果处理

6.1 排序

image-20230821144956357 image-20230821144938595

对酒店数据按照用户评价降序排序,评价相同的安装价格升序排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
GET /hotel/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"score": {"order": "desc"}
},
{
"price": "asc"
}
]
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GET /hotel/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance": {
"location": {
"lat": 31.034,
"lon": 121.612
},
"order": "asc",
"unit": "km"
}
}
]
}

6.2 分页

es默认情况下只返回top10的数据。而如果要查询更多的数据,就需要修改分页参数了。

通过修改fromsize来控制返回的分页结果。

image-20230821151539608

ES是分布式的,所以会面临深度分页问题。

如果搜索页数过深,或者结果集(from+size)越大,对内存和CPU的消耗也越高。因此ES设定结果集查询的上限是10000

深度分页问题

image-20250612215906283

查询的页码越深,从每个分片查询的数据量越多,性能越差。

针对深度分页,ES提供了两种解决方案:

  • search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页的数据。推荐

    • 优点:没有查询上限-支持深度分页
    • 缺点:只能向后逐页查询
    • 场景:数据迁移,手机滚动查询
  • scroll(弃用)

6.3 高亮

高亮就是在搜索结果中把关键字突出显示。前置和后置标签都是em,可以不加。

语法:

image-20230821153448307
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 默认情况下,搜索字段必须和高亮字段一致,否则不会高亮
# "require_field_match": "false",名称不同也会高亮
GET /hotel/_search
{
"query": {
"match": {
"all": "如家"
}
},
"highlight": {
"fields": {
"name": {
"require_field_match": "false"
}
}
}
}

七、RestClient查询文档

数据搜索的Java代码分为两部分:

  • 构建并发起请求
  • 解析查询结果

image-20250612223230084

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
void testMatchAll() throws IOException {
SearchRequest request = new SearchRequest("hotel");
request.source().query(QueryBuilders.matchAllQuery());
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 获取response里面的数据
long total = response.getHits().getTotalHits().value;
System.out.println("总共有多少条数据:"+total);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
HotelDoc hotelDoc = JSON.parseObject(sourceAsString, HotelDoc.class);
System.out.println(hotelDoc);
}
}

RestApI中构建DSL都是通过request里面的source()方法实现,包含查询,排序,分页,高亮等所有功能。

在JavaRestAPI中,所有类型的query查询条件都是通过QueryBuilders构建的。

image-20250612230306524

7.1 全文检索查询

image-20230821174837681image-20230821174902941

7.2 精确查询

image-20230821175444829

image-20230821175457053

7.3 复合查询

image-20230821175548038

image-20230821175559586

1
2
// 把boolQauery加到query()中。
request.source().query(boolQuery);

7.4 排序和分页

1
2
3
4
5
request.source()
.query(QueryBuilders.matchAllQuery())
.from(0)
.size(10)
.sort("price", SortOrder.DESC);

7.5 高亮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Test
void testMatchAll() throws IOException {
// 1. 构建request
SearchRequest request = new SearchRequest("hotel");
// 1.1 设置查询条件,并设置高亮字段
request.source()
.query(QueryBuilders.matchQuery("all","如家"))
.highlighter(new HighlightBuilder().field("name")
.requireFieldMatch(false));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);

long total = response.getHits().getTotalHits().value;
System.out.println("总共有多少条数据:"+total);
// 2. 解析数据
// 2.1 获取外层的hits
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
HotelDoc hotelDoc = JSON.parseObject(sourceAsString, HotelDoc.class);
System.out.println(hotelDoc);
// 3.处理高亮
Map<String, HighlightField> files = hit.getHighlightFields();
if (!CollectionUtils.isEmpty(files)){
// 3.1根据名称取到高亮结果数组
HighlightField highlightField = files.get("name");
if (highlightField != null){
// 3.2根据高亮结果取值
String name = highlightField.getFragments()[0].string();
System.out.println(name);
}
}
}
}

image-20230821181851436

八、数据聚合

聚合可以实现对文档数据的统计,分析和运算。聚合常见的有三类。

  • 桶(Bucket)聚合:用来对文档做分租。(对数据进行分组统计)
    • TermAggregation:按照文档字段值分组。
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等。(对分组的内容做最小值,最大值,平均值等)
    • AVG:求平均值
    • MAX:求最大值
    • MIN:求最小值
    • Stats:同时求,max,min,avg,sum等。
  • 管道(pipeline)聚合:其他聚合的结果为基础做聚合。

参与聚合的类型必须不能是Text。可以是:

  • keyword
  • 数值
  • 日期
  • 布尔

8.1 DSL实现Bucket聚合

现在,我们要统计所有数据中的==酒店品牌有几种==,此时可以根据酒店品牌的名称做聚合。

聚合三要素:

  • 名称
  • 类型
  • 字段

语法:

image-20250612235037208

DSL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 相当于对brand做分组统计
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}

// 自定义排序规则
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "asc"
},
"size": 20
}
}
}
}

搜索结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
// ...
"aggregations" : {
"brandAgg" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
// 下面是所有品牌,以及该品牌的酒店数量
{
"key" : "7天酒店",
"doc_count" : 30
},
{
"key" : "如家",
"doc_count" : 30
},
// ...
]
}
}
}

默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 限定聚合范围,价格在200元以内的的品牌的酒店数量
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "asc" # 对返回结果中的count进行排序
},
"size": 20
}
}
}
}

8.2 DSL实现Metrics聚合

image-20250613001012818

例如,要求获取每个品牌的用户评分的min,max,avg等值。在根据score的avg进行倒序排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20,
"order": {
"scopeAgg.avg": "desc"
}
},
"aggs": {
"scopeAgg": {
"stats": {
"field": "score"
}
}
}
}
}
}

8.3 RestClient实现聚合

8.3.1 Bucket

image-20230821220129424
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void testAggregation() throws IOException {
// 1. 准备request
SearchRequest request = new SearchRequest("hotel");
// 2. 准备DSL
request.source()
.size(0)
// 聚合,排序
.aggregation(AggregationBuilders.terms("brandAgg")
.field("brand").size(10)
.order(BucketOrder.count(false)));

// 3. 发出请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4. 解析结果
Aggregations aggregations = response.getAggregations();
Terms terms = aggregations.get("brandAgg");
List<? extends Terms.Bucket> buckets = terms.getBuckets();
// 遍历
buckets.forEach(bucket -> {
// 获取品牌信息
String brandName = bucket.getKeyAsString();
System.out.println(brandName);
});
}

8.3.2 Metrics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
public void testMetrics() throws IOException {
// 1. 准备request
SearchRequest request = new SearchRequest("hotel");
// 2. 准备DSL
request.source()
.size(0)
// 聚合,排序
.aggregation(AggregationBuilders.terms("brandAgg")
.field("brand").size(20)
.order(BucketOrder.aggregation("scopeAgg","avg",false))
.subAggregation(AggregationBuilders.stats("scopeAgg").field("score")));


// 3. 发出请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
System.out.println(response);
// 4. 解析结果
Aggregations aggregations = response.getAggregations();
Terms terms = aggregations.get("brandAgg");
List<? extends Terms.Bucket> buckets = terms.getBuckets();
// 遍历
buckets.forEach(bucket -> {
// 获取品牌信息
String brandName = bucket.getKeyAsString();
System.out.println(brandName);
});
}

九、自动补全

要实现根据拼音字母做补全,就必须对文档安装拼音分词。在GitHub上恰好有es的拼音分词插件。

安装拼音分词器。

  1. 解压
  2. 上传到虚拟机中,es的plugin目录
  3. 重启es
  4. 测试

9.1 自定义分词器

image-20230821235645663

image-20230821235819266

我们可以在创建索引库时,通过settings来配置自定义的analyzer(分词器)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 自定义分词器
PUT /test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name":{
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}

image-20230822001024437

9.2 自动补全

elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束。

  • 参与补全查询的字段必须是Completion类型
  • 字段的内容一般是用来补全的多个词条形成的数组。

image-20230822132229802 image-20230822132601416

查询语法如下:

image-20230822133130496

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 自动补全索引库
PUT test2
{
"mappings": {
"properties": {
"title":{
"type": "completion"
}
}
}
}
# 示例数据
POST test2/_doc
{
"title":["Sony","WH-10010"]
}
POST test2/_doc
{
"title":["SK-II","PITERA"]
}
POST test2/_doc
{
"title":["Nintendo","swich"]
}

GET /test2/_search
{
"query": {
"match_all": {}
}
}
# 自动补全查询
# "skip_duplicates":true, // 跳过重复的
# "size":10 // 获取前10条
GET /test2/_search
{
"suggest": {
"title_suggest": {
"text": "s",
"completion": {
"field": "title",
"skip_duplicates":true,
"size":10
}
}
}
}

9.3 酒店数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// 酒店数据索引库
PUT /hotel
{
// 自定义分词器
"settings": {
"analysis": {
"analyzer": {
// 定义了两个分词器
"text_anlyzer": {
// 切割成词条
"tokenizer": "ik_max_word",
// 分词后变成拼音
"filter": "py"
},
"completion_analyzer": {
// 不分词
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"id":{
"type": "keyword"
},
"name":{
"type": "text",
// 使用自定义分词器
"analyzer": "text_anlyzer",
// 搜索时使用ik_smart分词器
"search_analyzer": "ik_smart",
"copy_to": "all"
},
"address":{
"type": "keyword",
"index": false
},
"price":{
"type": "integer"
},
"score":{
"type": "integer"
},
"brand":{
"type": "keyword",
"copy_to": "all"
},
"city":{
"type": "keyword"
},
"starName":{
"type": "keyword"
},
"business":{
"type": "keyword",
"copy_to": "all"
},
"location":{
"type": "geo_point"
},
"pic":{
"type": "keyword",
"index": false
},
"all":{
"type": "text",
"analyzer": "text_anlyzer",
"search_analyzer": "ik_smart"
},
"suggestion":{
"type": "completion",
"analyzer": "completion_analyzer"
}
}
}
}

9.4 自动补全代码

请求解析:

image-20230822210109532

结果解析:

image-20230822211143863

image-20230822211202035
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
void testSuggest() throws IOException {
// 1.准备request
SearchRequest request = new SearchRequest("hotel");
// 2.准备DSL
request.source().suggest(new SuggestBuilder().addSuggestion(
"suggestions",
SuggestBuilders.completionSuggestion("suggestion")
.prefix("hc")
.skipDuplicates(true) //是否要跳过重复的
.size(10)
));
// 3.发起请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析结果
Suggest suggest = response.getSuggest();
CompletionSuggestion suggestion = suggest.getSuggestion("suggestions");
List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions();
options.forEach(option -> {
String text = option.getText().toString();
System.out.println(text);
});
}

十、数据同步

es中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,es也必须发生改变,这个就是es与mysql之间的数据同步。

image-20230822214012518

image-20230822214312927

image-20230822214613909

image-20230822214808199

利用MQ来实现Mysql和es的数据同步

当酒店数据发生增、删、改时,要求对easticsearch中的数据也要完成相同的操作。

durable:设置持久化后,将会存到硬盘中。服务器重启也不删除。

autoDelete:当所有的与此连接的客户端都断开时,该交换器会被删除

10.1 消费者

  1. 导入spring-boot-starter-amqp依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 添加配置
1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.16.128
port: 5672
username: itcast
password: 123321
virtual-host: /
  1. 编写交换机、队列之间的绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MqConstants {

/***
* 交换机
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/***
* 监听新增和修改队列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/***
* 监听删除的队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/***
* 新增或修改routingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert";
/***
* 删除routingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Configuration
public class MqConfig {

/**
* 定义交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
// 默认也是true和false
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}

/**
* 声明插入的队列
* @return
*/
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}

/**
* 声明删除的队列
* @return
*/
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
}

/**
* 新增和修改的队列与交换机绑定
* @return
*/
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}

/**
* 删除的队列与交换机绑定
* @return
*/
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}

}
  1. 编写监听器实现业务功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Component
public class HotelListener {

@Autowired
private IHotelService hotelService;

/**
* 监听酒店新增或修改的业务
* @param id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}

/**
* 监听酒店删除的业务
* @param id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void insertById(Long id) {
try {
// 根据id查询酒店数据
Hotel hotel = getById(id);
HotelDoc hotelDoc = new HotelDoc(hotel);
// 1.准备request
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
// 2.准备DSL
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
// 3.发送请求
client.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void deleteById(Long id) {
try {
// 1.准备Request
DeleteRequest request = new DeleteRequest("hotel").id(id.toString());
// 2.准备请求
client.delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

10.2 提供者

  1. 导入spring-boot-starter-amqp依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 添加配置
1
2
3
4
5
6
7
spring:
rabbitmq:
host: 192.168.16.128
port: 5672
username: itcast
password: 123321
virtual-host: /
  1. 在新增、修改、删除的时候发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MqConstants {

/***
* 交换机
*/
public final static String HOTEL_EXCHANGE = "hotel.topic";
/***
* 监听新增和修改队列
*/
public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
/***
* 监听删除的队列
*/
public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
/***
* 新增或修改routingKey
*/
public final static String HOTEL_INSERT_KEY = "hotel.insert";
/***
* 删除routingKey
*/
public final static String HOTEL_DELETE_KEY = "hotel.delete";

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}

@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}

@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}

十一、es集群

单机的es做数据存储,必然面临两个问题:海量的数据存储问题、单点故障。

  • 海量数据存储问题:将索引库从逻辑上拆分为N个分片,存储多个节点。
  • 单点故障问题:将分片数据在不同的节点备份。

image-20230823174053911

11.1 搭建es集群

我们计划利用3个docker容器模拟3个es节点。

4.部署es集群

11.2 集群职责及脑裂

es中集群节点有不同的职责划分:

image-20230823223333485

es中的每个节点都有自己不同的职责,因此建议部署集群角色时,每个节点都有独立的角色。

image-20230823224339406

11.2.1 ES集群中的脑裂问题

默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其他候选节点会选举一个成为主节点。当主节点与其他节点发生网络故障时,可能发生脑裂问题。

image-20230823224748430

image-20230823224802097

image-20230823224812712

为了避免脑裂,需要要求选票超过(eligible节点数量+1)/2才能当选为主,因此eligible节点数量最好是奇数。对应配项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题。

11.3 es节点的分布式存储

当新增文档时,应该保存到不同的分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?

image-20230823230526248

image-20230823231220936

当查询没有id时(查询全部时):

image-20230823231410612

11.4 es集群的故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移

image-20230823232156347

image-20230823232209014

image-20230823232303765