摘要:本文主要介紹了FusionInsight Flink組件的基本原理、Flink任務提交的常見問題、以及最佳實踐FAQ。

本文分享自華為雲社區《FusionInsight HD Flink組件基本原理和常見問題解析》,作者:FI小粉絲 。

Flink是一個批處理和流處理結合的統一計算框架,其核心是一個提供數據分發以及並行化計算的流數據處理引擎。

它的最大亮點是流處理,是業界最頂級的開源流處理引擎。

Flink最適合的應用場景是低時延的數據處理(Data Processing)場景:高並發pipeline處理數據,時延毫秒級,且兼具可靠性。

本文主要介紹了FusionInsight Flink組件的基本原理、Flink任務提交的常見問題、以及最佳實踐FAQ。

基本概念

基本原理

簡介

Flink是一個批處理和流處理結合的統一計算框架,其核心是一個提供了數據分發以及並行化計算的流數據處理引擎。它的最大亮點是流處理,是業界最頂級的開源流處理引擎。

Flink最適合的應用場景是低時延的數據處理(Data Processing)場景:高並發pipeline處理數據,時延毫秒級,且兼具可靠性。

Flink技術棧如圖所示:

Flink在當前版本中重點構建如下特性,其他特性繼承開源社區,不做增强,具體請參考:https://ci.apache.org/projects/flink/flink-docs-release-1.4/

  • DataStream
  • Checkpoint
  • Stream SQL
  • 窗口
  • Job Pipeline
  • 配置錶

架構

Flink架構如圖所示。

Flink整個系統包含三個部分:

  • Client

Flink Client主要給用戶提供向Flink系統提交用戶任務(流式作業)的能力。

  • TaskManager

Flink系統的業務執行節點,執行具體的用戶任務。TaskManager可以有多個,各個TaskManager都平等。

  • JobManager

Flink系統的管理節點,管理所有的TaskManager,並决策用戶任務在哪些Taskmanager執行。JobManager在HA模式下可以有多個,但只有一個主JobManager。

Flink系統提供的關鍵能力:

  • 低時延

提供ms級時延的處理能力。

  • Exactly Once

提供异步快照機制,保證所有數據真正只處理一次。

  • HA

JobManager支持主備模式,保證無單點故障。

  • 水平擴展能力

TaskManager支持手動水平擴展。

原理

  • Stream & Transformation & Operator

用戶實現的Flink程序是由Stream和Transformation這兩個基本構建塊組成。

    1. Stream是一個中間結果數據,而Transformation是一個操作,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。
    2. 當一個Flink程序被執行的時候,它會被映射為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似於一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。

下圖為一個由Flink程序映射為Streaming Dataflow的示意圖。

上圖中“FlinkKafkaConsumer”是一個Source Operator,Map、KeyBy、TimeWindow、Apply是Transformation Operator,RollingSink是一個Sink Operator。

  • Pipeline Dataflow

在Flink中,程序是並行和分布式的方式運行。一個Stream可以被分成多個Stream分區(Stream Partitions),一個Operator可以被分成多個Operator Subtask。

Flink內部有一個優化的功能,根據上下遊算子的緊密程度來進行優化。

    1. 緊密度低的算子則不能進行優化,而是將每一個Operator Subtask放在不同的線程中獨立執行。一個Operator的並行度,等於Operator Subtask的個數,一個Stream的並行度(分區總數)等於生成它的Operator的並行度。如下圖所示。

Operator

    1. 緊密度高的算子可以進行優化,優化後可以將多個Operator Subtask串起來組成一個Operator Chain,實際上就是一個執行鏈,每個執行鏈會在TaskManager上一個獨立的線程中執行,如下圖所示。

