大數據培訓 | 電商用戶行為分析之訂單支付實時監控

InfoQ 2022-06-23 11:42:38 阅读数:352

商用分析支付
​在電商網站中,訂單的支付作為直接與營銷收入掛鉤的一環,在業務流程中非常重要。對於訂單而言,為了正確控制業務流程,也為了增加用戶的支付意願,網站一般會設置一個支付失效時間,超過一段時間不支付的訂單就會被取消。另外,對於訂單的支付,我們還應保證用戶支付的正確性,這可以通過第三方支付平臺的交易數據來做一個實時對賬。在接下來的內容中,我們將實現這兩個需求。

模塊創建和數據准備

同樣地,在 UserBehaviorAnalysis 下新建一個 maven module 作為子項目,命名為 OrderTimeoutDetect。在這個子模塊中,我們同樣將會用到 flink 的 CEP 庫來實現事件流的模式匹配,所以需要在 pom 文件中引入 CEP 的相關依賴:

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

同樣,在 src/main/目錄下,將默認源文件目錄 java 改名為 scala。

更多Java –大數據 – 前端 – UI/UE - Android - 人工智能資料下載,可訪問百度:尚矽穀官網(www.atguigu.com)

代碼實現

在電商平臺中,最終創造收入和利潤的是用戶下單購買的環節;更具體一點,是用戶真正完成支付動作的時候。用戶下單的行為可以錶明用戶對商品的需求,但在現實中,並不是每次下單都會被用戶立刻支付。當拖延一段時間後,用戶支付的意願會降低。所以為了讓用戶更有緊迫感從而提高支付轉化率,同時也為了防範訂單支付環節的安全風險,電商網站往往會對訂單狀態進行監控,設置一個失效時間(比如 15 分鐘),如果下單後一段時間仍未支付,訂單就會被取消。

使用 CEP 實現

我們首先還是利用 CEP 庫來實現這個功能。我們先將事件流按照訂單號 orderId分流,然後定義這樣的一個事件模式:在 15 分鐘內,事件“create”與“pay”非嚴格緊鄰:

val orderPayPattern = Pattern.beginOrderEvent

.where(_.eventType == &quot;create&quot;)

.followedBy(&quot;follow&quot;)

.where(_.eventType == &quot;pay&quot;)

.within(Time.seconds(5))

這樣調用.select 方法時,就可以同時獲取到匹配出的事件和超時未匹配的事件了。

在 src/main/scala 下繼續創建 OrderTimeout.scala 文件,新建一個單例對象。定義樣例類 OrderEvent,這是輸入的訂單事件流;另外還有 OrderResult,這是輸出顯示 的 訂 單 狀 態 結 果 。 訂 單 數 據 也 本 應 該 從 UserBehavior 日 志 裏 提 取 , 由 於UserBehavior.csv 中沒有做相關埋點,我們從另一個文件 OrderLog.csv 中讀取登錄數據_
大數據培訓

完整代碼如下:

OrderTimeoutDetect/src/main/scala/OrderTimeout.scala

case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)

case class OrderResult(orderId: Long, eventType: String)

object OrderTimeout {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val orderEventStream = env.readTextFile(&quot;YOUR_PATH\resources\OrderLog.csv&quot;)

.map( data => {

val dataArray = data.split(&quot;,&quot;)

OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)

})

.assignAscendingTimestamps(_.eventTime * 1000)

// 定義一個帶匹配時間窗口的模式

val orderPayPattern = Pattern.beginOrderEvent

.where(_.eventType == &quot;create&quot;)

.followedBy(&quot;follow&quot;)

.where(_.eventType == &quot;pay&quot;)

.within(Time.minutes(15))

// 定義一個輸出標簽

val orderTimeoutOutput = OutputTagOrderResult

// 訂單事件流根據 orderId 分流,然後在每一條流中匹配出定義好的模式

val patternStream = CEP.pattern(orderEventStream.keyBy(&quot;orderId&quot;), orderPayPattern)

val completedResult = patternStream.select(orderTimeoutOutput) {

// 對於已超時的部分模式匹配的事件序列,會調用這個函數

(pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {

val createOrder = pattern.get(&quot;begin&quot;)

OrderResult(createOrder.get.iterator.next().orderId, &quot;timeout&quot;)

}

} {

// 檢測到定義好的模式序列時,就會調用這個函數

pattern: Map[String, Iterable[OrderEvent]] => {

val payOrder = pattern.get(&quot;follow&quot;)

OrderResult(payOrder.get.iterator.next().orderId, &quot;success&quot;)

}

}

// 拿到同一輸出標簽中的 timeout 匹配結果(流)

val timeoutResult = completedResult.getSideOutput(orderTimeoutOutput)

completedResult.print()

timeoutResult.print()

env.execute(&quot;Order Timeout Detect Job&quot;)

}

}

使用 Process Function 實現

我們同樣可以利用 Process Function,自定義實現檢測訂單超時的功能。為了簡化問題,我們只考慮超時報警的情形,在 pay 事件超時未發生的情况下,輸出超時報警信息。

