《Spark官方文檔》Spark Streaming編程指南

杜老師說 2022-01-07 06:10:01 阅读数:379

spark 官方 spark streaming 指南

spark-1.6.1 [原文地址]

Spark Streaming編程指南

概覽

 

Spark Streaming是對核心Spark API的一個擴展,它能够實現對實時數據流的流式處理,並具有很好的可擴展性、高吞吐量和容錯性。Spark Streaming支持從多種數據源提取數據,如:Kafka、Flume、Twitter、ZeroMQ、Kinesis以及TCP套接字,並且可以提供一些高級API來錶達複雜的處理算法,如:map、reduce、join和window等。最後,Spark Streaming支持將處理完的數據推送到文件系統、數據庫或者實時儀錶盤中展示。實際上,你完全可以將Spark的機器學習(machine learning) 和 圖計算(graph processing)的算法應用於Spark Streaming的數據流當中。

spark streaming-arch

 

下圖展示了Spark Streaming的內部工作原理。Spark Streaming從實時數據流接入數據,再將其劃分為一個個小批量供後續Spark engine處理,所以實際上,Spark Streaming是按一個個小批量來處理數據流的。

spark streaming-flow

 

Spark Streaming為這種持續的數據流提供了的一個高級抽象,即:discretized stream(離散數據流)或者叫DStream。DStream既可以從輸入數據源創建得來,如:Kafka、Flume或者Kinesis,也可以從其他DStream經一些算子操作得到。其實在內部,一個DStream就是包含了一系列RDDs

本文檔將向你展示如何用DStream進行Spark Streaming編程。Spark Streaming支持Scala、Java和Python(始於Spark 1.2),本文檔的示例包括這三種語言。

注意:對Python來說,有一部分API尚不支持,或者是和Scala、Java不同。本文檔中會用高亮形式來注明這部分 Python API。


一個小栗子

在深入Spark Streaming編程細節之前,我們先來看看一個簡單的小栗子以便有個感性認識。假設我們在一個TCP端口上監聽一個數據服務器的數據,並對收到的文本數據中的單詞計數。以下你所需的全部工作:

首先,我們需要導入Spark Streaming的相關class的一些包,以及一些支持StreamingContext隱式轉換的包(這些隱式轉換能給DStream之類的class增加一些有用的方法)。StreamingContext 是Spark Streaming的入口。我們將會創建一個本地 StreamingContext對象,包含兩個執行線程,並將批次間隔設為1秒。

import org.apache.spark._import org.apache.spark.streaming._import org.apache.spark.streaming.StreamingContext._ // 從Spark 1.3之後這行就可以不需要了// 創建一個local StreamingContext,包含2個工作線程,並將批次間隔設為1秒// master至少需要2個CPU核,以避免出現任務餓死的情况val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")val ssc = new StreamingContext(conf, Seconds(1))

利用這個上下文對象(StreamingContext),我們可以創建一個DStream,該DStream代錶從前面的TCP數據源流入的數據流,同時TCP數據源是由主機名(如:hostnam)和端口(如:9999)來描述的。

// 創建一個連接到hostname:port的DStream,如:localhost:9999val lines = ssc.socketTextStream("localhost", 9999)

這裏的 lines 就是從數據server接收到的數據流。其中每一條記錄都是一行文本。接下來,我們就需要把這些文本行按空格分割成單詞。

// 將每一行分割成多個單詞val words = lines.flatMap(_.split(" "))

flatMap 是一種 “一到多”(one-to-many)的映射算子,它可以將源DStream中每一條記錄映射成多條記錄,從而產生一個新的DStream對象。在本例中,lines中的每一行都會被flatMap映射為多個單詞,從而生成新的words DStream對象。然後,我們就能對這些單詞進行計數了。

import org.apache.spark.streaming.StreamingContext._ // Spark 1.3之後不再需要這行// 對每一批次中的單詞進行計數val pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKey(_ + _)// 將該DStream產生的RDD的頭十個元素打印到控制臺上wordCounts.print()

words這個DStream對象經過map算子(一到一的映射)轉換為一個包含(word, 1)鍵值對的DStream對象pairs,再對pairs使用reduce算子,得到每個批次中各個單詞的出現頻率。最後,wordCounts.print() 將會每秒(前面設定的批次間隔)打印一些單詞計數到控制臺上。

注意,執行以上代碼後,Spark Streaming只是將計算邏輯設置好,此時並未真正的開始處理數據。要啟動之前的處理邏輯,我們還需要如下調用:

ssc.start() // 啟動流式計算ssc.awaitTermination() // 等待直到計算終止

完整的代碼可以在Spark Streaming的例子 NetworkWordCount 中找到。

如果你已經有一個Spark包(下載在這裏downloaded,自定義構建在這裏built),就可以執行按如下步驟運行這個例子。

首先,你需要運行netcat(Unix-like系統都會有這個小工具),將其作為data server

$ nc -lk 9999

然後,在另一個終端,按如下指令執行這個例子

$ ./bin/run-example streaming.NetworkWordCount localhost 9999

好了,現在你嘗試可以在運行netcat的終端裏敲幾個單詞,你會發現這些單詞以及相應的計數會出現在啟動Spark Streaming例子的終端屏幕上。看上去應該和下面這個示意圖類似:

# TERMINAL 1:# Running Netcat$ nc -lk 9999hello world...
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999...-------------------------------------------Time: 1357008430000 ms-------------------------------------------(hello,1)(world,1)...


基本概念

下面,我們在之前的小栗子基礎上,繼續深入了解一下Spark Streaming的一些基本概念。

鏈接依賴項

和Spark類似,Spark Streaming也能在Maven庫中找到。如果你需要編寫Spark Streaming程序,你就需要將以下依賴加入到你的SBT或Maven工程依賴中。

<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.1</version></dependency>

還有,對於從Kafka、Flume以及Kinesis這類數據源提取數據的流式應用來說,還需要額外增加相應的依賴項,下錶列出了各種數據源對應的額外依賴項:

數據源 Maven工件
Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Kinesis spark-streaming-kinesis-asl_2.10 [Amazon Software License]
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10

最新的依賴項信息(包括源代碼和Maven工件)請參考Maven repository


初始化StreamingContext

要初始化任何一個Spark Streaming程序,都需要在入口代碼中創建一個StreamingContext對象。

StreamingContext object can be created from a SparkConf object.

StreamingContext對象需要一個SparkConf對象作為其構造參數。

import org.apache.spark._import org.apache.spark.streaming._val conf = new SparkConf().setAppName(appName).setMaster(master)val ssc = new StreamingContext(conf, Seconds(1))

上面代碼中的 appName 是你給該應用起的名字,這個名字會展示在Spark集群的web UI上。而 master 是Spark, Mesos or YARN cluster URL,如果支持本地測試,你也可以用”local[*]”為其賦值。通常在實際工作中,你不應該將master參數硬編碼到代碼裏,而是應用通過spark-submit的參數來傳遞master的值(launch the application with spark-submit )。不過對本地測試來說,”local[*]”足够了(該值傳給master後,Spark Streaming將在本地進程中,啟動n個線程運行,n與本地系統CPU core數相同)。注意,StreamingContext在內部會創建一個  SparkContext 對象(SparkContext是所有Spark應用的入口,在StreamingContext對象中可以這樣訪問:ssc.sparkContext)。

StreamingContext還有另一個構造參數,即:批次間隔,這個值的大小需要根據應用的具體需求和可用的集群資源來確定。詳見Spark性能調優( Performance Tuning)。

StreamingContext對象也可以通過已有的SparkContext對象來創建,示例如下:

import org.apache.spark.streaming._val sc = ... // 已有的SparkContextval ssc = new StreamingContext(sc, Seconds(1))

context對象創建後,你還需要如下步驟:

  1. 創建DStream對象,並定義好輸入數據源。
  2. 基於數據源DStream定義好計算邏輯和輸出。
  3. 調用streamingContext.start() 啟動接收並處理數據。
  4. 調用streamingContext.awaitTermination() 等待流式處理結束(不管是手動結束,還是發生异常錯誤)
  5. 你可以主動調用 streamingContext.stop() 來手動停止處理流程。
