場景 k12線上教育公司的業務場景中,有一些業務場景需要實時統計和分析,如分析線上上課老師數量、學生數量,實時銷售額,課堂崩潰率等,需要實時反應上課的質量問題,以便於對整個公司的業務情況有大致的瞭解。 方案對比 對比了很多解決方案,如下幾種,列出來供參考。 設計方案 實時處理採用Flink SQL, ...
場景
k12線上教育公司的業務場景中,有一些業務場景需要實時統計和分析,如分析線上上課老師數量、學生數量,實時銷售額,課堂崩潰率等,需要實時反應上課的質量問題,以便於對整個公司的業務情況有大致的瞭解。
方案對比
對比了很多解決方案,如下幾種,列出來供參考。
方案 | 實時入庫 | SQL支持度 |
---|---|---|
Spark+CarbonData | 支持 | Spark SQL語法豐富 |
Kylin | 不支持 | 支持join |
Flink+Druid | 支持 | 0.15以前不支持SQL,不支持join |
- 上一篇文章所示,使用Spark+CarbonData也是一種解決方案,但是他的缺點也是比較明顯,如不能和Flink進行結合,因為我們整個的大數據規劃的大致方向是,Spark用來作為離線計算,Flink作為實時計算,並且這兩個大方向短時間內不會改變;
- Kylin一直是老牌OLAP引擎,但是有個缺點無法滿足我們的需求,就是在技術選型的那個時間點kylin還不支持實時入庫(後續2.0版本支持實時入庫),所以就選擇了放棄;
- 使用Flink+Druid方式實現,這個時間選擇這個方案,簡直是順應潮流呀,Flink現在如日中天,各大廠都在使用,Druid是OLAP的新貴,關於它的文章也有很多,我也不贅述太多。有興趣的可以看下這篇文章,我的博客其它文章也有最新版本的安裝教程,實操方案哦。
設計方案
實時處理採用Flink SQL,實時入庫Druid方式採用 druid-kafka-indexing-service,另一種方式入庫方式,Tranquility,這種方式測試下來問題多多,放棄了。數據流向如下圖。
場景舉例
實時計算課堂連接掉線率。此事件包含兩個埋點上報,進入教室和掉線分別上報數據。druid設計的欄位
flink的處理
將上報的數據進行解析,上報使用的是json格式,需要解析出所需要的欄位然後發送到kafka。欄位包含如下
sysTime,DateTime格式
pt,格式yyyy-MM-dd
eventId,事件類型(enterRoom|disconnect)
lessonId,課程ID
Druid處理
啟動Druid Supervisor,消費Kafka里的數據,使用預聚合,配置如下
{ "type": "kafka", "dataSchema": { "dataSource": "sac_core_analyze_v1", "parser": { "parseSpec": { "dimensionsSpec": { "spatialDimensions": [], "dimensions": [ "eventId", "pt" ] }, "format": "json", "timestampSpec": { "column": "sysTime", "format": "auto" } }, "type": "string" }, "metricsSpec": [ { "filter": { "type": "selector", "dimension": "msg_type", "value": "disconnect" }, "aggregator": { "name": "lesson_offline_molecule_id", "type": "cardinality", "fields": ["lesson_id"] }, "type": "filtered" }, { "filter": { "type": "selector", "dimension": "msg_type", "value": "enterRoom" }, "aggregator": { "name": "lesson_offline_denominator_id", "type": "cardinality", "fields": ["lesson_id"] }, "type": "filtered" } ], "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": { "type": "none" }, "rollup": true, "intervals": null }, "transformSpec": { "filter": null, "transforms": [] } }, "tuningConfig": { "type": "kafka", "maxRowsInMemory": 1000000, "maxBytesInMemory": 0, "maxRowsPerSegment": 5000000, "maxTotalRows": null, "intermediatePersistPeriod": "PT10M", "basePersistDirectory": "/tmp/1564535441619-2", "maxPendingPersists": 0, "indexSpec": { "bitmap": { "type": "concise" }, "dimensionCompression": "lz4", "metricCompression": "lz4", "longEncoding": "longs" }, "buildV9Directly": true, "reportParseExceptions": false, "handoffConditionTimeout": 0, "resetOffsetAutomatically": false, "segmentWriteOutMediumFactory": null, "workerThreads": null, "chatThreads": null, "chatRetries": 8, "httpTimeout": "PT10S", "shutdownTimeout": "PT80S", "offsetFetchPeriod": "PT30S", "intermediateHandoffPeriod": "P2147483647D", "logParseExceptions": false, "maxParseExceptions": 2147483647, "maxSavedParseExceptions": 0, "skipSequenceNumberAvailabilityCheck": false }, "ioConfig": { "topic": "sac_druid_analyze_v2", "replicas": 2, "taskCount": 1, "taskDuration": "PT600S", "consumerProperties": { "bootstrap.servers": "bd-prod-kafka01:9092,bd-prod-kafka02:9092,bd-prod-kafka03:9092" }, "pollTimeout": 100, "startDelay": "PT5S", "period": "PT30S", "useEarliestOffset": false, "completionTimeout": "PT1200S", "lateMessageRejectionPeriod": null, "earlyMessageRejectionPeriod": null, "stream": "sac_druid_analyze_v2", "useEarliestSequenceNumber": false }, "context": null, "suspended": false }View Code
最重要的配置是metricsSpec,他主要定義了預聚合的欄位和條件。
數據查詢
數據格式如下
pt | eventId | lesson_offline_molecule_id | lesson_offline_denominator_id |
---|---|---|---|
2019-08-09 | enterRoom | "AQAAAAAAAA==" | "AQAAAAAAAA==" |
2019-08-09 | disconnect | "AQAAAAAAAA==" | "AQAAAAAAAA==" |
結果可以按照這樣的SQL出
SELECT pt,CAST(APPROX_COUNT_DISTINCT(lesson_offline_molecule_id) AS DOUBLE)/CAST(APPROX_COUNT_DISTINCT(lesson_offline_denominator_id) AS DOUBLE) from sac_core_analyze_v1 group by pt
可以使用Druid的介面查詢結果,肥腸的方便~