elastic4s是elasticsearch一個第三方開發的scala語言終端工具庫(Elastic4s is a concise, idiomatic, reactive, type safe Scala client for Elasticsearch.)。scala用戶可以用elastic4 ...
elastic4s是elasticsearch一個第三方開發的scala語言終端工具庫(Elastic4s is a concise, idiomatic, reactive, type safe Scala client for Elasticsearch.)。scala用戶可以用elastic4s提供的DSL用編程代碼形式來構建ES服務請求。與字元型json文本直接編寫請求不同的是:在編譯DSL編寫的ES服務請求時可以發現無論是語法上或者語意上的錯誤。一般來講:elastic4s的程式流程相對直接、簡單,如下:
client.execute {
indexInto("books" ).fields("title" -> "重慶火鍋的十種吃法", "content" -> "在這部書里描述了火鍋的各種烹飪方式")
}.await
val response = client.execute {
search("books").matchQuery("title", "火鍋")
}.await
...
...
一項ES操作服務從構建請求到具體運行都是在execute(T)這個函數里進行的。值得註意的是這個T類型在上面的例子里可以是IndexRequest或者SearchRequest,如下:
def indexInto(index: Index): IndexRequest
...
def search(index: String): SearchRequest
實際上execute(T)的T代表elastic4s支持的所有ES操作類型。這種方法實現有賴於scala的typeclass模式。我們先看看execute函數定義:
// Executes the given request type T, and returns an effect of Response[U]
// where U is particular to the request type.
// For example a search request will return a Response[SearchResponse].
def execute[T, U, F[_]](t: T)(implicit
executor: Executor[F],
functor: Functor[F],
handler: Handler[T, U],
manifest: Manifest[U]): F[Response[U]] = {
val request = handler.build(t)
val f = executor.exec(client, request)
functor.map(f) { resp =>
handler.responseHandler.handle(resp) match {
case Right(u) => RequestSuccess(resp.statusCode, resp.entity.map(_.content), resp.headers, u)
case Left(error) => RequestFailure(resp.statusCode, resp.entity.map(_.content), resp.headers, error)
}
}
}
這個函數比較重要的功能之一應該是構建服務請求了。這個功能是通過handler.build(t)實現的。handler: Handler[T,U]是個隱式參數,它就是一個typeclass:
/**
* A [[Handler]] is a typeclass used by the [[ElasticClient]] in order to
* create [[ElasticRequest]] instances which are sent to the elasticsearch
* server, as well as returning a [[ResponseHandler]] which handles the
* response from the server.
*
* @tparam T the type of the request object handled by this handler
* @tparam U the type of the response object returned by this handler
*/
abstract class Handler[T, U: Manifest] extends Logging {
def responseHandler: ResponseHandler[U] = ResponseHandler.default[U]
def build(t: T): ElasticRequest
}
這個抽象類中有兩個函數,其中一個就是build(t: T):ElasticRequest,也是個抽象方法,必須在構建實例時實現。在execute(T)中handler是一個隱式參數,也就是說如果在調用這個函數的可視域內能發現Handler[T,U]實例,則可獲取handler,然後可調用handler.build(t)來構建請求。這個T類型是即是調用execute(T)這個T類型了,上面說過T可以是ES的任何操作類型,也就是說如果這些操作類型都繼承了Handler[T,U],那麼必須按照要求實現build(t:T)來構建該操作類型所需的服務請求ElasticRequest。下麵就是例子里兩個操作類型需要的隱式實例:
implicit object IndexHandler extends Handler[IndexRequest, IndexResponse] {
override def responseHandler: ResponseHandler[IndexResponse] = new ResponseHandler[IndexResponse] {
override def handle(response: HttpResponse): Either[ElasticError, IndexResponse] = response.statusCode match {
case 201 | 200 => Right(ResponseHandler.fromResponse[IndexResponse](response))
case 400 | 401 | 403 | 409 | 500 => Left(ElasticError.parse(response))
case _ => sys.error(response.toString)
}
}
override def build(request: IndexRequest): ElasticRequest = {
val (method, endpoint) = request.id match {
case Some(id) =>
"PUT" -> s"/${URLEncoder.encode(request.index.name, StandardCharsets.UTF_8.name())}/_doc/${URLEncoder.encode(id.toString, StandardCharsets.UTF_8.name())}"
case None =>
"POST" -> s"/${URLEncoder.encode(request.index.name, StandardCharsets.UTF_8.name())}/_doc"
}
val params = scala.collection.mutable.Map.empty[String, String]
request.createOnly.foreach(
createOnly =>
if (createOnly)
params.put("op_type", "create")
)
request.routing.foreach(params.put("routing", _))
request.parent.foreach(params.put("parent", _))
request.timeout.foreach(params.put("timeout", _))
request.pipeline.foreach(params.put("pipeline", _))
request.refresh.map(RefreshPolicyHttpValue.apply).foreach(params.put("refresh", _))
request.version.map(_.toString).foreach(params.put("version", _))
request.ifPrimaryTerm.map(_.toString).foreach(params.put("if_primary_term", _))
request.ifSeqNo.map(_.toString).foreach(params.put("if_seq_no", _))
request.versionType.map(VersionTypeHttpString.apply).foreach(params.put("version_type", _))
val body = IndexContentBuilder(request)
val entity = ByteArrayEntity(body.getBytes, Some("application/json"))
logger.debug(s"Endpoint=$endpoint")
ElasticRequest(method, endpoint, params.toMap, entity)
}
}
...
implicit object SearchHandler extends Handler[SearchRequest, SearchResponse] {
override def build(request: SearchRequest): ElasticRequest = {
val endpoint =
if (request.indexes.values.isEmpty)
"/_all/_search"
else
"/" + request.indexes.values
.map(URLEncoder.encode(_, "UTF-8"))
.mkString(",") + "/_search"
val params = scala.collection.mutable.Map.empty[String, String]
request.requestCache.map(_.toString).foreach(params.put("request_cache", _))
request.searchType
.filter(_ != SearchType.DEFAULT)
.map(SearchTypeHttpParameters.convert)
.foreach(params.put("search_type", _))
request.routing.map(_.toString).foreach(params.put("routing", _))
request.pref.foreach(params.put("preference", _))
request.keepAlive.foreach(params.put("scroll", _))
request.allowPartialSearchResults.map(_.toString).foreach(params.put("allow_partial_search_results", _))
request.batchedReduceSize.map(_.toString).foreach(params.put("batched_reduce_size", _))
request.indicesOptions.foreach { opts =>
IndicesOptionsParams(opts).foreach { case (key, value) => params.put(key, value) }
}
request.typedKeys.map(_.toString).foreach(params.put("typed_keys", _))
val body = request.source.getOrElse(SearchBodyBuilderFn(request).string())
ElasticRequest("POST", endpoint, params.toMap, HttpEntity(body, "application/json"))
}
}
以上IndexHandler, SearchHandler就是針對index,search操作的Handler[T,U]隱式實例。它們的build(t:T)函數分別按傳入的T類型參數構建了各自要求格式的服務請求。
我總是覺著:不一定所有類型的服務請求都適合用DSL來構建,比如多層邏輯條件的json,可能不容易用DSL來實現(我個人的顧慮)。那麼應該有個介面直接json文本嵌入request-entity。elastic4s在各種操作類型的服務請求類型如IndexRequest, SearchRequest,BulkRequest等提供了source:Option[String]欄位接收json文本,如下:
case class IndexRequest(index: Index,
...
source: Option[String] = None)
extends BulkCompatibleRequest {
...
def source(json: String): IndexRequest = copy(source = json.some)
...
}
case class SearchRequest(indexes: Indexes,
...
source: Option[String] = None,
...
typedKeys: Option[Boolean] = None) {
...
/**
* Sets the source of the request as a json string. Note, if you use this method
* any other body-level settings will be ignored.
*
* HTTP query-parameter settings can still be used, eg limit, routing, search type etc.
*
* Unlike rawQuery, source is parsed at the "root" level
* Query must be valid json beginning with '{' and ending with '}'.
* Field names must be double quoted.
*
* NOTE: This method only works with the HTTP client.
*
* Example:
* {{{
* search in "*" limit 5 source {
* """{ "query": { "prefix": { "bands": { "prefix": "coldplay", "boost": 5.0, "rewrite": "yes" } } } }"""
* } searchType SearchType.Scan
* }}}
*/
def source(json: String): SearchRequest = copy(source = json.some)
...
}
現在,我們可以直接用json文本了:
val json =
"""
|{
| "query" : {
| "match" : {"title" : "火鍋"}
| }
|}
|""".stripMargin
val response = client.execute {
search("books").source(json) // .matchQuery("title", "火鍋")
}.await