search(4)- elastic4s-ElasticDsl

来源:https://www.cnblogs.com/tiger-xc/archive/2020/04/16/12716181.html
-Advertisement-
Play Games

上次分析了一下elastic4s的運算框架。本來計劃接著開始實質的函數調用示範,不過看過了Elastic4s的所有使用說明文檔後感覺還是走的快了一點。主要原因是elasticsearch在7.0後有了很多重點調整改變,elastic4s雖然一直在源代碼方面緊跟ES的變化,但使用文件卻一直未能更新,所 ...


   上次分析了一下elastic4s的運算框架。本來計劃接著開始實質的函數調用示範,不過看過了Elastic4s的所有使用說明文檔後感覺還是走的快了一點。主要原因是elasticsearch在7.0後有了很多重點調整改變,elastic4s雖然一直在源代碼方面緊跟ES的變化,但使用文件卻一直未能更新,所以從說明文檔中學習elastic4s的使用方法是不可能的,必須從源碼中摸索。花了些時間過了一次elastic4s的源碼,感覺這個工具庫以後還是挺有用的:一是通過編程方式產生json請求比較靈活,而且可以通過compiler來保證json語句的正確性。二是對搜索結果的處理方面:由於返回的搜索結果是一堆又長又亂的複雜json,不敢想象自己要如何正確的解析這些json, 然後才能調用到正確的結果,但elastic4s提供了一套很完善的response類,使用起來可能會很方便。實際上elastic4s的編程模式和scala語言運用還是值得學習的。既然這樣,我想可能用elastic4s做一套完整的示範,包括:索引創建、索引維護、搜索、聚合統計等,對瞭解和掌握elastic4s可能大有幫助。在這之前,我們還是再回顧一下elastic4s的運算原理:elastic4s的功能其實很簡單:通過dsl語句組合產生json請求,然後發送給ES-rest終端, 對返回的json結果進行處理,篩選出目標答案。

上篇我們討論過elastic4s的基本運算框架:

  client.execute(
      createIndex("company")
      .shards(2).replicas(3)
  )

...

  val bookschema = putMapping("books")
      .as(
        KeywordField("isbn"),
        textField("title"),
        doubleField("price")
      )

  client.execute(
    bookschema
  )

...

  val futAccts = client.execute(
    search("bank").termQuery("city" -> "dante")
  )
  futAccts.onComplete{
    case Success(esresp) =>
      esresp.result.hits.hits.foreach(h =>println(h.sourceAsMap))
    case Failure(err) => println(s"search error: ${err.getMessage}")
  }

實際上execute(T)的T代表elastic4s支持的所有ES操作類型。這種方法實現有賴於scala的typeclass模式。我們先看看execute函數定義:

  // Executes the given request type T, and returns an effect of Response[U]
  // where U is particular to the request type.
  // For example a search request will return a Response[SearchResponse].
  def execute[T, U, F[_]](t: T)(implicit
                                executor: Executor[F],
                                functor: Functor[F],
                                handler: Handler[T, U],
                                manifest: Manifest[U]): F[Response[U]] = {
    val request = handler.build(t)
    val f = executor.exec(client, request)
    functor.map(f) { resp =>
      handler.responseHandler.handle(resp) match {
        case Right(u) => RequestSuccess(resp.statusCode, resp.entity.map(_.content), resp.headers, u)
        case Left(error) => RequestFailure(resp.statusCode, resp.entity.map(_.content), resp.headers, error)
      }
    }
  }

上篇提過Handler[T,U]是個typeclass, 代表對不同類型T的json構建方法。elastic4s提供了這個T類型的操作方法,如下:

trait ElasticDsl
    extends ElasticApi
    with Logging
    with ElasticImplicits
    with BulkHandlers
    with CatHandlers
    with CountHandlers
    with ClusterHandlers
    with DeleteHandlers
    with ExistsHandlers
    with ExplainHandlers
    with GetHandlers
    with IndexHandlers
    with IndexAdminHandlers
    with IndexAliasHandlers
    with IndexStatsHandlers
    with IndexTemplateHandlers
    with LocksHandlers
    with MappingHandlers
    with NodesHandlers
    with ReindexHandlers
    with RoleAdminHandlers
    with RoleHandlers
    with RolloverHandlers
    with SearchHandlers
    with SearchTemplateHandlers
    with SearchScrollHandlers
    with SettingsHandlers
    with SnapshotHandlers
    with UpdateHandlers
    with TaskHandlers
    with TermVectorHandlers
    with UserAdminHandlers
    with UserHandlers
    with ValidateHandlers {

  implicit class RichRequest[T](t: T) {
    def request(implicit handler: Handler[T, _]): ElasticRequest = handler.build(t)
    def show(implicit handler: Handler[T, _]): String            = Show[ElasticRequest].show(handler.build(t))
  }
}