需要關注的重點:
  • 一旦streamingContext啟動,就不能再對其計算邏輯進行添加或修改。
  • 一旦streamingContext被stop掉,就不能restart。
  • 單個JVM虛機同一時間只能包含一個active的StreamingContext。
  • StreamingContext.stop() 也會把關聯的SparkContext對象stop掉,如果不想把SparkContext對象也stop掉,可以將StreamingContext.stop的可選參數 stopSparkContext 設為false。
  • 一個SparkContext對象可以和多個StreamingContext對象關聯,只要先對前一個StreamingContext.stop(sparkContext=false),然後再創建新的StreamingContext對象即可。

離散數據流 (DStreams)

離散數據流(DStream)是Spark Streaming最基本的抽象。它代錶了一種連續的數據流,要麼從某種數據源提取數據,要麼從其他數據流映射轉換而來。DStream內部是由一系列連續的RDD組成的,每個RDD都是不可變、分布式的數據集(詳見Spark編程指南 – Spark Programming Guide)。每個RDD都包含了特定時間間隔內的一批數據,如下圖所示:

spark streaming-dstream

 

任何作用於DStream的算子,其實都會被轉化為對其內部RDD的操作。例如,在前面的例子中,我們將 lines 這個DStream轉成words DStream對象,其實作用於lines上的flatMap算子,會施加於lines中的每個RDD上,並生成新的對應的RDD,而這些新生成的RDD對象就組成了words這個DStream對象。其過程如下圖所示:

spark streaming-dstream-ops

 

底層的RDD轉換仍然是由Spark引擎來計算。DStream的算子將這些細節隱藏了起來,並為開發者提供了更為方便的高級API。後續會詳細討論這些高級算子。


輸入DStream和接收器

輸入DStream代錶從某種流式數據源流入的數據流。在之前的例子裏,lines 對象就是輸入DStream,它代錶從netcat server收到的數據流。每個輸入DStream(除文件數據流外)都和一個接收器(Receiver – Scala docJava doc)相關聯,而接收器則是專門從數據源拉取數據到內存中的對象。

Spark Streaming主要提供兩種內建的流式數據源:

  • 基礎數據源(Basic sources): 在StreamingContext API 中可直接使用的源,如:文件系統,套接字連接或者Akka actor。
  • 高級數據源(Advanced sources): 需要依賴額外工具類的源,如:Kafka、Flume、Kinesis、Twitter等數據源。這些數據源都需要增加額外的依賴,詳見依賴鏈接(linking)這一節。

本節中,我們將會從每種數據源中挑幾個繼續深入討論。

注意,如果你需要同時從多個數據源拉取數據,那麼你就需要創建多個DStream對象(詳見後續的性能調優這一小節)。多個DStream對象其實也就同時創建了多個數據流接收器。但是請注意,Spark的worker/executor 都是長期運行的,因此它們都會各自占用一個分配給Spark Streaming應用的CPU。所以,在運行Spark Streaming應用的時候,需要注意分配足够的CPU core(本地運行時,需要足够的線程)來處理接收到的數據,同時還要足够的CPU core來運行這些接收器。

要點
  • 如果本地運行Spark Streaming應用,記得不能將master設為”local” 或 “local[1]”。這兩個值都只會在本地啟動一個線程。而如果此時你使用一個包含接收器(如:套接字、Kafka、Flume等)的輸入DStream,那麼這一個線程只能用於運行這個接收器,而處理數據的邏輯就沒有線程來執行了。因此,本地運行時,一定要將master設為”local[n]”,其中 n > 接收器的個數(有關master的詳情請參考Spark Properties)。
  • 將Spark Streaming應用置於集群中運行時,同樣,分配給該應用的CPU core數必須大於接收器的總數。否則,該應用就只會接收數據,而不會處理數據。

基礎數據源

前面的小栗子中,我們已經看到,使用ssc.socketTextStream(…) 可以從一個TCP連接中接收文本數據。而除了TCP套接字外,StreamingContext API 還支持從文件或者Akka actor中拉取數據。

  • 文件數據流(File Streams): 可以從任何兼容HDFS API(包括:HDFS、S3、NFS等)的文件系統,創建方式如下:
     streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

    Spark Streaming將監視該dataDirectory目錄,並處理該目錄下任何新建的文件(目前還不支持嵌套目錄)。注意:

    • 各個文件數據格式必須一致。
    • dataDirectory中的文件必須通過moving或者renaming來創建。
    • 一旦文件move進dataDirectory之後,就不能再改動。所以如果這個文件後續還有寫入,這些新寫入的數據不會被讀取。

    對於簡單的文本文件,更簡單的方式是調用 streamingContext.textFileStream(dataDirectory)。

    另外,文件數據流不是基於接收器的,所以不需要為其單獨分配一個CPU core。

    Python API fileStream目前暫時不可用,Python目前只支持textFileStream。

  • 基於自定義Actor的數據流(Streams based on Custom Actors): DStream可以由Akka actor創建得到,只需調用 streamingContext.actorStream(actorProps, actor-name)。詳見自定義接收器(Custom Receiver Guide)。actorStream暫時不支持Python API。
  • RDD隊列數據流(Queue of RDDs as a Stream): 如果需要測試Spark Streaming應用,你可以創建一個基於一批RDD的DStream對象,只需調用 streamingContext.queueStream(queueOfRDDs)。RDD會被一個個依次推入隊列,而DStream則會依次以數據流形式處理這些RDD的數據。

關於套接字、文件以及Akka actor數據流更詳細信息,請參考相關文檔:StreamingContext for Scala,JavaStreamingContext for Java, and StreamingContext for Python。

高級數據源

Python API 自 Spark 1.6.1 起,Kafka、Kinesis、Flume和MQTT這些數據源將支持Python。

使用這類數據源需要依賴一些額外的代碼庫,有些依賴還挺複雜的(如:Kafka、Flume)。因此為了减少依賴項版本沖突問題,各個數據源DStream的相關功能被分割到不同的代碼包中,只有用到的時候才需要鏈接打包進來。例如,如果你需要使用Twitter的tweets作為數據源,你需要以下步驟:

  1. Linking: 將spark-streaming-twitter_2.10工件加入到SBT/Maven項目依賴中。
  2. Programming: 導入TwitterUtils class,然後調用 TwitterUtils.createStream 創建一個DStream,具體代碼見下放。
  3. Deploying: 生成一個uber Jar包,並包含其所有依賴項(包括 spark-streaming-twitter_2.10及其自身的依賴樹),再部署這個Jar包。部署詳情請參考部署這一節(Deploying section)。
import org.apache.spark.streaming.twitter._TwitterUtils.createStream(ssc, None)

注意,高級數據源在spark-shell中不可用,因此不能用spark-shell來測試基於高級數據源的應用。如果真有需要的話,你需要自行下載相應數據源的Maven工件及其依賴項,並將這些Jar包部署到spark-shell的classpath中。

下面列舉了一些高級數據源:

自定義數據源

Python API 自定義數據源目前還不支持Python。

輸入DStream也可以用自定義的方式創建。你需要做的只是實現一個自定義的接收器(receiver),以便從自定義的數據源接收數據,然後將數據推入Spark中。詳情請參考自定義接收器指南(Custom Receiver Guide)。

接收器可靠性

從可靠性角度來劃分,大致有兩種數據源。其中,像Kafka、Flume這樣的數據源,它們支持對所傳輸的數據進行確認。系統收到這類可靠數據源過來的數據,然後發出確認信息,這樣就能够確保任何失敗情况下,都不會丟數據。因此我們可以將接收器也相應地分為兩類:

  1. 可靠接收器(Reliable Receiver) – 可靠接收器會在成功接收並保存好Spark數據副本後,向可靠數據源發送確認信息。
  2. 可靠接收器(Unreliable Receiver) – 不可靠接收器不會發送任何確認信息。不過這種接收器常用語於不支持確認的數據源,或者不想引入數據確認的複雜性的數據源。

自定義接收器指南(Custom Receiver Guide)中詳細討論了如何寫一個可靠接收器。


DStream支持的transformation算子

和RDD類似,DStream也支持從輸入DStream經過各種transformation算子映射成新的DStream。DStream支持很多RDD上常見的transformation算子,一些常用的見下錶:

