Flink 在 58 同城的應用與實踐

Flink 中文社區 2021-09-19 01:31:27 阅读数:655

flink 同城
▼ 關注「 Flink 中文社區 」,獲取更多技術幹貨 
Flink 中文社區
Flink 中文社區
Apache Flink 官微,Flink PMC 維護
239篇原創內容
公眾號
摘要:本文整理自 58 同城實時計算平臺負責人馮海濤在 Flink Forward Asia 2020 分享的議題《Flink 58 同城應用與實踐》,內容包括:

  1. 實時計算平臺架
  2. 實時 SQL 建設
  3. Storm 遷移 Flink  實踐
  4. 一站式實時計算平臺
  5. 後續規劃

Tips: 「閱讀原文」即可回顧作者原版分享視頻~ 

圖片 GitHub 地址 圖片
歡迎大家給 Flink 點贊送 star~

圖片


一、實時計算平臺架構


實時計算平臺的定比特是為 58 集團海量數據提供高效、穩定的實時計算一站式服務。一站式服務主要分為三個方向:


  • 第一個方向是實時數據存儲,主要負責為線上業務接入提供高速度的實時存儲能力;


  • 第二是實時數據計算,主要為海量數據的處理提供分布式計算框架;


  • 第三是實時數據分發,主要負責將計算後的數據分發到後續的實時存儲,供上層應用。


圖片