Operator chain

    • 上圖中上半部分錶示的是將Source和map兩個緊密度高的算子優化後串成一個Operator Chain,實際上一個Operator Chain就是一個大的Operator的概念。圖中的Operator Chain錶示一個Operator,keyBy錶示一個Operator,Sink錶示一個Operator,它們通過Stream連接,而每個Operator在運行時對應一個Task,也就是說圖中的上半部分有3個Operator對應的是3個Task。
    • 上圖中下半部分是上半部分的一個並行版本,對每一個Task都並行化為多個Subtask,這裏只是演示了2個並行度,sink算子是1個並行度。

日志介紹

日志描述

日志存儲路徑:

  • Executor運行日志:“${BIGDATA_DATA_HOME}/hadoop/data${i}/nm/containerlogs/application_${appid}/container_{$contid}”

運行中的任務日志存儲在以上路徑中,運行結束後會基於Yarn的配置確定是否匯聚到HDFS目錄中。

  • 其他日志:“/var/log/Bigdata/flink/flinkResource”

日志歸檔規則:

  • Executor日志默認50MB滾動存儲一次,最多保留100個文件,不壓縮。
  • 日志大小和壓縮文件保留個數可以在FusionInsight Manager界面中配置。

Flink日志列錶

日志級別

Flink中提供了如下錶所示的日志級別。日志級別優先級從高到低分別是ERROR、WARN、INFO、DEBUG。程序會打印高於或等於所設置級別的日志,設置的日志等級越高,打印出來的日志就越少。

日志級別

如果您需要修改日志級別,請執行如下操作:

  1. 登錄FusionInsight Manager系統。
  2. 選擇“服務管理 > Flink > 服務配置”。
  3. “參數類別”下拉框中選擇“全部”。
  4. 左邊菜單欄中選擇所需修改的角色所對應的日志菜單。
  5. 選擇所需修改的日志級別。
  6. 單擊“保存配置”,在彈出窗口中單擊“確定”使配置生效。

配置完成後立即生效,不需要重啟服務。

日志格式

常見故障

1. Flink對接kafka-寫入數據傾斜,部分分區沒有寫入數據

問題現象與背景

使用FlinkKafkaProducer進行數據生產,數據只寫到了kafka的部分分區中,其它的分區沒有數據寫入

原因分析

  • 可能原因1:Flink寫kafka使用的機制與原生接口的寫入方式是有差別的,在默認情况下,Flink使用了”並行度編號+分區數量”取模計算的結果作為topic的分區編號。那麼會有以下兩種場景:

    1. 並行度%分區數量=0,錶示並行度是kafkatopic分區數的一倍或者多倍,數據的寫入每個分區數據量是均衡的。
    2. 並行度%分區數量≠0,那麼數據量勢必會在個別分區上的數據量產生傾斜。
  • 可能原因2:在業務代碼的部分算子中使用了keyby()方法,由於現網中的數據流中,每個key值所屬的數據量不一致(就是說某些key的數據量會非常大,有些又非常小)導致每個並行度中輸出的數據流量不一致。從而出現數據傾斜。

解决辦法

原因一:

方法1,調整kafka的分區數跟flink的並行度保持一致,即要求kafka的分區數與flink寫kafka的sink並行度保持强一致性。這種做法的優勢在於每個並行度僅需要跟每個kafka分區所在的 broker保持一個常鏈接即可。能够節省每個並發線程與分區之間調度的時間。

方法2,flink寫kafka的sink的分區策略寫成隨機寫入模式,如下圖,這樣數據會隨即寫入topic的分區中,但是會有一部分時間損耗在線程向尋址,推薦使用方法1。

原因二:

需要調整業務側對key值的選取,例如:可以將key調整為“key+隨機數”的方式,保證Flink的keyby()算子中每個處理並行度中的數據是均衡的。

2. Flink任務的日志目錄增長過快,導致磁盤寫滿

問題現象

集群告警磁盤使用率超過閾值,經過排查發現是taskmanager.out文件過大導致

原因分析

代碼中存在大量的print模塊,導致taskmanager.out文件被寫入大量的日志信息,taskmanager.out 一般是,業務代碼加入了 .print的代碼,需要在代碼中排查是否有類似於以下的代碼邏輯:

