alpakka-kafka(8)-kafka數據消費模式實現

雪川大蟲 2021-08-15 14:27:32 阅读数:398

本文一共[544]字,预计阅读时长:1分钟~
alpakka-kafka alpakka kafka kafka 模式

  上篇介紹了kafka at-least-once消費模式。kafka消費模式以commit-offset的時間節點代錶不同的消費模式,分別是:at-least-once, at-most-once, exactly-once。上篇介紹的at-least-once消費模式是通過kafka自身的auto-commit實現的。事後想了想,這個應該算是at-most-once模式,因為消費過程不會影響auto-commit,kafka在每個設定的間隔都會自動進行offset-commit。如果這個間隔够短,比整個消費過程短,那麼在完成消費過程前就已經保存了offset,所以是at-most-once模式。不過,如果確定這個間隔一定大於消費過程,那麼又變成了at-least-once模式。具體能實現什麼消費模式並不能明確,因為auto-commit是無法從外部進行控制的。看來實現正真意義上的at-least-once消費模式還必須取得offset-commit的控制權才行。

alpakka-kafka提供了一種CommittableSource:

 def committableSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[CommittableMessage[K, V], Control] {...}

從這個CommittableSource輸出的元素是CommittableMessage[K,V]: 

 final case class CommittableMessage[K, V]( record: ConsumerRecord[K, V], committableOffset: CommittableOffset )

這個CommittableMessage除原始消息之外還提供了CommittableOffset。通過Flow或Sink都可以進行offset-commit。alpakka-kafka提供了Committer,通過Committer.sink, Committer.Flow幫助實現offset-commit,Committer.flow如下:

 Consumer .committableSource(consumerSettings, Subscriptions.topics(topic)) .mapAsync(1) { msg => updateStock.map(_ => msg.committableOffset) } .via(Committer.flow(committerDefaults.withMaxBatch(1))) .to(Sink.seq) .run()

或Committer.sink:

 Consumer .committableSource(consumerSettings, Subscriptions.topics(topic)) .mapAsync(1) { msg => updateStock.map(_ => msg.committableOffset) } .toMat(Committer.sink(committerSettings))(Keep.left) .run()

