前面寫了個cassandra-appender,一個基於cassandra的logback插件。正是cassandra的分散式資料庫屬性才合適作為akka-cluster-sharding分散式應用的logger。所以,cassandra-appender核心功能就是對logback消息的存寫部分了 ...
前面寫了個cassandra-appender,一個基於cassandra的logback插件。正是cassandra的分散式資料庫屬性才合適作為akka-cluster-sharding分散式應用的logger。所以,cassandra-appender核心功能就是對logback消息的存寫部分了。同樣,基於ES的logback-appender核心部分就是對ES的存寫過程了。在ES里這個過程還附帶了索引indexing過程。將來對歷史消息的搜索、分析會更加方便。直接看看消息存寫這部分elastic4代碼:
def writeLog(event: ILoggingEvent)(client: ElasticClient, idx: String)(appName: String, ip: String, hostName: String, default: String) = {
var content: List[(String,Any)] = List(
APP_NAME -> appName,
HOST_IP -> ip,
HOST_NAME -> hostName,
LOGGER_NAME -> event.getLoggerName(),
LEVEL -> event.getLevel().toString,
THREAD_NAME -> event.getThreadName(),
LOG_DATE -> logDate,
LOG_TIME -> logTime
)
try {
val callerData = event.getCallerData()
if (callerData.nonEmpty) {
content = content ++ List(
CLASS_NAME -> callerData.head.getClassName(),
FILE_NAME -> callerData.head.getFileName(),
LINE_NUMBER -> callerData.head.getLineNumber().toString,
METHOD_NAME -> callerData.head.getMethodName()
)
}
} catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")}
try {
if (event.getThrowableProxy() != null) {
val throwableStrs = event.getThrowableProxy().getSuppressed().asInstanceOf[List[IThrowableProxy]]
val throwableStr = throwableStrs.foldLeft("") { case (b, t) => b + "," + t.getMessage() }
content = content :+ (THROWABLE_STR -> throwableStr)
}
} catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")}
var logmsgs = event.getMessage()
try {
val logMap = fromJson[Map[String,String]](logmsgs)
logMap.foreach ( m => content = content :+ (m._1 -> m._2))
} catch {
case e: Throwable =>
content = content :+ (MESSAGE -> logmsgs)
try {
val dftMap = fromJson[Map[String,String]](default)
dftMap.foreach ( m => content = content :+ (m._1 -> m._2))
} catch {
case e: Throwable => }
}
val newRecord = indexInto(idx)
.fields(
content
).createOnly(true)
client.execute(newRecord) //.await
}
可以看到,我們先判斷了一下event.getMessage()消息是否是json格式的:如果是正確的json格式,那麼解析成為欄位名和欄位值,否則就直接寫入log_msg欄位 + 一串預設的欄位和值。乾什麼呢?要知道這個elastic-appender是一個通用的logback-plugin,是可以在任何軟體中使用的。因為各種軟體對運行狀態跟蹤目標、方式的要求不同,為了滿足這些要求,那麼通過用戶自定義跟蹤目標欄位的方式應該是一個好的解決方案。從測試例子里可以理解:
var loggedItems = Map[String,String]()
loggedItems = loggedItems ++ Map(
("app_customer" -> "logback.com"),
("app_device" -> "9101"),
("log_msg" -> "specific message for elastic ..."))
log.debug(toJson(loggedItems))
//logback.xml
<appender name="elasticLogger" class="com.datatech.logback.ElasticAppender">
<host>http://localhost</host>
<port>9200</port>
<appName>ESLoggerDemo</appName>
<defaultFieldValues>{"app_customer":"中心書城","app_device":"9013"}</defaultFieldValues>
<indexName>applog</indexName>
</appender>
上面代碼里定義了app_customer,app_device,log_msg這幾個自定義欄位和值。這樣做的意思是:logback只定義了log.info(msg)里msg一個欄位。如果存放在資料庫里我們只能在msg一個欄位里進行分類、查詢了。但既然已經使用了資料庫作為存儲我們更希望用更多的欄位來代表一條消息,如用戶號,機器號,店號等等。這樣跟蹤起來方便很多。所以,對於內部的用戶可以要求把因應特殊需要額外增加的欄位-值加密成json,然後傳遞給ElasticAppender去處理。對於應用中引用三方軟體所產生的logback-msg,我們可沒辦法要求他們按照這個格式來傳遞消息,但仍然會存進ES,所以就用logback.xml中defaultFieldValaues定義的預設欄位-值來填寫這些額外的信息了。
這一篇我們主要討論一下這個特別的elastic-appender,它的使用方法。那麼先重覆一下logback的工作原理:
首先認識一下logback:感覺需要重點瞭解的logging運作核心應該是消息等級level的操作。消息等級是指logback根據不同的消息等級來篩選需要記錄的消息。logback支持下麵幾個消息等級,按照各自記錄動作覆蓋面由弱到強排列,包括:
TRACE -> DEBUG -> INFO -> WARN -> ERROR 分別對應記錄函數 trace(msg),debug(msg),info(msg),warn(msg),error(msg)
logback按消息等級進行記錄篩選的規則如下:
假設記錄函數為p,某個class的消息等級level為q:當p>=q時選擇記錄消息。換言之調用函數error(msg)時logback會記錄所有等級消息,反之trace(msg)只能記錄TRACE級別的消息。logback手冊中如下表示:
TRACE DEBUG INFO WARN ERROR OFF
trace() YES NO NO NO NO NO
debug() YES YES NO NO NO NO
info() YES YES YES NO NO NO
warn() YES YES YES YES NO NO
error() YES YES YES YES YES NO
logback中每個類的預設消息等級可以按照類型繼承樹結構繼承。當一個子類沒有定義消息等級時,它繼承對上父類的消息等級,即:X.Y.Z中Z的預設消息等級從Y繼承。
再看看下麵logback.xml例子:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<Pattern>
%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
</Pattern>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<!-- path to your log file, where you want to store logs -->
<file>~/logback.log</file>
<append>false</append>
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="cassandraLogger" class="com.datatech.logback.CassandraAppender">
<appName>POCServer</appName>
<defaultFieldValues>{"app_customer":"999999","app_device":"9999"}</defaultFieldValues>
<keyspaceName>applog</keyspaceName>
<columnFamily>txnlog</columnFamily>
</appender>
<appender name="elasticLogger" class="com.datatech.logback.ElasticAppender">
<host>http://localhost</host>
<port>9200</port>
<appName>ESLoggerDemo</appName>
<defaultFieldValues>{"app_customer":"中心書城","app_device":"9013"}</defaultFieldValues>
<indexName>applog</indexName>
</appender>
<logger name="com.datatech" level="info"
additivity="false">
<appender-ref ref="cassandraLogger" />
<appender-ref ref="elasticLogger" />
<appender-ref ref="STDOUT" />
</logger>
<logger name="com.datatech.sdp" level="info"
additivity="false">
<appender-ref ref="cassandraLogger" />
<appender-ref ref="elasticLogger" />
<appender-ref ref="STDOUT" />
</logger>
<root level="info">
<appender-ref ref="cassandraLogger" />
<appender-ref ref="elasticLogger" />
<appender-ref ref="STDOUT" />
</root>
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
</configuration>
上面配置文件中定義了包括STDOUT,FILE,cassandraLoggeer,elasticLogger幾個appender。首先,不同level可以使用不同的appender。cassandraLogger,elasticLogger是我們自定義的appender。在elasticLogger段落里定義了ES終端連接參數如host,port。在ElasticAppender類源碼中的elastic終端連接和關閉如下:
override def start(): Unit = {
if(! _hosts.isEmpty) {
connectES()
super.start()
}
}
override def stop(): Unit = {
if(optESClient.isDefined) {
(optESClient.get).close()
optESClient = None
}
super.stop()
}
def connectES(): Unit = {
try {
val url = _hosts + ":" + _port.toString
val esjava = JavaClient(ElasticProperties(url))
val client = ElasticClient(esjava)
optESClient = Some(client)
} catch {
case e: Throwable =>
optESClient = None
}
}
註意,假如host在logback.xml里定義了那麼在ElasticAppender實例化時系統會自動直接連接,否則需要手工調用logger.start()來連接ES。xml文件里的屬性是通過getter來獲取的,如下:
private var _hosts: String = "" def setHost(host: String): Unit = _hosts = host def getHost : String = _hosts private var _port: Int = 9200 def setPort(port: Int): Unit = _port = port private var _idxname: String = "applog" def setIndexName(indexName: String): Unit = _idxname = indexName private var _username: String = "" def setUsername(username: String): Unit = _username = username private var _password: String = "" def setPassword(password: String): Unit = _password = password private var _defaultFieldValues: String = "" def setDefaultFieldValues(defaultFieldValues: String) = _defaultFieldValues = defaultFieldValues
下麵是ElasticAppender的使用示範:(先把logback_persist.jar放入lib目錄)
import scala.concurrent.ExecutionContext.Implicits.global
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties}
import ch.qos.logback.classic.Logger
import ch.qos.logback.core.{ConsoleAppender, FileAppender}
import com.datatech.logback.{CassandraAppender,ElasticAppender, JsonConverter}
import ch.qos.logback.classic.spi.ILoggingEvent
import org.slf4j.LoggerFactory
import ch.qos.logback.classic.LoggerContext
import java.time._
import java.time.format._
import java.util.Locale
object ElasticAppenderDemo extends App with JsonConverter {
val log: Logger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[Logger]
val elasticAppender = log.getAppender("elasticLogger").asInstanceOf[ElasticAppender]
val stdoutAppender = log.getAppender("STDOUT").asInstanceOf[ConsoleAppender[ILoggingEvent]]
val fileAppender = log.getAppender("FILE").asInstanceOf[FileAppender[ILoggingEvent]]
val cassAppender = log.getAppender("cassandraLogger").asInstanceOf[CassandraAppender]
//stop other appenders
if (stdoutAppender != null)
stdoutAppender.stop()
if (fileAppender != null)
fileAppender.stop()
if (cassAppender != null)
cassAppender.stop()
//check if host not set in logback.xml
if(elasticAppender != null) {
if (elasticAppender.getHost.isEmpty) {
elasticAppender.setHost("http://localhost")
elasticAppender.setPort(9200)
elasticAppender.start()
}
}
val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS", Locale.US)
val now = LocalDateTime.now.format(dateTimeFormatter)
(1 to 100).foreach { idx =>
log.info(s"************this is a info message $idx ")
}
log.debug("***********debugging message here ..." + now)
log.debug(toJson(loggedItems))
//stop the logger
val loggerContext = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext]
loggerContext.stop()
}
在Appender實例化時getAppender("elasticLogger")中這個elasticLogger是xml文件中appender段落名稱。如果host,port沒在xml文件中定義的話可以手工用setter setHost,setPort在程式里設置。loggerContext.stop()一次性關閉所有appender,包括它們連接的資料庫。也可以用elasticAppender.stop()來關閉獨立的appender。
我們可以用elastic4自定義一個表結構mapping, 如下:
val esjava = JavaClient(ElasticProperties("http://localhost:9200"))
val client = ElasticClient(esjava)
//刪除索引
val rspExists = client.execute(indexExists("applog")).await
if (rspExists.result.exists)
client.execute(deleteIndex("applog")).await
//構建索引
val idxCreate = client.execute(createIndex("applog")
.shards(1).replicas(1)).await
//創建表結構
if(idxCreate.isSuccess) {
val applogMapping = client.execute(
putMapping("applog").fields(
textField("class_name"),
textField("file_name"),
ipField("host_ip"),
textField("host_name"),
keywordField("level"),
keywordField("line_number"),
keywordField("logger_name"),
keywordField("method_name"),
keywordField("thread_name"),
textField("throwable_str_rep"),
dateField("log_date").format("basic_date").ignoreMalformed(true),
dateField("log_time").format("basic_date_time").ignoreMalformed(true),
textField("log_msg"),
keywordField("app_name"),
keywordField("app_customer"),
keywordField("app_device")
)).await
if(applogMapping.isSuccess)
println(s"mapping successfully created.")
else
println(s"mapping creation error: ${applogMapping.error.reason}")
} else {
println(s"index creation error: ${idxCreate.error.reason}")
}
client.close()
依賴引用在build.sbt里:
name := "logback-persist-demo" version := "0.1" scalaVersion := "2.12.9" val elastic4sVersion = "7.6.0" libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0", "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0", "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion, // for the default http client "com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % elastic4sVersion, "ch.qos.logback" % "logback-classic" % "1.2.3", "org.typelevel" %% "cats-core" % "2.0.0-M1", "org.json4s" %% "json4s-native" % "3.6.1", "org.json4s" %% "json4s-jackson" % "3.6.7", "org.json4s" %% "json4s-ext" % "3.6.7" )