前言 上文介紹了ES的各種查詢; 本文介紹如何在ES進行MySQL中的分組和聚合查詢 實現用戶輸入拼音自動補全功能 實現MySQL和ES之間的數據自動同步; 一、分組聚合 在ES中對於聚合查詢,主要分為2大類:指標(Metric)聚合 與 桶(Bucket)聚合。 指標聚合:max、min、sum等 ...
前言
上文介紹了ES的各種查詢;
本文介紹如何在ES進行MySQL中的分組和聚合查詢
實現用戶輸入拼音自動補全功能
實現MySQL和ES之間的數據自動同步;
一、
在ES中對於聚合查詢,主要分為2大類:指標(Metric)聚合 與 桶(Bucket)聚合。
- 指標聚合:max、min、sum等,作用等同於Mysql中的相關聚合函數。
- 桶聚合:group by,作用等同於Mysql中根據哪1個欄位進行分組
註意,我們不能對text類型的欄位進行分組,因為text會進行分詞,導致無法進行分組。
指標聚合相當於MySQL中聚合函數,統計品牌為萬豪的最貴酒店價格
GET /hotel/_search
{
"query": {
"term": {
"brand": {
"value": "萬豪"
}
}
},
"size": 0,
"aggs": {
"最貴的": {
"max": {
"field": "price"
}
},
"最便宜的": {
"min": {
"field": "price"
}
}
}
}
GET /hotel/_search
{
"size": 0,
"query": {
"term": {
"brand": {
"value": "萬豪"
}
}
},
"aggs": {
"按星級名稱分組": {
"terms": {
"field": "specs",
"size": 20
}
}
}
}
對資料庫中所有數據,按照星級和品牌分組;
GET /hotel/_search
{
"size": 0,
"aggs": {
"按品牌分組": {
"terms": {
"field": "brand",
"size": 20
}
},
"按星級分組": {
"terms": {
"field": "specs",
"size": 20
}
}
}
}
3.總結
在ES中1次請求,可以寫多個聚合函數;
4.功能實現
根據搜索條件篩選之後,再根據品牌進行分組;
GET hotel/_search { "size": 0, "query": { "query_string": { "fields": ["name","synopsis","area","address"], "query": "三亞 OR 商務" } }, "aggs": { "hotel_brands": { "terms": { "field": "brand", "size": 100 } } } }
@Override public Map<String, Object> searchBrandGroupQuery(Integer current, Integer size, Map<String, Object> searchParam) { //設置查詢請求頭 SearchRequest searchRequest = new SearchRequest("hotel"); //設置查詢請求體 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //設置查詢方式 if (!StringUtils.isEmpty(searchParam.get("condition"))) { QueryBuilder queryBuilder = QueryBuilders.queryStringQuery(searchParam.get("condition").toString()) .field("name") .field("synopsis") .field("area") .field("address") .defaultOperator(Operator.OR); searchSourceBuilder.query(queryBuilder); } //設置按品牌分組 AggregationBuilder aggregationBuilder = AggregationBuilders.terms("brand_groups") .size(200) .field("brand"); searchSourceBuilder.aggregation(aggregationBuilder); //設置分頁 searchSourceBuilder.from((current - 1) * size); searchSourceBuilder.size(size); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = searchResponse.getHits(); long totalHits = hits.getTotalHits().value; ArrayList<String> groupNameList = new ArrayList<>(); //獲取並處理聚合查詢結果 Terms brandGroups = searchResponse.getAggregations().get("brand_groups"); for (Terms.Bucket bucket : brandGroups.getBuckets()) { String key = (String) bucket.getKey(); groupNameList.add(key); } Map<String, Object> map = new HashMap<>(); // map.put("list", list); map.put("totalResultSize", totalHits); map.put("current", current); //設置總頁數 map.put("totalPage", (totalHits + size - 1) / size); //設置品牌分組列表 map.put("brandList", groupNameList); return map; } catch (IOException e) { e.printStackTrace(); } return null; }HotelServiceImpl.java
5.分組和聚合一起使用
通常情況我們統計數據時,會先進行分組,然後再在分組的基礎上進行聚合操作;
根據用戶輸入的日期,統計某品牌下所有酒店銷量。 對於該功能的實現,需要進行多層聚合。
- 根據品牌進行分組查詢
5.1.
GET hotel/_search
{
"size": 0,
"query": {
"range": {
"createTime": {
"gte": "2015-01-01",
"lte": "2015-12-31"
}
}
},
"aggs": {
"根據品牌分組": {
"terms": {
"field": "brand",
"size": 100
},
"aggs": {
"該品牌總銷量": {
"sum": {
"field": "salesVolume"
}
},
"該品牌銷量平均值": {
"avg": {
"field": "salesVolume"
}
}
}
}
}
}
public List<Map<String, Object>> searchDateHistogram(Map<String, Object> searchParam) { //定義結果集 List<Map<String, Object>> result = new ArrayList<>(); //設置查詢 SearchRequest searchRequest = new SearchRequest("hotel"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //todo 自定義日期時間段範圍查詢 RangeQueryBuilder queryBuilder = QueryBuilders.rangeQuery("createTime") .gte(searchParam.get("minTime")) .lte(searchParam.get("maxTime")) .format("yyyy-MM-dd"); searchSourceBuilder.query(queryBuilder); //todo 聚合查詢設置 TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("hotel_brand").field("brand").size(100); //構建二級聚合 SumAggregationBuilder secondAggregation = AggregationBuilders.sum("hotel_salesVolume").field("salesVolume"); aggregationBuilder.subAggregation(secondAggregation); searchSourceBuilder.aggregation(aggregationBuilder); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); //todo 獲取聚合結果並處理 Aggregations aggregations = searchResponse.getAggregations(); Map<String, Aggregation> aggregationMap = aggregations.asMap(); Terms terms = (Terms) aggregationMap.get("hotel_brand"); List<? extends Terms.Bucket> buckets = terms.getBuckets(); buckets.forEach(bucket -> { Map<String, Object> info = new HashMap<>(); info.put("brand",bucket.getKeyAsString()); //獲取二級聚合數據 ParsedSum parsedSum = bucket.getAggregations().get("hotel_salesVolume"); Integer sumValue = (int) parsedSum.getValue(); info.put("sumValue",sumValue); result.add(info); }); return result; } catch (IOException e) { e.printStackTrace(); } return null; }HotelServiceImpl.java
-
fuction score:算分函數查詢,可以控制文檔相關性算分,控制文檔排名
-
GET hotel/_search
{
"query": {
"match": {
"name": "北京市東城區萬豪"
}
}
}
#結果
[
{
"_score" : 7.060467,
"_source" : {
"name" : "北京市東城區萬豪酒店",
}
},
{
"_score" : 7.060467,
"_source" : {
"name" : "北京市東城區金陵酒店",
}
},
{
"_score" : 7.060467,
"_source" : {
"name" : "北京市東城區華天酒店",
}
}
]
在ElasticSearch中,早期使用的打分演算法是TF-IDF演算法,公式如下:
在後來的5.1版本升級中,elasticsearch將演算法改進為BM25演算法,公式如下:
TF-IDF演算法有一各缺陷,就是詞條頻率越高,文檔得分也會越高,單個詞條對文檔影響較大。而BM25則會讓單個詞條的算分有一個上限,曲線更加平滑:
-
TF-IDF演算法
-
#查詢多域展示相關結果數據
GET hotel/_search
{
"query": {
"query_string": {
"fields": ["name","synopsis","area","address"],
"query": "北京市萬豪spa三星"
}
}
查詢結果
2.2.權重設置
在查詢的時候給每1條數據的權重進行加分操作,但是沒有用因為每1條數據都漲了(內捲),無法實現競價排名;
GET hotel/_search
{
"query": {
"query_string": {
"fields": ["name","synopsis","area","address"],
"query": "北京市萬豪spa三星",
"boost": 50
}
}
}
查詢結果
2.2.2.索引設置(靜態)
在創建索引時,指定欄位的配置權重;
該方式在開發中不常用,因為隨著業務的改變,無法隨時調整權重;
而索引一旦創建則無法修改,除非刪除索引重建。
PUT hotel
{
"mappings": {
"properties": {
"name":{
"type": "text",
"analyzer": "ik_max_word",
"boost": 5
},
"address":{
"type": "text",
"analyzer": "ik_max_word",
"boost": 3
}
}
}
}
2.2.3.查詢設置(動態)
在下列查詢中,query中的內容為主查詢條件,functions中為判斷要為哪些數據加權。weight
假設x豪掏了告費用,那我就為品牌為x豪的酒店,權重值增加50倍;
GET hotel/_search
{
"query": {
"function_score": {
"query": {
"query_string": {
"fields": ["name","synopsis","area","address"],
"query": "北京市spa三星"
}
},
"functions": [
{
"filter": {
"term": {
"brand": "x豪"
}
},
"weight": 50
}
]
}
}
}
查詢結果
3.
GET hotel/_search
{
"query": {
"function_score": {
"query": {
"query_string": {
"fields": [
"name",
"specs",
"area"
],
"query": "北京市萬豪sap三星"
}
},
"functions": [
{
"filter": {
"term": {
"isAd": "1"
}
},
"weight": 100
}
]
}
}
}
public Map<String, Object> searchScoreQuery(Integer current, Integer size, Map<String, Object> searchParam) { SearchRequest searchRequest = new SearchRequest("hotel"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //構建主查詢條件 QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(searchParam.get("condition").toString()) .field("name") .field("synopsis") .field("area") .field("address") .defaultOperator(Operator.OR); //構建加權條件 FunctionScoreQueryBuilder.FilterFunctionBuilder[] scoreFunctionBuilder = new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ new FunctionScoreQueryBuilder.FilterFunctionBuilder(QueryBuilders.termQuery("isAd",1), ScoreFunctionBuilders.weightFactorFunction(100)) }; FunctionScoreQueryBuilder queryBuilder = QueryBuilders.functionScoreQuery(queryStringQueryBuilder, scoreFunctionBuilder); searchSourceBuilder.query(queryBuilder); //設置分頁 searchSourceBuilder.from((current - 1) * size); searchSourceBuilder.size(size); searchRequest.source(searchSourceBuilder); try { //處理查詢結果 SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchHits hits = searchResponse.getHits(); long totalHits = hits.getTotalHits().value; SearchHit[] searchHits = hits.getHits(); List<HotelEntity> list = new ArrayList<>(); for (SearchHit searchHit : searchHits) { String sourceAsString = searchHit.getSourceAsString(); list.add(JSON.parseObject(sourceAsString, HotelEntity.class)); } Map<String, Object> map = new HashMap<>(); map.put("list", list); map.put("totalResultSize", totalHits); map.put("current", current); //設置總頁數 map.put("totalPage", (totalHits + size - 1) / size); return map; } catch (IOException e) { e.printStackTrace(); } return null; }HotelServiceImpl.java
1.
PUT user
{
"mappings": {
"properties": {
"first_name": {
"type": "text"
},
"last_name": {
"type": "text"
}
}
}
}
#添加數據
PUT user/_doc/1
{
"first_name": "John",
"last_name": "Smith"
}
#查詢
GET user/_search
{
"query": {
"query_string": {
"fields": ["first_name","last_name"],
"query": "John OR Smith"
}
}
}
我們可以利用copy_to屬性完成將多個欄位,合併拷貝到一個欄位中簡化查詢;
這是典型的空間換時間操作;
DELETE user
PUT user
{
"mappings": {
"properties": {
"first_name": {
"type": "text",
"copy_to": "full_name"
},
"last_name": {
"type": "text",
"copy_to": "full_name"
},
"full_name": {
"type": "text"
}
}
}
}
PUT user/_doc/1
{
"first_name": "John",
"last_name": "Smith"
}
#用match當做單欄位查詢
GET user/_search
{
"query": {
"match": {
"full_name": {
"query": "John Smith",
"operator": "and"
}
}
}
}
- copy_to屬性可以幫助我們將多個欄位或者一個欄位拷貝到另外一個欄位
- copy_to屬性可以幫助我們簡化查詢
2.1.解壓
2.2.上傳到ES的插件目錄下
[root@zhanggen plugins]# ls elasticsearch-analysis-ik-7.10.1 elasticsearch-analysis-pinyin-7.10.1 [root@zhanggen plugins]# pwd /mydata/elasticsearch/plugins
2.3.重啟es容器
POST /_analyze
{
"text": "張根",
"analyzer": "pinyin"
}
2.5.測試結果
{
"tokens" : [
{
"token" : "zhang",
"start_offset" : 0,
"end_offset" : 0,
"type" : "word",
"position" : 0
},
{
"token" : "zg",
"start_offset" : 0,
"end_offset" : 0,
"type" : "word",
"position" : 0
},
{
"token" : "gen",
"start_offset" : 0,
"end_offset" : 0,
"type" : "word",
"position" : 1
}
]
}
3.1.聲明自定義分詞器
聲明自定義分詞器的語法如下:
PUT test
{
"settings": {
"analysis": {
"analyzer": {
"my_analyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}
POST test/_analyze
{
"text": "張根",
"analyzer": "my_analyzer"
}
3.2.查看分詞結果
{
"tokens" : [
{
"token" : "張",
"start_offset" : 0,
"end_offset" : 1,
"type" : "CN_CHAR",
"position" : 0
},
{
"token" : "zhang",
"start_offset" : 0,
"end_offset" : 1,
"type" : "CN_CHAR",
"position" : 0
},
{
"token" : "z",
"start_offset" : 0,
"end_offset" : 1,
"type" : "CN_CHAR",
"position" : 0
},
{
"token" : "根",
"start_offset" : 1,
"end_offset" : 2,
"type" : "CN_CHAR",
"position" : 1
},
{
"token" : "gen",
"start_offset" : 1,
"end_offset" : 2,
"type" : "CN_CHAR",
"position" : 1
},
{
"token" : "g",
"start_offset" : 1,
"end_offset" : 2,
"type" : "CN_CHAR",
"position" : 1
}
]
}
4.
-
參與補全查詢的欄位必須是completion類型。
-
欄位的內容一般是用來補全的多個詞條形成的數組。
PUT test { "mappings": { "properties": { "title": { "type": "completion" } } } }
然後插入下麵的數據
#示例數據
POST test/_doc
{
"title": [
"Sony",
"WH-1000XM3"
]
}
POST test/_doc
{
"title": [
"SK-II",
"PITERA"
]
}
POST test/_doc
{
"title": [
"Nintendo",
"switch"
]
}
查詢的DSL語句如下
#自動補全
GET test/_search
{
"suggest": {
"YOUR_SUGGESTION": {
"text": "s",
"completion": {
"field": "title",
"skip_duplicates":true,
"size":10
}
}
}
}
- 1.定義分詞器
- 2.創建suggest欄位
# 酒店數據索引庫
PUT hotel_3
{
"settings": {
"analysis": {
"analyzer": {
"text_anlyzer": {
"tokenizer": "ik_max_word",
"filter": "py"
},
"completion_analyzer": {
"tokenizer": "keyword",
"filter": "py"
}
},
"filter": {
"py": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chinese_pinyin_tokenize": false
}
}
}
},
"mappings": {
"properties": {
"suggest":{
"type": "completion",
"analyzer": "completion_analyzer"
},
"address" : {
"type" : "text",
"analyzer" : "text_anlyzer",
"search_analyzer" : "ik_smart"
},
"area" : {
"type" : "text",
"analyzer" : "text_anlyzer",
"search_analyzer" : "ik_smart"
},
"brand" : {
"type" : "keyword",
"copy_to": "suggest"
},
"createTime" : {
"type" : "date",
"format" : "yyyy-MM-dd"
},
"id" : {
"type" : "long"
},
"imageUrl" : {
"type" : "text"
},
"isAd" : {
"type" : "integer"
},
"name" : {
"type" : "text",
"analyzer" : "text_anlyzer",
"search_analyzer" : "ik_smart",
"copy_to": "suggest"
},
"price" : {
"type" : "integer"
},
"salesVolume" : {
"type" : "integer"
},
"specs" : {
"type" : "keyword"
},
"synopsis" : {
"type" : "text",
"analyzer" : "text_anlyzer",
"search_analyzer" : "ik_smart"
},
"type" : {
"type" : "keyword"
}
}
}
}
#平滑遷移數據 POST _reindex?wait_for_completion=false&requests_per_second=200 { "source": { "index": "hotel_2" }, "dest": { "index":"hotel_3" } } #檢查任務狀態 GET _tasks/_6af5BFpS7mrvRyP6f8xlg:6792 #重新指向別名 #斷開原來的關係 POST _aliases { "actions": [ { "remove": { "index": "hotel_2", "alias": "hotel" } } ] } #刪除原來的索引表 DELETE hotel_2 #新建hotel_2的關係 POST _aliases { "actions": [ { "add": { "index": "hotel_3", "alias": "hotel" } } ] }
5.3.
模擬用戶輸入了1個拼音wan
GET hotel/_search
{
"_source": false,
"suggest": {
"my_suggest": {
"text": "wan",
"completion": {
"field": "suggest",
"skip_duplicates":true,
"size":10
}
}
}
}
5.4.查看結果
查到了萬事達、萬豪、王朝
{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 0,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"suggest" : {
"my_suggest" : [
{
"text" : "wan",
"offset" : 0,
"length" : 3,
"options" : [
{
"text" : "萬事達",
"_index" : "hotel_3",
"_type" : "_doc",
"_id" : "AeSfyIEBhlAS7ARu8P7t",
"_score" : 1.0
},
{
"text" : "萬悅",
"_index" : "hotel_3",
"_type" : "_doc",
"_id" : "_uSfyIEBhlAS7ARu8P3t",
"_score" : 1.0
},
{
"text" : "萬豪",
"_index" : "hotel_3",
"_type" : "_doc",
"_id" : "wuSfyIEBhlAS7ARu8P3t",
"_score" : 1.0
},
{
"text" : "王朝",
"_index" : "hotel_3",
"_type" : "_doc",
"_id" : "1eSfyIEBhlAS7ARu8P3t",
"_score" : 1.0
}
]
}
]
}
}
5.5.
public List<String> searchSuggestInfo(String key) { //定義結果集 List<String> result = new ArrayList<>(); //設置查詢 SearchRequest searchRequest = new SearchRequest("hotel"); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //todo 構建自動補全搜索 searchSourceBuilder.fetchSource(false); SuggestBuilder suggestBuilder = new SuggestBuilder(); CompletionSuggestionBuilder suggest = SuggestBuilders .completionSuggestion("suggest") .prefix(key) .skipDuplicates(true) .size(10); suggestBuilder.addSuggestion("my_suggest",suggest); searchSourceBuilder.suggest(suggestBuilder); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); //處理自動補全查詢結果 CompletionSuggestion my_suggest = searchResponse.getSuggest().getSuggestion("my_suggest"); List<CompletionSuggestion.Entry.Option> options = my_suggest.getOptions(); for (CompletionSuggestion.Entry.Option option : options) { String s = option.getText().string(); result.add(s); } return result; } catch (IOException e) { throw new RuntimeException(e); } }HotelServiceImpl.java
5.6.效果
E