導航:首頁 > 源碼編譯 > spout源碼

spout源碼

發布時間:2023-01-13 05:44:56

1. java.使用FileReader字元流統計一篇英文中的單詞,要求如下

我已經給你寫好了,你要求的功能都能實現,以下是源代碼:

packageregular;

importjava.io.BufferedReader;
importjava.io.BufferedWriter;
importjava.io.File;
importjava.io.FileNotFoundException;
importjava.io.FileReader;
importjava.io.FileWriter;
importjava.io.IOException;
importjava.util.ArrayList;

importjava.util.Collections;
importjava.util.Comparator;
importjava.util.HashMap;
importjava.util.List;
importjava.util.Map;
importjava.util.Map.Entry;
importjava.util.regex.Pattern;

/**
*統計一篇英文中的單詞,要求如下:
*①一共出現了多少個單詞;②有多少個互不相同的單詞;③給出每個單詞出現的頻率,並將這些單詞按頻率大小順序輸出到文件words.txt文件中。
**/
publicclassWordStatistics{
=null;
=null;


publicstaticvoidmain(String[]args){
WordStatisticswordStatistics=newWordStatistics();
Map<String,Integer>word_map=wordStatistics.readFile();

// for(Map.Entry<String,Integer>mapping:word_map.entrySet()){
// System.out.println(mapping.getKey()+":"+mapping.getValue());
// }
wordStatistics.sortAndWrite(word_map);

}

/**
*從指定路徑讀取英文文章,並形成Map集合
**/
publicMap<String,Integer>readFile(){
//讀文件
StringBufferstringBuffer=newStringBuffer();
try{
bufferedReader=newBufferedReader(newFileReader(newFile("F:\text1.txt")));//文件路徑可自定義
Stringline="";
while((line=bufferedReader.readLine())!=null)
stringBuffer.append(line);
bufferedReader.close();
}catch(FileNotFoundExceptione){
e.printStackTrace();
}catch(IOExceptione){
e.printStackTrace();
}

//生成<單詞,次數>鍵值對
Patternpattern=Pattern.compile("(\.)?");
String[]words=pattern.split(stringBuffer.toString());
Map<String,Integer>word_map=newHashMap<String,Integer>();
for(Strings:words){
if(!word_map.containsKey(s)){
word_map.put(s,1);
}
else{
intcount=word_map.get(s);
word_map.replace(s,count,count+1);
}
}
returnword_map;
}

/**
*按單詞的出現頻率排序並輸出到words.txt文件中
**/
publicvoidsortAndWrite(Map<String,Integer>word_map){
//排序
List<Map.Entry<String,Integer>>list=newArrayList<Map.Entry<String,Integer>>(word_map.entrySet());
Collections.sort(list,newComparator<Map.Entry<String,Integer>>(){
publicintcompare(Entry<String,Integer>o1,
Entry<String,Integer>o2){
//TODOAuto-generatedmethodstub
returno1.getValue().compareTo(o2.getValue());
}

});

//寫入文件
try{
bufferedWriter=newBufferedWriter(newFileWriter(newFile("F:\words.txt")));
bufferedWriter.write("一共出現了"+word_map.size()+"個單詞,每個單詞和它出現的頻率分別是:");
bufferedWriter.flush();
bufferedWriter.newLine();
for(Map.Entry<String,Integer>mapping:list){
bufferedWriter.write(mapping.getKey()+":"+mapping.getValue());
bufferedWriter.flush();
bufferedWriter.newLine();
}

bufferedWriter.close();
System.out.println("WorkOut");
}catch(IOExceptione){
e.printStackTrace();
}

}

}


測試用例:

She had been shopping with her Mom in Wal-Mart. She must have been 6 years old, this beautiful brown haired, freckle-faced image of innocence. It was pouring outside. The kind of rain that gushes over the top of rain gutters, so much in a hurry to hit the Earth, it has no time to flow down the spout.


輸出:

(PS:這可全部是原創手寫的,望採納)

2. jstorm 核心

生成Topology

IRichSpout
IRichSpout 為最簡單的Spout介面

