(1)優(yōu)點(diǎn):
·離線和小時級任務(wù)各自獨(dú)立
·代碼邏輯復(fù)用性高,減少開發(fā)成本
·可以使用離線數(shù)據(jù)覆蓋小時級數(shù)據(jù),進(jìn)行數(shù)據(jù)修復(fù)
(2)缺點(diǎn):
·小時級數(shù)據(jù)的延遲性還是很高,已無法滿足業(yè)務(wù)對數(shù)據(jù)時效性的要求
·MapReduce不適合分鐘級頻次的任務(wù)調(diào)度,主要是MapReduce任務(wù)啟動慢,另外會過高的頻次會產(chǎn)生很多小文件,影響HDFS的穩(wěn)定性,以及SQL on Hadoop系統(tǒng)的查詢速度
·批量數(shù)據(jù)處理每次運(yùn)行對資源要求高,尤其是當(dāng)凌晨Hadoop資源緊張時,任務(wù)經(jīng)常無法得到調(diào)度,延遲嚴(yán)重
2.基于Flink+Kudu
為了解決上面基于MapReduce小時級任務(wù)的問題,我們采用了流式處理系統(tǒng)Flink和支持增量更新的存儲系統(tǒng)Kudu。
如上圖所示,實(shí)時的日志數(shù)據(jù)通過Flume采集到Kafka,實(shí)時的業(yè)務(wù)數(shù)據(jù)通過canal實(shí)時同步數(shù)據(jù)庫的binlog再轉(zhuǎn)發(fā)到Kafka中,F(xiàn)link再實(shí)時消費(fèi)Kafka中的數(shù)據(jù)寫入Kudu中。
在使用Flink+Kudu的實(shí)踐中,我們遇到了如下幾個問題:
·Flink基于stream語義,做復(fù)雜指標(biāo)計(jì)算非常復(fù)雜,門檻高,開發(fā)效率不高,數(shù)據(jù)倉庫更多使用批處理SQL
·Kudu+Impala聚合查詢效率不高,查詢響應(yīng)時間不能滿足業(yè)務(wù)多維分析要求
·使用Kudu需要依賴Impala、Hive等整個Hadoop組件,維護(hù)成本太高
·Kudu社區(qū)不活躍,遇到問題很難找到相關(guān)解決方案,使用過程中遇到過宕機(jī)等各類疑難問題
3.基于DorisDB
基于上面方案的問題,我們開始對實(shí)時數(shù)倉進(jìn)行調(diào)研,包括DorisDB、ClickHouse、Kylin等系統(tǒng),考慮到查詢性能、社區(qū)發(fā)展、運(yùn)維成本等多種因素,我們最后選擇DorisDB作為我們的實(shí)時數(shù)倉,各系統(tǒng)的對比總結(jié)如下:
我們也深入考慮過ClickHouse,對于教育場景,一個學(xué)員要關(guān)聯(lián)的數(shù)據(jù)維度多,包括課堂、服務(wù)、訂單、教研等。在每個主題我們都會建設(shè)靈活且易用的星型數(shù)據(jù)模型。當(dāng)業(yè)務(wù)想進(jìn)行個性化自助分析時,僅需要關(guān)聯(lián)相關(guān)表即可。但如果直接構(gòu)建明細(xì)大寬表,隨著業(yè)務(wù)不斷調(diào)整,經(jīng)常需要重構(gòu)開發(fā)。這種情況下,ClickHouse的join能力弱,無法滿足需求,而DorisDB強(qiáng)悍的Join能力,就成了我們應(yīng)對業(yè)務(wù)變化的利器。而且DorisDB支持CBO(基于成本統(tǒng)計(jì)的優(yōu)化器),具備復(fù)雜查詢的優(yōu)化能力,從而可以快速的進(jìn)行復(fù)雜實(shí)時微批處理任務(wù),可以幫助我們更好的進(jìn)行實(shí)時指標(biāo)構(gòu)建。
最終選擇DorisDB的原因:
·使用DorisDB可以讓我們像開發(fā)離線Hive任務(wù)一樣進(jìn)行實(shí)時數(shù)倉的開發(fā),避免了復(fù)雜的Flink stream語義,同時也能在功能上對齊離線指標(biāo),保證指標(biāo)豐富性的基礎(chǔ)上完成指標(biāo)定義口徑的一致,并且可以保證分鐘級的數(shù)據(jù)可見性。
·大寬表和星型模型的查詢性能都很好,可以靈活高效的滿足各類業(yè)務(wù)分析要求。
·DorisDB簡單易用,運(yùn)維管理成本低。
三、基于DorisDB的實(shí)時數(shù)倉架構(gòu)
1.系統(tǒng)搭建
整個系統(tǒng),除了DorisDB集群之外,我們還搭建了下面兩個配套系統(tǒng)
·調(diào)度:使用Airflow,進(jìn)行DAG任務(wù)調(diào)度
·監(jiān)控:使用grafana+prometheus,采集DorisDB信息并進(jìn)行實(shí)時監(jiān)控
2.實(shí)時數(shù)倉總體架構(gòu)
基于DorisDB的實(shí)時數(shù)倉總體架構(gòu),主要包括下面三個部分:
(1)數(shù)據(jù)源:業(yè)務(wù)數(shù)據(jù)(使用Flink實(shí)時同步mysql的binlog日志,寫入到Kafka)、日志數(shù)據(jù)(包括H5小程序、APP、直播ipad客戶端等埋點(diǎn)采集的各類日志數(shù)據(jù),通過Flume寫入到Kafka中)
(2)數(shù)據(jù)存儲:
·采用DorisDB的Routine Load直接消費(fèi)Kafka中的日志和業(yè)務(wù)數(shù)據(jù)
·使用DorisDB的Broker Load將Hadoop中的DWD、DWS、ADS等數(shù)據(jù)導(dǎo)入到DorisDB中
·對于Flink等流式處理下系統(tǒng),使用DorisDB的Stream Load方式實(shí)時將數(shù)據(jù)導(dǎo)入DorisDB
(3)數(shù)據(jù)應(yīng)用:
·使用DataX可以將DorisDB數(shù)據(jù)導(dǎo)出到MySQL中
·使用DorisDB的Export可以將DorisDB中的數(shù)據(jù)導(dǎo)出到HDFS中
·DorisDB完全兼容Mysql協(xié)議,BI或業(yè)務(wù)系統(tǒng)可以使用Mysql Connector直接連接DorisDB進(jìn)行使用
3.實(shí)時數(shù)倉數(shù)據(jù)處理流程
在實(shí)時數(shù)倉內(nèi)部,也是按照傳統(tǒng)離線數(shù)倉的方式,對數(shù)據(jù)處理進(jìn)行分層處理:
·ODS層,設(shè)置DorisDB的Routine Load間隔30秒消費(fèi)一次Kafka數(shù),寫入到ODS表中
·DWD層,按業(yè)務(wù)分析的需要建模DWD表,通過Airflow間隔5分鐘,將ODS表中過去5分鐘的增量數(shù)據(jù)寫入到DWD表中
·DWS層,對DWD表中的維度進(jìn)行輕度或中度匯總,可以加快上層查詢速度
·BI層,通過自研的一個指標(biāo)定義工具,分析人員可以快速的基于DWS構(gòu)建報表,也可以衍生出一些復(fù)合指標(biāo)進(jìn)行二次加工。分析師也可以將取數(shù)口徑中的SQL做臨時修改,生成一個復(fù)雜跨主題查詢SQL,來應(yīng)對一些Adhoc需求場景。
四、DorisDB實(shí)時數(shù)倉具體應(yīng)用
在好未來,為保證課堂上課數(shù)據(jù)、訂單數(shù)據(jù)的實(shí)時分析要求,使用DorisDB支撐了課堂、訂單等分析業(yè)務(wù)。下面以課堂、訂單場景為例,從數(shù)據(jù)同步、數(shù)據(jù)加工等幾個步驟拆解DorisDB在好未來應(yīng)用場景的落地方案。
1.實(shí)時數(shù)據(jù)同步
在好未來,采用flink采集業(yè)務(wù)庫的binlog數(shù)據(jù),然后寫入到kafka中,DorisDB只需要消費(fèi)kafka對應(yīng)的topic數(shù)據(jù)即可,整體流程如下圖:
2.實(shí)時數(shù)倉數(shù)據(jù)處理
DorisDB內(nèi)部的實(shí)時數(shù)據(jù)加工處理主要有如下操作:
·縮短計(jì)算鏈路的長度,實(shí)時部分最多計(jì)算2層。dwd或dws層
·增量計(jì)算,采用DorisDB的UNIQUE KEY模型,相當(dāng)于(insert+update),因此只計(jì)算增量部分即可
·采用時間分區(qū),多副本策略。既為了數(shù)據(jù)安全,又能避免鎖表
·離線表結(jié)構(gòu)與實(shí)時表結(jié)構(gòu),保持一樣,這樣就可以用離線修復(fù)T+1數(shù)據(jù)
3.DAG任務(wù)調(diào)度
為了使DorisDB能在airflow上執(zhí)行,我們封裝了airflow調(diào)用DorisDB執(zhí)行sql的算子,以便DorisDB的加工邏輯在airflow上被定時調(diào)度。
DorisDB任務(wù)執(zhí)行狀態(tài)的檢查,由于不像T+1,只需要判斷昨天任務(wù)是否執(zhí)行就行了,實(shí)時檢查需要滿足以下條件:
·檢查輪詢間隔,需要根據(jù)不同的調(diào)度間隔,適當(dāng)調(diào)整
·檢查輪詢總時長,不能超過(調(diào)度間隔時長-10秒)
·檢查的范圍,最小需要大于調(diào)度間隔,最大小于2倍的調(diào)度間隔
根據(jù)以上的實(shí)時調(diào)度檢查條件,我們封裝了基于DorisDB的實(shí)時調(diào)度的任務(wù)檢查airflow算子,方便使用。
4.實(shí)時數(shù)據(jù)生產(chǎn)預(yù)警
為了監(jiān)控DorisDB的實(shí)時數(shù)據(jù)生產(chǎn)情況,我們設(shè)置了三種預(yù)警:
(1)檢查DorisDB消費(fèi)Kafka的任務(wù),是否停掉了,如果停掉自動重啟,重啟3次依然失敗,再發(fā)通知,人為干預(yù)
(2)檢查常規(guī)任務(wù)的執(zhí)行,如果執(zhí)行報錯,就發(fā)通知。
(3)檢查數(shù)據(jù)源與DorisDB實(shí)時數(shù)倉ods層表,schema的對比,如果出現(xiàn)schema變更,就發(fā)通知人為干預(yù)。這樣我們就能在白天實(shí)時了解schema的變更情況,不必要等到調(diào)度報錯才發(fā)現(xiàn),而且不影響線上數(shù)據(jù)產(chǎn)出。
五、DorisDB使用效果
1.提升業(yè)務(wù)收益
DorisDB在眾多場景給業(yè)務(wù)帶來了直接收益,尤其是DorisDB的實(shí)時數(shù)據(jù)與算法模型相結(jié)合的場景。比如教育的獲客、轉(zhuǎn)化、用戶續(xù)報等業(yè)務(wù),之前模型需要特征數(shù)據(jù)都是前一天的,所以模型也相對滯后。而我們通過大量數(shù)據(jù)分析得出結(jié)論:是當(dāng)日行為和跟進(jìn)數(shù)據(jù),是最有價值的特征數(shù)據(jù),這樣模型效果較好。特別是意向用戶識別模型,成為線索當(dāng)天的歷史積累數(shù)據(jù)的特征和前一天的歷史積累數(shù)據(jù)的特征,分別訓(xùn)練模型后,線上實(shí)際預(yù)測效果相差2-3個百分點(diǎn),AUC 0.752和AUC 0.721的差別,所以,當(dāng)天的特征模型效果特別明顯。
2.降低使用成本
·用簡單的SQL語義替代Stream語義完成實(shí)時數(shù)倉的開發(fā),大大降低了開發(fā)的復(fù)雜度和時間成本,同時能夠保證和離線指標(biāo)的一致性。
·結(jié)合使用寬表模型和星型模型,寬表和物化視圖可以保證報表性能和并發(fā)能力,星型模型可以保證系統(tǒng)的查詢靈活性,在一套系統(tǒng)中滿足不同場景的分析需求。另外,明細(xì)表查詢我們通過DorisDB外表的方式暴露查詢,提升了查詢的速度,大大降低了業(yè)務(wù)方的成本。DorisDB的分布式Join能力非常強(qiáng),原來一些需要查詢多個Index在從內(nèi)存中計(jì)算的邏輯可以直接下推到DorisDB中,降低了原有方案的復(fù)雜度,提升了查詢服務(wù)的穩(wěn)定性,加快了響應(yīng)時間。
·BI報表遷移成本低,我們前期BI可視化是基于Mysql構(gòu)建的,某些看板不斷優(yōu)化和豐富需求后,加上多維度靈活條件篩選,每次加載超級慢,業(yè)務(wù)無法接受,當(dāng)同樣數(shù)據(jù)同步到DorisDB上后,我們僅需要修改數(shù)據(jù)源鏈接信息,SQL邏輯不用修改(這個超級爽,遷移成本超級低),查詢性能直接提升10倍以上。
·運(yùn)維成本低,相對其他大數(shù)據(jù)組件來說,DorisDB只需要部署一種即可滿足各類數(shù)據(jù)分析需求,不需要其他軟件輔助,而且部署運(yùn)維簡單。
未來展望
DorisDB作為新一代MPP數(shù)據(jù)庫的引領(lǐng)者,當(dāng)前在多種場景下性能都非常優(yōu)秀,幫助我們非常好的重構(gòu)了實(shí)時數(shù)倉。目前DorisDB高效的支持了實(shí)時指標(biāo)的計(jì)算,以及業(yè)務(wù)方在實(shí)時場景下的數(shù)據(jù)靈活探查和多維分析需求。DorisDB在集團(tuán)內(nèi)部各個業(yè)務(wù)線的應(yīng)用越來越多,我們也將推動實(shí)時和離線數(shù)據(jù)分析進(jìn)行統(tǒng)一,為業(yè)務(wù)分析提供更好的支撐。后繼我們將分享更多DorisDB的成功實(shí)踐。最后,感謝鼎石科技的大力支持!