按照計劃,這篇開始嘗試用elastic4s來做一系列索引管理和搜索操作示範。前面提過,elastic4s的主要功能之一是通過組合Dsl語句形成json請求。那麼我們先試試組合一些Dsl語句,再想辦法產生出json請求文本,然後在kibana控制臺中驗證它的正確性。 首先看看elastic4s提供的一 ...
按照計劃,這篇開始嘗試用elastic4s來做一系列索引管理和搜索操作示範。前面提過,elastic4s的主要功能之一是通過組合Dsl語句形成json請求。那麼我們先試試組合一些Dsl語句,再想辦法產生出json請求文本,然後在kibana控制臺中驗證它的正確性。
首先看看elastic4s提供的一個show函數:
def show(implicit handler: Handler[T, _]): String = Show[ElasticRequest].show(handler.build(t))
又見到了這個Handler[T, _],作為show的一個隱式參數。前面說過這個Handler[T, _]是個代表構建T類型json請求的typeclass。具體構建函數就是上面的這個build(t)函數。我們先看看CreateIndexRequest類型的show函數示範:
val jsCreate = createIndex("company")
.shards(1).replicas(1).show
println(jsCreate)
產生了json如下:
PUT:/company?
StringEntity({"settings":{"index":{"number_of_shards":1,"number_of_replicas":1}}},Some(application/json))
在kibana里是如下表達的:
PUT /company
{
"settings":{
"index":{
"number_of_shards":1,
"number_of_replicas":1
}
}
}
可能是歷史原因吧,elastic4s與ES7.6還有很多不相容的地方,或者說是elastic4s還有許多沒來得及更新的地方。具體有問題的語句或參數都可以通過把json body放在kibana里進行驗證,如果elastic4s還有地方沒有完成覆蓋ES7.6功能的話,我們可以把一個正確的ES7.6 json腳本直接通過source傳人到操作類型中去:
val js =
"""
|{
| "settings":{
| "index":{
| "number_of_shards":1,
| "number_of_replicas":1
| }
| }
|}
|""".stripMargin
val createFromJs = (createIndex("company").source(js)).show
println(createFromJs)
下麵的例子是一套完整的索引創建過程:先刪除同名稱索引、創建索引、構建mapping:
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.akka._
import akka.actor._
import com.sksamuel.elastic4s.requests.mappings.{KeywordField, MappingDefinition, NestedField, SearchAsYouTypeField}
import scala.concurrent.ExecutionContext.Implicits.global
object Lesson03 extends App {
import com.sksamuel.elastic4s.ElasticDsl._
//akka客戶端
private implicit lazy val system: ActorSystem = ActorSystem()
val akkaClient = AkkaHttpClient(AkkaHttpClientSettings(List("localhost:9200")))
val client = ElasticClient(akkaClient)
//刪除索引
val idxDelete = client.execute(deleteIndex("company")).await
//構建索引
val idxCreate = client.execute(createIndex("company")
.shards(1).replicas(1)).await
//創建表結構
if(idxCreate.isSuccess) {
val compMapping = client.execute(
putMapping("company").fields(
KeywordField("code"),
SearchAsYouTypeField("name")
.fielddata(true)
.fields(KeywordField("keyword")),
textField("biztype"),
NestedField("addr").fields(
textField("district"),
textField("address"),
KeywordField("zipcode")
),
dateField("regdate")
.ignoreMalformed(true)
.format("strict_date_optional_time||epoch_millis"),
textField("contact")
)).await
if(compMapping.isSuccess)
println(s"mapping successfully created.")
else
println(s"mapping creation error: ${compMapping.error.reason}")
} else {
println(s"index creation error: ${idxCreate.error.reason}")
}
system.terminate()
client.close()
}
以上代碼有幾個地方值得提一下:
1、這上面使用了一個基於akka-stream的客戶端。優點是響應式標準相容,用隊列queue來緩衝密集請求
2、在刪除索引前為甚麼不先檢查一下同名索引是否存在?elastic4s ExistApi還是ES7以前版本,不能用
3、client.execute(...)返回Future, 為什麼不用for-yield?試過了,一是deleteIndex,createIndex返回結果與實際刪除、構建操作可能有些延遲,createIndex會返回索引已經存在錯誤, mapping會出現索引不存在錯誤。