Storm入門之第6章一個實際的例子

杜老師說 2022-01-07 12:59:35 阅读数:419

storm 例子

本文翻譯自《Getting Started With Storm》譯者:吳京潤    編輯:郭蕾 方騰飛

本章要闡述一個典型的網絡分析解决方案,而這類問題通常利用Hadoop批處理作為解决方案。與Hadoop不同的是,基於Storm的方案會實時輸出結果。

 

 

我們的這個例子有三個主要組件(見圖6-1)

  • 一個基於Node.js的web應用,用於測試系統
  • 一個Redis服務器,用於持久化數據
  • 一個Storm拓撲,用於分布式實時處理數據

架構概覽

圖6-1  架構概覽

NOTE:你如果想先把這個例子運行起來,請首先閱讀附錄C

基於Node.js的web應用

我們已經偽造了簡單的電子商務網站。這個網站只有三個頁面:一個主頁、一個產品頁和一個產品統計頁面。這個應用基於ExpressSocket.io兩個框架實現了向瀏覽器推送內容更新。制作這個應用的目的是為了讓你體驗Storm集群功能並看到處理結果,但它不是本書的重點,所以我們不會對它的頁面做更詳細描述。

主頁

這個頁面提供了全部有效產品的鏈接。它從Redis服務器獲取數據並在頁面上把它們顯示出來。這個頁面的URL是http://localhost:3000/。(見圖6-2,譯者注,圖6-2翻譯如下,全是文字就不制圖了)

有效產品:

DVD播放器(帶環繞立體聲系統)

全高清藍光dvd播放器

媒體播放器(帶USB 2.0接口)

全高清攝像機

防水高清攝像機

防震防水高清攝像機

反射式攝像機

雙核安卓智能手機(帶64GB SD卡)

普通移動電話

衛星電話

64GB SD卡

32GB SD卡

16GB SD卡

粉紅色智能手機殼

黑色智能手機殼

小山羊皮智能手機殼

圖6-2 首頁

產品頁

產品頁用來顯示指定產品的相關信息,例如,價格、名稱、分類。這個頁面的URL是:http://localhost:3000/product/:id。(見圖6-3,譯者注:全是文字不再制圖,翻譯如下)

產品頁:32英寸液晶電視

分類:電視機

價格:400

相關分類

圖6-3,產品頁

產品統計頁

這個頁面顯示通過收集用戶瀏覽站點,用Storm集群計算的統計信息。可以顯示為如下概要:瀏覽這個產品的用戶,在那些分類下面瀏覽了n次產品。該頁的URL是:http://localhost:3000/product/:id/stats。(見圖6-4,譯者注:全是文字,不再制圖,翻譯如下)

瀏覽了該產品的用戶也瀏覽了以下分類的產品:

1.攝像機

2.播放器

3.手機殼

4.存儲卡

圖6-4. 產品統計視圖

啟動這個Node.js web應用

首先啟動Redis服務器,然後執行如下命令啟動web應用:

 node webapp/app.js

為了向你演示,這個應用會自動向Redis填充一些產品數據作為樣本。

Storm拓撲

為這個系統搭建Storm拓撲的目標是改進產品統計的實時性。產品統計頁顯示了一個分類計數器列錶,用來顯示訪問了其它同類產品的用戶數。這樣可以幫助賣家了解他們的用戶需求。拓撲接收瀏覽日志,並更新產品統計結果(見圖6-5)。

圖6-5  Storm拓撲的輸入與輸出

我們的Storm拓撲有五個組件:一個spout向拓撲提供數據,四個bolt完成統計任務。

UsersNavigationSpout

從用戶瀏覽數據隊列讀取數據發送給拓撲

GetCategoryBolt

從Redis服務器讀取產品信息,向數據流添加產品分類

UserHistoryBolt

讀取用戶以前的產品瀏覽記錄,向下一步分發Product:Category鍵值對,在下一步更新計數器

ProductCategoriesCounterBolt