一個簡單的思路是,可以在訂單的 create 事件到來後注册定時器,15 分鐘後觸發;然後再用一個布爾類型的 Value 狀態來作為標識比特,錶明 pay 事件是否發生過。如果 pay 事件已經發生,狀態被置為 true,那麼就不再需要做什麼操作;而如果 pay事件一直沒來,狀態一直為 false,到定時器觸發時,就應該輸出超時報警信息_
大數據視頻

具體代碼實現如下:

OrderTimeoutDetect/src/main/scala/OrderTimeoutWithoutCep.scala

object OrderTimeoutWithoutCep {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

val orderEventStream = env.readTextFile(&quot;YOUR_PATH\resources\OrderLog.csv&quot;)

.map( data => {

val dataArray = data.split(&quot;,&quot;)

OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)

})

更多Java –大數據 – 前端 – UI/UE - Android - 人工智能資料下載,可訪問百度:尚矽穀官網(www.atguigu.com)

.assignAscendingTimestamps(_.eventTime * 1000)

.keyBy(_.orderId)

// 自定義一個 process function,進行 order 的超時檢測,輸出超時報警信息

val timeoutWarningStream = orderEventStream

.process(new OrderTimeoutAlert)

timeoutWarningStream.print()

env.execute()

}

class OrderTimeoutAlert extends KeyedProcessFunction[Long, OrderEvent, OrderResult]

{

lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new

ValueStateDescriptor[Boolean](&quot;ispayed-state&quot;, classOf[Boolean]))

override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long,

OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {

val isPayed = isPayedState.value()

if (value.eventType == &quot;create&quot; && !isPayed) {

ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L + 15 * 60 *

1000L)

} else if (value.eventType == &quot;pay&quot;) {

isPayedState.update(true)

}

}

override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent,

OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {

val isPayed = isPayedState.value()

if (!isPayed) {

out.collect(OrderResult(ctx.getCurrentKey, &quot;order timeout&quot;))

}

isPayedState.clear()

}

}

}

來自兩條流的訂單交易匹配

對於訂單支付事件,用戶支付完成其實並不算完,我們還得確認平臺賬戶上是否到賬了。而往往這會來自不同的日志信息,所以我們要同時讀入兩條流的數據來做 合 並 處 理 。 這 裏 我 們 利 用 connect 將 兩 條 流 進 行 連 接 , 然 後 用 自 定 義 的CoProcessFunction 進行處理。

具體代碼如下:

TxMatchDetect/src/main/scala/TxMatch

case class OrderEvent( orderId: Long, eventType: String, txId: String, eventTime: Long )

case class ReceiptEvent( txId: String, payChannel: String, eventTime: Long )

object TxMatch {

val unmatchedPays = new OutputTagOrderEvent

val unmatchedReceipts = new OutputTagReceiptEvent

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val orderEventStream = env.readTextFile(&quot;YOUR_PATH\resources\OrderLog.csv&quot;)

.map( data => {

val dataArray = data.split(&quot;,&quot;)

OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2),

dataArray(3).toLong)

})

.filter(_.txId != &quot;&quot;)

.assignAscendingTimestamps(_.eventTime * 1000L)

.keyBy(_.txId)

val receiptEventStream = env.readTextFile(&quot;YOUR_PATH\resources\ReceiptLog.csv&quot;)

.map( data => {

val dataArray = data.split(&quot;,&quot;)

ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)

})

.assignAscendingTimestamps(_.eventTime * 1000L)

.keyBy(_.txId)

val processedStream = orderEventStream

.connect(receiptEventStream)

.process(new TxMatchDetection)

processedStream.getSideOutput(unmatchedPays).print(&quot;unmatched pays&quot;)

processedStream.getSideOutput(unmatchedReceipts).print(&quot;unmatched receipts&quot;)

processedStream.print(&quot;processed&quot;)

env.execute()

}

class TxMatchDetection extends CoProcessFunction[OrderEvent, ReceiptEvent,

(OrderEvent, ReceiptEvent)]{

lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new

ValueStateDescriptorOrderEvent )

lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new

ValueStateDescriptor[ReceiptEvent](&quot;receipt-state&quot;, classOf[ReceiptEvent]) )

override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent,

ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent,

ReceiptEvent)]): Unit = {

val receipt = receiptState.value()

if( receipt != null ){

receiptState.clear()

out.collect((pay, receipt))

} else{

payState.update(pay)

ctx.timerService().registerEventTimeTimer(pay.eventTime * 1000L)

}

}

override def processElement2(receipt: ReceiptEvent, ctx:

CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out:

Collector[(OrderEvent, ReceiptEvent)]): Unit = {

val payment = payState.value()

if( payment != null ){

payState.clear()

out.collect((payment, receipt))

} else{

receiptState.update(receipt)

ctx.timerService().registerEventTimeTimer(receipt.eventTime * 1000L)

}

}

更多Java –大數據 – 前端 – UI/UE - Android - 人工智能資料下載,可訪問百度:尚矽穀官網(www.atguigu.com)

override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent,

ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent,

ReceiptEvent)]): Unit = {

if ( payState.value() != null ){

ctx.output(unmatchedPays, payState.value())

}

if ( receiptState.value() != null ){

ctx.output(unmatchedReceipts, receiptState.value())

}

payState.clear()

receiptState.clear()

}

}

}

版权声明:本文为[InfoQ]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/174/202206231134478532.html