Transformation算子 用途
map(func) 返回會一個新的DStream,並將源DStream中每個元素通過func映射為新的元素
flatMap(func) 和map類似,不過每個輸入元素不再是映射為一個輸出,而是映射為0到多個輸出
filter(func) 返回一個新的DStream,並包含源DStream中被func選中(func返回true)的元素
repartition(numPartitions) 更改DStream的並行度(增加或减少分區數)
union(otherStream) 返回新的DStream,包含源DStream和otherDStream元素的並集
count() 返回一個包含單元素RDDs的DStream,其中每個元素是源DStream中各個RDD中的元素個數
reduce(func) 返回一個包含單元素RDDs的DStream,其中每個元素是通過源RDD中各個RDD的元素經func(func輸入兩個參數並返回一個同類型結果數據)聚合得到的結果。func必須滿足結合律,以便支持並行計算。
countByValue() 如果源DStream包含的元素類型為K,那麼該算子返回新的DStream包含元素為(K, Long)鍵值對,其中K為源DStream各個元素,而Long為該元素出現的次數。
reduceByKey(func, [numTasks]) 如果源DStream 包含的元素為 (K, V) 鍵值對,則該算子返回一個新的也包含(K, V)鍵值對的DStream,其中V是由func聚合得到的。注意:默認情况下,該算子使用Spark的默認並發任務數(本地模式為2,集群模式下由spark.default.parallelism 决定)。你可以通過可選參數numTasks來指定並發任務個數。
join(otherStream, [numTasks]) 如果源DStream包含元素為(K, V),同時otherDStream包含元素為(K, W)鍵值對,則該算子返回一個新的DStream,其中源DStream和otherDStream中每個K都對應一個 (K, (V, W))鍵值對元素。
cogroup(otherStream, [numTasks]) 如果源DStream包含元素為(K, V),同時otherDStream包含元素為(K, W)鍵值對,則該算子返回一個新的DStream,其中每個元素類型為包含(K, Seq[V], Seq[W])的tuple。
transform(func) 返回一個新的DStream,其包含的RDD為源RDD經過func操作後得到的結果。利用該算子可以對DStream施加任意的操作。
updateStateByKey(func) 返回一個包含新”狀態”的DStream。源DStream中每個key及其對應的values會作為func的輸入,而func可以用於對每個key的“狀態”數據作任意的更新操作。

下面我們會挑幾個transformation算子深入討論一下。

updateStateByKey算子

updateStateByKey 算子支持維護一個任意的狀態。要實現這一點,只需要兩步:

  1. 定義狀態 – 狀態數據可以是任意類型。
  2. 定義狀態更新函數 – 定義好一個函數,其輸入為數據流之前的狀態和新的數據流數據,且可其更新步驟1中定義的輸入數據流的狀態。

在每一個批次數據到達後,Spark都會調用狀態更新函數,來更新所有已有key(不管key是否存在於本批次中)的狀態。如果狀態更新函數返回None,則對應的鍵值對會被删除。

舉例如下。假設你需要維護一個流式應用,統計數據流中每個單詞的出現次數。這裏將各個單詞的出現次數這個整型數定義為狀態。我們接下來定義狀態更新函數如下:

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = { val newCount = ... // 將新的計數值和之前的狀態值相加,得到新的計數值 Some(newCount)}

該狀態更新函數可以作用於一個包括(word, 1) 鍵值對的DStream上(見本文開頭的小栗子)。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

該狀態更新函數會為每個單詞調用一次,且相應的newValues是一個包含很多個”1″的數組(這些1來自於(word,1)鍵值對),而runningCount包含之前該單詞的計數。本例的完整代碼請參考 StatefulNetworkWordCount.scala

注意,調用updateStateByKey前需要配置檢查點目錄,後續對此有詳細的討論,見檢查點(checkpointing)這節。

transform算子

transform算子(及其變體transformWith)可以支持任意的RDD到RDD的映射操作。也就是說,你可以用tranform算子來包裝任何DStream API所不支持的RDD算子。例如,將DStream每個批次中的RDD和另一個Dataset進行關聯(join)操作,這個功能DStream API並沒有直接支持。不過你可以用transform來實現這個功能,可見transform其實為DStream提供了非常强大的功能支持。比如說,你可以用事先算好的垃圾信息,對DStream進行實時過濾。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 包含垃圾信息的RDDval cleanedDStream = wordCounts.transform(rdd => { rdd.join(spamInfoRDD).filter(...) // 將DStream中的RDD和spamInfoRDD關聯,並實時過濾垃圾數據 ...})

注意,這裏transform包含的算子,其調用時間間隔和批次間隔是相同的。所以你可以基於時間改變對RDD的操作,如:在不同批次,調用不同的RDD算子,設置不同的RDD分區或者廣播變量等。

基於窗口(window)的算子

Spark Streaming同樣也提供基於時間窗口的計算,也就是說,你可以對某一個滑動時間窗內的數據施加特定tranformation算子。如下圖所示:

spark streaming-dstream-window

 

如上圖所示,每次窗口滑動時,源DStream中落入窗口的RDDs就會被合並成新的windowed DStream。在上圖的例子中,這個操作會施加於3個RDD單元,而滑動距離是2個RDD單元。由此可以得出任何窗口相關操作都需要指定一下兩個參數:

  • (窗口長度)window length – 窗口覆蓋的時間長度(上圖中為3)
  • (滑動距離)sliding interval – 窗口啟動的時間間隔(上圖中為2)

注意,這兩個參數都必須是DStream批次間隔(上圖中為1)的整數倍.

下面咱們舉個栗子。假設,你需要擴展前面的那個小栗子,你需要每隔10秒統計一下前30秒內的單詞計數。為此,我們需要在包含(word, 1)鍵值對的DStream上,對最近30秒的數據調用reduceByKey算子。不過這些都可以簡單地用一個 reduceByKeyAndWindow搞定。

// 每隔10秒歸約一次最近30秒的數據val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

以下列出了常用的窗口算子。所有這些算子都有前面提到的那兩個參數 – 窗口長度 和 滑動距離。

Transformation窗口算子 用途
window(windowLengthslideInterval) 將源DStream窗口化,並返回轉化後的DStream
countByWindow(windowLength,slideInterval) 返回數據流在一個滑動窗口內的元素個數
reduceByWindow(funcwindowLength,slideInterval) 基於數據流在一個滑動窗口內的元素,用func做聚合,返回一個單元素數據流。func必須滿足結合律,以便支持並行計算。
reduceByKeyAndWindow(func,windowLengthslideInterval, [numTasks]) 基於(K, V)鍵值對DStream,將一個滑動窗口內的數據進行聚合,返回一個新的包含(K,V)鍵值對的DStream,其中每個value都是各個key經過func聚合後的結果。
注意:如果不指定numTasks,其值將使用Spark的默認並行任務數(本地模式下為2,集群模式下由 spark.default.parallelism决定)。當然,你也可以通過numTasks來指定任務個數。
reduceByKeyAndWindow(funcinvFunc,windowLength,slideInterval, [numTasks]) 和前面的reduceByKeyAndWindow() 類似,只是這個版本會用之前滑動窗口計算結果,遞增地計算每個窗口的歸約結果。當新的數據進入窗口時,這些values會被輸入func做歸約計算,而這些數據離開窗口時,對應的這些values又會被輸入 invFunc 做”反歸約”計算。舉個簡單的例子,就是把新進入窗口數據中各個單詞個數“增加”到各個單詞統計結果上,同時把離開窗口數據中各個單詞的統計個數從相應的統計結果中“减掉”。不過,你的自己定義好”反歸約”函數,即:該算子不僅有歸約函數(見參數func),還得有一個對應的”反歸約”函數(見參數中的 invFunc)。和前面的reduceByKeyAndWindow() 類似,該算子也有一個可選參數numTasks來指定並行任務數。注意,這個算子需要配置好檢查點(checkpointing)才能用。
countByValueAndWindow(windowLength,slideInterval, [numTasks]) 基於包含(K, V)鍵值對的DStream,返回新的包含(K, Long)鍵值對的DStream。其中的Long value都是滑動窗口內key出現次數的計數。
和前面的reduceByKeyAndWindow() 類似,該算子也有一個可選參數numTasks來指定並行任務數。

Join相關算子

最後,值得一提的是,你在Spark Streaming中做各種關聯(join)操作非常簡單。

流-流(Stream-stream)關聯

一個數據流可以和另一個數據流直接關聯。

val stream1: DStream[String, String] = ...val stream2: DStream[String, String] = ...val joinedStream = stream1.join(stream2)

上面代碼中,stream1的每個批次中的RDD會和stream2相應批次中的RDD進行join。同樣,你可以類似地使用 leftOuterJoin, rightOuterJoin, fullOuterJoin 等。此外,你還可以基於窗口來join不同的數據流,其實現也很簡單,如下;)

val windowedStream1 = stream1.window(Seconds(20))val windowedStream2 = stream2.window(Minutes(1))val joinedStream = windowedStream1.join(windowedStream2)
流-數據集(stream-dataset)關聯

其實這種情况已經在前面的DStream.transform算子中介紹過了,這裏再舉個基於滑動窗口的例子。

val dataset: RDD[String, String] = ...val windowedStream = stream.window(Seconds(20))...val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

實際上,在上面代碼裏,你可以動態地該錶join的數據集(dataset)。傳給tranform算子的操作函數會在每個批次重新求值,所以每次該函數都會用最新的dataset值,所以不同批次間你可以改變dataset的值。

完整的DStream transformation算子列錶見API文檔。Scala請參考 DStream 和 PairDStreamFunctions. Java請參考 JavaDStream 和 JavaPairDStream. Python見 DStream


DStream輸出算子

輸出算子可以將DStream的數據推送到外部系統,如:數據庫或者文件系統。因為輸出算子會將最終完成轉換的數據輸出到外部系統,因此只有輸出算子調用時,才會真正觸發DStream transformation算子的真正執行(這一點類似於RDD 的action算子)。目前所支持的輸出算子如下錶:

輸出算子 用途
print() 在驅動器(driver)節點上打印DStream每個批次中的頭十個元素。
Python API 對應的Python API為 pprint()
saveAsTextFiles(prefix, [suffix]) 將DStream的內容保存到文本文件。
每個批次一個文件,各文件命名規則為 “prefix-TIME_IN_MS[.suffix]”
saveAsObjectFiles(prefix, [suffix]) 將DStream內容以序列化Java對象的形式保存到順序文件中。
每個批次一個文件,各文件命名規則為 “prefix-TIME_IN_MS[.suffix]”Python API 暫不支持Python
saveAsHadoopFiles(prefix, [suffix]) 將DStream內容保存到Hadoop文件中。
每個批次一個文件,各文件命名規則為 “prefix-TIME_IN_MS[.suffix]”Python API 暫不支持Python
foreachRDD(func) 這是最通用的輸出算子了,該算子接收一個函數func,func將作用於DStream的每個RDD上。
func應該實現將每個RDD的數據推到外部系統中,比如:保存到文件或者寫到數據庫中。
注意,func函數是在streaming應用的驅動器進程中執行的,所以如果其中包含RDD的action算子,就會觸發對DStream中RDDs的實際計算過程。

使用foreachRDD的設計模式

DStream.foreachRDD是一個非常强大的原生工具函數,用戶可以基於此算子將DStream數據推送到外部系統中。不過用戶需要了解如何正確而高效地使用這個工具。以下列舉了一些常見的錯誤。

通常,對外部系統寫入數據需要一些連接對象(如:遠程server的TCP連接),以便發送數據給遠程系統。因此,開發人員可能會不經意地在Spark驅動器(driver)進程中創建一個連接對象,然後又試圖在Spark worker節點上使用這個連接。如下例所示:

dstream.foreachRDD { rdd => val connection = createNewConnection() // 這行在驅動器(driver)進程執行 rdd.foreach { record => connection.send(record) // 而這行將在worker節點上執行 }}

這段代碼是錯誤的,因為它需要把連接對象序列化,再從驅動器節點發送到worker節點。而這些連接對象通常都是不能跨節點(機器)傳遞的。比如,連接對象通常都不能序列化,或者在另一個進程中反序列化後再次初始化(連接對象通常都需要初始化,因此從驅動節點發到worker節點後可能需要重新初始化)等。解决此類錯誤的辦法就是在worker節點上創建連接對象。

然而,有些開發人員可能會走到另一個極端 – 為每條記錄都創建一個連接對象,例如:

dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() }}

一般來說,連接對象是有時間和資源開銷限制的。因此,對每條記錄都進行一次連接對象的創建和銷毀會增加很多不必要的開銷,同時也大大减小了系統的吞吐量。一個比較好的解决方案是使用 rdd.foreachPartition – 為RDD的每個分區創建一個單獨的連接對象,示例如下:

dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() }}