追踪用戶瀏覽特定分類下的商品次數

NewsNotifierBolt

通知web應用立即更新用戶界面

下圖展示了拓撲的工作方式(見圖6-6)

package storm.analytics;...public class TopologyStarter { public static void main(String[] args) { Logger.getRootLogger().removeAllAppenders(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("read-feed", new UsersNavigationSpout(),3); builder.setBolt("get-categ", new GetCategoryBolt(),3) .shuffleGrouping("read-feed"); builder.setBolt("user-history", new UserHistoryBolt(),5) .fieldsGrouping("get-categ", new Fields("user")); builder.setBolt("product-categ-counter", new ProductCategoriesCounterBolt(),5) .fieldsGrouping("user-history", new Fields("product")); builder.setBolt("news-notifier", new NewsNotifierBolt(),5) .shuffleGrouping("product-categ-counter"); Config conf = new Config(); conf.setDebug(true); conf.put("redis-host",REDIS_HOST); conf.put("redis-port",REDIS_PORT); conf.put("webserver", WEBSERVER); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("analytics", conf, builder.createTopology()); }}

Storm topology
Figure 6-6 Storm拓撲

UsersNavigationSpout

UsersNavigationSpout負責向拓撲提供瀏覽數據。每條瀏覽數據都是一個用戶瀏覽過的產品頁的引用。它們都被web應用保存在Redis服務器。我們一會兒就要看到更多信息。

你可以使用https://github.com/xetorthio/jedis從Redis服務器讀取數據,這是個極為輕巧簡單的Java Redis客戶端。

NOTE:下面的代碼塊就是相關代碼。

package storm.analytics;public class UsersNavigationSpout extends BaseRichSpout { Jedis jedis; ... @Override public void nextTuple() { String content = jedis.rpop("navigation"); if(content==null || "nil".equals(content)){ try { Thread.sleep(300); } catch (InterruptedException e) {} } else { JSONObject obj=(JSONObject)JSONValue.parse(content); String user = obj.get("user").toString(); String product = obj.get("product").toString(); String type = obj.get("type").toString(); HashMap<String, String> map = new HashMap<String, String>(); map.put("product", product); NavigationEntry entry = new NavigationEntry(user, type, map); collector.emit(new Values(user, entry)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("user", "otherdata")); }}

spout首先調用jedis.rpop(“navigation”)從Redis删除並返回”navigation”列錶最右邊的元素。如果列錶已經是空的,就休眠0.3秒,以免使用忙等待循環阻塞服務器。如果得到一條數據(數據是JSON格式),就解析它,並創建一個包含該數據的NavigationEntry POJO:

  • 瀏覽頁面的用戶
  • 用戶瀏覽的頁面類型
  • 由頁面類型决定的額外頁面信息。“產品”頁的額外信息就是用戶瀏覽的產品ID。

spout調用collector.emit(new Values(user, entry))分發包含這些信息的元組。這個元組的內容是拓撲裏下一個bolt的輸入。

GetCategoryBolt

這個bolt非常簡單。它只負責反序列化前面的spout分發的元組內容。如果這是產品頁的數據,就通過ProductsReader類從Redis讀取產品信息,然後基於輸入的元組再分發一個新的包含具體產品信息的元組:

  • 用戶
  • 產品
  • 產品類別
package storm.analytics;public class GetCategoryBolt extends BaseBasicBolt { private ProductReader reader; ... @Override public void execute(Tuple input, BasicOutputCollector collector) { NavigationEntry entry = (NavigationEntry)input.getValue(1); if("PRODUCT".equals(entry.getPageType())){ try { String product = (String)entry.getOtherData().get("product"); //調用產品條目API,得到產品信息 Product itm = reader.readItem(product); if(itm == null) { return; } String categ = itm.getCategory(); collector.emit(new Values(entry.getUserId(), product, categ)); } catch (Exception ex) { System.err.println("Error processing PRODUCT tuple"+ ex); ex.printStackTrace(); } } } ...}

正如前面所提到的, 使用ProductsReader類讀取產品具體信息。

package storm.analytics.utilities;...public class ProductReader { ... public Product readItem(String id) throws Exception{ String content = jedis.get(id); if(content == null || ("nil".equals(content))){ return null; } Object obj = JSONValue.parse(content); JSONObjectproduct = (JSONObject)obj; Product i = new Product((Long)product.get("id"), (String)product.get("title"), (Long)product.get("price"), (String)product.get("category")); return i; } ...}

UserHistoryBolt

UserHistoryBolt是整個應用的核心。它負責持續追踪每個用戶瀏覽過的產品,並决定應當增加計數的鍵值對。

我們使用Redis保存用戶的產品瀏覽曆史,同時基於性能方面的考慮,還應該保留一份本地副本。我們把數據訪問細節隱藏在方法getUserNavigationHistory(user)addProductToHistory(user,prodKey)裏,分別用來讀/寫訪問。它們的實現如下

package storm.analytics;...public class UserHistoryBolt extends BaseRichBolt{ @Override public void execute(Tuple input) { String user = input.getString(0); String prod1 = input.getString(1); String cat1 = input.getString(2); //產品鍵嵌入了產品類別信息 String prodKey = prod1+":"+cat1; Set productsNavigated = getUserNavigationHistory(user); //如果用戶以前瀏覽過->忽略它 if(!productsNavigated.contains(prodKey)) { //否則更新相關條目 for (String other : productsNavigated) { String[] ot = other.split(":"); String prod2 = ot[0]; String cat2 = ot[1]; collector.emit(new Values(prod1, cat2)); collector.emit(new Values(prod2, cat1)); } addProductToHistory(user, prodKey); } }}

需要注意的是,這個bolt的輸出是那些類別計數應當獲得增長的產品。

看一看代碼。這個bolt維護著一組被每個用戶瀏覽過的產品。值得注意的是,這個集合包含產品:類別鍵值對,而不是只有產品。這是因為你會在接下來的調用中用到類別信息,而且這樣也比每次從數據庫獲取更高效。這樣做的原因是基於以下考慮,產品可能只有一個類別,而且它在整個產品的生命周期當中不會改變。

讀取了用戶以前瀏覽過的產品集合之後(以及它們的類別),檢查當前產品以前有沒有被瀏覽過。如果瀏覽過,這條瀏覽數據就被忽略了。如果這是首次瀏覽,遍曆用戶瀏覽曆史,並執行collector.emit(new Values(prod1,cat2))分發一個元組,這個元組包含當前產品和所有瀏覽曆史類別。第二個元組包含所有瀏覽曆史產品和當前產品類別,由collectior.emit(new Values(prod2,cat1))。最後,將當前產品和它的類別添加到集合。

比如,假設用戶John有以下瀏覽曆史:

下面是將要處理的瀏覽數據

該用戶沒有瀏覽過產品8,因此你需要處理它。

因此要分發以下元組:

注意,左邊的產品和右邊的類別之間的關系應當作為一個整體遞增。

現在,讓我們看看這個Bolt用到的持久化實現。

public class UserHistoryBolt extends BaseRichBolt{ ... private Set getUserNavigationHistory(String user) { Set userHistory = usersNavigatedItems.get(user); if(userHistory == null) { userHistory = jedis.smembers(buildKey(user)); if(userHistory == null) userHistory = new HashSet(); usersNavigatedItems.put(user, userHistory); } return userHistory; } private void addProductToHistory(String user, String product) { Set userHistory = getUserNavigationHistory(user); userHistory.add(product); jedis.sadd(buildKey(user), product); } ...}

getUserNavigationHistory方法返回用戶瀏覽過的產品集。首先,通過usersNavigatedItems.get(user)方法試圖從本地內存得到用戶瀏覽曆史,否則,使用jedis.smembers(buildKey(user))從Redis服務器獲取,並把數據添加到本地數據結構usersNavigatedItems

當用戶瀏覽一個新產品時,調用addProductToHistory,通過userHistory.add(product)jedis.sadd(buildKey(user),product)同時更新內存數據結構和Redis服務器。

需要注意的是,當你需要做並行化處理時,只要bolt在內存中維護著用戶數據,你就得首先通過用戶做域數據流分組(譯者注:原文是fieldsGrouping,詳細情况請見第三章的域數據流組),這是一件很重要的事情,否則集群內將會有用戶瀏覽曆史的多個不同步的副本。

ProductCategoriesCounterBolt

該類持續追踪所有的產品-類別關系。它通過由UsersHistoryBolt分發的產品-類別數據對更新計數。

每個數據對的出現次數保存在Redis服務器。基於性能方面的考慮,要使用一個本地讀寫緩存,通過一個後臺線程向Redis發送數據。

Bolt會向拓撲的下一個Bolt——NewsNotifierBolt——發送包含最新記數的元組,這也是最後一個Bolt,它會向最終用戶廣播實時更新的數據。

public class ProductCategoriesCounterBolt extends BaseRichBolt { ... @Override public void execute(){ String product = input.getString(0); String categ = input.getString(1); int total = count(product, categ); collector.emit(new Values(product, categ, total)); } ... private int count(String product, String categ) { int count = getProductCategoryCount(categ, product); count++; storeProductCategoryCount(categ, product, count); return count; } ...}

這個bolt的持久化工作隱藏在getProductCategoryCountstoreProductCategoryCount兩個方法中。它們的具體實現如下:

package storm.analytics;...public class ProductCategoriesCounterBolt extends BaseRichBolt { // 條目:分類 -> 計數 HashMap<String,Integer> counter = new HashMap<String, Integer>(); //條目:分類 -> 計數 HashMap<String,Integer> pendingToSave = new HashMap<String,Integer>(); ... public int getProductCategoryCount(String categ, String product) { Integer count = counter.get(buildLocalKey(categ, product)); if(count == null) { String sCount = jedis.hget(buildRedisKey(product), categ); if(sCount == null || "nil".equals(sCount)) { count = 0; } else { count = Integer.valueOf(sCount); } } return count; } ... private void storeProductCategoryCount(String categ, String product, int count) { String key = buildLocalKey(categ, product); counter.put(key, count); synchronized (pendingToSave) { pendingToSave.put(key, count); } } ...}

方法getProductCategoryCount首先檢查內存緩存計數器。如果沒有有效令牌,就從Redis服務器取得數據。

方法storeProductCategoryCount更新計數器緩存和pendingToSae緩沖。緩沖數據由下述後臺線程持久化。

package storm.analytics;public class ProductCategoriesCounterBolt extends BaseRichBolt {... private void startDownloaderThread() { TimerTask t = startDownloaderThread() { @Override public void run () { HashMap<String, Integer> pendings; synchronized (pendingToSave) { pendings = pendingToSave; pendingToSave = new HashMap<String,Integer>(); } for (String key : pendings.keySet) { String[] keys = key.split(":"); String product = keys[0]; String categ = keys[1]; Integer count = pendings.get(key); jedis.hset(buildRedisKey(product), categ, count.toString()); } } }; timer = new Timer("Item categories downloader"); timer.scheduleAtFixedRate(t, downloadTime, downloadTime); } ...}

下載線程鎖定pendingToSave, 向Redis發送數據時會為其它線程創建一個新的緩沖。這段代碼每隔downloadTime毫秒運行一次,這個值可由拓撲配置參數download-time配置。download-time值越大,寫入Redis的次數就越少,因為一對數據的連續計數只會向Redis寫一次。

NewsNotifierBolt

為了讓用戶能够實時查看統計結果,由NewsNotifierBolt負責向web應用通知統計結果的變化。通知機制由Apache HttpClient通過HTTP POST訪問由拓撲配置參數指定的URL。POST消息體是JSON格式。

測試時把這個bolt從拜年中删除。

package storm.analytics;...public class NewsNotifierBolt extends BaseRichBolt {[email protected] void execute(Tuple input) {String product = input.getString(0);String categ = input.getString(1);int visits = input.getInteger(2);</code>String content = "{\"product\":\"+product+"\",\"categ\":\""+categ+"\",\"visits\":"+visits+"}";HttpPost post = new HttpPost(webserver);try {post.setEntity(new StringEntity(content));HttpResponse response = client.execute(post);org.apache.http.util.EntityUtils.consume(response.getEntity());} catch (Exception e) {e.printStackTrace();reconnect();}}...}

Redis服務器

Redis是一種先進的、基於內存的、支持持久化的鍵值存儲(見http://redis.io)。本例使用它存儲以下信息:

  • 產品信息,用來為web站點服務
  • 用戶瀏覽隊列,用來為Storm拓撲提供數據
  • Storm拓撲的中間數據,用於拓撲發生故障時恢複數據
  • Storm拓撲的處理結果,也就是我們期望得到的結果。

產品信息

Redis服務器以產品ID作為鍵,以JSON字符串作為值保存著產品信息。

redis-cliredis 127.0.0.1:6379&gt; get 15"{\"title\":\"Kids smartphone cover\",\"category\":\"Covers\",\"price\":30,\"id\":15}"

用戶瀏覽隊列

用戶瀏覽隊列保存在Redis中一個鍵為navigation的先進先出隊列中。用戶瀏覽一個產品頁時,服務器從隊列左側添加用戶瀏覽數據。Storm集群不斷的從隊列右側獲取並移除數據。

redis 127.0.0.1:6379&gt; llen navigation(integer) 5redis 127.0.0.1:6379&gt; lrange navigation 0 41) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"1\",\"type\":\"PRODUCT\"}"2) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"1\",\"type\":\"PRODUCT\"}"3) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"2\",\"type\":\"PRODUCT\"}"4) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"3\",\"type\":\"PRODUCT\"}"5) "{\"user\":\"59c34159-0ecb-4ef3-a56b-99150346f8d5\",\"product\":\"5\",\"type\":\"PRODUCT\"}"

中間數據

集群需要分開保存每個用戶的曆史數據。為了實現這一點,它在Redis服務器上保存著一個包含所有用戶瀏覽過的產品和它們的分類的集合。

redis 127.0.0.1:6379&gt; smembers history:59c34159-0ecb-4ef3-a56b-99150346f8d51) "1:Players"2) "5:Cameras"3) "2:Players"4) "3:Cameras"

結果

Storm集群生成關於用戶瀏覽的有用數據,並把它們的產品ID保存在一個名為“prodcnt”的Redis hash中。

redis 127.0.0.1:6379&gt; hgetall prodcnt:21) "Players"2) "1"3) "Cameras"4) "2"

