search(9)- elastic4s logback-appender

来源:https://www.cnblogs.com/tiger-xc/archive/2020/05/05/12832651.html
-Advertisement-
Play Games

前面寫了個cassandra-appender,一個基於cassandra的logback插件。正是cassandra的分散式資料庫屬性才合適作為akka-cluster-sharding分散式應用的logger。所以,cassandra-appender核心功能就是對logback消息的存寫部分了 ...


前面寫了個cassandra-appender,一個基於cassandra的logback插件。正是cassandra的分散式資料庫屬性才合適作為akka-cluster-sharding分散式應用的logger。所以,cassandra-appender核心功能就是對logback消息的存寫部分了。同樣,基於ES的logback-appender核心部分就是對ES的存寫過程了。在ES里這個過程還附帶了索引indexing過程。將來對歷史消息的搜索、分析會更加方便。直接看看消息存寫這部分elastic4代碼:

  def writeLog(event: ILoggingEvent)(client: ElasticClient, idx: String)(appName: String, ip: String, hostName: String, default: String) = {

    var content: List[(String,Any)] = List(
      APP_NAME -> appName,
      HOST_IP -> ip,
      HOST_NAME -> hostName,
      LOGGER_NAME -> event.getLoggerName(),
      LEVEL -> event.getLevel().toString,
      THREAD_NAME -> event.getThreadName(),
      LOG_DATE -> logDate,
      LOG_TIME -> logTime
    )

    try {
      val callerData = event.getCallerData()
      if (callerData.nonEmpty) {
        content = content ++ List(
          CLASS_NAME -> callerData.head.getClassName(),
          FILE_NAME -> callerData.head.getFileName(),
          LINE_NUMBER -> callerData.head.getLineNumber().toString,
          METHOD_NAME -> callerData.head.getMethodName()
        )
      }
    } catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")}

    try {
      if (event.getThrowableProxy() != null) {
        val throwableStrs = event.getThrowableProxy().getSuppressed().asInstanceOf[List[IThrowableProxy]]
        val throwableStr = throwableStrs.foldLeft("") { case (b, t) => b + "," + t.getMessage() }
        content = content :+ (THROWABLE_STR -> throwableStr)
      }
    } catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")}

    var logmsgs = event.getMessage()
    try {
      val logMap = fromJson[Map[String,String]](logmsgs)
      logMap.foreach ( m =>  content = content :+ (m._1 -> m._2))
    } catch {
      case e: Throwable =>
        content = content :+ (MESSAGE -> logmsgs)
        try {
          val dftMap = fromJson[Map[String,String]](default)
          dftMap.foreach ( m =>  content = content :+ (m._1 -> m._2))
        } catch {
          case e: Throwable => }
    }

    val newRecord = indexInto(idx)
      .fields(
        content
      ).createOnly(true)

    client.execute(newRecord)   //.await

  }

可以看到,我們先判斷了一下event.getMessage()消息是否是json格式的:如果是正確的json格式,那麼解析成為欄位名和欄位值,否則就直接寫入log_msg欄位 + 一串預設的欄位和值。乾什麼呢?要知道這個elastic-appender是一個通用的logback-plugin,是可以在任何軟體中使用的。因為各種軟體對運行狀態跟蹤目標、方式的要求不同,為了滿足這些要求,那麼通過用戶自定義跟蹤目標欄位的方式應該是一個好的解決方案。從測試例子里可以理解:

  var loggedItems = Map[String,String]()
  loggedItems = loggedItems ++ Map(
    ("app_customer" -> "logback.com"),
    ("app_device" -> "9101"),
    ("log_msg" -> "specific message for elastic ..."))

  log.debug(toJson(loggedItems))


//logback.xml
    <appender name="elasticLogger" class="com.datatech.logback.ElasticAppender">
        <host>http://localhost</host>
        <port>9200</port>
        <appName>ESLoggerDemo</appName>
        <defaultFieldValues>{"app_customer":"中心書城","app_device":"9013"}</defaultFieldValues>
        <indexName>applog</indexName>
    </appender>