這樣一來,連接對象的創建開銷就攤到很多條記錄上了。

最後,還有一個更優化的辦法,就是在多個RDD批次之間複用連接對象。開發者可以維護一個靜態連接池來保存連接對象,以便在不同批次的多個RDD之間共享同一組連接對象,示例如下:

dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool 是一個靜態的、懶惰初始化的連接池 val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // 將連接返還給連接池,以便後續複用之 }}

注意,連接池中的連接應該是懶惰創建的,並且有確定的超時時間,超時後自動銷毀。這個實現應該是目前發送數據最高效的實現方式。

其他要點:
  • DStream的轉化執行也是懶惰的,需要輸出算子來觸發,這一點和RDD的懶惰執行由action算子觸發很類似。特別地,DStream輸出算子中包含的RDD action算子會强制觸發對所接收數據的處理。因此,如果你的Streaming應用中沒有輸出算子,或者你用了dstream.foreachRDD(func)卻沒有在func中調用RDD action算子,那麼這個應用只會接收數據,而不會處理數據,接收到的數據最後只是被簡單地丟弃掉了。
  • 默認地,輸出算子只能一次執行一個,且按照它們在應用程序代碼中定義的順序執行。

累加器和廣播變量

首先需要注意的是,累加器(Accumulators)和廣播變量(Broadcast variables)是無法從Spark Streaming的檢查點中恢複回來的。所以如果你開啟了檢查點功能,並同時在使用累加器和廣播變量,那麼你最好是使用懶惰實例化的單例模式,因為這樣累加器和廣播變量才能在驅動器(driver)故障恢複後重新實例化。代碼示例如下:

object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance }}object DroppedWordsCounter { @volatile private var instance: Accumulator[Long] = null def getInstance(sc: SparkContext): Accumulator[Long] = { if (instance == null) { synchronized { if (instance == null) { instance = sc.accumulator(0L, "WordsInBlacklistCounter") } } } instance }}wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // 獲取現有或注册新的blacklist廣播變量 val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // 獲取現有或注册新的 droppedWordsCounter 累加器 val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // 基於blacklist來過濾詞,並將過濾掉的詞的個數累加到 droppedWordsCounter 中 val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter += count false } else { true } }.collect() val output = "Counts at time " + time + " " + counts})

這裏有完整代碼:source code


DataFrame和SQL相關算子

在Streaming應用中可以調用DataFrames and SQL來處理流式數據。開發者可以用通過StreamingContext中的SparkContext對象來創建一個SQLContext,並且,開發者需要確保一旦驅動器(driver)故障恢複後,該SQLContext對象能重新創建出來。同樣,你還是可以使用懶惰創建的單例模式來實例化SQLContext,如下面的代碼所示,這裏我們將最開始的那個小栗子做了一些修改,使用DataFrame和SQL來統計單詞計數。其實就是,將每個RDD都轉化成一個DataFrame,然後注册成臨時錶,再用SQL查詢這些臨時錶。