測試拓撲

使用LocalCluster和一個本地Redis服務器執行測試(見圖6-7)。向Redis填充產品數據,偽造訪問日志。我們的斷言會在讀取拓撲向Redis輸出的數據時執行。測試用戶用java和groovy完成。
測試架構
圖6-7. 測試架構

初始化測試

初始化由以下三步組成:
啟動LocalCluster並提交拓撲。初始化在AbstractAnalyticsTest實現,所有測試用例都繼承該類。當初始化多個AbstractAnalyticsTest子類對象時,由一個名為topologyStarted的靜態標志屬性確定初始化工作只會進行一次。

需要注意的是,sleep語句是為了確保在試圖獲取結果之前LocalCluster已經正確啟動了。

public abstract class AbstractAnalyticsTest extends Assert { def jedis static topologyStarted = false static sync= new Object() private void reconnect() { jedis = new Jedis(TopologyStarter.REDIS_HOST, TopologyStarter.REDIS_PORT) } @Before public void startTopology(){ synchronized(sync){ reconnect() if(!topologyStarted){ jedis.flushAll() populateProducts() TopologyStarter.testing = true TopologyStarter.main(null) topologyStarted = true sleep 1000 } } } ... public void populateProducts() { def testProducts = [ [id: 0, title:"Dvd player with surround sound system", category:"Players", price: 100], [id: 1, title:"Full HD Bluray and DVD player", category:"Players", price:130], [id: 2, title:"Media player with USB 2.0 input", category:"Players", price:70], ... [id: 21, title:"TV Wall mount bracket 50-55 Inches", category:"Mounts", price:80] ] testProducts.each() { product -&gt; def val = "{ \"title\": \"${product.title}\" , \"category\": \"${product.category}\"," + " \"price\": ${product.price}, \"id\": ${product.id} }" println val jedis.set(product.id.toString(), val.toString()) } } ...}