上面代碼里定義了app_customer,app_device,log_msg這幾個自定義欄位和值。這樣做的意思是:logback只定義了log.info(msg)里msg一個欄位。如果存放在資料庫里我們只能在msg一個欄位里進行分類、查詢了。但既然已經使用了資料庫作為存儲我們更希望用更多的欄位來代表一條消息,如用戶號,機器號,店號等等。這樣跟蹤起來方便很多。所以,對於內部的用戶可以要求把因應特殊需要額外增加的欄位-值加密成json,然後傳遞給ElasticAppender去處理。對於應用中引用三方軟體所產生的logback-msg,我們可沒辦法要求他們按照這個格式來傳遞消息,但仍然會存進ES,所以就用logback.xml中defaultFieldValaues定義的預設欄位-值來填寫這些額外的信息了。

這一篇我們主要討論一下這個特別的elastic-appender,它的使用方法。那麼先重覆一下logback的工作原理:

首先認識一下logback:感覺需要重點瞭解的logging運作核心應該是消息等級level的操作。消息等級是指logback根據不同的消息等級來篩選需要記錄的消息。logback支持下麵幾個消息等級,按照各自記錄動作覆蓋面由弱到強排列,包括:

TRACE -> DEBUG -> INFO -> WARN -> ERROR 分別對應記錄函數 trace(msg),debug(msg),info(msg),warn(msg),error(msg)

logback按消息等級進行記錄篩選的規則如下:

假設記錄函數為p,某個class的消息等級level為q:當p>=q時選擇記錄消息。換言之調用函數error(msg)時logback會記錄所有等級消息,反之trace(msg)只能記錄TRACE級別的消息。logback手冊中如下表示:

                TRACE       DEBUG    INFO      WARN       ERROR      OFF
trace()          YES          NO      NO         NO         NO        NO
debug()          YES         YES      NO         NO         NO        NO
info()           YES         YES     YES         NO         NO        NO
warn()           YES         YES     YES        YES         NO        NO
error()          YES         YES     YES        YES        YES        NO

logback中每個類的預設消息等級可以按照類型繼承樹結構繼承。當一個子類沒有定義消息等級時,它繼承對上父類的消息等級,即:X.Y.Z中Z的預設消息等級從Y繼承。

再看看下麵logback.xml例子:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <Pattern>
                %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
            </Pattern>
        </encoder>
    </appender>

    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <!-- path to your log file, where you want to store logs -->
        <file>~/logback.log</file>
        <append>false</append>
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="cassandraLogger" class="com.datatech.logback.CassandraAppender">
        <appName>POCServer</appName>
        <defaultFieldValues>{"app_customer":"999999","app_device":"9999"}</defaultFieldValues>
        <keyspaceName>applog</keyspaceName>
        <columnFamily>txnlog</columnFamily>
    </appender>

    <appender name="elasticLogger" class="com.datatech.logback.ElasticAppender">
        <host>http://localhost</host>
        <port>9200</port>
        <appName>ESLoggerDemo</appName>
        <defaultFieldValues>{"app_customer":"中心書城","app_device":"9013"}</defaultFieldValues>
        <indexName>applog</indexName>
    </appender>

    <logger name="com.datatech" level="info"
            additivity="false">
        <appender-ref ref="cassandraLogger" />
        <appender-ref ref="elasticLogger" />
        <appender-ref ref="STDOUT" />
    </logger>

    <logger name="com.datatech.sdp" level="info"
            additivity="false">
        <appender-ref ref="cassandraLogger" />
        <appender-ref ref="elasticLogger" />
        <appender-ref ref="STDOUT" />
    </logger>

    <root level="info">
        <appender-ref ref="cassandraLogger" />
        <appender-ref ref="elasticLogger" />
        <appender-ref ref="STDOUT" />
    </root>

    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
</configuration>

上面配置文件中定義了包括STDOUT,FILE,cassandraLoggeer,elasticLogger幾個appender。首先,不同level可以使用不同的appender。cassandraLogger,elasticLogger是我們自定義的appender。在elasticLogger段落里定義了ES終端連接參數如host,port。在ElasticAppender類源碼中的elastic終端連接和關閉如下:

