1. 如何檢測hadoop中gz壓縮文件是否損壞
執行hive任務的時候,進入到8088的map詳細進度列表,即是RUNNING MAP attempts in job_1456816082333_1354,查看最後出錯的map是哪個節點或者在頁面直接點擊logs進入詳細log日誌查看,或者進入到節點的Hadoop的logs/userlogs目錄
根據jobid找到對應的目錄: application_1456816082333_1354,裡面有錯誤的文件id,然後刪除掉hdfs的對應的損壞文件。
2. Hadoop 壓縮從理論到實戰
在大數據領域,無論上層計算引擎採用的是什麼,在存儲過程中,壓縮都是一個避不開的問題。合適的壓縮選擇可以降低存儲成本、減少網路傳輸I/O。而錯誤的壓縮選擇則可能讓 cpu 負荷達到瓶頸、降低並發度等等,所以是否選擇壓縮、選擇什麼壓縮格式在大數據存儲環節中都是一個至關重要的問題。
點評:壓縮時間和壓縮率之間的取捨本質上是 cpu 資源和存儲資源的取捨。是否需冊猛要支持分片也不是絕對的,如果單個文件大小均小於 splitSize,則沒必要支持分片。
點評:一階段考慮盡可能支持分片(單個文件大於 splitSize 時)。二階段考慮盡可能快的壓悄姿基縮速度。三階段根據是作為長期歸檔(幾乎不用)or 作為下一作業輸入,考慮盡可能高的壓縮能力 or 支持分片。
點評:有兩點需要注意,第一點:這里的速度和壓縮率沒有具體測試數據,而是給出了一個模糊的表達。因為即使具體測試了速度和壓縮率,也會因數據不同而結果有很大的差異。後面會給出測試的腳本,大家可以結合自己的表數據自行測試。第二點:有些壓縮格式性能參數很相似,為什麼 Hadoop 中要搞這么多種?較為直觀的一個原因是:不同存儲格式支持的壓縮是不一樣的,比如 orc 存儲格式只支持 zlib 和 snappy 兩種壓縮 [8] ,parquet 雖然支持很多壓縮格式,但是不支持 bzip2 [7]
以下摘自《Hadoop The Definitive Guide》
重點閱讀文中加粗片段。大致意思是:因為 gzip 壓縮格式使用的 DEFLATE 壓縮演算法沒辦法做到隨機任意讀取,必須同步順序讀取。也就意味著沒辦法為每一個 block 創建一個分片(split),然後為該分片啟一個 mapper 去讀取數據。所以即使 gzip 文件有很多 block,MR 程序也只會啟動一個 Mapper 去讀取所有的 block。也即 gzip 這種壓縮格式不支持分片。相反的,如果壓縮格式使用的演算法支持隨機任意讀取,那麼就可以為每一個 block 創建一個分片,同時啟動一個 mapper 去讀取數據,這樣有多少個 block 就有多少個分片,就有多少個 mapper ,這些 mapper 並行讀取數據,效率大大提升。上述涉及到幾個小概念,接下來分別進行詳述。
一句話總結: zlib、gzip 在大數據語境中都是一種 壓縮格式 ,他們使用相同的 壓縮演算法: DEFLATE,DefaultCodec 是 zlib 使用的 編解碼器 ,Gzip 使用的編解碼器是 GzipCodec
我們知道,Hadoop 在任務切分時,是按照文件的粒度進行的。即一個文件一個文件啟謹進行切分。而每一個文件切分成幾塊,取決於 splitSize 的大小。比如兩個文件,第一個文件 300M,第二個文件150M。分片大小是128M,那麼對於第一個文件將會切分成3片(128M,128M,44M),第二個文件會切分成2片(128M,22M)。共計5片。所以分片數量除了由文件數決定,另一個決定因素就是 splitSize 即分片大小。
splitSize 如何計算?
幾個前提:
影響參數:
接下來進行實際驗證:
經過了 2.4.2 中的一系列實驗,驗證了一個結論:當一個輸入格式支持分片時,mapper 數量是無限制的,反之 mapper 數量小於等於文件的數量。所以我們可以通過設置參數來試圖調小分片大小來增加 mapper 數量看其上限是否等於文件數量即可。假如輸入的文件個數只有一個,那麼當 mapper 數量大於1的時候,說明該輸入格式是支持分片的。
大家可以根據自己數據集和想測試的壓縮和存儲格式自行修改腳本。通過以上腳本跑出來的結果如下:
由 2.1 中評價壓縮的三項指標可知,壓縮率、壓縮/解壓速度、是否支持分片是衡量壓縮最重要的三項指標。3.1.1小節中只對壓縮率進行了測試。壓縮/解壓速度可以通過跑一些查詢語句進一步測試。這里就不展開測試了。業界中常用的存儲格式一般是 parquet, orc,所以上面測試除了純文本只測試了這兩種存儲格式。
我們可以通過 hive> set io.compression.codecs; 來查看當前Hadoop集群支持的壓縮,在公司的集群中查詢得到的結果是:
可以看到 lzo 有兩種編解碼器: LzoCodec 和 LzopCodec。他們之間有什麼區別呢?
如果你閱讀過關於 Hadoop 壓縮的文章,應該可以看到,絕大多數文章中對於 snappy 是否支持分片都是直接給出的否定的答案。 CDH 的文檔中也指出來 snappy 是不支持分片的。
看文中加粗片段,雖然 snappy 本身是不支持分片的,但是如果 snappy 存儲在一些特定的存儲格式比如 SequenceFile 或者 Avro 中,那麼是可以支持分片的。也就是說 snappy 是否支持分片是分情況討論的。不能說使用了 snappy 壓縮就一定不支持分片。前面提到了,業界中常用的存儲格式一般是 parquet 或者 orc,而上面 CDH 的文章中恰恰沒有提到 parquet 和 orc 是否支持,接下來以 parquet 為例,進行測試。測試內容即為 parquet + snappy 組合,是否支持分片。
首先准備數據,因為之前做壓縮率測試,已經有了 parquet + snappy 文件了,這里直接拿來用。
一共3個輸入文件,啟了6個mapper,說明輸入文件是可以分片的。即 parquet + snappy 的組合是支持分片的。在《Hadoop The Definitive Guide》中也對 parquet 是否支持分片有說明:
以 maprece.output.fileoutputformat.compress.codec 為例,這個參數可以在三個地方配置:
那麼當三者都設置時,以哪個為准呢?按照經驗來看,一定是粒度小的優先順序大於粒度大的優先順序。經過測試也驗證了這種猜測。即:表級別 > hive > hadoop
初學者往往容易混淆存儲格式和壓縮格式之間的關系,其實二者是完全獨立的。如果完整的閱讀了該篇文章,應該已經消除了這一塊理解對誤區。這里總結一下:比如 parquet, orc,他們都是常見的 存儲格式 。是否使用壓縮,使用何種壓縮都是可以設置的。而 zlib、gzip、lzo、lz4、snappy 等等這些都是常見的 壓縮格式 ,他們既可以依附於某些 存儲格式 ,比如之前提到的 parquet + snappy,orc + zlib 等等。也可以脫離特定的 存儲格式 ,比如純文本文件進行壓縮,text + parquet, text + bzip2 等等。
3. 如何在Scala中讀取Hadoop集群上的gz壓縮文件
(1)一個從文件創建的Scala對象,或(2)一個並行切片(分布在各個節點之間),或(3)從其他RDD轉換得來,或(4)改變已有RDD的持久性,如請求將已有RDD緩存在內存中。Spark應用稱為driver,實現單個節點或一組節點上的操作。
4. 如何實現讓用戶在網頁中上傳下載文件到HDFS中
hadoop計算需要在hdfs文件系統上進行,文件上傳到hdfs上通常有三種方法:a hadoop自帶的dfs服務,put;b hadoop的API,Writer對象可以實現這一功能;c 調用OTL可執行程序,數據從資料庫直接進入hadoop
hadoop計算需要在hdfs文件系統上進行,因此每次計算之前必須把需要用到的文件(我們稱為原始文件)都上傳到hdfs上。文件上傳到hdfs上通常有三種方法:
a hadoop自帶的dfs服務,put;
b hadoop的API,Writer對象可以實現這一功能;
c 調用OTL可執行程序,數據從資料庫直接進入hadoop
由於存在ETL層,因此第三種方案不予考慮
將a、b方案進行對比,如下:
1 空間:方案a在hdfs上佔用空間同本地,因此假設只上傳日誌文件,則保存一個月日誌文件將消耗掉約10T空間,如果加上這期間的各種維表、事實表,將佔用大約25T空間
方案b經測試,壓縮比大約為3~4:1,因此假設hdfs空間為100T,原來只能保存約4個月的數據,現在可以保存約1年
2 上傳時間:方案a的上傳時間經測試,200G數據上傳約1小時
方案b的上傳時間,程序不做任何優化,大約是以上的4~6倍,但存在一定程度提升速度的餘地
3 運算時間:經過對200G數據,大約4億條記錄的測試,如果程序以IO操作為主,則壓縮數據的計算可以提高大約50%的速度,但如果程序以內存操作為主,則只能提高5%~10%的速度
4 其它:未壓縮的數據還有一個好處是可以直接在hdfs上查看原始數據。壓縮數據想看原始數據只能用程序把它導到本地,或者利用本地備份數據
壓縮格式:按照hadoop api的介紹,壓縮格式分兩種:BLOCK和RECORD,其中RECORD是只對value進行壓縮,一般採用BLOCK進行壓縮。
對壓縮文件進行計算,需要用SequenceFileInputFormat類來讀入壓縮文件,以下是計算程序的典型配置代碼:
JobConf conf = new JobConf(getConf(), log.class);
conf.setJobName(」log」);
conf.setOutputKeyClass(Text.class);//set the map output key type
conf.setOutputValueClass(Text.class);//set the map output value type
conf.setMapperClass(MapClass.class);
//conf.setCombinerClass(Rece.class);//set the combiner class ,if havenot, use Recuce class for default
conf.setRecerClass(Rece.class);
conf.setInputFormat(SequenceFileInputFormat.class);//necessary if use compress
接下來的處理與非壓縮格式的處理一樣