或者類似於這樣的打印:

如果包含,日志信息會持續打印到taskmanager.out裏面。

解决方案

將上圖紅框中的代碼去掉,或者輸出到日志文件中。

3. 任務啟動失敗,報資源不足:Could not allocate all requires slots within timeout of xxxx ms

問題現象

任務啟動一段時間後報錯,例如如下日志,需要60個資源實際上只有54個。

原因分析

Flink任務在啟動過程中的資源使用是先增長在下降到當前值的,實際在啟動過程中需要的資源量等於每個算子並行度之和。等到任務開始運行後,Flink會對資源進行合並。

例如如下算子,在啟動過程中需要“1+6+6+5+3=21”個資源。

但是運行穩定後會降低到6。這個是Flink的機制。假如任務在啟動過程中不滿足21個資源的啟動資源量,任務就會出現NoResourceAvailableException的异常。

解决方案

减少任務的啟動並發,或者將其它任務kill掉再啟動Flink任務。

4. 算子的部分節點產生背壓,其它節點正常

問題現象

業務運行一段時間以後,算子的部分節點出現背壓。

原因分析

通過Flink原生頁面排查這個並發的算子所在的節點,通過上圖我們能够看出是异常算子的第44個並發。通過前臺頁面能够查看並確認第44並發所在的節點,例如下圖:

通過查找這個節點在taskmanager列錶,例如下圖比特置:

整理taskmanager在每個nodemanager節點的數量發現,背壓節點啟動的taskmanager數量過多。

經過排查,該yarn集群資源相對比較緊張,每個節點啟動的taskmanager數量不一致,如果部分節點啟動的較多容易出現數據傾斜。

解决方案

建議一個節點啟動多個slot。避免多個taskmanager出現在一個nodemanager節點上。啟動方式見:slot優化

FAQ

Flink如何加載其它目錄的jar包

需求描述

Flink業務一般在運行過程中默認加載的jar包路徑為:“xxx/Flink/flink/lib”的目錄下,如果添加其它路徑的jar包會報錯,如何添加其它外部依賴。

實現方案

創建一個外部的lib目錄,將部分依賴包放到外部lib目錄下,如下圖:

修改啟動脚本的參數配置脚本,config.sh將jar包路徑傳給環境變量中。

此時正常啟動任務即可,不需要加其它參數。

HDFS上也能看到第三方jar的目錄。

如何收集任務taskmanager的jstack和pstree信息

需求描述

在任務運行過程中我們通常需要對taskmanager的進程進行查詢和處理,例如:打jstack,jmap等操作,做這些操作的過程中需要獲取任務的taskmanager信息。

實現方案

獲取一個nodemanager節點上面所有taskmanager的進程信息的方法如下:

ps -ef | grep taskmanager | grep -v grep | grep -v "bash -c"

其中紅框中的內容就是taskmanager的進程號,如果一個節點上面存在多個taskmanager那麼這個地方會有多個進程號。獲取到進程號後我們可以針對這個進程號收集jstack或者pstree信息。

收集jstack

1.通過上面流程獲取到進程信息,然後從中獲取進程ID和application id,如上圖中進程id為“30047 applicationid為application_1623239745860_0001”。

2.從FI前臺界面獲取這個進程的啟動用戶。如下圖為flinkuser。

3.在對應的nodemanager節點後臺切換到這個用戶,人機用戶機機用戶即可。

4. 進入到節點所在的jdk目錄下

5. 給taskmanager進程打jstack。

不同用戶提交的taskmanager只能由提交任務的用戶打jstack。

收集pstree信息

使用pstree –p PID 的方式能够獲取taskmanager的pstree信息,這個地方提供一個收集脚本。內容如下:

