SpringCloud-Stream實戰入門(二,瘋狂複習半個月

程序員妹控 2021-09-19 20:20:51 阅读数:483

springcloud-stream springcloud stream

如果你的消費者端的代碼還像以前一樣寫,接收的是String類型的,你會發現接收到的字符串是JSON格式的,因為發送端默認會把對象轉換為JSON格式進行發送SpringCloud-Stream實戰入門(二,瘋狂複習半個月_後端

這裏也可以直接以User類型進行接收,此時Spring Cloud將自動將接收到的JSON字符串轉換為消費者方法的入參對象,比如下面這樣。SpringCloud-Stream實戰入門(二,瘋狂複習半個月_Java_02

上面應用的名為output1和input1的Binding的配置如下。

#發布者配置

spring.cloud.stream.bindings.output1.destination=test-topic1

#消費者配置

spring.cloud.stream.bindings.input1.destination=test-topic1

spring.cloud.stream.bindings.input1.group=test-group1

自定義MessageConverter

Spring Cloud Stream在進行對象和JSON轉換時默認使用的是org.springframework.messaging.converter.MappingJackson2MessageConverter

有時候我們也可以實現自己的MessageConverter,在實現自定義的MessageConverter時通常不直接實現MessageConverter接口而是繼承org.springframework.messaging.converter.AbstractMessageConverter,然後重寫其supports(…)、convertFromInternal(…)和convertToInternal

比如下面的代碼中實現了一個只能轉換User對象的MessageConverter底層使用的是FastJson,在進行發送消息時重置了user的name屬性,加上了t-前綴。

SpringCloud-Stream實戰入門(二,瘋狂複習半個月_程序員_03

然後為了使它生效,我們需要把它定義為一個bean,並標注@StreamMessageConverter,比如下面這樣。SpringCloud-Stream實戰入門(二,瘋狂複習半個月_Java_04

如果在轉換為JSON時不希望使用默認的Jackson實現,而希望使用Alibaba的FastJson也是可以的

FastJson已經提供了MessageConverter的實現類com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter。

所以如果希望使用FastJson的實現,只需要進行類似如下這樣的定義。SpringCloud-Stream實戰入門(二,瘋狂複習半個月_Java_05

异常處理

在接收消息時,如果消息處理失敗,Spring Cloud會把失敗的消息轉到名為<destination>.<group>.errors的Channel,並可通過@ServiceActivator方法進行接收。比如有如下Binding定義。

spring.cloud.stream.bindings.input1.destination=test-topic1

spring.cloud.stream.bindings.input1.group=test-group1

當消息消費失敗時將轉發包裝了失敗原因的消息到名為test-topic1.test-group1.errors的Channel,我們可以通過在某個bean中定義一個@ServiceActivator方法處理相應的异常。

SpringCloud-Stream實戰入門(二,瘋狂複習半個月_Java_06

上面介紹的方法是處理某個特定Binding的消息消費异常的,如果你的消息消費异常的處理方式都是一樣的,你可能希望有一個統一的入口來處理所有的消息消費异常,而不用管當前的Binding是什麼。

Spring Cloud Stream也考慮到了這一點,它提供了一個名為errorChannel的Binding,所有的消息消費异常都會轉發到該Binding,所以如果我們想有一個統一的處理所有的消息消費异常的入口則可以定義一個Binding名為errorChannel的@StreamListener方法。SpringCloud-Stream實戰入門(二,瘋狂複習半個月_程序員_07

重試機制

Spring Cloud Stream在進行消息的接收處理時也是利用Spring Retry進行了包裝的。當消息消費失敗時默認會最多試3次(加上第一次),使用的是Spring Retry的RetryTemplate的默認配置。

如果默認的重試邏輯不能滿足你的需求,你也可以定義自己的RetryTemplate,但是需要使用@StreamRetryTemplate進行標注(StreamRetryTemplate上標注了@Bean)。SpringCloud-Stream實戰入門(二,瘋狂複習半個月_後端_08

上面的代碼中就應用了自定義的RetryTemplate,指定最多嘗試5次的消息消費嘗試5次後仍然失敗將走前面介紹的异常處理邏輯,即投遞消息到相應的异常處理的Channel。

也可以通過配置的方式,最多嘗試次數通過Binding的consumer.maxAttempts參數進行指定,如果需要指定名為input1的Binding在消費某條消息時最多允許嘗試5次,則可以進行如下定義。如果將該屬性值定義為1,則錶示不允許進行重試。

spring.cloud.stream.bindings.input1.consumer.maxAttempts=5

定制消費者線程數

默認情况下,每個Binding對應的消費者線程數是1,可以通過spring.cloud.stream.bindings.<bindingName>.consumer.concurrency屬性進行指定,比如下面的配置就指定了名稱為input1的Binding的消費者線程是3即Spring Cloud Stream將同時啟動3個線程用於從名為input1的Binding進行消費

spring.cloud.stream.bindings.input1.consumer.concurrency=3

總結

互聯網大廠比較喜歡的人才特點:對技術有熱情,强硬的技術基礎實力;主動,善於團隊協作,善於總結思考。無論是哪家公司,都很重視高並發高可用技術,重視基礎,所以千萬別小看任何知識。面試是一個雙向選擇的過程,不要抱著畏懼的心態去面試,不利於自己的發揮。同時看中的應該不止薪資,還要看你是不是真的喜歡這家公司,是不是能真的得到鍛煉。其實我寫了這麼多,只是我自己的總結,並不一定適用於所有人,相信經過一些面試,大家都會有這些感觸。

**另外本人還整理收藏了2021年多家公司面試知識點以及各種技術點整理 **

 CodeChina開源項目:【一線大廠Java面試題解析+核心總結學習筆記+最新講解視頻】

下面有部分截圖希望能對大家有所幫助。

SpringCloud-Stream實戰入門(二,瘋狂複習半個月_後端_09

版权声明:本文为[程序員妹控]所创,转载请带上原文链接,感谢。 https://gsmany.com/2021/09/20210919202050610U.html