筆者將整份報告按四大模塊梳理為:
第一部分,流計算平臺的發(fā)展歷程——從2014年到現(xiàn)在,4年多的發(fā)展歷程中,蘇寧經(jīng)歷storm->spark streaming->flink的轉變,目前還在轉變中。形成storm(4000~虛機節(jié)點),flink&spark streaming(200+物理節(jié)點,on yarn模式)的規(guī)模,同時介紹了各引擎發(fā)展過程中的問題以及解決路徑;
第二部分,storm及spark streaming的缺點,從兼顧吞吐量和延時、高效的狀態(tài)管理、Exactly-Once的保證及Event-Time等要點闡述了蘇寧選擇flink的理由;
第三部分,蘇寧基于flink框架所做的具體工作。(1)平臺層功能豐富:sql語法豐富(distinct,流表join),算子自動擴縮容,connector(mysql, hbase,kafka1.0)以及sink降速(2)工具層:統(tǒng)一日志收集及展示、平臺層和業(yè)務層的統(tǒng)一監(jiān)控管理平臺(3)服務層:Dlink 一站式開發(fā)平臺;
第四部分是在數(shù)據(jù)集成、機器學習和CEP等方面,談談蘇寧對未來的展望。
目前,陳豐主要負責蘇寧易購集團大數(shù)據(jù)流計算平臺建設,包括Storm、SparkStreaming、Flink等組件,經(jīng)歷了流計算從組件化到平臺服務化到智能化的發(fā)展過程。對于大數(shù)據(jù)開源框架有較為豐富的經(jīng)驗,在分布式計算架構設計和系統(tǒng)優(yōu)化方面有自己的思考和領悟:
既然說到前世今生,首先介紹一下流計算平臺在蘇寧的整個發(fā)展歷程,怎么從Storm到目前很火的Flink,以及它的現(xiàn)狀,談談整體的架構以及它的整體集群規(guī)模。2018年上半年,蘇寧把主要精力都投向了Flink。
首先看一下平臺的發(fā)展歷程。
最早2014年蘇寧上線了第一個Storm的大屏展示任務,同年Storm整體的孵化平臺上線。到了2015年因為對于SQL開發(fā)的需求蘇寧還是比較多的,蘇寧自研了一套基于安踏做SQL的平臺。2016年基于吞吐量的上線,有了spark streaming,同年考慮到性能和流計算的痛點,把目光投向了Flink。到了2018年,F(xiàn)link是蘇寧流計算基礎平臺重要的目標項目,將業(yè)務推到Flink上做,比如說Flink的開發(fā)平臺、管理平臺等等一系列配套的業(yè)務上線。
再看流計算在蘇寧的配套。Storm2014年就用了,整體規(guī)模和占比比較多50%,物理機1000多,虛擬機4000+,任務數(shù)1500+。蘇寧做Flink起步較晚,但調研時間比較長,目前占比占到15%,計劃未來1-2年都會把流計算底層平臺所有的都投入到Flink上。
為什么選擇Flink?從蘇寧業(yè)務層面來看,首先Storm和2.0的spark streaming都使用的是processing time,它處理的時間遠晚于數(shù)據(jù)產生的時間,產生大量的數(shù)據(jù)再1或2小時堆積后,數(shù)據(jù)是錯誤的,沒辦法接受的。第二個就是容錯能力,Storm只能做到 Exacly once。第三個就是中間狀態(tài)的維護,Storm維護不提供state的東西,做中間狀態(tài)的維護只能依靠第三方來做,那么業(yè)務開發(fā)的時候成本相對高一些,會寫很多的代碼,效果也不是很好,因為它用第三方組件的時候,有可能出現(xiàn)一致性問題,或重啟后計算結果不準確等等。從蘇寧的平臺來看,兩者都沒有辦法兼顧高吞吐、低延時,兩個性能互補,但不能兼顧。
調研階段,對Flink的各個優(yōu)勢做過簡單的列表,F(xiàn)link是一個設計的比較優(yōu)雅的流計算框架,它能兼顧到低延時和高吞吐,同時支持Exacly once。
談談在功能擴展、服務平臺開發(fā)以及運行時管理系統(tǒng)方面的經(jīng)驗分享。
首先說一下功能擴展。Flink sql從它出來就比較火,為什么,因為很簡單,SQL對于程序員來說非常熟悉,開發(fā)成本非常低,同時由于SQL是一個統(tǒng)一的標準,它的遷移成本非常低的,如果今天用了subeg SQL,明天出的新的組件,可以非常輕松的遷移到其它的組件上,它是通用的語法。所以蘇寧FlinkSQL上做了一系列的語法擴展。另外Connectors,可以打通不同組件的聯(lián)系。
最后結合業(yè)務痛點,聊一下在運行時的它的算子動態(tài)擴容縮容,以及Checkpoint動態(tài)調整,我們怎么實現(xiàn)怎么把它做出來的。
(1)首先看一下語法擴展。因為我們從StormSQL開始就做了純SQL的開發(fā),純SQL開發(fā)起碼要支持DDL和DML,但是Flink社區(qū)明確的講述它不會做DDL的事情,這件事由我們自己做出來。然后是DML語言,對于電商領域來說很典型的事情就是統(tǒng)計UV,對于這種聚合也做了大量支持,支持on? group by,over? window,group by window。同時Flink版本有它的局限性,它的流數(shù)據(jù)和靜態(tài)數(shù)據(jù)是沒有辦法去做互相操作的,然后最后說一個batch window,后面會具體的說說。
count distinct,這個當時基于0.003做的,當時社區(qū)沒有提供,我個人認為是由于整體代碼的抽象上的問題,它沒有去做指導,只能1.7去實現(xiàn)了,方式和現(xiàn)代社區(qū)幾乎是基本上一致的。
多介紹一下Approx count distinct,它的目的其實和count distinct語法是一樣,也是去重復的結構,但是它的目的是用較小的計算精度的誤差換取巨大的計算資源的節(jié)省,比如說內存。同時這個語法符合Calcite標準的,也就是說是通用的語法,我們可以遷移到其它的引擎上。
這邊只能粗略的講,看它怎么工作的。首先一條SQL進來進入Calcite做語法解析、變換。然后轉到Data program的時候,我們做定制化的基數(shù)和函數(shù),基數(shù)和函數(shù)不擴展講了,因為其實涉及的算法不少,我們實現(xiàn)了一系列的基數(shù)的函數(shù),讓用戶選擇相對應的精度,然后對應它的資源消耗,讓用戶自己去做選擇。然后回到我們Data program這一層,進而轉化成用戶選擇的基礎方程。到了下一層,每條數(shù)據(jù)進來的時候將進行累加,輸出時可以把數(shù)據(jù)向下一層的sink進行觸發(fā)計算,可以提供相對完美的容錯能力。
(2)另外一個SQL Batch window,這個是蘇寧特色的一個名詞,我們看一下它業(yè)務需求的case,它需要統(tǒng)計每日PV、UV,我們在線計算要求延時盡可能低,不可能等到每天結束的時候零點再看到結果,這個不能接受的。業(yè)務的需求是每秒都能實時的檢測到PV、UV的變化,這個從開始到第一秒第二秒第三秒都能看到結果,這個是業(yè)務能夠接受的case,這個是輸出的頻率,這個頻率是可以定制化的。直到這個窗口的結束,我們的結果會被reset,重新開始被計算,這個是蘇寧常用的Batch window。
怎么用SQL實現(xiàn)Batch window?這點不難,但怎么體現(xiàn)到SQL語法,又不能破壞標準的SQL語法呢?滑動窗口它是可以做到很短的輸出,但是不能固定窗口,窗口滑動到下一秒。第三條就是定制trigger。第四條就是Cascading window,它的窗口是固定的,10點到11點,數(shù)據(jù)沒有超出的時候是不會被滑動的,同時做到及早的輸出,但是它的問題是每條數(shù)據(jù)都輸出,不能控制輸出頻率的。第二點如果TPS非常高,一秒一萬,一秒十萬DB吃不下,會造成瓶頸。
所以我們怎么實現(xiàn)蘇寧的Batch window?我們用最后講的這個,加上DDL,定義輸出的間隔,然后再使用我們自己實現(xiàn)的Periodical sink,它主要的目的是把每一條進來的數(shù)據(jù)都進行緩存,并且能夠根據(jù)輸出的頻率和數(shù)量的閥值進行定量的輸出,整個鏈路進行的數(shù)據(jù)都會觸發(fā)計算,每個數(shù)據(jù)出來之后進行緩存,舊值被新值覆蓋,直到task輸出的時候,首先滿足定性定量的輸出,第二個不會對于下層造成太多的壓力,因為定點定時輸出,TBS只有2000左右。這個時間我覺得還是有進一步擴展或者是優(yōu)化的空間,比如說其實這個的話只在sink層面解決了需求,如果我們在Batch window里面不把數(shù)據(jù)進行一條條處理,而是進行批處理,我覺得計算的效果效率會真正提升,這個可能我們后面會去做這件事情。
剛才非常簡單的舉了幾個例子,說了一下SQL的擴展。說一下Connectors。這個我不會多說,因為內容不多,我會舉例子來說一說。
HBase Sink實現(xiàn)兩種模式,主要是考慮它的容錯性,現(xiàn)在不會只滿足于端到端正的容錯,我們還希望它能做到Flink和組件之間的容錯。于是我們針對不同的業(yè)務場景做了冪的插入模式,一種是mini? wbatch,容錯,有可能會Failover,要求Failover后業(yè)務重發(fā)的數(shù)據(jù)與Fail前完全一致,同時我要求table是單版本的,這么一個sink。同時考慮到效率和實時性,我們也做了兩種寫入模式,一個是one by one的同步寫入,效率比較高。還有mini batch,異步寫入的,它的演時比較高,但是可以做到定時和定量。
剛才講的是冪的插入模式,現(xiàn)在講非冪等插入模式。Failover后寫HBase結果與fail之前不同使用的WAL機制。我們用Checkpoint時,將mini batch寫入外部文件系統(tǒng)。Checkpoint完成,將mini batch寫HBase。
下面來說一說業(yè)務上也經(jīng)常面對的這么一個問題,就是擴容縮容的問題。我們看一下流程的分析,首先業(yè)務開發(fā)兩種模式,一種是寫SQL,還有一種是寫Flink SQL,還有一種是用Datastream API而進行開發(fā)。上線之后發(fā)現(xiàn)并行不夠,需要擴容,擴容的話對于SQL來說,我能做到的是什么,我可以用原生的Flink提供的去進行工作,把鏈路上的節(jié)點都進行擴容或者縮容,同時對于重新打包發(fā)布,然后再去重新上線的那些,這兩種開發(fā)模式都有問題,SQL的開發(fā)面對雙十一零點的大促,我們需要改代碼,并且還需要有高的延遲,業(yè)務才能上線,這個我們不能接受??傮w對于SQL開發(fā)它的擴容是任務級別的,而對于Datastream成本太高了。
我們做了Operator,我們一開始考慮是從wrong time考慮這個事情的。如果說我們要從這一層做的話,首先對元碼改動比較多,第二個任務相對比較復雜,我們需要重新生成不同的job,同時還要有我們自己運行時的管理服務系統(tǒng),我們會把某個需要去RESCALE的 job拿過來進行修改,再把提交新的JOB graph,做真正擴縮容的事情。這邊著重說一下這個DO RESCALE會再領任務,資源不會釋放的,資源部釋放意味著響應的時間非??欤覀円沧鲞^實驗,基本上到達秒級別甚至百毫秒級別做到擴容縮容,這個就是我們的解決方案。
剛才介紹了基礎的組建的擴展或者優(yōu)化,現(xiàn)在來聊聊平臺服務化。首先看一下流計算平臺的架構,從左往右看,這邊是數(shù)據(jù)元,底層進來之后有Storm,然后是Flink streaming和spark streaming,上面有我們運行時管理系統(tǒng),主要的作用是對業(yè)務進行監(jiān)控、運維、報警一系列的事情。再往上一層是自己開發(fā)的一個開發(fā)者平臺或者工具層,對于Storm來說有Storm SQL LIBRO,還有Stream SQL 還有,可視化流程開發(fā),Datastream。再網(wǎng)上就是支持我們的業(yè)務,體育、易購、風控、物流、BI等業(yè)務層。
我是做平臺的,主要介紹一下平臺層,也是工具層,下面運維的這么一個系統(tǒng)。
平臺服務首先是Stream SQL開發(fā)平臺,還有就是這個可視化流程開發(fā)平臺。
我們的Stream SQL是元數(shù)據(jù)處理,通過拖拉拽動態(tài)的生成我們的語句,可以支持整個的流程開發(fā),從編寫到測試到業(yè)務上線,都可以這個平臺去做,業(yè)務完全不用寫代碼,直接寫SQL,在上面做就行了。
第二個可視化流程開發(fā),把功能拽上來,建立它們之間的關系就可以了,同樣可以做到流程生命周期的事情,都能涵蓋。
最后任務提交,我們對于這個Flink底層的元碼也做了修改,也是覺得它很多的關于Checkpoint很多的行為要通過代碼體現(xiàn)的,我們覺得這個非常不靈活,所以我們對于底層做了相應的修改,只需要在提交的時候對于進行配置,就能做到動態(tài)的去設置和修改它的相對應的行為,只要一鍵提交就可以了。
下面看一下運行時管理。運行時管理主要解決了一下這些事情。解決了Flink運行時以及歷史日志的問題,我們做平臺的時候,F(xiàn)link的運行時日志可以通過原生的UI看的,但是在使用過程中去做歷史日志就相當有問題了,它往往要通過YARN日志查看,所以業(yè)務用的時候非常頭痛。針對這一點我們提供了統(tǒng)一的日志解決方案,同時還有一些子代的Metric的查詢,我們也弄出來做了統(tǒng)計和展示。同時我們也把一些比較重要的事件也從我們的APP里截出來,比如說交互啟停的動作做了展示和通集。其次就是剛才描述的運行時的運行調整,比如說調整Operator并行度,還有在線調整。最后還有告警。
日志查看,通過任務名查,也可以通過關鍵字搜索。Metrics監(jiān)控也是類似的,可以卡時間范圍,也可以不同維度查詢,并且做了一系列的聚合,為用戶提供相對有效的信息,提供給用戶比較有用的信息。
對于事件的接觸我們也做了相對的統(tǒng)計,左邊可以看到備壓等等一系列事件的統(tǒng)計,我們可以統(tǒng)計Checkpoint成功率,以及Checkpoint它的打下分布等等一些事情。動態(tài)修改Checkpoint并行度。
最后簡單的展望一下未來,2019工作計劃。首先我們可能會考慮一下做機器學習,據(jù)官方所稱,對于迭代計算, Flink應該是比spark還要快的,看有沒有辦法實現(xiàn)流處理的機器學習的算法模型。第二點就是去做通用的數(shù)據(jù)集成,因為Flink首先實時計算,同時它也提供了很多sink或souser,把組件連接起來。第三個就是智能動態(tài)擴容,現(xiàn)在的擴容都是手動的,如果有可能的話可以用STM做一些算法。最后一個是CEP的事情。