其中注意:
=>spout對象必須是繼承Serializable, 因此要求spout內所有數據結構必須是可序列化的
=>spout可以有構造函數,但構造函數只執行一次,是在提交任務時,創建spout對象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內容將攜帶到每一個=>task內(因為提交任務時將spout序列化到文件中去,在worker起來時再將spout從文件中反序列化出來)。
=>open是當task起來後執行的初始化動作
=>close是當task被shutdown後執行的動作
=>activate 是當task被激活時,觸發的動作
=>deactivate 是task被deactive時,觸發的動作
=>nextTuple 是spout實現核心, nextuple完成自己的邏輯,即每一次取消息後,用collector 將消息emit出去。
=>ack, 當spout收到一條ack消息時,觸發的動作,詳情可以參考 ack機制
=>fail, 當spout收到一條fail消息時,觸發的動作,詳情可以參考 ack機制
=>declareOutputFields, 定義spout發送數據,每個欄位的含義
=>getComponentConfiguration 獲取本spout的component 配置

Bolt

其中注意:
=>bolt對象必須是繼承Serializable, 因此要求spout內所有數據結構必須是可序列化的
=>bolt可以有構造函數,但構造函數只執行一次,是在提交任務時,創建bolt對象,因此在task分配到具體worker之前的初始化工作可以在此處完成,一旦完成,初始化的內容將攜帶到每一個task內(因為提交任務時將bolt序列化到文件中去,在worker起來時再將bolt從文件中反序列化出來)。
=>prepare是當task起來後執行的初始化動作
=>cleanup是當task被shutdown後執行的動作
=>execute是bolt實現核心, 完成自己的邏輯,即接受每一次取消息後,處理完,有可能用collector 將產生的新消息emit出去。 ** 在executor中,當程序處理一條消息時,需要執行collector.ack, 詳情可以參考 ack機制 ** 在executor中,當程序無法處理一條消息時或出錯時,需要執行collector.fail ,詳情可以參考 ack機制
=>declareOutputFields, 定義bolt發送數據,每個欄位的含義
=>getComponentConfiguration 獲取本bolt的component 配置

打包

提交jar
xxxx.jar 為打包後的jar
com.alibaba.xxxx.xx 為入口類,即提交任務的類
parameter即為提交參數

Storm中有個特殊的task名叫acker,他們負責跟蹤spout發出的每一個Tuple的Tuple樹(因為一個tuple通過spout發出了,經過每一個bolt處理後,會生成一個新的tuple發送出去)。當acker(框架自啟動的task)發現一個Tuple樹已經處理完成了,它會發送一個消息給產生這個Tuple的那個task。Acker的跟蹤演算法是Storm的主要突破之一,對任意大的一個Tuple樹,它只需要恆定的20位元組就可以進行跟蹤。

Acker跟蹤演算法的原理:acker對於每個spout-tuple保存一個ack-val的校驗值,它的初始值是0,然後每發射一個Tuple或Ack一個Tuple時,這個Tuple的id就要跟這個校驗值異或一下,並且把得到的值更新為ack-val的新值。那麼假設每個發射出去的Tuple都被ack了,那麼最後ack-val的值就一定是0。Acker就根據ack-val是否為0來判斷是否完全處理,如果為0則認為已完全處理。
要實現ack機制:

阿里自己的Jstorm會提供
public interface IFailValueSpout { void fail(Object msgId, List<object>values); }
這樣更合理一些, 可以直接取得系統cache的msg values

ack機制即,spout發送的每一條消息,在規定的時間內,spout收到Acker的ack響應,即認為該tuple 被後續bolt成功處理

在規定的時間內(默認是30秒),沒有收到Acker的ack響應tuple,就觸發fail動作,即認為該tuple處理失敗,timeout時間可以通過Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS來設定。
l或者收到Acker發送的fail響應tuple,也認為失敗,觸發fail動作
注意,我開始以為如果繼承BaseBasicBolt那麼程序拋出異常,也會讓spout進行重發,但是我錯了,程序直接異常停止了
這里我以分布式程序入門案例worldcount為例子吧。

有人問到Storm 是怎麼處理重復的tuple?
因為Storm 要保證tuple 的可靠處理,當tuple 處理失敗或者超時的時候,spout 會fail並重新發送該tuple,那麼就會有tuple 重復計算的問題。這個問題是很難解決的,storm也沒有提供機制幫助你解決。不過也有一些可行的策略:
(1)不處理,這也算是種策略。因為實時計算通常並不要求很高的精確度,後
續的批處理計算會更正實時計算的誤差。
(2)使用第三方集中存儲來過濾,比如利用 MySQL 、MemCached 或者 Redis 根據邏輯主鍵來去重。
(3)使用bloom filter 做過濾,簡單高效。

在學習storm的過程中,有不少人對storm的Spout組件中的ack及fail相關的問題存在困惑,這里做一個簡要的概述。