object ElasticDsl extends ElasticDsl

所有的操作api在這裡:

trait ElasticApi
  extends AliasesApi
    with ElasticImplicits
    with AggregationApi
    with AnalyzerApi
    with BulkApi
    with CatsApi
    with CreateIndexApi
    with ClearRolesCacheApi
    with ClusterApi
    with CollapseApi
    with CountApi
    with CreateRoleApi
    with CreateUserApi
    with DeleteApi
    with DeleteIndexApi
    with DeleteRoleApi
    with DeleteUserApi
    with DynamicTemplateApi
    with ExistsApi
    with ExplainApi
    with ForceMergeApi
    with GetApi
    with HighlightApi
    with IndexApi
    with IndexAdminApi
    with IndexRecoveryApi
    with IndexTemplateApi
    with LocksApi
    with MappingApi
    with NodesApi
    with NormalizerApi
    with QueryApi
    with PipelineAggregationApi
    with ReindexApi
    with RoleApi
    with ScriptApi
    with ScoreApi
    with ScrollApi
    with SearchApi
    with SearchTemplateApi
    with SettingsApi
    with SnapshotApi
    with SortApi
    with SuggestionApi
    with TaskApi
    with TermVectorApi
    with TokenizerApi
    with TokenFilterApi
    with TypesApi
    with UpdateApi
    with UserAdminApi
    with UserApi
    with ValidateApi {

  implicit class RichFuture[T](future: Future[T]) {
    def await(implicit duration: Duration = 60.seconds): T = Await.result(future, duration)
  }
}

object ElasticApi extends ElasticApi

通過 import ElasticDsl._  ,所有類型的api,handler操作方法都有了。下麵是例子里的api方法:

trait CreateIndexApi {

  def createIndex(name: String): CreateIndexRequest = CreateIndexRequest(name)
...
}

trait MappingApi {
...
  def putMapping(indexes: Indexes): PutMappingRequest =  PutMappingRequest(IndexesAndType(indexes))
}

trait SearchApi {
  def search(index: String): SearchRequest = search(Indexes(index))
...
}

CreateIndexRequest, PutMappingRequest,SearchRequest這幾個類型都提供了handler隱式實例:

trait IndexAdminHandlers {
...
  implicit object CreateIndexHandler extends Handler[CreateIndexRequest, CreateIndexResponse] {

    override def responseHandler: ResponseHandler[CreateIndexResponse] = new ResponseHandler[CreateIndexResponse] {
      override def handle(response: HttpResponse): Either[ElasticError, CreateIndexResponse] =
        response.statusCode match {
          case 200 | 201 => Right(ResponseHandler.fromResponse[CreateIndexResponse](response))
          case 400 | 500 => Left(ElasticError.parse(response))
          case _         => sys.error(response.toString)
        }
    }

    override def build(request: CreateIndexRequest): ElasticRequest = {

      val endpoint = "/" + URLEncoder.encode(request.name, "UTF-8")

      val params = scala.collection.mutable.Map.empty[String, Any]
      request.waitForActiveShards.foreach(params.put("wait_for_active_shards", _))
      request.includeTypeName.foreach(params.put("include_type_name", _))

      val body   = CreateIndexContentBuilder(request).string()
      val entity = HttpEntity(body, "application/json")

      ElasticRequest("PUT", endpoint, params.toMap, entity)
    }
  }

}

...