override def start(): Unit = {
    if(! _hosts.isEmpty) {
      connectES()
      super.start()
    }
  }

  override def stop(): Unit = {
    if(optESClient.isDefined) {
      (optESClient.get).close()
      optESClient = None
    }
    super.stop()
  }

  def connectES(): Unit = {
    try {
      val url = _hosts + ":" + _port.toString
      val esjava =  JavaClient(ElasticProperties(url))
      val client = ElasticClient(esjava)
      optESClient = Some(client)
    } catch {
      case e: Throwable =>
        optESClient = None
    }
  }

註意,假如host在logback.xml里定義了那麼在ElasticAppender實例化時系統會自動直接連接,否則需要手工調用logger.start()來連接ES。xml文件里的屬性是通過getter來獲取的,如下:

 private var _hosts: String = ""
  def setHost(host: String): Unit = _hosts = host
  def getHost : String = _hosts

  private var _port: Int = 9200
  def setPort(port: Int): Unit = _port = port

  private var _idxname: String = "applog"
  def setIndexName(indexName: String): Unit = _idxname = indexName

  private var _username: String = ""
  def setUsername(username: String): Unit = _username = username

  private var _password: String = ""
  def setPassword(password: String): Unit = _password = password

  private var _defaultFieldValues: String = ""
  def setDefaultFieldValues(defaultFieldValues: String) = _defaultFieldValues = defaultFieldValues

下麵是ElasticAppender的使用示範:(先把logback_persist.jar放入lib目錄)

import scala.concurrent.ExecutionContext.Implicits.global
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import ch.qos.logback.classic.Logger
import ch.qos.logback.core.{ConsoleAppender, FileAppender}
import com.datatech.logback.{CassandraAppender,ElasticAppender, JsonConverter}
import ch.qos.logback.classic.spi.ILoggingEvent
import org.slf4j.LoggerFactory
import ch.qos.logback.classic.LoggerContext
import java.time._
import java.time.format._
import java.util.Locale

object ElasticAppenderDemo extends App with JsonConverter {
  val log: Logger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[Logger]
  val elasticAppender = log.getAppender("elasticLogger").asInstanceOf[ElasticAppender]
  val stdoutAppender = log.getAppender("STDOUT").asInstanceOf[ConsoleAppender[ILoggingEvent]]
  val fileAppender = log.getAppender("FILE").asInstanceOf[FileAppender[ILoggingEvent]]
  val cassAppender = log.getAppender("cassandraLogger").asInstanceOf[CassandraAppender]

  //stop other appenders
  if (stdoutAppender != null)
    stdoutAppender.stop()
  if (fileAppender != null)
    fileAppender.stop()
  if (cassAppender != null)
    cassAppender.stop()

  //check if host not set in logback.xml
  if(elasticAppender != null) {
    if (elasticAppender.getHost.isEmpty) {
      elasticAppender.setHost("http://localhost")
      elasticAppender.setPort(9200)
      elasticAppender.start()
    }
  }

  val  dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS", Locale.US)
  val now = LocalDateTime.now.format(dateTimeFormatter)

  (1 to 100).foreach { idx =>
    log.info(s"************this is a info message $idx ")
  }

  log.debug("***********debugging message here ..." + now)

  log.debug(toJson(loggedItems))


  //stop the logger

  val loggerContext = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext]
  loggerContext.stop()

}

在Appender實例化時getAppender("elasticLogger")中這個elasticLogger是xml文件中appender段落名稱。如果host,port沒在xml文件中定義的話可以手工用setter setHost,setPort在程式里設置。loggerContext.stop()一次性關閉所有appender,包括它們連接的資料庫。也可以用elasticAppender.stop()來關閉獨立的appender。

