上篇介紹了kafka at-least-once消費模式。kafka消費模式以commit-offset的時間節點代錶不同的消費模式,分別是:at-least-once, at-most-once, exactly-once。上篇介紹的at-least-once消費模式是通過kafka自身的auto-commit實現的。事後想了想,這個應該算是at-most-once模式,因為消費過程不會影響auto-commit,kafka在每個設定的間隔都會自動進行offset-commit。如果這個間隔够短,比整個消費過程短,那麼在完成消費過程前就已經保存了offset,所以是at-most-once模式。不過,如果確定這個間隔一定大於消費過程,那麼又變成了at-least-once模式。具體能實現什麼消費模式並不能明確,因為auto-commit是無法從外部進行控制的。看來實現正真意義上的at-least-once消費模式還必須取得offset-commit的控制權才行。

alpakka-kafka提供了一種CommittableSource:

 def committableSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[CommittableMessage[K, V], Control] {...}

從這個CommittableSource輸出的元素是CommittableMessage[K,V]:

 final case class CommittableMessage[K, V](
record: ConsumerRecord[K, V],
committableOffset: CommittableOffset
)

這個CommittableMessage除原始消息之外還提供了CommittableOffset。通過Flow或Sink都可以進行offset-commit。alpakka-kafka提供了Committer,通過Committer.sink, Committer.Flow幫助實現offset-commit,Committer.flow如下:

 Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1) { msg =>
updateStock.map(_ => msg.committableOffset)
}
.via(Committer.flow(committerDefaults.withMaxBatch(1)))
.to(Sink.seq)
.run()

或Committer.sink:

 Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1) { msg =>
updateStock.map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(Keep.left)
.run()

下面是一個具體的at-least-once示範:

 val committerSettings = CommitterSettings(sys).withMaxBatch(commitMaxBatch)
val stkTxns = new DocToStkTxns(trace)
val curStk = new CurStk(trace)
val pcmTxns = new PcmTxns(trace) val commitableSource = Consumer
.committableSource(consumerSettings, subscription) def start =
(1 to numReaders).toList.map { _ =>
RestartSource
.onFailuresWithBackoff(restartSource) { () => commitableSource }
// .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for {
_ <- FastFuture.successful {
log.step(s"AtLeastOnceReaderGroup-msg: ${msg.record}")(Messages.MachineId("", ""))
}
_ <- stkTxns.docToStkTxns(msg.record.value())
pmsg <- FastFuture.successful {
log.step(s"AtLeastOnceReaderGroup-docToStkTxns: ${msg.record}")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
curstks <- curStk.updateStk(msg.record.value())
pmsg <- FastFuture.successful {
log.step(s"AtLeastOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
pcm <- pcmTxns.writePcmTxn(msg.record.value())
pmsg <- FastFuture.successful {
log.step(s"AtLeastOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
_ <- pcmTxns.updatePcm(msg.record.value())
} yield "Completed"
FastFuture.successful(msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(Keep.left)
.run()
}

消費過程其它部分的設計考慮和實現,如多線程、异常處理等可參考上篇討論。

對於at-most-once消費模式的實現,alpakka-kafka提供了atMostOnceSource:

 def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] = {...}

下面是用這個Source實現at-most-once的示範:

 val atmostonceSource = Consumer
.atMostOnceSource(consumerSettings, subscription) def start =
(1 to numReaders).toList.map { _ =>
RestartSource
.onFailuresWithBackoff(restartSource) { () => atmostonceSource }
// .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for {
_ <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-msg: $msg")(Messages.MachineId("", ""))
}
_ <- stkTxns.docToStkTxns(msg.value())
pmsg <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-docToStkTxns: $msg")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
_ <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: msg: $msg")(Messages.MachineId("", ""))
}
curstks <- curStk.updateStk(msg.value())
pmsg<- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
_ <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-writePcmTxn: msg: $msg")(Messages.MachineId("", ""))
}
pcm <- pcmTxns.writePcmTxn(msg.value())
pmsg <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
_ <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updatePcm: msg: $msg")(Messages.MachineId("", ""))
}
_ <- pcmTxns.updatePcm(msg.value())
_ <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: updatePcm-$msg")(Messages.MachineId("", ""))
}
} yield "Completed"
}
.toMat(Sink.seq)(Keep.left)
.run()
}