/** streaming應用中調用DataFrame算子 */val words: DStream[String] = ...words.foreachRDD { rdd => // 獲得SQLContext單例 val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ // 將RDD[String] 轉為 DataFrame val wordsDataFrame = rdd.toDF("word") // DataFrame注册為臨時錶 wordsDataFrame.registerTempTable("words") // 再用SQL語句查詢,並打印出來 val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show()}

See the full source code.

這裏有完整代碼:source code

你也可以在其他線程裏執行SQL查詢(异步查詢,即:執行SQL查詢的線程和運行StreamingContext的線程不同)。不過這種情况下,你需要確保查詢的時候 StreamingContext 沒有把所需的數據丟弃掉,否則StreamingContext有可能已將老的RDD數據丟弃掉了,那麼异步查詢的SQL語句也可能無法得到查詢結果。舉個栗子,如果你需要查詢上一個批次的數據,但是你的SQL查詢可能要執行5分鐘,那麼你就需要StreamingContext至少保留最近5分鐘的數據:streamingContext.remember(Minutes(5)) (這是Scala為例,其他語言差不多)

更多DataFrame和SQL的文檔見這裏: DataFrames and SQL


MLlib算子

MLlib 提供了很多機器學習算法。首先,你需要關注的是流式計算相關的機器學習算法(如:Streaming Linear RegressionStreaming KMeans),這些流式算法可以在流式數據上一邊學習訓練模型,一邊用最新的模型處理數據。除此以外,對更多的機器學習算法而言,你需要離線訓練這些模型,然後將訓練好的模型用於在線的流式數據。詳見MLlib


緩存/持久化

和RDD類似,DStream也支持將數據持久化到內存中。只需要調用 DStream的persist() 方法,該方法內部會自動調用DStream中每個RDD的persist方法進而將數據持久化到內存中。這對於可能需要計算很多次的DStream非常有用(例如:對於同一個批數據調用多個算子)。對於基於滑動窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或者有狀態的算子,如:updateStateByKey,數據持久化就更重要了。因此,滑動窗口算子產生的DStream對象默認會自動持久化到內存中(不需要開發者調用persist)。

對於從網絡接收數據的輸入數據流(如:Kafka、Flume、socket等),默認的持久化級別會將數據持久化到兩個不同的節點上互為備份副本,以便支持容錯。

注意,與RDD不同的是,DStream的默認持久化級別是將數據序列化到內存中。進一步的討論見性能調優這一小節。關於持久化級別(或者存儲級別)的更詳細說明見Spark編程指南(Spark Programming Guide)。


檢查點

一般來說Streaming 應用都需要7*24小時長期運行,所以必須對一些與業務邏輯無關的故障有很好的容錯(如:系統故障、JVM崩潰等)。對於這些可能性,Spark Streaming 必須在檢查點保存足够的信息到一些可容錯的外部存儲系統中,以便能够隨時從故障中恢複回來。所以,檢查點需要保存以下兩種數據:

  • 元數據檢查點(Metadata checkpointing) – 保存流式計算邏輯的定義信息到外部可容錯存儲系統(如:HDFS)。主要用途是用於在故障後回複應用程序本身(後續詳談)。元數包括:
    • Configuration – 創建Streaming應用程序的配置信息。
    • DStream operations – 定義流式處理邏輯的DStream操作信息。
    • Incomplete batches – 已經排隊但未處理完的批次信息。
  • 數據檢查點(Data checkpointing) – 將生成的RDD保存到可靠的存儲中。這對一些需要跨批次組合數據或者有狀態的算子來說很有必要。在這種轉換算子中,往往新生成的RDD是依賴於前幾個批次的RDD,因此隨著時間的推移,有可能產生很長的依賴鏈條。為了避免在恢複數據的時候需要恢複整個依賴鏈條上所有的數據,檢查點需要周期性地保存一些中間RDD狀態信息,以斬斷無限制增長的依賴鏈條和恢複時間。

總之,元數據檢查點主要是為了恢複驅動器節點上的故障,而數據或RDD檢查點是為了支持對有狀態轉換操作的恢複。

何時啟用檢查點

如果有以下情况出現,你就必須啟用檢查點了:

  • 使用了有狀態的轉換算子(Usage of stateful transformations) – 不管是用了 updateStateByKey 還是用了 reduceByKeyAndWindow(有”反歸約”函數的那個版本),你都必須配置檢查點目錄來周期性地保存RDD檢查點。
  • 支持驅動器故障中恢複(Recovering from failures of the driver running the application) – 這時候需要元數據檢查點以便恢複流式處理的進度信息。

注意,一些簡單的流式應用,如果沒有用到前面所說的有狀態轉換算子,則完全可以不開啟檢查點。不過這樣的話,驅動器(driver)故障恢複後,有可能會丟失部分數據(有些已經接收但還未處理的數據可能會丟失)。不過通常這點丟失時可接受的,很多Spark Streaming應用也是這樣運行的。對非Hadoop環境的支持未來還會繼續改進。

如何配置檢查點

檢查點的啟用,只需要設置好保存檢查點信息的檢查點目錄即可,一般會會將這個目錄設為一些可容錯的、可靠性較高的文件系統(如:HDFS、S3等)。開發者只需要調用 streamingContext.checkpoint(checkpointDirectory)。設置好檢查點,你就可以使用前面提到的有狀態轉換算子了。另外,如果你需要你的應用能够支持從驅動器故障中恢複,你可能需要重寫部分代碼,實現以下行為:

  • 如果程序是首次啟動,就需要new一個新的StreamingContext,並定義好所有的數據流處理,然後調用StreamingContext.start()。
  • 如果程序是故障後重啟,就需要從檢查點目錄中的數據中重新構建StreamingContext對象。
 

不過這個行為可以用StreamingContext.getOrCreate來實現,示例如下:

// 首次創建StreamingContext並定義好數據流處理邏輯def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // 新建一個StreamingContext對象 val lines = ssc.socketTextStream(...) // 創建DStreams ... ssc.checkpoint(checkpointDirectory) // 設置好檢查點目錄 ssc}// 創建新的StreamingContext對象,或者從檢查點構造一個val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)// 無論是否是首次啟動都需要設置的工作在這裏context. ...// 啟動StreamingContext對象context.start()context.awaitTermination()

如果 checkpointDirectory 目錄存在,則context對象會從檢查點數據重新構建出來。如果該目錄不存在(如:首次運行),則 functionToCreateContext 函數會被調用,創建一個新的StreamingContext對象並定義好DStream數據流。完整的示例請參見RecoverableNetworkWordCount,這個例子會將網絡數據中的單詞計數統計結果添加到一個文件中。

除了使用getOrCreate之外,開發者還需要確保驅動器進程能在故障後重啟。這一點只能由應用的部署環境基礎設施來保證。進一步的討論見部署(Deployment)這一節。

另外需要注意的是,RDD檢查點會增加額外的保存數據的開銷。這可能會導致數據流的處理時間變長。因此,你必須仔細的調整檢查點間隔時間。如果批次間隔太小(比如:1秒),那麼對每個批次保存檢查點數據將大大减小吞吐量。另一方面,檢查點保存過於頻繁又會導致血統信息和任務個數的增加,這同樣會影響系統性能。對於需要RDD檢查點的有狀態轉換算子,默認的間隔是批次間隔的整數倍,且最小10秒。開發人員可以這樣來自定義這個間隔:dstream.checkpoint(checkpointInterval)。一般推薦設為批次間隔時間的5~10倍。


部署應用

本節中將主要討論一下如何部署Spark Streaming應用。

前提條件

要運行一個Spark Streaming 應用,你首先需要具備以下條件:

  • 集群以及集群管理器 – 這是一般Spark應用的基本要求,詳見 deployment guide
  • 給Spark應用打個JAR包 – 你需要將你的應用打成一個JAR包。如果使用spark-submit 提交應用,那麼你不需要提供Spark和Spark Streaming的相關JAR包。但是,如果你使用了高級數據源(advanced sources – 如:Kafka、Flume、Twitter等),那麼你需要將這些高級數據源相關的JAR包及其依賴一起打包並部署。例如,如果你使用了TwitterUtils,那麼就必須將spark-streaming-twitter_2.10及其相關依賴都打到應用的JAR包中。
  • 為執行器(executor)預留足够的內存 – 執行器必須配置預留好足够的內存,因為接受到的數據都得存在內存裏。注意,如果某些窗口長度達到10分鐘,那也就是說你的系統必須知道保留10分鐘的數據在內存裏。可見,到底預留多少內存是取决於你的應用處理邏輯的。
  • 配置檢查點 – 如果你的流式應用需要檢查點,那麼你需要配置一個Hadoop API兼容的可容錯存儲目錄作為檢查點目錄,流式應用的信息會寫入這個目錄,故障恢複時會用到這個目錄下的數據。詳見前面的檢查點小節。
  • 配置驅動程序自動重啟 – 流式應用自動恢複的前提就是,部署基礎設施能够監控驅動器進程,並且能够在其故障時,自動重啟之。不同的集群管理器有不同的工具來實現這一功能:
    • Spark獨立部署 – Spark獨立部署集群可以支持將Spark應用的驅動器提交到集群的某個worker節點上運行。同時,Spark的集群管理器可以對該驅動器進程進行監控,一旦驅動器退出且返回非0值,或者因worker節點原始失敗,Spark集群管理器將自動重啟這個驅動器。詳見Spark獨立部署指南(Spark Standalone guide)。
    • YARN – YARN支持和獨立部署類似的重啟機制。詳細請參考YARN的文檔。
    • Mesos – Mesos上需要用Marathon來實現這一功能。
  • 配置WAL(write ahead log)- 從Spark 1.2起,我們引入了write ahead log來提高容錯性。如果啟用這個功能,則所有接收到的數據都會以write ahead log形式寫入配置好的檢查點目錄中。這樣就能確保數據零丟失(容錯語義有詳細的討論)。用戶只需將 spark.streaming.receiver.writeAheadLog 設為true。不過,這同樣可能會導致接收器的吞吐量下降。不過你可以啟動多個接收器並行接收數據,從而提昇整體的吞吐量(more receivers in parallel)。另外,建議在啟用WAL後禁用掉接收數據多副本功能,因為WAL其實已經是存儲在一個多副本存儲系統中了。你只需要把存儲級別設為 StorageLevel.MEMORY_AND_DISK_SER。如果是使用S3(或者其他不支持flushing的文件系統)存儲WAL,一定要記得啟用這兩個標識:spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。更詳細請參考: Spark Streaming Configuration
  • 設置好最大接收速率 – 如果集群可用資源不足以跟上接收數據的速度,那麼可以在接收器設置一下最大接收速率,即:每秒接收記錄的條數。相關的主要配置有:spark.streaming.receiver.maxRate,如果使用Kafka Direct API 還需要設置 spark.streaming.kafka.maxRatePerPartition。從Spark 1.5起,我們引入了backpressure的概念來動態地根據集群處理速度,評估並調整該接收速率。用戶只需將 spark.streaming.backpressure.enabled設為true即可啟用該功能。

昇級應用代碼

昇級Spark Streaming應用程序代碼,可以使用以下兩種方式:

  • 新的Streaming程序和老的並行跑一段時間,新程序完成初始化以後,再關閉老的。注意,這種方式適用於能同時發送數據到多個目標的數據源(即:數據源同時將數據發給新老兩個Streaming應用程序)。
  • 老程序能够優雅地退出(參考  StreamingContext.stop(...) or JavaStreamingContext.stop(...) ),即:確保所收到的數據都已經處理完畢後再退出。然後再啟動新的Streaming程序,而新程序將接著在老程序退出點上繼續拉取數據。注意,這種方式需要數據源支持數據緩存(或者叫數據堆積,如:Kafka、Flume),因為在新舊程序交接的這個空檔時間,數據需要在數據源處緩存。目前還不能支持從檢查點重啟,因為檢查點存儲的信息包含老程序中的序列化對象信息,在新程序中將其反序列化可能會出錯。這種情况下,只能要麼指定一個新的檢查點目錄,要麼删除老的檢查點目錄。

應用監控

除了Spark自身的監控能力(monitoring capabilities)之外,對Spark Streaming還有一些額外的監控功能可用。如果實例化了StreamingContext,那麼你可以在Spark web UI上看到多出了一個Streaming tab頁,上面顯示了正在運行的接收器(是否活躍,接收記錄的條數,失敗信息等)和處理完的批次信息(批次處理時間,查詢延時等)。這些信息都可以用來監控streaming應用。

web UI上有兩個度量特別重要:

  • 批次處理耗時(Processing Time) – 處理單個批次耗時
  • 批次調度延時(Scheduling Delay) -各批次在隊列中等待時間(等待上一個批次處理完)

如果批次處理耗時一直比批次間隔時間大,或者批次調度延時持續上昇,就意味著系統處理速度跟不上數據接收速度。這時候你就得考慮一下怎麼把批次處理時間降下來(reducing)。

Spark Streaming程序的處理進度可以用StreamingListener接口來監聽,這個接口可以監聽到接收器的狀態和處理時間。不過需要注意的是,這是一個developer API接口,換句話說這個接口未來很可能會變動(可能會增加更多度量信息)。



性能調優

要獲得Spark Streaming應用的最佳性能需要一點點調優工作。本節將深入解釋一些能够改進Streaming應用性能的配置和參數。總體上來說,你需要考慮這兩方面的事情:

  1. 提高集群資源利用率,减少單批次處理耗時。
  2. 設置合適的批次大小,以便使數據處理速度能跟上數據接收速度。

减少批次處理時間

有不少優化手段都可以减少Spark對每個批次的處理時間。細節將在優化指南(Tuning Guide)中詳談。這裏僅列舉一些最重要的。

數據接收並發度

跨網絡接收數據(如:從Kafka、Flume、socket等接收數據)需要在Spark中序列化並存儲數據。

如果接收數據的過程是系統瓶頸,那麼可以考慮增加數據接收的並行度。注意,每個輸入DStream只包含一個單獨的接收器(receiver,運行約worker節點),每個接收器單獨接收一路數據流。所以,配置多個輸入DStream就能從數據源的不同分區分別接收多個數據流。例如,可以將從Kafka拉取兩個topic的數據流分成兩個Kafka輸入數據流,每個數據流拉取其中一個topic的數據,這樣一來會同時有兩個接收器並行地接收數據,因而增加了總體的吞吐量。同時,另一方面我們又可以把這些DStream數據流合並成一個,然後可以在合並後的DStream上使用任何可用的transformation算子。示例代碼如下:

val numStreams = 5val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }val unifiedStream = streamingContext.union(kafkaStreams)unifiedStream.print()