Storm保證每一個數據都得到有效處理,這是如何保證的呢?正是ack及fail機制確保數據都得到處理的保證,但是storm只是提供給我們一個介面,而具體的方法得由我們自己來實現。例如在spout下一個拓撲節點的bolt上,我們定義某種情況下為數據處理失敗,則調用fail,則我們可以在fail方法中進行數據重發,這樣就保證了數據都得到了處理。其實,通過讀storm的源碼,裡面有講到,有些類(BaseBasicBolt?)是會自動調用ack和fail的,不需要我們程序員去ack和fail,但是其他Bolt就沒有這種功能了。

3. wso2-ei 編譯

在眾多規則引擎中,找到了 siddhi 能處理流式數據,支持 etl 等功能,但其為基礎的單機版本,除非啟動多個實例,網上有 siddhi-storm 的版本,將 siddhi 作為 storm 中的 Bolt 進行執行。 wso2 公司自己的實現也是依賴 storm 實現分布式實時流處理。

在 wso2 眾多產品中 wso2-ei (proct-ie) 是依賴 siddhi 實現的 carbon-event-processing 來執行所有流式規則,如下以源碼的方式進行編譯:

注 根據日誌提示即可訪問 https://10.1.2.3:9443/carbon/ 直接進行訪問,默認用戶為 admin / admin

WSO2 ESB 允許系統管理員和SOA架構師輕松的配置消息路由, 虛擬化, 中介, 轉換, 日誌記錄, 任務調度, 負載均衡, 失敗路由, 事件中介等等. 運行時被設計為完全非同步, 非阻塞 、連續的。基於 Apache Synapse 中介引擎,Apache Synapse是使用Apache Axis2創建的。

各種數據通過 receiving 獲取數據,然後通過 SiddhiSpout 接收數據,發射到後端 siddhiBolt 中逐步執行,並完成所有的規則:

4. 想轉行到大數據開發需要學習哪些技術

如果要學習大數據,不管你是零基礎,還是有一定的基礎,都是要懂至少一種計算機編程語言,因為大數據的開發離不開編程語言,不僅要懂,還要精通!但這門編程語言不一定是java。

比如說,如果你主攻Hadoop開發方向,是一定要學習java的,因為Hadoop是由java來開發的。

如果你想要主攻spark方向,是要學習Scala語言的,每個方向要求的編程語言是不同的。

如果你是想要走數據分析方向,那你就要從python編程語言下手,這個也是看自己未來的需求的。

大數據是需要一定的編程基礎的,但具體學習哪一門編程,自己可以選擇的。其實只要學會了一門編程語言,其他編程語言也是不在話下的。

5. 怎樣學習大數據

首先我們要了解Java語言和Linux操作系統,這兩個是學習大數據的基礎,學習的順序不分前後。

Java :只要了解一些基礎即可,做大數據不需要很深的Java 技術,學java SE 就相當於有學習大數據基礎。

Linux:因為大數據相關軟體都是在Linux上運行的,所以Linux要學習的扎實一些,學好Linux對你快速掌握大數據相關技術會有很大的幫助,能讓你更好的理解hadoop、hive、hbase、spark等大數據軟體的運行環境和網路環境配置,能少踩很多坑,學會shell就能看懂腳本這樣能更容易理解和配置大數據集群。還能讓你對以後新出的大數據技術學習起來更快。

Hadoop:這是現在流行的大數據處理平台幾乎已經成為大數據的代名詞,所以這個是必學的。Hadoop裡麵包括幾個組件HDFS、MapRece和YARN,HDFS是存儲數據的地方就像我們電腦的硬碟一樣文件都存儲在這個上面,MapRece是對數據進行處理計算的,它有個特點就是不管多大的數據只要給它時間它就能把數據跑完,但是時間可能不是很快所以它叫數據的批處理。

Zookeeper:這是個萬金油,安裝Hadoop的HA的時候就會用到它,以後的Hbase也會用到它。它一般用來存放一些相互協作的信息,這些信息比較小一般不會超過1M,都是使用它的軟體對它有依賴,對於我們個人來講只需要把它安裝正確,讓它正常的run起來就可以了。

Mysql:我們學習完大數據的處理了,接下來學習學習小數據的處理工具mysql資料庫,因為一會裝hive的時候要用到,mysql需要掌握到什麼層度那?你能在Linux上把它安裝好,運行起來,會配置簡單的許可權,修改root的密碼,創建資料庫。這里主要的是學習SQL的語法,因為hive的語法和這個非常相似。

Sqoop:這個是用於把Mysql里的數據導入到Hadoop里的。當然你也可以不用這個,直接把Mysql數據表導出成文件再放到HDFS上也是一樣的,當然生產環境中使用要注意Mysql的壓力。