平臺建設主要分為兩個部分:

  • 第一部分是基礎能力建設,目前主要包括 Kafka 集群、storm 集群、 Flink 集群、SparkStreaming 集群。


  • 另一部分是平臺化建設,主要是包括兩點:


    • 第一個是數據分發,我們的數據分發是基於 Kafka Connect 打造的一個平臺,目標是實現异構數據源的集成與分發。在實際使用數據場景過程中,經常需要將不同的數據源匯聚到一起進行計算分析。


      傳統方式可能需要針對不同的存儲采用不同的數據同步方案。我們的數據分發是通過提供一套完整的架構,實現不同數據源的集成和分發。


    • 第二個是我們基於 Flink 打造的一站式實時計算平臺,後文會有詳細的介紹。


    圖片


    上圖是我們的實時計算平臺的架構。

    • 實時數據接入這部分,我們采用的是 Kafka,binlog 提供 canal 和 debezium 兩種方式進行接入。


    • 業務日志這部分,我們主要采用 flume 進行線上業務的 log 的采集。


    • 實時計算引擎這部分,根據開源社區發展以及用戶的需求,從最早的 Storm 到後來引入 SparkStreaming,以及現在主流的 Flink。


    • 實時存儲這部分,為了滿足多元化的實時需求,我們支持 Kafka、Druid、Hbase、ES、ClickHouse。


    • 同時在計算架構之上,我們建設了一些管理平臺,比如集群管理,它主要負責集群的擴容,穩定性的管理。


    • 另一個是 Nightfury,主要負責集群治理,包括數據接入、權限治理、資源管理等等。


    圖片


    我們在業務發展過程中,引入了 Flink 計算框架。首先從業務來說,58 是一個一站式生活服務平臺,包含很多業務線。隨著業務的發展,數據量越來越大,場景越來越豐富,需要一個更加强大的計算框架來滿足用戶的需求。

    • 第一個場景是實時 ETL,主要是針對原始日志進行信息轉化,結構化處理,運用於後續計算,需要高吞吐低延遲的計算能力。


    • 第二塊是實時數倉,它作為離線數倉的一個補充,主要是提昇一些實時指標的時效性。第三種場景是實時監控,它需要比較靈活的時間窗口支持。


    • 最後一種場景是實時數據流分析,比如說,數據亂序的處理、中間狀態的管理、Exactly once 語義保障。


    我們前期基於 Storm 和 SparkStreaming 構建的計算集群在很大程度上並不能滿足這些場景需求。於是對 Flink 進行了調研,發現 Flink 不論是在計算性能,還是流數據特性支持上,都體現出了非常大的優勢。因此,我們决定采用 Flink 作為主流的計算框架。

    圖片


    上圖是我們 Flink 集群的建設情况。Flink 作為實時計算框架,經常需要 7×24 小時的可用性。我們在建設底層集群的時候,需要考慮高可用的架構。

    • 首先在部署模式上,主要是采用 Flink On YARN,實現集群的高可用。


    • 在底層的 HDFS 上,采用 HDFS federation 機制,既可以避免離線集群的抖動對實時這邊造成影響,同時也减少了維護的 HDFS 數量。


    • 在集群隔離上,主要是采用 Node Labe 機制,就可以實現把重要業務運行在一些指定節點上。同時在這個基礎之上,引入了 Cgroup,對 CPU 進行隔離,避免任務間的 CPU 搶占。


    • 在管理層面,不同的業務提交到不同的隊列進行管理,避免業務間的資源搶占。


    • 在計算場景上,根據不同的計算場景,比如說計算型、IO 型,會提交到不同的節點,從而提昇整個集群的資源利用率。


    Flink 計算框架在 58 經曆了大概兩年多的發展。目前我們的集群有 900 多臺機器,2000 多個實時任務,每天處理大概 2.5 萬億的實時數據,數據量峰值達到了 3000 萬每秒。
     

    二、實時 SQL 建設


    1. 實時 SQL 演進


    SQL 編程具有低門檻、自動優化、版本統一等特點。同時 Flink SQL 作為實時數倉的主要工具,是我們在建設 Flink 平臺時考慮的一個主要方向。

    我們最早上線的 Flink 是基於 1.6 版本的,當時這個版本只支持 DML,我們在當時的版本基礎上進行了一些擴展,主要是在 DDL 語法上的擴展支持。在用戶使用層面,為了簡化 DDL 的定義,也通過一個配置化的方式來實現自動生成 DDL。在開發的時候,提供可視化開發的功能和在線調試的功能。

    隨著社區的開源,我們將 Flink SQL 切換到了社區版本,之後也昇級相關的版本,以及合並比較多的社區版本特性,比如說 Blink 相關、批流合一、對 Hive 的支持。
    最後針對 Flink SQL 這塊的實時數倉,也做了一些數倉化的工作,主要包括元數據管理、血緣關系、數倉分層、權限管理等等。

    圖片


    2. 存儲擴展


    關於存儲擴展這一塊,最開始我們是基於 Flink 自己實現的一套 DDL。隨著社區開源,切換到社區的 Flink SQL 版本,然後在上面做了一些擴展,主要有幾個方面:

    • 第一,打通了主流存儲和內部的實時存儲。比如說,在源錶上支持了內部的 wmb,它是一個分布式消息隊列。在維錶上支持這種 redis,內部的 wtable。在結果錶上支持了 ClickHouse,redis,以及我們內部的 wtable;


    • 第二,定制 format 支持。因為在實際業務中,很多數據格式並不是標准的,沒法通過 DDL 來定義一個錶。我們提供了一種通用的方式,可以采用一個字段來代錶一條日志,讓用戶可以通過 udf 去自定義,並解析一條日志。


    • 最後,在 source 和 sink DDL 定義基礎上,增加了並發度的設置。這樣用戶就可以更靈活地控制任務的並發。


    圖片


    3. 性能優化


    關於性能優化,主要是兩方面:

    • 第一個是對 Blink 特性的引進,Blink 提供了大量的特性,比如通過 mini batch 的處理方式,提高任務的吞吐。通過 local global 兩階段聚合,緩解數據熱點問題。還有通過 emit,增强窗口的功能。把這些功能集成到我們的計算平臺,用戶通過一些按鈕可以直接打開。


    • 另一個是對异步 lO 的應用。在實時數倉化建設過程中,維錶之間的關聯是比較大的應用場景,經常因為維錶的性能導致整個任務的吞吐不高。因此我們增加了一個异步 IO 的機制,主要有兩種實現:


      • 一種針對目標存儲支持异步 client,直接基於异步 client 來實現。比如 MySQL 和 redis。


      • 另一種不支持异步 client 的,我們就借助現成的機制來模擬,同時在這個基礎之上增加了一套緩存的機制,避免所有的數據直接查詢到目標存儲,减少目標存儲的壓力。同時在緩存基礎上,也增加 LRU 機制,更加靈活的控制整個緩存。


        同樣,數據寫入這一塊遇到大並發量寫入的時候,盡量提高並發來解决寫入性的問題,這樣就會導致整個任務的 CPU 利用率比較低,所以就采用單並發度多線程的寫入機制,它的實現是在 sink 算子裏面增加一個 buffer,數據流入到 sink 之後會首先寫入到 buffer,然後會啟動多線程機制去消費這個 buffer,最終寫到存儲裏面。


    圖片


    4. 數倉化建設


    實時數倉作為 Flink 的一個比較典型的應用場景,相較於離線數倉它可能存在一些平臺化不完善的方面:

    • 首先,元數據管理功能不完善;


    • 然後,Flink SQL 這一塊,對於每個任務我們都可能需要重新定義一個數據錶。並且由於數據沒有分層的概念,導致任務比較獨立,烟囪式開發,數據和資源使用率比較低下;


    • 另外,也缺乏數據血緣信息。


    為了提昇實時數倉建設的效率,我們提供了面向數倉化實時 SQL 能力,在數倉設計,任務開發,平臺化管理方面全面對齊離線數倉的建設模式。

    圖片


    ■ 4.1 數倉化


    數倉化主要是參考離線數倉的模型,對我們實時數倉這一塊進行模型建設。

    比如說,最原始的數據會進入ODS 層,經過一些清洗落入到行為明細層,之後會拆分到具體的主題明細層,然後再將一些相關的維錶信息進行計算,再到匯總層,最終提供給最上層的應用,包括一些實時報錶,Ad-hoc 查詢等。

    圖片


    ■ 4.2 數倉平臺


    實時數倉目前主要還是基於這種 Lambda 架構來進行平臺化的建設。

    • 首先,在元數據管理這一塊,Flink 默認采用內存對元數據進行管理,我們就采用了 HiveCatalog 機制對庫錶進行持久化;


    • 同時我們在數據庫的權限管理上,借助 Hive ACL 來進行權限管理;


    • 有了元數據持久化之後,就可以提供全局的元數據檢索;


    • 同時任務模式就可以由傳統的 DDL+DML 簡化為 DML;


    • 最後,我們也做了血緣關系,主要是在 Flink SQL 提交過程中,自動發現 SQL 任務血緣依賴關系。


    圖片


    三、Storm 遷移 Flink 實踐


    1. Flink 與 Storm 對比


    Flink 相對於 Storm 來說,有比較多的優勢:

    • 在數據保障上,Flink 支持 Exactly once 語義,在吞吐量、資源管理、狀態管理,用戶越來越多的基於 Flink 進行開發;


    • 而 Storm 對用戶來說,編程模型簡單,開發成本高,流式計算特性缺乏,吞吐低無法滿足性能。在平臺側,獨立集群多、運維困難、任務缺少平臺化管理、用戶體驗差。


    因此我們决定遷移到 Flink。

    圖片


    2. Flink-Storm 工具


    在 Storm 遷移到 Flink 的時候,如果讓用戶重新基於 Flink 進行邏輯開發,可能需要比較大的工作量。因此我們對 Flink 進行了調研,發現有個 Flink-Storm 工具。它實現了將 Storm Topology 轉到 Flink Topology。比如說,把 spout 轉換到 Flink 的 source function,把 bolt 轉換到 Transform 和 sink function。

    在使用的過程中我們也發現一些問題,Flink-Storm 工具無法支持 Yarn 模式, 缺少 Storm 引擎功能,最後還有一個比較大的問題,我們的 storm 在發展過程中維護了很多版本,但是 Flink-Storm 工具只支持基於一個版本進行開發。於是,我們做了一些改進。

    圖片


    3. 對 Flink-Storm 的改進


    ■ 3.1 消息保障


    Storm 有三個特點:

    • 第一,ack 機制;


    • 第二,依賴 zookeeper;


    • 第三,at least once 語義保障。


    我們做了四點改進:

    • 第一,Flink-Storm 去掉 ack 支持;


    • 第二,KafkaSpout 實現 CheckpointListener;


    • 第三,KafkaSpout 實現 CheckpointedFunction;


    • 第四,Flink-Storm 打開 checkpoint。


    圖片


    ■ 3.2 對 Storm 定時器的支持


    在早期版本裏面其實是沒有窗口機制的,我們借助 Storm 定時機制來實現窗口計算。它的機制是這樣的,Storm 引擎會定時向 bolt 裏面發送一個系統信號,用戶就可以通過這個系統信號進行一個切分,模擬窗口操作。

    同樣,Flink 也沒有這樣一個定時器的機制,於是我們就考慮從 Flink-Storm 層面來實現,改造了 BoltWrapper 類,它作為 bolt 類的一個封裝,實現機制跟 bolt 是一樣的,包括 5 點:

    • 初始化 open 方式啟動异步線程;


    • 模擬構造 tick 的 StreamRecord;


    • 調用 processeElement 函數發送 tuple;


    • 頻率由外部參數全局控制;


    • close 中關閉線程。


    圖片


    ■ 3.3 Storm on Yarn


    Storm on yarn 並不是直接提交到 YARN 集群,它只是提交到 local 或者 stand alone 的模式。Flink on yarn 主要是提供了 ClusterClient 這樣一個代理,實現方式 有三個步驟:

    1. 初始化 YarnClusterConfiguration Flink 配置 執行 jar 包 / 資源配置 加載 classpath;


    2. 啟動 yarn client;


    3. 複用 Flink on yarn 機制 deploy 轉換後的 jobGraph。


    圖片


    4. 任務遷移


    在完善上述的一些改進之後,遷移就比較容易了。首先我們會把改造後的版本打包,上傳到公司的私服上。然後用戶在他的工程裏面只需要引入 jar 包。在代碼這一塊,只需要將原來基於 storm 的提交方式改造成基於 Flink 的提交方式,邏輯是完全不用動的。在任務部署模式這一塊,也提供了 Flink 提交的模式,這樣一個脚本可以實現 Flink Perjob 模式。

    圖片


    總結一下,除了一些比較極端的複雜情况,基本上做到了無縫遷移所有的任務。遷移到 Flink 之後,大部分任務的延遲都降低到毫秒級別,整個吞吐提昇 3~5 倍。同時,整體資源節省了大概 40%,約等於 80 臺機器。完成了 5 個 storm 集群完全下線,實現了任務平臺化管理。

    圖片


    四、一站式實時計算平臺


    1. Wstream 平臺


    我們為了提昇管理效率而打造了 Wstream 平臺,它構建在底層引擎和上層應用之間,對用戶可以屏蔽底層的集群信息,比如跨機房多集群的一些信息。

    • 在任務接入方式上,支持 Flink Jar,Flink SQL,Flink-Storm,PyFlink 這 4 種方式,來滿足多元化的用戶需求;


    • 在產品功能上,主要支持了任務管理、任務的創建、啟動删除等;


    • 另外,為了更好的讓用戶管理自己的任務和對任務進行問題定比特,我們也提供了一個監控告警和任務診斷的系統;


    • 針對數倉,提供了一些數倉平臺化的功能,包括權限管理、血緣關系等等;


    • 針對 Flink SQL 也提供了調試探查的功能。


    用戶可以在 Wstream 平臺之上很好的去構建他們的應用。

    圖片


    2. 狀態管理


    狀態作為 Flink 一個比較重要的特性,在實際場景中有大量的應用。用戶在使用平臺的時候,沒法跟底層的 Flink 工具進行交互,於是我們就將底層的一些能力進行了集成。

    • 在任務保存方面,支持 Checkpoint,Savepoint,Cancel With Savepoint。


    • 在容錯方面,支持 allowNonRestoredState,跳過無法恢複的狀態。


    • 在分析方面,支持 Queryable State 實時查詢,基於離線的 State Processor 的分析方式,我們會幫用戶把這個狀態下載進行分析。


    對於整個任務狀態管理來說,我們通過 jobgraph 設置定向到指定 Hdfs 目錄,進行統一目錄管理。在狀態小文件這塊,控制並發度,jobgraph 優化,checkpoint 間隔時間,保留版本數量。

    圖片


    3. SQL 調試


    針對 Flink SQL,我們也提供了一些調試功能。這裏主要包括兩塊:

    • 第一,語法層面的功能包括:


      • 智能提示;

      • 語法校驗;

      • 轉換 graph 邏輯校驗。


    • 第二,邏輯層面的功能包括:


      • 模擬輸入,DataGen 自定義數據源;

      • 結果輸出,Print 重定向到標准輸出。


    這樣我們可以更方便的對整個業務邏輯進行調試。

    圖片


    4. 任務監控


    關於任務監控,對於 Flink 實時計算任務來說,我們主要關心的是任務的穩定性、性能方面、以及業務邏輯是否符合預期。對於如何監控這些指標,主要包括 4 個層面:

    • 第一個是 Flink 自帶的 Flink-metrics,提供大量的信息,比如流量信息、狀態信息、反壓、檢查點、CPU、網絡等等;


    • 第二個是 yarn 層面,提供運行時長、任務狀態;


    • 第三,從 kafka 層面提供消息堆積;


    • 最後,通過用戶自定義的一些 metrics,我們可以了解業務邏輯是否符合預期。


    圖片


    5. 監控體系


    為了采集這些指標,我們也基於 Prometheus 搭建了一套監控體系。對於所有的 Flink 任務,會實時將 metrics 推到 pushgateway,然後會將收集到的指標推到 Prometheus,這一塊我們主要是采用的 federation 的機制。所有子節點負責指標采集,之後匯聚到一個中心節點,由中心節點統一對外提供服務。最終可以實現整個指標的計算和告警。

    圖片


    6. 監控告警


    有了上面這些指標之後,我們在告警這一塊就可以比較方便。針對實時計算比較關注的任務穩定性方面,我們可以從 Topic 消息消費堆積、任務計算 qps 波動、Flink task Restart、Flink Checkpoint failed、任務失敗、延遲等信息來觀察整個任務的運行情况。

    圖片


    7. 指標可視化


    在指標可視化這一塊,主要是兩個層面:

    • 第一個層面是 Job 層面,這一塊主要是把一些比較核心的指標匯聚到我們的實時計算平臺。比如說,qps 信息、輸入輸出的信息、延遲的信息等等;


    • 對於更底層的 task 級別的 metrics,通過 Grafana 可以了解具體的一些task信息,比如流量信息、反壓信息等。


    圖片


    五、後續規劃


    我們的後續規劃,主要包括 4 個方面:


    • 第一個是社區比較流行的批流合一。因為我們當前這個實時架構大部分還是基於 Lambda 架構,這種架構會帶來很大的維護工作量,所以我們也希望借助批流合一的能力來簡化架構;


    • 第二個是資源調優,因為作為流式計算來說,缺少一些動態資源管理的機制,因此我們也希望有手段來進行這樣一些調優;


    • 第三個是智能監控,我們當前的監控和告警是事後的,希望有某種方式在任務出現問題之前進行預警;


    • 最後是擁抱社區的新能力,包括對新場景的探索。


    圖片




    熱點推薦






    更多 Flink 相關技術問題,可掃碼加入社區釘釘交流群~
    圖片

     圖片  戳我,回顧作者分享視頻!

    版权声明:本文为[Flink 中文社區]所创,转载请带上原文链接,感谢。 https://gsmany.com/2021/09/20210919013126587L.html