另一個可以考慮優化的參數就是接收器的阻塞間隔,該參數由配置參數(configuration parameter)spark.streaming.blockInterval决定。大多數接收器都會將數據合並成一個個數據塊,然後再保存到spark內存中。對於map類算子來說,每個批次中數據塊的個數將會决定處理這批數據並行任務的個數,每個接收器每批次數據處理任務數約等於 (批次間隔 / 數據塊間隔)。例如,對於2秒的批次間隔,如果數據塊間隔為200ms,則創建的並發任務數為10。如果任務數太少(少於單機cpu core個數),則資源利用不够充分。如需增加這個任務數,對於給定的批次間隔來說,只需要减少數據塊間隔即可。不過,我們還是建議數據塊間隔至少要50ms,否則任務的啟動開銷占比就太高了。

另一個切分接收數據流的方法是,顯示地將輸入數據流劃分為多個分區(使用 inputStream.repartition(<number of partitions>))。該操作會在處理前,將數據散開重新分發到集群中多個節點上。

數據處理並發度

在計算各個階段(stage)中,任何一個階段的並發任務數不足都有可能造成集群資源利用率低。例如,對於reduce類的算子,如:reduceByKey 和 reduceByKeyAndWindow,其默認的並發任務數是由 spark.default.parallelism 决定的。你既可以修改這個默認值(spark.default.parallelism),也可以通過參數指定這個並發數量(見PairDStreamFunctions)。

數據序列化

調整數據的序列化格式可以大大减少數據序列化的開銷。在spark Streaming中主要有兩種類型的數據需要序列化:

  • 輸入數據: 默認地,接收器收到的數據是以 StorageLevel.MEMORY_AND_DISK_SER_2 的存儲級別存儲到執行器(executor)內存中的。也就是說,收到的數據會被序列化以减少GC開銷,同時保存兩個副本以容錯。同時,數據會優先保存在內存裏,當內存不足時才吐出到磁盤上。很明顯,這個過程中會有數據序列化的開銷 – 接收器首先將收到的數據反序列化,然後再以spark所配置指定的格式來序列化數據。
  • Streaming算子所生產的持久化的RDDs: Streaming計算所生成的RDD可能會持久化到內存中。例如,基於窗口的算子會將數據持久化到內存,因為窗口數據可能會多次處理。所不同的是,spark core默認用 StorageLevel.MEMORY_ONLY 級別持久化RDD數據,而spark streaming默認使用StorageLevel.MEMORY_ONLY_SER 級別持久化接收到的數據,以便盡量减少GC開銷。

不管是上面哪一種數據,都可以使用Kryo序列化來减少CPU和內存開銷,詳見Spark Tuning Guide。另,對於Kryo,你可以考慮這些優化:注册自定義類型,禁用對象引用跟踪(詳見Configuration Guide)。

在一些特定的場景下,如果數據量不是很大,那麼你可以考慮不用序列化格式,不過你需要注意的是取消序列化是否會導致大量的GC開銷。例如,如果你的批次間隔比較短(幾秒)並且沒有使用基於窗口的算子,這種情况下你可以考慮禁用序列化格式。這樣可以减少序列化的CPU開銷以優化性能,同時GC的增長也不多。

