操作ES有很多种方式,而官方推荐的是使用Java REST Client 本例使用的ES版本:7.6.2
ES提供了两个JAVA REST client 版本
Java Low Level REST Client: 低级别的REST客户端,通过http与集群交互,用户需自己编组请求JSON串,及解析响应JSON串。兼容所有ES版本。Java High Level REST Client: 高级别的REST客户端,基于低级别的REST客户端,增加了编组请求JSON串、解析响应JSON串等相关api。使用的版本需要保持和ES服务端的版本一致,否则会有版本问题。
Java Low Level REST Client 说明
特点、maven 引入、使用介绍: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.html API doc:https://artifacts.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-client/6.2.4/index.html
Java High Level REST Client 说明
从6.0.0开始加入的,目的是以java面向对象的方式来进行请求、响应处理。 每个API 支持 同步/异步 两种方式,同步方法直接返回一个结果对象。异步的方法以async为后缀,通过listener参数来通知结果。 高级java REST 客户端依赖Elasticsearch core project
兼容性说明: 依赖 java1.8 和 Elasticsearch core project 请使用与服务端ES版本一致的客户端版本
官方文档
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-low.html
一、Maven依赖
<!-- https
://mvnrepository
.com
/artifact
/org
.elasticsearch
.client
/elasticsearch
-rest
-high
-level
-client
-->
<dependency>
<groupId>org
.elasticsearch
.client
</groupId
>
<artifactId>elasticsearch
-rest
-high
-level
-client
</artifactId
>
<version>7.6.2</version
>
</dependency
>
二、配置
@Configuration
public class RestClientConfig extends AbstractElasticsearchConfiguration {
@Override
public RestHighLevelClient
elasticsearchClient() {
ClientConfiguration clientConfiguration
= ClientConfiguration
.builder()
.connectedTo("localhost:9200")
.build();
return RestClients
.create(clientConfiguration
).rest();
}
}
三、操作Elasticsearch
1.创建索引
package com
.example
;
import org
.apache
.logging
.log4j
.LogManager
;
import org
.apache
.logging
.log4j
.Logger
;
import org
.elasticsearch
.action
.admin
.indices
.alias
.Alias
;
import org
.elasticsearch
.action
.admin
.indices
.create
.CreateIndexRequest
;
import org
.elasticsearch
.action
.admin
.indices
.create
.CreateIndexResponse
;
import org
.elasticsearch
.client
.RequestOptions
;
import org
.elasticsearch
.client
.RestHighLevelClient
;
import org
.elasticsearch
.common
.settings
.Settings
;
import org
.elasticsearch
.common
.xcontent
.XContentType
;
import org
.junit
.jupiter
.api
.Test
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.boot
.test
.context
.SpringBootTest
;
import java
.io
.IOException
;
@SpringBootTest
public class IndexCreateTest {
@Autowired
private RestHighLevelClient restHighLevelClient
;
private static Logger logger
= LogManager
.getRootLogger();
@Test
public void indexCreate() {
try {
CreateIndexRequest request
= new CreateIndexRequest("twitter");
request
.settings(Settings
.builder()
.put("analysis.analyzer.default.tokenizer", "ik_max_word")
);
request
.mapping("_doc",
"{\n" +
" \"properties\":{\n" +
" \"id\":{\n" +
" \"type\":\"long\",\n" +
" \"store\":true\n" +
" },\n" +
" \"username\":{\n" +
" \"type\":\"text\",\n" +
" \"store\":true,\n" +
" \"analyzer\":\"ik_max_word\"\n" +
" },\n" +
" \"email\":{\n" +
" \"type\":\"text\",\n" +
" \"store\":true,\n" +
" \"analyzer\":\"ik_max_word\"\n" +
" }\n" +
" }\n" +
"}",
XContentType
.JSON
);
request
.alias(new Alias("账户信息"));
CreateIndexResponse createIndexResponse
= restHighLevelClient
.indices()
.create(request
, RequestOptions
.DEFAULT
);
boolean acknowledged
= createIndexResponse
.isAcknowledged();
boolean shardsAcknowledged
= createIndexResponse
.isShardsAcknowledged();
logger
.info("acknowledged = " + acknowledged
);
logger
.info("shardsAcknowledged = " + shardsAcknowledged
);
} catch (IOException e
) {
logger
.error(e
);
}
}
}
2.修改type的mapping
@Test
public void putMappingRequest() {
try {
PutMappingRequest request
= new PutMappingRequest("twitter");
request
.type("_doc");
request
.source(
"{\n" +
" \"properties\":{\n" +
" \"id\":{\n" +
" \"type\":\"long\",\n" +
" \"store\":true\n" +
" },\n" +
" \"username\":{\n" +
" \"type\":\"text\",\n" +
" \"store\":true,\n" +
" \"analyzer\":\"ik_max_word\"\n" +
" },\n" +
" \"email\":{\n" +
" \"type\":\"text\",\n" +
" \"store\":true,\n" +
" \"analyzer\":\"ik_max_word\"\n" +
" },\n" +
" \"age\":{\n" +
" \"type\":\"long\",\n" +
" \"store\":true\n" +
" }\n" +
" }\n" +
"}",
XContentType
.JSON
);
restHighLevelClient
.indices().putMapping(request
, RequestOptions
.DEFAULT
);
} catch (IOException e
) {
System
.out
.println(e
);
}
}
3.往索引里面放入文档数据
package com
.example
;
import org
.apache
.logging
.log4j
.LogManager
;
import org
.apache
.logging
.log4j
.Logger
;
import org
.elasticsearch
.ElasticsearchException
;
import org
.elasticsearch
.action
.DocWriteResponse
;
import org
.elasticsearch
.action
.index
.IndexRequest
;
import org
.elasticsearch
.action
.index
.IndexResponse
;
import org
.elasticsearch
.action
.support
.replication
.ReplicationResponse
;
import org
.elasticsearch
.client
.RequestOptions
;
import org
.elasticsearch
.client
.RestHighLevelClient
;
import org
.elasticsearch
.common
.xcontent
.XContentType
;
import org
.elasticsearch
.rest
.RestStatus
;
import org
.junit
.jupiter
.api
.Test
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.boot
.test
.context
.SpringBootTest
;
import java
.io
.IOException
;
@SpringBootTest
public class IndexDocumentTest {
@Autowired
private RestHighLevelClient restHighLevelClient
;
private static Logger logger
= LogManager
.getRootLogger();
@Test
public void indexDocument() {
try {
IndexRequest request
= new IndexRequest(
"twitter",
"_doc",
"1");
String jsonString
= "{" +
"\"id\":\"1\"," +
"\"username\":\"JonssonYan\"," +
"\"email\":\"yz808@outlook.com\"" +
"}";
request
.source(jsonString
, XContentType
.JSON
);
IndexResponse indexResponse
= null
;
try {
indexResponse
= restHighLevelClient
.index(request
, RequestOptions
.DEFAULT
);
} catch (ElasticsearchException e
) {
if (e
.status() == RestStatus
.CONFLICT
) {
System
.out
.println("冲突了,请在此写冲突处理逻辑!" + e
.getDetailedMessage());
}
System
.out
.println("索引异常");
}
if (indexResponse
!= null
) {
String index
= indexResponse
.getIndex();
String type
= indexResponse
.getType();
String id
= indexResponse
.getId();
long version
= indexResponse
.getVersion();
if (indexResponse
.getResult() == DocWriteResponse
.Result
.CREATED
) {
System
.out
.println("新增文档成功,处理逻辑代码写到这里。");
} else if (indexResponse
.getResult() == DocWriteResponse
.Result
.UPDATED
) {
System
.out
.println("修改文档成功,处理逻辑代码写到这里。");
}
ReplicationResponse
.ShardInfo shardInfo
= indexResponse
.getShardInfo();
if (shardInfo
.getTotal() != shardInfo
.getSuccessful()) {
}
if (shardInfo
.getFailed() > 0) {
for (ReplicationResponse
.ShardInfo
.Failure failure
: shardInfo
.getFailures()) {
String reason
= failure
.reason();
System
.out
.println("副本失败原因:" + reason
);
}
}
}
} catch (IOException e
) {
logger
.error(e
);
}
}
}
4.获取文档数据
package com
.example
;
import org
.apache
.logging
.log4j
.LogManager
;
import org
.apache
.logging
.log4j
.Logger
;
import org
.elasticsearch
.ElasticsearchException
;
import org
.elasticsearch
.action
.get
.GetRequest
;
import org
.elasticsearch
.action
.get
.GetResponse
;
import org
.elasticsearch
.client
.RequestOptions
;
import org
.elasticsearch
.client
.RestHighLevelClient
;
import org
.elasticsearch
.common
.Strings
;
import org
.elasticsearch
.rest
.RestStatus
;
import org
.elasticsearch
.search
.fetch
.subphase
.FetchSourceContext
;
import org
.junit
.jupiter
.api
.Test
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.boot
.test
.context
.SpringBootTest
;
import java
.io
.IOException
;
import java
.util
.Map
;
@SpringBootTest
public class GetDocumentTest {
@Autowired
private RestHighLevelClient restHighLevelClient
;
private static Logger logger
= LogManager
.getRootLogger();
@Test
public void getDocument() {
try {
GetRequest request
= new GetRequest(
"twitter",
"_doc",
"1");
String
[] includes
= new String[]{"id", "username", "email"};
String
[] excludes
= Strings
.EMPTY_ARRAY
;
FetchSourceContext fetchSourceContext
= new FetchSourceContext(true, includes
, excludes
);
request
.fetchSourceContext(fetchSourceContext
);
GetResponse getResponse
= null
;
try {
getResponse
= restHighLevelClient
.get(request
, RequestOptions
.DEFAULT
);
} catch (ElasticsearchException e
) {
if (e
.status() == RestStatus
.NOT_FOUND
) {
logger
.info("没有找到该id的文档");
}
if (e
.status() == RestStatus
.CONFLICT
) {
logger
.info("获取时版本冲突了,请在此写冲突处理逻辑!");
}
logger
.info("获取文档异常" + e
);
}
if (getResponse
!= null
) {
String index
= getResponse
.getIndex();
String type
= getResponse
.getType();
String id
= getResponse
.getId();
if (getResponse
.isExists()) {
long version
= getResponse
.getVersion();
String sourceAsString
= getResponse
.getSourceAsString();
Map
<String, Object> sourceAsMap
= getResponse
.getSourceAsMap();
byte[] sourceAsBytes
= getResponse
.getSourceAsBytes();
logger
.info("index:" + index
+ " type:" + type
+ " id:" + id
);
logger
.info(sourceAsString
);
} else {
logger
.info("没有找到该id的文档");
}
}
} catch (IOException e
) {
logger
.error(e
);
}
}
}
5.搜索数据
package com
.example
;
import org
.apache
.logging
.log4j
.LogManager
;
import org
.apache
.logging
.log4j
.Logger
;
import org
.apache
.lucene
.search
.TotalHits
;
import org
.elasticsearch
.action
.search
.SearchRequest
;
import org
.elasticsearch
.action
.search
.SearchResponse
;
import org
.elasticsearch
.action
.search
.ShardSearchFailure
;
import org
.elasticsearch
.client
.RequestOptions
;
import org
.elasticsearch
.client
.RestHighLevelClient
;
import org
.elasticsearch
.common
.unit
.TimeValue
;
import org
.elasticsearch
.index
.query
.QueryBuilders
;
import org
.elasticsearch
.rest
.RestStatus
;
import org
.elasticsearch
.search
.SearchHit
;
import org
.elasticsearch
.search
.SearchHits
;
import org
.elasticsearch
.search
.builder
.SearchSourceBuilder
;
import org
.junit
.jupiter
.api
.Test
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.boot
.test
.context
.SpringBootTest
;
import java
.io
.IOException
;
import java
.util
.Map
;
import java
.util
.concurrent
.TimeUnit
;
@SpringBootTest
public class SearchTest {
@Autowired
private RestHighLevelClient restHighLevelClient
;
private static Logger logger
= LogManager
.getRootLogger();
@Test
public void search() {
try {
SearchRequest searchRequest
= new SearchRequest("twitter");
searchRequest
.types("_doc");
SearchSourceBuilder sourceBuilder
= new SearchSourceBuilder();
sourceBuilder
.query(QueryBuilders
.termQuery("email", "yz808@outlook.com"));
sourceBuilder
.from(0);
sourceBuilder
.size(10);
sourceBuilder
.timeout(new TimeValue(60, TimeUnit
.SECONDS
));
searchRequest
.source(sourceBuilder
);
SearchResponse searchResponse
= restHighLevelClient
.search(searchRequest
, RequestOptions
.DEFAULT
);
RestStatus status
= searchResponse
.status();
TimeValue took
= searchResponse
.getTook();
Boolean terminatedEarly
= searchResponse
.isTerminatedEarly();
boolean timedOut
= searchResponse
.isTimedOut();
int totalShards
= searchResponse
.getTotalShards();
int successfulShards
= searchResponse
.getSuccessfulShards();
int failedShards
= searchResponse
.getFailedShards();
for (ShardSearchFailure failure
: searchResponse
.getShardFailures()) {
}
SearchHits hits
= searchResponse
.getHits();
TotalHits totalHits
= hits
.getTotalHits();
float maxScore
= hits
.getMaxScore();
SearchHit
[] searchHits
= hits
.getHits();
for (SearchHit hit
: searchHits
) {
String index
= hit
.getIndex();
String type
= hit
.getType();
String id
= hit
.getId();
float score
= hit
.getScore();
String sourceAsString
= hit
.getSourceAsString();
Map
<String, Object> sourceAsMap
= hit
.getSourceAsMap();
logger
.info("index:" + index
+ " type:" + type
+ " id:" + id
);
logger
.info(sourceAsString
);
}
} catch (IOException e
) {
logger
.error(e
);
}
}
}
6.高亮
package com
.example
;
import org
.apache
.logging
.log4j
.LogManager
;
import org
.apache
.logging
.log4j
.Logger
;
import org
.apache
.lucene
.search
.TotalHits
;
import org
.elasticsearch
.action
.search
.SearchRequest
;
import org
.elasticsearch
.action
.search
.SearchResponse
;
import org
.elasticsearch
.client
.RequestOptions
;
import org
.elasticsearch
.client
.RestHighLevelClient
;
import org
.elasticsearch
.common
.text
.Text
;
import org
.elasticsearch
.index
.query
.QueryBuilder
;
import org
.elasticsearch
.index
.query
.QueryBuilders
;
import org
.elasticsearch
.rest
.RestStatus
;
import org
.elasticsearch
.search
.SearchHit
;
import org
.elasticsearch
.search
.SearchHits
;
import org
.elasticsearch
.search
.builder
.SearchSourceBuilder
;
import org
.elasticsearch
.search
.fetch
.subphase
.highlight
.HighlightBuilder
;
import org
.elasticsearch
.search
.fetch
.subphase
.highlight
.HighlightField
;
import org
.junit
.jupiter
.api
.Test
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.boot
.test
.context
.SpringBootTest
;
import java
.io
.IOException
;
import java
.util
.Map
;
@SpringBootTest
public class HighlightTest {
@Autowired
private RestHighLevelClient restHighLevelClient
;
private static Logger logger
= LogManager
.getRootLogger();
@Test
public void highlight() {
try {
SearchRequest searchRequest
= new SearchRequest("twitter");
SearchSourceBuilder sourceBuilder
= new SearchSourceBuilder();
QueryBuilder matchQueryBuilder
= QueryBuilders
.matchQuery("username", "JonssonYan");
sourceBuilder
.query(matchQueryBuilder
);
HighlightBuilder highlightBuilder
= new HighlightBuilder();
highlightBuilder
.requireFieldMatch(false).field("username").field("email")
.preTags("<strong>").postTags("</strong>");
sourceBuilder
.highlighter(highlightBuilder
);
searchRequest
.source(sourceBuilder
);
SearchResponse searchResponse
= restHighLevelClient
.search(searchRequest
, RequestOptions
.DEFAULT
);
if (RestStatus
.OK
.equals(searchResponse
.status())) {
SearchHits hits
= searchResponse
.getHits();
TotalHits totalHits
= hits
.getTotalHits();
SearchHit
[] searchHits
= hits
.getHits();
for (SearchHit hit
: searchHits
) {
String index
= hit
.getIndex();
String type
= hit
.getType();
String id
= hit
.getId();
float score
= hit
.getScore();
Map
<String, Object> sourceAsMap
= hit
.getSourceAsMap();
logger
.info("index:" + index
+ " type:" + type
+ " id:" + id
);
logger
.info("sourceMap : " + sourceAsMap
);
Map
<String, HighlightField> highlightFields
= hit
.getHighlightFields();
HighlightField highlight
= highlightFields
.get("username");
if (highlight
!= null
) {
Text
[] fragments
= highlight
.fragments();
if (fragments
!= null
) {
String fragmentString
= fragments
[0].string();
logger
.info("username highlight : " + fragmentString
);
}
}
highlight
= highlightFields
.get("email");
if (highlight
!= null
) {
Text
[] fragments
= highlight
.fragments();
if (fragments
!= null
) {
String fragmentString
= fragments
[0].string();
logger
.info("email highlight : " + fragmentString
);
}
}
}
}
} catch (IOException e
) {
logger
.error(e
);
}
}
}
7.查询建议
词项建议拼写检查,检查用户的拼写是否错误,如果有错给用户推荐正确的词,appel->apple
package com
.example
;
import org
.apache
.logging
.log4j
.LogManager
;
import org
.apache
.logging
.log4j
.Logger
;
import org
.elasticsearch
.action
.search
.SearchRequest
;
import org
.elasticsearch
.action
.search
.SearchResponse
;
import org
.elasticsearch
.client
.RequestOptions
;
import org
.elasticsearch
.client
.RestHighLevelClient
;
import org
.elasticsearch
.rest
.RestStatus
;
import org
.elasticsearch
.search
.builder
.SearchSourceBuilder
;
import org
.elasticsearch
.search
.suggest
.Suggest
;
import org
.elasticsearch
.search
.suggest
.SuggestBuilder
;
import org
.elasticsearch
.search
.suggest
.SuggestBuilders
;
import org
.elasticsearch
.search
.suggest
.SuggestionBuilder
;
import org
.elasticsearch
.search
.suggest
.term
.TermSuggestion
;
import org
.junit
.jupiter
.api
.Test
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.boot
.test
.context
.SpringBootTest
;
import java
.io
.IOException
;
@SpringBootTest
public class SuggestTest {
@Autowired
private RestHighLevelClient restHighLevelClient
;
private static Logger logger
= LogManager
.getRootLogger();
@Test
public void termSuggest() {
try {
SearchRequest searchRequest
= new SearchRequest("twitter");
SearchSourceBuilder sourceBuilder
= new SearchSourceBuilder();
sourceBuilder
.size(0);
SuggestionBuilder termSuggestionBuilder
= SuggestBuilders
.termSuggestion("username").text("JonssonYa");
SuggestBuilder suggestBuilder
= new SuggestBuilder();
suggestBuilder
.addSuggestion("suggest_user", termSuggestionBuilder
);
sourceBuilder
.suggest(suggestBuilder
);
searchRequest
.source(sourceBuilder
);
SearchResponse searchResponse
= restHighLevelClient
.search(searchRequest
, RequestOptions
.DEFAULT
);
if (RestStatus
.OK
.equals(searchResponse
.status())) {
Suggest suggest
= searchResponse
.getSuggest();
TermSuggestion termSuggestion
= suggest
.getSuggestion("suggest_user");
for (TermSuggestion
.Entry entry
: termSuggestion
.getEntries()) {
logger
.info("text: " + entry
.getText().string());
for (TermSuggestion
.Entry
.Option option
: entry
) {
String suggestText
= option
.getText().string();
logger
.info(" suggest option : " + suggestText
);
}
}
}
} catch (IOException e
) {
e
.printStackTrace();
}
}
}
自动补全,根据用户的输入联想到可能的词或者短语
8.聚合分析
package com
.example
;
import org
.apache
.logging
.log4j
.LogManager
;
import org
.apache
.logging
.log4j
.Logger
;
import org
.elasticsearch
.action
.search
.SearchRequest
;
import org
.elasticsearch
.action
.search
.SearchResponse
;
import org
.elasticsearch
.client
.RequestOptions
;
import org
.elasticsearch
.client
.RestHighLevelClient
;
import org
.elasticsearch
.rest
.RestStatus
;
import org
.elasticsearch
.search
.aggregations
.AggregationBuilders
;
import org
.elasticsearch
.search
.aggregations
.Aggregations
;
import org
.elasticsearch
.search
.aggregations
.BucketOrder
;
import org
.elasticsearch
.search
.aggregations
.bucket
.terms
.Terms
;
import org
.elasticsearch
.search
.aggregations
.bucket
.terms
.TermsAggregationBuilder
;
import org
.elasticsearch
.search
.aggregations
.metrics
.Avg
;
import org
.elasticsearch
.search
.builder
.SearchSourceBuilder
;
import org
.junit
.jupiter
.api
.Test
;
import org
.springframework
.beans
.factory
.annotation
.Autowired
;
import org
.springframework
.boot
.test
.context
.SpringBootTest
;
import java
.io
.IOException
;
@SpringBootTest
public class AggregationTest {
@Autowired
private RestHighLevelClient restHighLevelClient
;
private static Logger logger
= LogManager
.getRootLogger();
@Test
public void aggregation() {
try {
SearchRequest searchRequest
= new SearchRequest("twitter");
SearchSourceBuilder sourceBuilder
= new SearchSourceBuilder();
sourceBuilder
.size(0);
TermsAggregationBuilder aggregation
= AggregationBuilders
.terms("by_age")
.field("age").order(BucketOrder
.aggregation("average_balance", true));
aggregation
.subAggregation(AggregationBuilders
.avg("average_balance")
.field("balance"));
sourceBuilder
.aggregation(aggregation
);
searchRequest
.source(sourceBuilder
);
SearchResponse searchResponse
= restHighLevelClient
.search(searchRequest
, RequestOptions
.DEFAULT
);
if (RestStatus
.OK
.equals(searchResponse
.status())) {
Aggregations aggregations
= searchResponse
.getAggregations();
Terms byAgeAggregation
= aggregations
.get("by_age");
logger
.info("aggregation by_age 结果");
logger
.info("docCountError: " + byAgeAggregation
.getDocCountError());
logger
.info("sumOfOtherDocCounts: " + byAgeAggregation
.getSumOfOtherDocCounts());
logger
.info("------------------------------------");
for (Terms
.Bucket buck
: byAgeAggregation
.getBuckets()) {
logger
.info("key: " + buck
.getKeyAsNumber());
logger
.info("docCount: " + buck
.getDocCount());
logger
.info("docCountError: " + buck
.getDocCountError());
Avg averageBalance
= buck
.getAggregations().get("average_balance");
logger
.info("average_balance: " + averageBalance
.getValue());
logger
.info("------------------------------------");
}
}
} catch (IOException e
) {
logger
.error(e
);
}
}
}
各种查询对应的QueryBuilder: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-query-builders.html 各种聚合对应的AggregationBuilder: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-aggregation-builders.html