由於offset-commit和消息消費是兩個獨立的過程,無論如何努力都無法保證只讀一次,必須把這兩個過程合並成一個才有可能實現。所以,exactly-once可以通過數據庫系統的事務處理transaction-processing來實現,就是把offset-commit和數據更新兩個動作放到同一個事務transaction裏,通過事務處理的ACID原子特性保證兩個動作同進同退的一致性。這也意味著這個exactly-once消費模式必須在一個提供事務處理功能的數據庫系統裏實現,也代錶kafka-offset必須和其它交易數據一起存放在同一種數據庫裏。mongodb4.0以上支持事務處理,可以用來作示範。

首先,先研究一下exactly-once模式的框架:

 val mergedSource = Consumer
.plainPartitionedManualOffsetSource(consumerSettings,subscription,
loadOffsets)
.flatMapMerge(maxReaders, _._2)
.async.mapAsync(1) { msg =>
for {
cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed")
pmsg <- FastFuture.successful {
log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.mapAsync(1) { msg =>
for {
curstks <- curStk.updateStk(msg.value())
pmsg <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.toMat(Sink.seq)(Keep.left)
.run()
}
}

在上面的例子裏使用了plainPartitionedManualOffsetSource:

def plainPartitionedManualOffsetSource[K, V](
settings: ConsumerSettings[K, V],
subscription: AutoSubscription,
getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, Long]],
onRevoke: Set[TopicPartition] => Unit = _ => ()
): Source[(TopicPartition, Source[ConsumerRecord[K, V], NotUsed]), Control] = {...}

getOffsetsOnAssign提供指定partition的offset(從數據庫裏讀出指定partition的offset值),如下:

 private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = {
offsetStore.getOffsets(partitions)
} def getOffsets(partitions: Set[TopicPartition])(
implicit ec: ExecutionContext) = {
log.step(s"OffsetStore-getOffsets: ($partitions)")(Messages.MachineId("", "")) def getOffset(tp: TopicPartition) = {
val query = and(equal(KfkModels.SCHEMA.TOPIC, tp.topic()),
equal(KfkModels.SCHEMA.PARTITION,tp.partition()))
def offset: Future[Seq[Document]] = colOffset.find(query).toFuture()
for {
docs <- offset
ofs <- FastFuture.successful(if(docs.isEmpty) None
else Some(Offsets.fromDocument(docs.head)))
} yield ofs
}
val listFut = partitions.toList.map(getOffset)
val futList: Future[List[Option[KfkModels.Offsets]]] = FastFuture.sequence(listFut)
futList.map { oofs =>
oofs.foldRight(Map[TopicPartition,Long]()){(oof,m) =>
oof match {
case None => m
case ofs => m + (new TopicPartition(ofs.get.topic,ofs.get.partition) -> ofs.get.offset)
}
}
}
}

注意loadOffset的函數類型:  Set[TopicPartition] => Future[Map[TopicPartition, Long]],返回的是個Map[partition,offset]。

另外,plainPartitionedManualSource返回Source[...Source[ConsumerRecord[K, V]],要用flatMapMerge打平:

 /**
* Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by merging, where at most `breadth`
* substreams are being consumed at any given time.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*/
def flatMapMerge[T, M](breadth: Int, f: Out => Graph[SourceShape[T], M]): Repr[T] =
map(f).via(new FlattenMerge[T, M](breadth))

參數breadth代錶需合並的source數量。

還有,saveOffset和writeStkTxns在同一個事務處理裏:

 def docToStkTxns(jsonDoc: String, partition: Int, offset: Long, observable: SingleObservable[ClientSession]) = {
val bizDoc = fromJson[BizDoc](jsonDoc)
log.step(s"TxnalDocToStkTxns-docToStkTxns: $bizDoc")(Messages.MachineId("", "")) observable.map(clientSession => {
val transactionOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
clientSession.startTransaction(transactionOptions)
val txns = StkTxns.docToTxns(dbStkTxn,dbVtx,dbVendor,bizDoc,trace)
StkTxns.writeStkTxns(clientSession,colStkTxn,colPcm,txns,trace)
offsetStore.saveOffset(clientSession,partition,offset)
clientSession.commitTransaction()
clientSession
}) }

注意:mongodb的事務處理必須在複制集replica-set上進行。這也很容易理解,在複制集上才方便交易回滾rollback。

完整的exactly-once實現代碼如下:

 private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = {
offsetStore.getOffsets(partitions)
} val mergedSource = Consumer
.plainPartitionedManualOffsetSource(consumerSettings,subscription,
loadOffsets)
.flatMapMerge(maxReaders, _._2) def start = {
(1 to numReaders).toList.map {_ =>
RestartSource
.onFailuresWithBackoff(restartSource) { () => mergedSource }
// .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for {
cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed")
pmsg <- FastFuture.successful {
log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
curstks <- curStk.updateStk(msg.value())
pmsg <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
pcm <- pcmTxns.writePcmTxn(msg.value())
pmsg <- FastFuture.successful {
log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", ""))
msg
}
} yield pmsg
}
.async.mapAsync(1) { msg =>
for {
_ <- pcmTxns.updatePcm(msg.value())
} yield "Completed"
}
.toMat(Sink.seq)(Keep.left)
.run()
}
}

只有第一個异步階段使用了事務處理。也就是說保證了writeStkTxns只執行一次。這個函數的功能主要是把前端產生的交易全部固化。為了避免消費過程中出現异常中斷造成了前端交易的遺失或者重複入賬,必須保證前端交易只固化一次。其它階段的數據處理都是基於已正確固化的交易記錄的。如果出現問題,可以通過重算交易記錄獲取正確的狀態。為了保證平臺運行效率,選擇了不使用事務處理的方式更新數據。

alpakka-kafka(8)-kafka數據消費模式實現的更多相關文章

  1. Spark Streaming消費Kafka Direct方式數據零丟失實現

    使用場景 Spark Streaming實時消費kafka數據的時候,程序停止或者Kafka節點掛掉會導致數據丟失,Spark Streaming也沒有設置CheckPoint(據說比較雞肋,雖然可以 ...

  2. Flume簡介與使用(三)——Kafka Sink消費數據之Kafka安裝

    前面已經介紹了如何利用Thrift Source生產數據,今天介紹如何用Kafka Sink消費數據. 其實之前已經在Flume配置文件裏設置了用Kafka Sink消費數據 agent1.sinks ...

  3. Kafka作為大數據的核心技術,你了解多少?

    Kafka作為大數據最核心的技術,作為一名技術開發人員,如果你不懂,那麼就真的“out”了.DT時代的快速發展離不開kafka,所以了解kafka,應用kafka就成為一種必須. 什麼是kafka?K ...

  4. Spark Streaming和Kafka整合保證數據零丟失

    當我們正確地部署好Spark Streaming,我們就可以使用Spark Streaming提供的零數據丟失機制.為了體驗這個關鍵的特性,你需要滿足以下幾個先决條件: 1.輸入的數據來自可靠的數據源 ...

  5. spark-streaming集成Kafka處理實時數據

    在這篇文章裏,我們模擬了一個場景,實時分析訂單數據,統計實時收益. 場景模擬 我試圖覆蓋工程上最為常用的一個場景: 1)首先,向Kafka裏實時的寫入訂單數據,JSON格式,包含訂單ID-訂單類型-訂 ...

  6. kafka 清除topic數據脚本

    原 kafka 清除topic數據脚本 2018年07月25日 16:57:13 pete1223 閱讀數:1028     #!/bin/sh       param=$1   echo " ...

  7. kafka查看消費數據

    一.如何查看 在老版本中,使用kafka-run-class.sh 脚本進行查看.但是對於最新版本,kafka-run-class.sh 已經不能使用,必須使用另外一個脚本才行,它就是kafka-co ...

  8. 【Kafka】Kafka數據可靠性深度解讀

    轉帖:http://www.infoq.com/cn/articles/depth-interpretation-of-kafka-data-reliability Kafka起初是由LinkedIn ...

  9. Kafka如何保證數據不丟失

    Kafka如何保證數據不丟失 1.生產者數據的不丟失 kafka的ack機制:在kafka發送數據的時候,每次發送消息都會有一個確認反饋機制,確保消息正常的能够被收到,其中狀態有0,1,-1. 如果是 ...

  10. 如果數據需要被多個應用程序消費的話,推薦使用 Kafka,如果數據只是面向 Hadoop 的,可以使用 Flume

    https://www.ibm.com/developerworks/cn/opensource/os-cn-kafka/index.html Kafka 與 Flume 很多功能確實是重複的.以下是 ...

隨機推薦

  1. SYMMETRIC MULTIPROCESSORS

    COMPUTER ORGANIZATION AND ARCHITECTURE DESIGNING FOR PERFORMANCE NINTH EDITION As demands for perfor ...

  2. 即將翻譯 Building The New Financial Times Web App

    <金融時報>這份Web APP 經驗的總結,寫得非常詳細,也提到Web APP制作中常遇到的問題.為麼他們就沒有點透Bug - -! Building The New Financial ...

  3. js獲取瀏覽器語言(ie、ff、chrome)、contextpath

    /js獲取瀏覽器語言(ie.ff.chrome) var language_en_us = "en-us"; var language_zh_cn = "zh-cn&qu ...

  4. Android 中的 Service 全面總結(轉載)

    轉載地址:http://www.cnblogs.com/newcj/archive/2011/05/30/2061370.html 感謝作者 Android 中的 Service 全面總結 1.Ser ...

  5. CSS知識點總結

    1.選擇器 參考鏈接:十分鐘搞定CSS選擇器-Samaritans  CSS選擇器筆記-阮一峰  CSS選擇器-w3school  MDN 參考書籍:<CSS高效開發指南> 2.布局 2. ...

  6. oracle兩種導出導入方式,即imp與impdp之比較

    盡管使用了很多次impexp及impdpexpdp但是使用起來還是會遇到很多問題,現總結如下: 應用:將一個用戶的所有錶結構及索引,觸發器,過程,函數等導入到另一用戶裏 imp/exp 導出用戶錶結構 ...

  7. 編寫android的widget

    以前對這個東西很感興趣,因為確實方便,如今有時間了來做一個例子 首先要定義一個layout(widgetview.xml)和一個配置文件(widgetconfig.xml) <?xml vers ...

  8. HDU -1864最大報銷額(01背包)

    這道題屬於簡單的01背包,但是背包問題還算簡單,就是前面的細節處理的時候要注意,題意大致說了三條限制吧 1. 只有a, b, c 三種類型的發票可以報銷,其它的一律不報銷 2. 物品單項的報銷額不超過 ...

  9. delphi怎麼實現全選的功能

    1. SelectAll 可以實現全選功能 Delphi/Pascal code edit1.SelectAll; // Delphi/Pascal code RichEdit1.SelStart:= ...

  10. MPAndroidChart——餅圖

    MPAndroidChart--餅圖 MPAndroidChart是安卓下的一個開源圖形庫,很多效果,簡單看幾個效果圖 Github地址:https://github.com/PhilJay/MPAn ...