任務啟動開銷

如果每秒啟動的任務數過多(比如每秒50個以上),那麼將任務發送給slave節點的開銷會明顯增加,那麼你也就很難達到亞秒級(sub-second)的延遲。不過以下兩個方法可以减少任務的啟動開銷:

  • 任務序列化(Task Serialization): 使用Kryo來序列化任務,以减少任務本身的大小,從而提高發送任務的速度。任務的序列化格式是由 spark.closure.serializer 屬性决定的。不過,目前還不支持閉包序列化,未來的版本可能會增加對此的支持。
  • 執行模式(Execution mode): Spark獨立部署或者Mesos粗粒度模式下任務的啟動時間比Mesos細粒度模式下的任務啟動時間要短。詳見Running on Mesos guide

這些調整有可能能够减少100ms的批次處理時間,這也使得亞秒級的批次間隔成為可能。


設置合適的批次間隔

要想streaming應用在集群上穩定運行,那麼系統處理數據的速度必須能跟上其接收數據的速度。換句話說,批次數據的處理速度應該和其生成速度一樣快。對於特定的應用來說,可以從其對應的監控(monitoring)頁面上觀察驗證,頁面上顯示的處理耗時應該要小於批次間隔時間。

根據spark streaming計算的性質,在一定的集群資源限制下,批次間隔的值會極大地影響系統的數據處理能力。例如,在WordCountNetwork示例中,對於特定的數據速率,一個系統可能能够在批次間隔為2秒時跟上數據接收速度,但如果把批次間隔改為500毫秒系統可能就處理不過來了。所以,批次間隔需要謹慎設置,以確保生產系統能够處理得過來。

要找出適合的批次間隔,你可以從一個比較保守的批次間隔值(如5~10秒)開始測試。要驗證系統是否能跟上當前的數據接收速率,你可能需要檢查一下端到端的批次處理延遲(可以看看Spark驅動器log4j日志中的Total delay,也可以用StreamingListener接口來檢測)。如果這個延遲能保持和批次間隔差不多,那麼系統基本就是穩定的。否則,如果這個延遲持久在增長,也就是說系統跟不上數據接收速度,那也就意味著系統不穩定。一旦系統文檔下來後,你就可以嘗試提高數據接收速度,或者减少批次間隔值。不過需要注意,瞬間的延遲增長可以只是暫時的,只要這個延遲後續會自動降下來就沒有問題(如:降到小於批次間隔值)


內存調優

Spark應用內存占用和GC調優已經在調優指南(Tuning Guide)中有詳細的討論。牆裂建議你讀一讀那篇文檔。本節中,我們只是討論一下幾個專門用於Spark Streaming的調優參數。

Spark Streaming應用在集群中占用的內存量嚴重依賴於具體所使用的tranformation算子。例如,如果想要用一個窗口算子操縱最近10分鐘的數據,那麼你的集群至少需要在內存裏保留10分鐘的數據;另一個例子是updateStateByKey,如果key很多的話,相對應的保存的key的state也會很多,而這些都需要占用內存。而如果你的應用只是做一個簡單的 “映射-過濾-存儲”(map-filter-store)操作的話,那需要的內存就很少了。

一般情况下,streaming接收器接收到的數據會以 StorageLevel.MEMORY_AND_DISK_SER_2 這個存儲級別存到spark中,也就是說,如果內存裝不下,數據將被吐到磁盤上。數據吐到磁盤上會大大降低streaming應用的性能,因此還是建議根據你的應用處理的數據量,提供充足的內存。最好就是,一邊小規模地放大內存,再觀察評估,然後再放大,再評估。

另一個內存調優的方向就是垃圾回收。因為streaming應用往往都需要低延遲,所以肯定不希望出現大量的或耗時較長的JVM垃圾回收暫停。

