在流式架構中,特征由在線預估服務在 serving 時 dump 對應的快照并發(fā)送到消息隊列中。標簽則來自實時行為采集服務,通過日志上報等方法采集得到。在線樣本生成服務消費兩個數據流,通過關聯得到完整的樣本,并發(fā)送到下游的流式訓練服務中進行模型訓練,完成樣本數據的消費。

批式架構是流式架構的補充,批式架構在訂閱流式數據的同時,還會加入批式的特征或者批式生成的標簽。比如風控反作弊或者廣告類的業(yè)務,會有批式生產的數據,并使用批式的樣本生成模塊生成樣本,進而被模型訓練組件消費。

流式和批式數據流架構中,還有元數據服務,元數據服務記錄了特征的相關元數據,流式批式數據流都會訪問元數據服務獲取 meta 信息。因此,我們對于批式的特征存儲有若干種特定的訪問 pattern。

讀方面有以下讀數據 pattern:大范圍的按天批式讀取,關注吞吐指標;秒級的點查;高效的謂詞下推查詢能力;存在基于主鍵/外建的 join。

在寫方面需支持以下能力:基于主鍵的 upsert;針對部分 cell 的插入與更新;針對行/列/cell 的刪除;基于外鍵的 upsert。

在這樣的背景下,我們了解 Hudi 在機器學習離線數據流中的若干應用場景。

2.離線樣本存儲與迭代

我們希望設計的樣本離線存儲方案能夠適用于多種場景,主要包含以下三類情況。

第一,模型的重新訓練,回放流式訓練的過程,迭代/糾偏模型等等。

第二,樣本的數據迭代,增加修改或者刪除對應的特征/標簽,并重新訓練模型。

第三,樣本的 OLAP 查詢,用于日常 debug 等。

為了能夠支持以上的場景的樣本存儲與迭代,我們提出的存儲方案整體架構設計如下。在邏輯建模上,構建樣本存儲和構建特定 pattern 的 Hive 表非常類似,樣本包含主鍵、分區(qū)鍵、內部元數據列等功能性 column,然后包含若干特征列和若干標簽列。在物理架構上,通過流式和批式生產/采集的特征數據和標簽數據通過多個作業(yè)混合 upsert 的方式寫入 Hudi,更新位于 KV 存儲的索引信息,并將實際的數據寫入 HDFS 中。由于 Hudi 基于主鍵/外鍵 upsert 的特性,數據會被自然地拼接在一起,形成完整的包含特征和標簽的樣本數據,供消費使用。

在對離線特征進行調研時,我們需要面臨以下挑戰(zhàn):基于 HDFS 這種不可變的文件存儲,如何實現低成本低讀寫放大的數據修改。在沒有使用數據湖之前,用戶做離線特征調研之前需要復制樣本,修改并另存一份。其中消耗了巨大的計算和存儲資源,伴隨樣本量的增大,這樣的方案將消耗數個 EB 的存儲,使得迭代變得不可能。

我們基于 Hudi 實現了 ColumnFamily 的能力。這個方案受到了經典 BigTable 存儲 Apache HBase 的啟發(fā),將 IO pattern 不同的數據使用不同的文件進行存儲,以減少不必要的讀寫放大。原理是將同一個 FileGroup 的不同列數據存儲在不同的文件中,在讀時進行合并。這種方法會將新增列的數據單獨進行文件存儲,發(fā)生修改或者新增成本很低。

我們通過為調研特征列賦予單獨的 CF 的方式來減少讀寫放大,其他列復用線上的特征所在的 CF。這樣資源的使用量只會和新增特征相關。這種方式極大得減少了迭代所需的存儲使用,并且不會引入任何 shuffle 操作。

上文介紹了離線樣本的存儲與迭代方案,接下來我們進一步為大家介紹在線樣本生成時的流批一體生成方案,討論其如何降低在線存儲的使用成本。

3. 流批一體的樣本生成

在線樣本生成服務中,我們使用 KV 或者 BigTable 類存儲來滿足樣本拼接的需求,比如 RocksDB 等。這類存儲點查性能好,延遲低,但是存儲成本也較高。如果在數據有明顯的冷熱分層的情況下,這類存儲本身并不能很好的滿足這樣的存儲需求。Hudi 是一個具有 KV 語義的離線存儲,存儲成本較低,我們將冷數據存在 Hudi 上的方式來降低在線存儲的使用成本,并通過統(tǒng)一的讀寫接口來屏蔽差異。這一架構也受到了目前市面的多種 HSAP 系統(tǒng)的啟發(fā)。