下面是一個具體的at-least-once示範:

 val committerSettings = CommitterSettings(sys).withMaxBatch(commitMaxBatch) val stkTxns = new DocToStkTxns(trace) val curStk = new CurStk(trace) val pcmTxns = new PcmTxns(trace) val commitableSource = Consumer .committableSource(consumerSettings, subscription) def start = (1 to numReaders).toList.map { _ => RestartSource .onFailuresWithBackoff(restartSource) { () => commitableSource } // .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for { _ <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-msg: ${msg.record}")(Messages.MachineId("", "")) } _ <- stkTxns.docToStkTxns(msg.record.value()) pmsg <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-docToStkTxns: ${msg.record}")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { curstks <- curStk.updateStk(msg.record.value()) pmsg <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { pcm <- pcmTxns.writePcmTxn(msg.record.value()) pmsg <- FastFuture.successful { log.step(s"AtLeastOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { _ <- pcmTxns.updatePcm(msg.record.value()) } yield "Completed" FastFuture.successful(msg.committableOffset) } .toMat(Committer.sink(committerSettings))(Keep.left) .run() }

消費過程其它部分的設計考慮和實現,如多線程、异常處理等可參考上篇討論。

對於at-most-once消費模式的實現,alpakka-kafka提供了atMostOnceSource:

 

 def atMostOnceSource[K, V](settings: ConsumerSettings[K, V],
subscription: Subscription): Source[ConsumerRecord[K, V], Control] = {...}

下面是用這個Source實現at-most-once的示範:

 val atmostonceSource = Consumer .atMostOnceSource(consumerSettings, subscription) def start = (1 to numReaders).toList.map { _ => RestartSource .onFailuresWithBackoff(restartSource) { () => atmostonceSource } // .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for { _ <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-msg: $msg")(Messages.MachineId("", "")) } _ <- stkTxns.docToStkTxns(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-docToStkTxns: $msg")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { _ <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: msg: $msg")(Messages.MachineId("", "")) } curstks <- curStk.updateStk(msg.value()) pmsg<- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { _ <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-writePcmTxn: msg: $msg")(Messages.MachineId("", "")) } pcm <- pcmTxns.writePcmTxn(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { _ <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updatePcm: msg: $msg")(Messages.MachineId("", "")) } _ <- pcmTxns.updatePcm(msg.value()) _ <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: updatePcm-$msg")(Messages.MachineId("", "")) } } yield "Completed" } .toMat(Sink.seq)(Keep.left) .run() }

由於offset-commit和消息消費是兩個獨立的過程,無論如何努力都無法保證只讀一次,必須把這兩個過程合並成一個才有可能實現。所以,exactly-once可以通過數據庫系統的事務處理transaction-processing來實現,就是把offset-commit和數據更新兩個動作放到同一個事務transaction裏,通過事務處理的ACID原子特性保證兩個動作同進同退的一致性。這也意味著這個exactly-once消費模式必須在一個提供事務處理功能的數據庫系統裏實現,也代錶kafka-offset必須和其它交易數據一起存放在同一種數據庫裏。mongodb4.0以上支持事務處理,可以用來作示範。

首先,先研究一下exactly-once模式的框架:

 

 val mergedSource = Consumer .plainPartitionedManualOffsetSource(consumerSettings,subscription, loadOffsets) .flatMapMerge(maxReaders, _._2) .async.mapAsync(1) { msg =>
for { cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed") pmsg <- FastFuture.successful { log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", "")) msg } } yield pmsg } .mapAsync(1) { msg =>
for { curstks <- curStk.updateStk(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", "")) msg } } yield pmsg } .toMat(Sink.seq)(Keep.left) .run() } }

 

在上面的例子裏使用了plainPartitionedManualOffsetSource:

def plainPartitionedManualOffsetSource[K, V]( settings: ConsumerSettings[K, V], subscription: AutoSubscription, getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, Long]], onRevoke: Set[TopicPartition] => Unit = _ => () ): Source[(TopicPartition, Source[ConsumerRecord[K, V], NotUsed]), Control] = {...}

getOffsetsOnAssign提供指定partition的offset(從數據庫裏讀出指定partition的offset值),如下:

 private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = { offsetStore.getOffsets(partitions) } def getOffsets(partitions: Set[TopicPartition])( implicit ec: ExecutionContext) = { log.step(s"OffsetStore-getOffsets: ($partitions)")(Messages.MachineId("", "")) def getOffset(tp: TopicPartition) = { val query = and(equal(KfkModels.SCHEMA.TOPIC, tp.topic()), equal(KfkModels.SCHEMA.PARTITION,tp.partition())) def offset: Future[Seq[Document]] = colOffset.find(query).toFuture() for { docs <- offset ofs <- FastFuture.successful(if(docs.isEmpty) None else Some(Offsets.fromDocument(docs.head))) } yield ofs } val listFut = partitions.toList.map(getOffset) val futList: Future[List[Option[KfkModels.Offsets]]] = FastFuture.sequence(listFut) futList.map { oofs => oofs.foldRight(Map[TopicPartition,Long]()){(oof,m) => oof match { case None => m case ofs => m + (new TopicPartition(ofs.get.topic,ofs.get.partition) -> ofs.get.offset) } } } }

注意loadOffset的函數類型:  Set[TopicPartition] => Future[Map[TopicPartition, Long]],返回的是個Map[partition,offset]。

另外,plainPartitionedManualSource返回Source[...Source[ConsumerRecord[K, V]],要用flatMapMerge打平:

 /** * Transform each input element into a `Source` of output elements that is * then flattened into the output stream by merging, where at most `breadth` * substreams are being consumed at any given time. * * '''Emits when''' a currently consumed substream has an element available * * '''Backpressures when''' downstream backpressures * * '''Completes when''' upstream completes and all consumed substreams complete * * '''Cancels when''' downstream cancels */ def flatMapMerge[T, M](breadth: Int, f: Out => Graph[SourceShape[T], M]): Repr[T] = map(f).via(new FlattenMerge[T, M](breadth))

參數breadth代錶需合並的source數量。

還有,saveOffset和writeStkTxns在同一個事務處理裏:

 def docToStkTxns(jsonDoc: String, partition: Int, offset: Long, observable: SingleObservable[ClientSession]) = { val bizDoc = fromJson[BizDoc](jsonDoc) log.step(s"TxnalDocToStkTxns-docToStkTxns: $bizDoc")(Messages.MachineId("", "")) observable.map(clientSession => { val transactionOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build() clientSession.startTransaction(transactionOptions) val txns = StkTxns.docToTxns(dbStkTxn,dbVtx,dbVendor,bizDoc,trace) StkTxns.writeStkTxns(clientSession,colStkTxn,colPcm,txns,trace) offsetStore.saveOffset(clientSession,partition,offset) clientSession.commitTransaction() clientSession }) }

注意:mongodb的事務處理必須在複制集replica-set上進行。這也很容易理解,在複制集上才方便交易回滾rollback。

完整的exactly-once實現代碼如下:

 private def loadOffsets(partitions: Set[TopicPartition]): Future[Map[TopicPartition,Long]] = { offsetStore.getOffsets(partitions) } val mergedSource = Consumer .plainPartitionedManualOffsetSource(consumerSettings,subscription, loadOffsets) .flatMapMerge(maxReaders, _._2) def start = { (1 to numReaders).toList.map {_ => RestartSource .onFailuresWithBackoff(restartSource) { () => mergedSource } // .viaMat(KillSwitches.single)(Keep.right)
.async.mapAsync(1) { msg =>
for { cmt <- stkTxns.stkTxnsWithRetry(msg.value(), msg.partition(), msg.offset()).toFuture().map(_ => "Completed") pmsg <- FastFuture.successful { log.step(s"ExactlyOnceReaderGroup-stkTxnsWithRetry: committed transaction-$cmt")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { curstks <- curStk.updateStk(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: curstks-$curstks")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { pcm <- pcmTxns.writePcmTxn(msg.value()) pmsg <- FastFuture.successful { log.step(s"AtMostOnceReaderGroup-updateStk: writePcmTxn-$pcm")(Messages.MachineId("", "")) msg } } yield pmsg } .async.mapAsync(1) { msg =>
for { _ <- pcmTxns.updatePcm(msg.value()) } yield "Completed" } .toMat(Sink.seq)(Keep.left) .run() } }

只有第一個异步階段使用了事務處理。也就是說保證了writeStkTxns只執行一次。這個函數的功能主要是把前端產生的交易全部固化。為了避免消費過程中出現异常中斷造成了前端交易的遺失或者重複入賬,必須保證前端交易只固化一次。其它階段的數據處理都是基於已正確固化的交易記錄的。如果出現問題,可以通過重算交易記錄獲取正確的狀態。為了保證平臺運行效率,選擇了不使用事務處理的方式更新數據。

 

版权声明:本文为[雪川大蟲]所创,转载请带上原文链接,感谢。 https://gsmany.com/2021/08/20210815142709816A.html