以下是一些能够幫助你减少內存占用和GC開銷的參數或手段:

  • DStream持久化級別(Persistence Level of DStreams): 前面數據序列化(Data Serialization)這小節已經提到過,默認streaming的輸入RDD會被持久化成序列化的字節流。相對於非序列化數據,這樣可以减少內存占用和GC開銷。如果啟用Kryo序列化,還能進一步减少序列化數據大小和內存占用量。如果你還需要進一步减少內存占用的話,可以開啟數據壓縮(通過spark.rdd.compress這個配置設定),只不過數據壓縮會增加CPU消耗。
  • 清除老數據(Clearing old data): 默認情况下,所有的輸入數據以及DStream的transformation算子產生的持久化RDD都是自動清理的。Spark Streaming會根據所使用的transformation算子來清理老數據。例如,你用了一個窗口操作處理最近10分鐘的數據,那麼Spark Streaming會保留至少10分鐘的數據,並且會主動把更早的數據都删掉。當然,你可以設置 streamingContext.remember 以保留更長時間段的數據(比如:你可能會需要交互式地查詢更老的數據)。
  • CMS垃圾回收器(CMS Garbage Collector): 為了盡量减少GC暫停的時間,我們牆裂建議使用CMS垃圾回收器(concurrent mark-and-sweep GC)。雖然CMS GC會稍微降低系統的總體吞吐量,但我們仍建議使用它,因為CMS GC能使批次處理的時間保持在一個比較恒定的水平上。最後,你需要確保在驅動器(通過spark-submit中的–driver-java-options設置)和執行器(使用spark.executor.extraJavaOptions配置參數)上都設置了CMS GC。
  • 其他提示: 如果還想進一步减少GC開銷,以下是更進一步的可以嘗試的手段:
    • 配合Tachyon使用堆外內存來持久化RDD。詳見Spark編程指南(Spark Programming Guide
    • 使用更多但是更小的執行器進程。這樣GC壓力就會分散到更多的JVM堆中。


容錯語義

本節中,我們將討論Spark Streaming應用在出現失敗時的具體行為。

背景

要理解Spark Streaming所提供的容錯語義,我們首先需要回憶一下Spark RDD所提供的基本容錯語義。

  1. RDD是不可變的,可重算的,分布式數據集。每個RDD都記錄了其創建算子的血統信息,其中每個算子都以可容錯的數據集作為輸入數據。
  2. 如果RDD的某個分區因為節點失效而丟失,則該分區可以根據RDD的血統信息以及相應的原始輸入數據集重新計算出來。
  3. 假定所有RDD transformation算子計算過程都是確定性的,那麼通過這些算子得到的最終RDD總是包含相同的數據,而與Spark集群的是否故障無關。

Spark主要操作一些可容錯文件系統的數據,如:HDFS或S3。因此,所有從這些可容錯數據源產生的RDD也是可容錯的。然而,對於Spark Streaming並非如此,因為多數情况下Streaming需要從網絡遠端接收數據,這回導致Streaming的數據源並不可靠(尤其是對於使用了fileStream的應用)。要實現RDD相同的容錯屬性,數據接收就必須用多個不同worker節點上的Spark執行器來實現(默認副本因子是2)。因此一旦出現故障,系統需要恢複兩種數據:

  1. 接收並保存了副本的數據 – 數據不會因為單個worker節點故障而丟失,因為有副本!
  2. 接收但尚未保存副本數據 – 因為數據並沒有副本,所以一旦故障,只能從數據源重新獲取。

此外,還有兩種可能的故障類型需要考慮:

  1. Worker節點故障 – 任何運行執行器的worker節點一旦故障,節點上內存中的數據都會丟失。如果這些節點上有接收器在運行,那麼其包含的緩存數據也會丟失。
  2. Driver節點故障 – 如果Spark Streaming的驅動節點故障,那麼很顯然SparkContext對象就沒了,所有執行器及其內存數據也會丟失。

有了以上這些基本知識,下面我們就進一步了解一下Spark Streaming的容錯語義。

定義

流式系統的可靠度語義可以據此來分類:單條記錄在系統中被處理的次數保證。一個流式系統可能提供保證必定是以下三種之一(不管系統是否出現故障):

  1. 至多一次(At most once): 每條記錄要麼被處理一次,要麼就沒有處理。
  2. 至少一次(At least once): 每條記錄至少被處理過一次(一次或多次)。這種保證能確保沒有數據丟失,比“至多一次”要强。但有可能出現數據重複。
  3. 精確一次(Exactly once): 每條記錄都精確地只被處理一次 – 也就是說,既沒有數據丟失,也不會出現數據重複。這是三種保證中最强的一種。

基礎語義

任何流式處理系統一般都會包含以下三個數據處理步驟:

  1. 數據接收(Receiving the data): 從數據源拉取數據。
  2. 數據轉換(Transforming the data): 將接收到的數據進行轉換(使用DStream和RDD transformation算子)。
  3. 數據推送(Pushing out the data): 將轉換後最終數據推送到外部文件系統,數據庫或其他展示系統。

如果Streaming應用需要做到端到端的“精確一次”的保證,那麼就必須在以上三個步驟中各自都保證精確一次:即,每條記錄必須,只接收一次、處理一次、推送一次。下面讓我們在Spark Streaming的上下文環境中來理解一下這三個步驟的語義:

  1. 數據接收: 不同數據源提供的保證不同,下一節再詳細討論。
  2. 數據轉換: 所有的數據都會被“精確一次”處理,這要歸功於RDD提供的保障。即使出現故障,只要數據源還能訪問,最終所轉換得到的RDD總是包含相同的內容。
  3. 數據推送: 輸出操作默認保證“至少一次”的語義,是否能“精確一次”還要看所使用的輸出算子(是否幂等)以及下遊系統(是否支持事務)。不過用戶也可以開發自己的事務機制來實現“精確一次”語義。這個後續會有詳細討論。

接收數據語義

不同的輸入源提供不同的數據可靠性級別,從“至少一次”到“精確一次”。

從文件接收數據

如果所有的輸入數據都來源於可容錯的文件系統,如HDFS,那麼Spark Streaming就能在任何故障中恢複並處理所有的數據。這種情况下就能保證精確一次語義,也就是說不管出現什麼故障,所有的數據總是精確地只處理一次,不多也不少。

基於接收器接收數據

對於基於接收器的輸入源,容錯語義將同時依賴於故障場景和接收器類型。前面也已經提到過,spark Streaming主要有兩種類型的接收器:

  1. 可靠接收器 – 這類接收器會在數據接收並保存好副本後,向可靠數據源發送確認信息。這類接收器故障時,是不會給緩存的(已接收但尚未保存副本)數據發送確認信息。因此,一旦接收器重啟,沒有收到確認的數據,會重新從數據源再獲取一遍,所以即使有故障也不會丟數據。
  2. 不可靠接收器 – 這類接收器不會發送確認信息,因此一旦worker和driver出現故障,就有可能會丟失數據。

對於不同的接收器,我們可以獲得如下不同的語義。如果一個worker節點故障了,對於可靠接收器來書,不會有數據丟失。而對於不可靠接收器,緩存的(接收但尚未保存副本)數據可能會丟失。如果driver節點故障了,除了接收到的數據之外,其他的已經接收且已經保存了內存副本的數據都會丟失,這將會影響有狀態算子的計算結果。

為了避免丟失已經收到且保存副本的數,從 spark 1.2 開始引入了WAL(write ahead logs),以便將這些數據寫入到可容錯的存儲中。只要你使用可靠接收器,同時啟用WAL(write ahead logs enabled),那麼久再也不用為數據丟失而擔心了。並且這時候,還能提供“至少一次”的語義保證。

下錶總結了故障情况下的各種語義:

部署場景 Worker 故障 Driver 故障
Spark 1.1及以前版本 或者
Spark 1.2及以後版本,且未開啟WAL
若使用不可靠接收器,則可能丟失緩存(已接收但尚未保存副本)數據;
若使用可靠接收器,則沒有數據丟失,且提供至少一次處理語義
若使用不可靠接收器,則緩存數據和已保存數據都可能丟失;
若使用可靠接收器,則沒有緩存數據丟失,但已保存數據可能丟失,且不提供語義保證
Spark 1.2及以後版本,並啟用WAL 若使用可靠接收器,則沒有數據丟失,且提供至少一次語義保證 若使用可靠接收器和文件,則無數據丟失,且提供至少一次語義保證

從Kafka Direct API接收數據

從Spark 1.3開始,我們引入Kafka Direct API,該API能為Kafka數據源提供“精確一次”語義保證。有了這個輸入API,再加上輸出算子的“精確一次”保證,你就能真正實現端到端的“精確一次”語義保證。(改功能截止Spark 1.6.1還是實驗性的)更詳細的說明見:Kafka Integration Guide

輸出算子的語義

輸出算子(如 foreachRDD)提供“至少一次”語義保證,也就是說,如果worker故障,單條輸出數據可能會被多次寫入外部實體中。不過這對於文件系統來說是可以接受的(使用saveAs***Files 多次保存文件會覆蓋之前的),所以我們需要一些額外的工作來實現“精確一次”語義。主要有兩種實現方式:

  • 幂等更新(Idempotent updates): 就是說多次操作,產生的結果相同。例如,多次調用saveAs***Files保存的文件總是包含相同的數據。
  • 事務更新(Transactional updates): 所有的更新都是事務性的,這樣一來就能保證更新的原子性。以下是一種實現方式:
    • 用批次時間(在foreachRDD中可用)和分區索引創建一個唯一標識,該標識代錶流式應用中唯一的一個數據塊。
    • 基於這個標識建立更新事務,並使用數據塊數據更新外部系統。也就是說,如果該標識未被提交,則原子地將標識代錶的數據更新到外部系統。否則,就認為該標識已經被提交,直接忽略之。
      dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // 使用uniqueId作為事務的唯一標識,基於uniqueId實現partitionIterator所指向數據的原子事務提交 }}


遷移指南 – 從0.9.1及以下昇級到1.x

在Spark 0.9.1和Spark 1.0之間,有一些API接口變更,變更目的是為了保障未來版本API的穩定。本節將詳細說明一下從已有版本遷移昇級到1.0所需的工作。

輸入DStream(Input DStreams): 所有創建輸入流的算子(如:StreamingContext.socketStream, FlumeUtils.createStream 等)的返回值不再是DStream(對Java來說是JavaDStream),而是 InputDStream / ReceiverInputDStream(對Java來說是JavaInputDStream / JavaPairInputDStream /JavaReceiverInputDStream / JavaPairReceiverInputDStream)。這樣才能確保特定輸入流的功能能够在未來持續增加到這些class中,而不會打破二進制兼容性。注意,已有的Spark Streaming應用應該不需要任何代碼修改(新的返回類型都是DStream的子類),只不過需要基於Spark 1.0重新編譯一把。

定制網絡接收器(Custom Network Receivers): 自從Spark Streaming發布以來,Scala就能基於NetworkReceiver來定制網絡接收器。但由於錯誤處理和匯報API方便的限制,該類型不能在Java中使用。所以Spark 1.0開始,用 Receiver 來替換掉這個NetworkReceiver,主要的好處如下:

  • 該類型新增了stop和restart方法,便於控制接收器的生命周期。詳見custom receiver guide
  • 定制接收器用Scala和Java都能實現。

為了將已有的基於NetworkReceiver的自定義接收器遷移到Receiver上來,你需要如下工作:

  • 首先你的自定義接收器類型需要從 org.apache.spark.streaming.receiver.Receiver繼承,而不再是org.apache.spark.streaming.dstream.NetworkReceiver。
  • 原先,我們需要在自定義接收器中創建一個BlockGenerator來保存接收到的數據。你必須顯示的實現onStart() 和 onStop() 方法。而在新的Receiver class中,這些都不需要了,你只需要調用它的store系列的方法就能將數據保存到Spark中。所以你接下來需要做的遷移工作就是,删除BlockGenerator對象(這個類型在Spark 1.0之後也沒有了~),然後用store(…)方法來保存接收到的數據。

基於Actor的接收器(Actor-based Receivers): 從actor class繼承後,並實現了org.apache.spark.streaming.receiver.Receiver 後,即可從Akka Actors中獲取數據。獲取數據的類被重命名為  org.apache.spark.streaming.receiver.ActorHelper ,而保存數據的pushBlocks(…)方法也被重命名為 store(…)。其他org.apache.spark.streaming.receivers包中的工具類也被移到  org.apache.spark.streaming.receiver 包下並重命名,新的類名應該比之前更加清晰。



下一步

原創文章,轉載請注明: 轉載自並發編程網 – ifeve.com本文鏈接地址: 《Spark官方文檔》Spark Streaming編程指南

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201070610003041.html