我們可以用elastic4自定義一個表結構mapping, 如下:

    val esjava = JavaClient(ElasticProperties("http://localhost:9200"))
    val client = ElasticClient(esjava)


    //刪除索引
    val rspExists = client.execute(indexExists("applog")).await
    if (rspExists.result.exists)
      client.execute(deleteIndex("applog")).await

    //構建索引
    val idxCreate = client.execute(createIndex("applog")
      .shards(1).replicas(1)).await
    //創建表結構
    if(idxCreate.isSuccess) {
      val applogMapping = client.execute(
        putMapping("applog").fields(
          textField("class_name"),
          textField("file_name"),
          ipField("host_ip"),
          textField("host_name"),
          keywordField("level"),
          keywordField("line_number"),
          keywordField("logger_name"),
          keywordField("method_name"),
          keywordField("thread_name"),
          textField("throwable_str_rep"),
          dateField("log_date").format("basic_date").ignoreMalformed(true),
          dateField("log_time").format("basic_date_time").ignoreMalformed(true),
          textField("log_msg"),
          keywordField("app_name"),
          keywordField("app_customer"),
          keywordField("app_device")
        )).await
      if(applogMapping.isSuccess)
        println(s"mapping successfully created.")
      else
        println(s"mapping creation error: ${applogMapping.error.reason}")
    } else {
      println(s"index creation error: ${idxCreate.error.reason}")
    }
    client.close()

依賴引用在build.sbt里:

name := "logback-persist-demo"

version := "0.1"

scalaVersion := "2.12.9"

val elastic4sVersion = "7.6.0"

libraryDependencies ++= Seq(
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0",
  "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0",

  "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion,

  // for the default http client
  "com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % elastic4sVersion,

  "ch.qos.logback"  %  "logback-classic"   % "1.2.3",
  "org.typelevel" %% "cats-core" % "2.0.0-M1",
  "org.json4s" %% "json4s-native" % "3.6.1",
  "org.json4s" %% "json4s-jackson" % "3.6.7",
  "org.json4s" %% "json4s-ext" % "3.6.7"
)

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 需要準備的環境: (1)python3.8 (2)pycharm (3)截取網路請求信息的工具,有很多,百度一種隨便用即可。 第一:首先通過python的sqlalchemy模塊,來新建一個表。 第二:通過python中的request模塊介面的形式調取數據。 思路:(1)先獲取所有城市信息:需要用 ...
  • WAE : Concrete syntax WAE : Abstract syntax parse : sexp WAE subst : WAE symbol number WAE interp : WAE number ...
  • 一、用於數據分析、科學計算與可視化的擴展模塊主要有:numpy、scipy、pandas、SymPy、matplotlib、Traits、TraitsUI、Chaco、TVTK、Mayavi、VPython、OpenCV。 1.numpy模塊:科學計算包,支持N維數組運算、處理大型矩陣、成熟的廣播函 ...
  • Numpy 定義:NumPy(Numerical Python) 是 Python 語言的一個擴展程式庫,支持大量的維度數組與矩陣運算,此外也針對數組運算提供大量的數學函數庫。它主要用於數組計算,包括: 一個強大的N維數組對象 ndarray 廣播功能函數 整合 C/C++/Fortran 代碼的工 ...
  • 其實單調隊列就是一種隊列內的元素有單調性的隊列,因為其單調性所以經常會被用來維護區間最值或者降低DP的維數已達到降維來減少空間及時間的目的。 每一個答案只與當前下標的前m個有關,所以可以用單調隊列維護前m的個最小值, 考慮如何實現該維護的過程?? 顯然當前下標$X$的$m$個以前的元素(即下標小於$ ...
  • 基礎演算法 [TOC] 演算法基礎 演算法的定義和特征 演算法設計的要求 演算法的時間複雜度 二分查找 冒泡排序 選擇排序 python 代碼 將亂序中的最大值找出,跟最後一個元素交換位置 def sort(alist): max_index = 0 最大值的下標 for i in range(1,len(a ...
  • 我的LeetCode:https://leetcode cn.com/u/ituring/ 我的LeetCode刷題源碼[GitHub]:https://github.com/izhoujie/Algorithmcii LeetCode 面試題30. 包含min函數的棧 與以下題目相同 前往:Lee ...
  • 我的LeetCode:https://leetcode cn.com/u/ituring/ 我的LeetCode刷題源碼[GitHub]:https://github.com/izhoujie/Algorithmcii LeetCode 155. 最小棧 題目 設計一個支持 push ,pop ,t ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...