在AbstractAnalyticsTest中實現一個名為navigate的方法。為了測試不同的場景,我們要模擬用戶瀏覽站點的行為,這一步向Redis的瀏覽隊列(譯者注:就是前文提到的鍵是navigation的隊列)插入瀏覽數據。

public abstract class AbstractAnalyticsTest extends Assert { ...public void navigate(user, product) { String nav = "{\"user\": \"${user}\", \"product\": \"${product}\", \"type\": \"PRODUCT\"}".toString() println "Pushing navigation: ${nav}" jedis.lpush('navigation', nav) } ...}

實現一個名為getProductCategoryStats的方法,用來讀取指定產品與分類的數據。不同的測試同樣需要斷言統計結果,以便檢查拓撲是否按照期望的那樣執行了。

public abstract class AbstractAnalyticsTest extends Assert { ... public int getProductCategoryStats(String product, String categ) { String count = jedis.hget("prodcnt:${product}", categ) if(count == null || "nil".equals(count)) return 0 return Integer.valueOf(count) } ...}

一個測試用例

下一步,為用戶“1”模擬一些瀏覽記錄,並檢查結果。注意執行斷言之前要給系統留出兩秒鐘處理數據。(記住ProductCategoryCounterBolt維護著一份計數的本地副本,它是在後臺异步保存到Redis的。)