為了能夠讓 Hudi 支持更好的點查,我們復用了寫時的 HBase 索引。點查請求會先訪問 HBase 索引找到數據所在文件,然后根據文件進行點查。整體端到端的延遲可以做到秒級。適合存儲數據量大,qps 較低的場景。

4. 功能與優(yōu)化

在使用 Hudi 滿足諸多業(yè)務需求的過程中,我們也對其內核做了一些改造,以更好得服務我們的業(yè)務場景。

4.1 Local Sort

我們支持了單文件內的主鍵排序。排序是較為常見的查詢性能優(yōu)化手段。通過對主鍵的排序,享受以下收益

● CF 在讀時,多 CF 合并使用 Sort Merge 的方式,內存使用更低。

● Compaction 時支持 Sort Merge。不會觸發(fā) spill,內存使用低。我們之前使用 SSD 隊列來做 Compaction 以保證性能,現在可以使用一些廉價的資源(比如無盤的潮汐資源)來進行 Compaction。

● 在流批一體的樣本生成中,由于主鍵是排好序的,我們點查時基于主鍵的謂詞下推效果非常好。提升了點查性能。

4.2 Bulkload 并發(fā)寫

并發(fā)寫一直是 Hudi 的比較大的挑戰(zhàn)。我們的業(yè)務場景中會發(fā)生行級別/列級別的寫沖突,這種沖突無法通過樂觀鎖來避免?;跈C器學習對于數據沖突的解決需求,我們之前就支持了 MVCC 的沖突解決方式。更進一步得,為了能夠讓 Hudi 支持并發(fā)讀寫,我們參考 HBase 支持了 Bulkload 的功能來解決并發(fā)寫需求。所有寫數據都會寫成功,并由數據內部的 mvcc 來決定數據沖突。

我們首先將數據文件生成到一個臨時緩沖區(qū),每個緩沖區(qū)對應一個 commit 請求,多個寫臨時緩沖區(qū)的請求可以并發(fā)進行。當數據完整寫入臨時緩沖區(qū)之后,我們有一個常駐的任務會接收數據 load 的請求,將數據從緩沖區(qū)中通過文件移動的方式 load 進 Hudi,并生成對應的 commit 信息。多個 load 請求是線性進行的,由 Hudi Timeline 的表鎖保證,但是每個 load 請求中只涉及文件的移動,所以 load 請求執(zhí)行時間是秒級,這樣就實現了大吞吐的數據多并發(fā)寫和最終一致性。

4.3 Compaction Service

關于 Compaction,Hudi 社區(qū)提供了若干 Compaction 的開箱即用的策略。但是業(yè)務側的需求非常靈活多變,無法歸類到一種開箱即用的策略上。因此我們提供了 Compaction Service 這樣的組件用來處理用戶的 Compaction 請求,允許用戶主動觸發(fā)一次 Compaction,并可指定 Compaction 的數據范圍,資源使用等等。用戶也可以選擇按照時間周期性觸發(fā) Compaction,以達到自動化數據生效的效果。

在底層我們針對 Compaction 的業(yè)務場景做了冷熱隊列分層,根據不同的 SLA 的 Compaction 任務,會選擇對應的隊列資源來執(zhí)行。用來降低 Compaction 的整體成本。比如每天天級別的數據生效是一個高保障的 Compaction 任務,會有獨占隊列來執(zhí)行。但是進行歷史數據的單次修復觸發(fā)的 Compaction,對執(zhí)行時間不敏感,會被調度到低優(yōu)先級隊列以較低成本完成。

針對數據湖的樣本存儲與生成問題,我們搭建了適用于多種場景的存儲方案架構,實現了批流一體的樣本生成,并且通過對 Hudi 內核進行一定的改造,實現更加滿足實際業(yè)務需求的功能設計。

以上就是字節(jié)跳動在 Hudi 的實踐,目前均已通過火山引擎 湖倉一體分析服務 LAS 產品對外服務,歡迎對這方面有需求、感興趣的用戶都可以積極地來體驗一下我們的 LAS 湖倉一體分析服務 。

湖倉一體分析服務 LAS(Lakehouse Analytics Service)是面向湖倉一體架構的 Serverless 數據處理分析服務,提供字節(jié)跳動最佳實踐的一站式 EB 級海量數據存儲計算和交互分析能力,兼容 Spark、Presto 生態(tài),幫助企業(yè)輕松構建智能實時湖倉。

分享到

xiesc

相關推薦