Hive:這個東西對於會SQL語法的來說就是神器,它能讓你處理大數據變的很簡單,不會再費勁的編寫MapRece程序。有的人說Pig那?它和Pig差不多掌握一個就可以了。

Oozie:既然學會Hive了,我相信你一定需要這個東西,它可以幫你管理你的Hive或者MapRece、Spark腳本,還能檢查你的程序是否執行正確,出錯了給你發報警並能幫你重試程序,最重要的是還能幫你配置任務的依賴關系。我相信你一定會喜歡上它的,不然你看著那一大堆腳本,和密密麻麻的crond是不是有種想屎的感覺。

Hbase:這是Hadoop生態體系中的NOSQL資料庫,他的數據是按照key和value的形式存儲的並且key是唯一的,所以它能用來做數據的排重,它與MYSQL相比能存儲的數據量大很多。所以他常被用於大數據處理完成之後的存儲目的地。

Kafka:這是個比較好用的隊列工具,隊列是干嗎的?排隊買票你知道不?數據多了同樣也需要排隊處理,這樣與你協作的其它同學不會叫起來,你干嗎給我這么多的數據(比如好幾百G的文件)我怎麼處理得過來,你別怪他因為他不是搞大數據的,你可以跟他講我把數據放在隊列里你使用的時候一個個拿,這樣他就不在抱怨了馬上灰流流的去優化他的程序去了,因為處理不過來就是他的事情。而不是你給的問題。當然我們也可以利用這個工具來做線上實時數據的入庫或入HDFS,這時你可以與一個叫Flume的工具配合使用,它是專門用來提供對數據進行簡單處理,並寫到各種數據接受方(比如Kafka)的。

Spark:它是用來彌補基於MapRece處理數據速度上的缺點,它的特點是把數據裝載到內存中計算而不是去讀慢的要死進化還特別慢的硬碟。特別適合做迭代運算,所以演算法流們特別稀飯它。它是用scala編寫的。Java語言或者Scala都可以操作它,因為它們都是用JVM的。

6. 大數據培訓需要多長時間難不難學

一般大數據的學習方式有兩種:

線下脫產學習,線上視頻教學。如果是0基礎學員參加線下脫產班學習的話,大多數培訓機構都是6個月左右的周期。

大數據的學習有一定難度,對於0基礎的小白來說,一定要細心、耐心,認真聽課,多多練習。大數據的薪資待遇是比較可觀的,目前大數據開發招聘還是以技術為主,大數據需要學習hadoop、spark、storm、超大集群調優、機器學習、並發編程等,加米穀的具體如下:

Java,大數據基礎:Linux基礎、Maven基礎

HDFS分布式文件系統

MapRece分布式計算模型+Yarn分布式資源管理器+Zookeeper分布式協調服務

Hbase分布式數據 庫+Hive分布式數據倉庫

FlumeNG分布式數據採集系統+Sqoop大數據遷移系統

Scala大數據黃金語言+kafka分布式匯流排系統

SparkCore大數據計算基石+SparkSQL數據挖掘利器+SparkStreaming流式計算平台

SparkMllib機器學習平台+SparkGraphx圖計算平台

大數據項目實戰

7. storm基本概念

流式計算中,各個中間件產品對計算過程中的角色的抽象都不盡相同,實現方式也是千差萬別。本文針對storm中間件在進行流式計算中的幾個概念做個概括總結。

storm分布式計算結構稱為topology(拓撲)由stream,spout,bolt組成。

spout代表一個storm拓撲中的數據入口,連接到數據源,將數據轉化為一個個tuple,並發射tuple

stream是由無限制個tuple組成的序列。tuple為storm的核心數據結構,是包含了一個或多個鍵值對的列表。

bolt可以理解為計算程序中的運算或者函數,bolt的上游是輸入流,經過bolt實施運算後,可輸出一個或者多個輸出流。

bolt可以訂閱多個由spout或者其他bolt發射的數據流,用以構建復雜的數據流轉換網路。

上述即為storm最基本的組成元素,無論storm如何運行,都是以stream,spout,bolt做為最基本的運行單元。而這三者則是共同構成了一個storm拓撲topology。

首先需要明確一個概念,bolt,spout實例,都屬於任務,spout產生數據流,並發射,bolt消費數據流,進行計算,並進行落地或再發射,他們的存在以及運行過程都需要消耗資源,而storm集群是一個提供了資源的集群,我們要做的就是將spout/boult實例合理分配到storm集群提供的計算資源上,這樣就可以讓spout/bolt得以執行。