package functionalclass StatsTest extends AbstractAnalyticsTest { @Test public void testNoDuplication(){ navigate("1", "0") // Players navigate("1", "1") // Players navigate("1", "2") // Players navigate("1", "3") // Cameras Thread.sleep(2000) // Give two seconds for the system to process the data. assertEquals 1, getProductCategoryStats("0", "Cameras") assertEquals 1, getProductCategoryStats("1", "Cameras") assertEquals 1, getProductCategoryStats("2", "Cameras") assertEquals 2, getProductCategoryStats("0", "Players") assertEquals 3, getProductCategoryStats("3", "Players") }}

對可擴展性和可用性的提示

為了能在一章的篇幅中講明白整個方案,它已經被簡化了。正因如此,一些與可擴展性和可用性有關的必要複雜性也被去掉了。這方面主要有兩個問題。
Redis服務器不只是一個故障的節點,還是性能瓶頸。你能接收的數據最多就是Redis能處理的那些。Redis可以通過分片增强擴展性,它的可用性可以通過主從配置得到改進。這都需要修改拓撲和web應用的代碼實現。
另一個缺點就是web應用不能通過增加服務器成比例的擴展。這是因為當產品統計數據發生變化時,需要通知所有關注它的瀏覽器。這一“通知瀏覽器”的機制通過Socket.io實現,但是它要求監聽器和通知器在同一主機上。這一點只有當GET /product/:id/statsPOST /news滿足以下條件時才能實現,那就是這二者擁有相同的分片標准,確保引用相同產品的請求由相同的服務器處理。

原創文章,轉載請注明: 轉載自並發編程網 – ifeve.com本文鏈接地址: Storm入門之第6章一個實際的例子

FavoriteLoading添加本文到我的收藏
版权声明:本文为[杜老師說]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/01/202201071259349885.html