#!/bin/bash
searchPID() {
local pids=`ps -ef | grep taskmanager | grep -v grep | grep -v "bash -c" | grep -v taskmanagerSearch.sh | awk '{print $2}'`;
time=$(date "+%Y-%m-%d %H:%M:%S")
echo "checktime is --------------------- $time" >> /var/log/Bigdata/taskManagerTree.log
for i in $pids
do
local treeNum=$(pstree -p $i | wc -l)
echo "$i 's pstree num is $treeNum" >> /var/log/Bigdata/taskManagerTree.log
done
}
searchPID

該脚本的功能為獲取節點上所有taskmanager pstree的數量,打印結果如下:

slot優化

需求描述

Slot可以認為是taskmanager上面一塊獨立分配的資源,是taskmanager並行執行的能力的體現。Taskmanager中有兩種使用slot的方法:

  • 一個taskmanager中設置了一個slot。
  • 一個taskmanager中設置了多個slot。

每個task slot 錶示TaskManager 擁有資源的一個固定大小的子集。假如一個taskManager 有三個slot,那麼它會將其管理的內存分成三份給各個slot。資源slot化意味著一個subtask 將不需要跟來自其他job 的subtask 競爭被管理的內存,取而代之的是它將擁有一定數量的內存儲備。需要注意的是,這裏不會涉及到CPU 的隔離,slot 目前僅用來隔離task 的受管理的內存。通過調整task slot 的數量,允許用戶定義subtask 之間隔離的方式。如果一個TaskManager 一個slot,那將意味著每個task group運行在獨立的JVM 中(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager 多個slot 意味著更多的subtask 可以共享同一個JVM。而在同一個JVM 進程中的task 將共享TCP 連接(基於多路複用)和心跳消息。它們也可能共享數據集和數據結構。因此,對於資源密集型任務(尤其是對cpu使用較為密集的)不建議使用單個taskmanager中創建多個slot使用,否則容易導致taskmanager心跳超時,出現任務失敗。如果需要設置單taskmanager多slot,參考如下操作。

單taskmanager多slot的設置方法

方式一:在配置文件中配置taskmanager.numberOfTaskSlots,通過修改提交任務的客戶端配置文件中的配置flink-conf.yaml配置,如下圖:將該值設置為需要調整的數值即可。

方式二:啟動命令的過程中使用-ys命令傳入,例如以下命令:

./flink run -m yarn-cluster -p 1 -ys 3 ../examples/streaming/WindowJoin.jar

在啟動後在一個taskmanager中會啟動3個slot。

單taskmanager多slot需要優化哪些參數

設置單taskmanager多slot需要優化以下參數

點擊關注,第一時間了解華為雲新鮮技術~

帶你認識FusionInsight Flink:既能批處理,又能流處理的更多相關文章

  1. Superior Scheduler:帶你了解FusionInsight MRS的超級調度器

    摘要:Superior Scheduler是一個專門為Hadoop YARN分布式資源管理系統設計的調度引擎,是針對企業客戶融合資源池,多租戶的業務訴求而設計的高性能企業級調度器. 本文分享自華為雲社 ...

  2. 一文帶你了解 Flink Forward 柏林站全部重點內容

    前言 2019.10.7~9號,隨著70周年國慶活動的順利閉幕,Flink Forward 也照例在他們的發源地柏林舉辦了第五届大會.雖然還沒有拿到具體的數據,不過從培訓門票已經在會前銷售一空的這樣的 ...

  3. Flink 剖析

    1.概述 在如今數據爆炸的時代,企業的數據量與日俱增,大數據產品層出不窮.今天給大家分享一款產品—— Apache Flink,目前,已是 Apache 頂級項目之一.那麼,接下來,筆者為大家介紹Fl ...

  4. Apache Flink

    Flink 剖析 1.概述 在如今數據爆炸的時代,企業的數據量與日俱增,大數據產品層出不窮.今天給大家分享一款產品—— Apache Flink,目前,已是 Apache 頂級項目之一.那麼,接下來, ...

  5. Flink 核心技術淺析(整理版)

    1. Flink簡介 Apache Flink是一個面向分布式數據流處理和批量數據處理的開源計算平臺,它能够基於同一個Flink流執行引擎(streaming dataflow engine),提供支 ...

  6. 【轉帖】Flink 核心技術淺析(整理版)

    Flink 核心技術淺析(整理版) https://www.cnblogs.com/swordfall/p/10612404.html 分類: Flink undefined 1. Flink簡介 A ...

  7. Apache Flink 為什麼能够成為新一代大數據計算引擎?

    眾所周知,Apache Flink(以下簡稱 Flink)最早誕生於歐洲,2014 年由其創始團隊捐贈給 Apache 基金會.如同其他誕生之初的項目,它新鮮,它開源,它適應了快速轉的世界中更重視的速 ...

  8. [原創] Win7全自動精簡批處理_絕對原創,絕對給力_感謝無憂給了我一年的潜水

    2011htpcfans發錶於 2012-5-5 http://bbs.wuyou.net/forum.php?mod=viewthread&tid=210043 @echo 全自動/手動精簡 ...

  9. Apache Flink Quickstart

    Apache Flink 是新一代的基於 Kappa 架構的流處理框架,近期底層部署結構基於 FLIP-6 做了大規模的調整,我們來看一下在新的版本(1.6-SNAPSHOT)下怎樣從源碼快速編譯執行 ...

  10. 大數據框架對比:Hadoop、Storm、Samza、Spark和Flink

    轉自:https://www.cnblogs.com/reed/p/7730329.html 今天看到一篇講得比較清晰的框架對比,這幾個框架的選擇對於初學分布式運算的人來說確實有點迷茫,相信看完這篇文 ...

隨機推薦

  1. sql數據庫獲取錶名稱和錶列名

    select * from sysobjects where xtype='u' SELECT COLUMN_NAME,DATA_TYPE FROM INFORMATION_SCHEMA.column ...

  2. .net字符串Gzip壓縮和base64string轉換:

    class Program { static void Main(string[] args) { //要壓縮的字符串 string data = "13800138000,驗證碼:1234 ...

  3. Windows7下Blend for Visual Studio 2012使用問題

    目前開發的系統裏很多控件樣式和動畫比較複雜,應該是之前同事用Blend做的,這種神器不用太浪費了,自己也准備試試. 系統環境Windows7+Visual Studio 2012 1.Windows7 ...

  4. Sublime Text使用教程【轉】

    本文轉載自:http://lucida.me/blog/sublime-text-complete-guide/ 摘要(Abstract) 本文系統全面的介紹了 Sublime Text,旨在成為最優 ...

  5. maven為不同環境打包(hibernate)-超越昨天的自己系列(6)

    超越昨天的自己系列(6) 使用ibatis開發中,耗在dao層的開發時間,調試時間,差錯時間,以及適應修改需求的時間太長,導致項目看起來就添删改查,卻特別費力.   在項目性能要求不高的情况下,開始尋 ...

  6. bootstrap API地址

    http://wenzhixin.net.cn/p/bootstrap-table/docs/examples.html#pagination-table

  7. oracle賦值問題(將同一錶中某一字段賦值給另外一個字段的語句)

    將同一錶中某一字段賦值給另外一個字段的語句update jxc_ckmx ckmx1 set ckmx1.ddsl = (select ckmx2.sl from jxc_ckmx ckmx2 whe ...

  8. oracle 分析函數中 keep關鍵字的使用

    語法 min | max(column1) keep (dense_rank first | last order by column2) over (partion by column3); 另外f ...

  9. nginx安裝目錄詳解(針對centos)

  10. 論文閱讀:Review of Visual Saliency Detection with Comprehensive Information

    這篇文章目前發錶在arxiv,日期:20180309. 這是一篇針對多種綜合性信息的視覺顯著性檢測的綜述文章. 注:有些名詞直接貼原文,是因為不翻譯更容易理解.也不會逐字逐句都翻譯,重要的肯定不會錯過 ...