worker為JVM進程,一個topology會分配到一個或者多個worker上運行。

executor是worker內的java線程,是具體執行bolt/spout實例用的。下篇文章在介紹如何提供storm並行計算能力時會介紹worker以及executor的配置。

在storm中,worker是由supervisor進程創建,並進行監控的。storm集群遵循主從模式,主為nimbus,從為supervisor,storm集群由一個主節點(確實有單點問題),和多個工作節點(supervisor)組成,並使用zookeeper來協調集群中的狀態信息,比如任務分配情況,worker狀態,supervisor的拓撲度量。

通過配置可指定supervisor上可運行多少worker。一個worker代表一個slot。

nimbus守護進程的主要職責是管理,協調和監控在集群上運行的topology.包括topology的發布,任務指派,事件處理失敗時重新指派任務。

supervisor守護進程等待nimbus分配任務後生成並監控workers執行任務。supervosior和worker都是運行在不同的JVM進程上。

了解了集群模式下,storm大致的分布概念,下面結合筆者做的一個實例,了解一下如何發布計算資源到storm集群上。

筆者定義了一個spout,兩個bolt 運算過程如下:

其中streamMaking是一個不斷生成隨機數(5~30)的spout實例,Step1Bolt會過濾掉15以下的隨機數(過濾),15以上的隨機數會乘以16(計算),再將結果向後發射。Step2Bolt訂閱Step1Bolt發射的數據,接收數據後,列印輸出。流程結束。

筆者在定義spout/bolt實例時,配置了spout,bolt的並行執行數。其中

streamMaking:4   Step1Bolt:2  Step2Bolt 1

這樣,發布成功後,storm會根據我的配置,分配足夠的計算資源給予spout/bolt進行執行。

發布:

發布時,spout和bolt都是在一起以jar的形式發布到nimbus上的,分配後,內部定義的spout和bolt將以組件的形式被nimbus分配至worker進程中執行。

其中worker都是由supervisor創建的,創建出來的worker進程與supervisor是分開的不同進程。一個supervisor可創建多少worker可通過修改storm安裝目錄下的storm.yaml進行配置。

task是執行的最小單元。spout/bolt實例在定義中指定了,要起多少task,以及多少executor。也即一個topology發布之前已經定義了task總量,和需要多少資源來執行我的task總量。nimbus將根據已有的計算資源進行分配。

下圖中:  nimbus左邊代表著計算任務量,和所需計算配置

nimbus右邊代表著計算資源

nimbus將根據計算資源信息,合理的分發計算任務量。

發布成功後,通過storm自帶的UI功能,可以查看你發布的topology運行以及其中每個組件的分布執行情況。

監控圖像中清晰的顯示了,目前部署的topology,以及topology中每個組件所分配的計算資源所在host,以及每個組件發射了多少tuple,接收了多少tuple,以及有多少個executor在並行執行。

本文講述了storm內的基本元素以及基本概念,後續將講述storm的重點配置信息,以及如何提高並發計算能力,窗口概念等高級特性,後續會進行源碼分析,以及與其他實時計算中間件的比較。

8. 大數據行業有哪些工作機會,招聘的崗位技能有哪些

大數據主要有以下職位: 1)數據分析師Data analyst:指熟悉相關業務,熟練搭建數據分析框架,掌握和使用相關的分析常用工具和基本的分析方法,進行數據搜集、整理、分析,針對數據分析結論給管理銷售運營提供指導意義的分析意見。

閱讀全文

與spout源碼相關的資料

熱點內容
少兒編程作品美麗的小房子 瀏覽:970
伺服器卡在網頁上怎麼辦 瀏覽:54
用python自製編譯器 瀏覽:950
android分享新浪微博客戶端 瀏覽:26
系統中伺服器在哪裡下載地址 瀏覽:1001
新a4安卓手機怎麼投屏 瀏覽:173
pdftoemf 瀏覽:886
java介面可以實現介面嗎 瀏覽:59
vb編程10個隨機函數 瀏覽:21
程序員個人簡介100 瀏覽:772
土木工程師演算法工程師 瀏覽:92
javaexcel導入oracle 瀏覽:880
如何設置異地伺服器 瀏覽:882
為什麼安卓手機藍牙耳機不會彈窗 瀏覽:547
linuxf77編譯器安裝教程 瀏覽:949
android本地錄音許可權 瀏覽:446
加密u盤內容怎麼拷貝 瀏覽:284
安卓手機為什麼看不到iso文件 瀏覽:582
用圖片做文件夾圖標 瀏覽:693
java正則表達式語法 瀏覽:865