trait MappingHandlers {
...
 implicit object PutMappingHandler extends Handler[PutMappingRequest, PutMappingResponse] {

    override def build(request: PutMappingRequest): ElasticRequest = {

      val endpoint = s"/${request.indexesAndType.indexes.mkString(",")}/_mapping${request.indexesAndType.`type`.map("/" + _).getOrElse("")}"

      val params = scala.collection.mutable.Map.empty[String, Any]
      request.updateAllTypes.foreach(params.put("update_all_types", _))
      request.ignoreUnavailable.foreach(params.put("ignore_unavailable", _))
      request.allowNoIndices.foreach(params.put("allow_no_indices", _))
      request.expandWildcards.foreach(params.put("expand_wildcards", _))
      request.includeTypeName.foreach(params.put("include_type_name", _))

      val body   = PutMappingBuilderFn(request).string()
      val entity = HttpEntity(body, "application/json")

      ElasticRequest("PUT", endpoint, params.toMap, entity)
    }
  }

}
...
trait SearchHandlers {
...
  implicit object SearchHandler extends Handler[SearchRequest, SearchResponse] {

    override def build(request: SearchRequest): ElasticRequest = {

      val endpoint =
        if (request.indexes.values.isEmpty)
          "/_all/_search"
        else
          "/" + request.indexes.values
            .map(URLEncoder.encode(_, "UTF-8"))
            .mkString(",") + "/_search"

      val params = scala.collection.mutable.Map.empty[String, String]
      request.requestCache.map(_.toString).foreach(params.put("request_cache", _))
      request.searchType
        .filter(_ != SearchType.DEFAULT)
        .map(SearchTypeHttpParameters.convert)
        .foreach(params.put("search_type", _))
      request.routing.map(_.toString).foreach(params.put("routing", _))
      request.pref.foreach(params.put("preference", _))
      request.keepAlive.foreach(params.put("scroll", _))
      request.allowPartialSearchResults.map(_.toString).foreach(params.put("allow_partial_search_results", _))
      request.batchedReduceSize.map(_.toString).foreach(params.put("batched_reduce_size", _))

      request.indicesOptions.foreach { opts =>
        IndicesOptionsParams(opts).foreach { case (key, value) => params.put(key, value) }
      }

      request.typedKeys.map(_.toString).foreach(params.put("typed_keys", _))

      val body = request.source.getOrElse(SearchBodyBuilderFn(request).string())
      ElasticRequest("POST", endpoint, params.toMap, HttpEntity(body, "application/json"))
    }
  }

}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 代碼 @Slf4j public class StringCompareDemo { public static void compare() { String a = "1"; String b = "1"; log.info("\nString a = \"1\";\n" + "String b ...
  • 最近與同事討論時,提到Go語言的可變參數,之前沒有總結過相關知識點,今天我們介紹一下Go語言的可變參數。 可變參數(Variable Parameters):參數數量可變的函數稱之為可變參數函數,主要是在使用語法糖(syntactic sugar)。最經典的例子就是fmt.Printf()和類似的函 ...
  • 前言: 準備了體體面面的自我介紹,敗在了技術深度上;又或者技術知識背得完完全全,卻輸在了面試技巧,看看這個,一定要看到最後 115個Java面試題: 1. 什麼是Java虛擬機?為什麼Java被稱作是無關的編程語言? 2. JDK和JRE的區別是什麼? 3. static關鍵字是什麼意思?Java中 ...
  • 命令行啟動服務的方式,在後端使用非常廣泛,如果有寫過C語言的同學相信不難理解這一點!在C語言中,我們可以根據argc和argv來獲取和解析命令行的參數,從而通過不同的參數調取不同的方法,同時也可以用Usage來列印幫助信息了。 那麼開始今天的話題之前,我們回顧一下在C語言中是如何解析傳遞的參數的。 ...
  • 在Asp.net的WEBform中,上傳文件與下載文件處理是很簡單的事情,如果轉為ASP.NET MVC呢?那就沒有那麼容易了,難少少,也不是很難,一起來看下本文吧。本文主要講如何在Asp.net MVC中上傳文件,然後如何再從伺服器中把上傳過的文件下載下來.在Web Forms中,當你把一個Fil ...
  • SolrCloud SolrCloud(solr 雲)是Solr提供的分散式搜索方案,當你需要大規模,容錯,分散式索引和檢索能力時使用 SolrCloud。當一個系統的索引數據量少的時候是不需要使用SolrCloud的,當索引量很大,搜索請求併發很高,這時需要使用SolrCloud來滿足這些需求。 ...
  • 如果你參加過一些大廠面試,肯定會遇到一些開放性的問題: 1、寫一段程式,讓其運行時的表現為觸發了5次Young GC、3次Full GC、然後3次Young GC; 2、如果一個Java進程突然消失了,你會怎麼去排查這種問題? 3、給了一段Spring載入Bean的代碼片段,闡述一下具體的執行流程? ...
  • 前言: 常常一些核心技術等我碰到的時候才發現自己忘得差不多了,甘心安於現狀,等自己跟別人有了差距之後才想起來要學習,我太難了,永遠不要停下自己學習的腳步,比你厲害的人真的有很多,今天給大家分享的是一份283頁的Java核心知識點(PDF)特別詳細,有幸得此寶典,這麼詳細的核心知識點怎能獨吞呢,分享給 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...