近期由於Hadoop集群機器硬盤資源緊張,有需求讓把 Hadoop 集群上的歷史數據進行下壓縮,開始從網上查找的都是關於各種壓縮機制的對比,很少有關於怎麼壓縮的教程(我沒找到。。),再此特記錄下本次壓縮的過程,方便以後查閱,利己利人。
本文涉及的所有 jar包、腳本、native lib 見文末的相關下載 ~
我的壓縮版本:
Jdk 1.7及以上
Hadoop-2.2.0 版本
壓縮前環境準備:
關於壓縮算法對比,網上資料很多,這裏我用的是 Bzip2 的壓縮方式,比較中庸,由於是Hadoop自帶的壓縮機制,也不需要額外下載別的東西,只需要在 Hadoop根目錄下 lib/native 文件下有如下文件即可:
壓縮之前要檢查 Hadoop 集群支持的壓縮算法: hadoop checknative
每台機器都要檢查一下,都显示如圖 true 則說明 集群支持 bzip2 壓縮,
如果显示false 則需要將上圖的文件下載拷貝到 Hadoop根目錄下 lib/native
壓縮程序介紹:
壓縮程序用到的類 getFileList(獲取文件路徑) 、 FileHdfsCompress(壓縮類)、FileHdfsDeCompress(解壓縮類) ,只用到這三個類即可完成壓縮/解壓縮操作。
getFileList 作用:遞歸打印 傳入文件目錄下文件的根路徑,包括子目錄下的文件。開始想直接輸出到文件中,後來打包放到集群上運行時,發現文件沒有內容,可能是由於分佈式運行的關係,所以就把路徑打印出來,人工在放到文件中。
核心代碼:
public static void listAllFiles(String path, List<String> strings) throws IOException {
FileSystem hdfs = getHdfs(path);
Path[] filesAndDirs = getFilesAndDirs(path);
for(Path p : filesAndDirs){
if(hdfs.getFileStatus(p).isFile()){
if(!p.getName().contains("SUCCESS")){
System.out.println(p);
}
}else{
listAllFiles(p.toString(),strings);
}
}
// FileUtils.writeLines(new File(FILE_LIST_OUTPUT_PATH), strings,true);
}
public static FileSystem getHdfs(String path) throws IOException {
Configuration conf = new Configuration();
return FileSystem.get(URI.create(path),conf);
}
public static Path[] getFilesAndDirs(String path) throws IOException {
FileStatus[] fs = getHdfs(path).listStatus(new Path(path));
return FileUtil.stat2Paths(fs);
}
FileHdfsCompress:壓縮程序非常簡單,對應程序里的 FileHdfsCompress 類,(解壓縮是 FileHdfsDeCompress),採用的是Hadoop 原生API ,將Hadoop集群上原文件讀入作為輸入流,將壓縮路徑的輸入流作為輸出,再使用相關的壓縮算法即,代碼如下:
核心代碼:
//指定壓縮方式
Class<?> codecClass = Class.forName(COMPRESS_CLASS_NAME);
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, conf);
// FileSystem fs = FileSystem.get(conf);
FileSystem fs = FileSystem.get(URI.create(inputPath),conf);
//原文件路徑 txt 用原本的輸入流讀入
FSDataInputStream in = fs.open(new Path(inputPath));
//創建 HDFS 上的輸出流,壓縮路徑
//通過文件系統獲取輸出流
OutputStream out = fs.create(new Path(FILE_OUTPUT_PATH));
//對輸出流的數據壓縮
CompressionOutputStream compressOut = codec.createOutputStream(out);
//讀入原文件 壓縮到HDFS上 輸入--普通流 輸出-壓縮流
IOUtils.copyBytes(in, compressOut, 4096,true);
以下是代碼優化的過程,不涉及壓縮程序使用,不感興趣同學可以跳過 ~ :
在實際編碼中,我其實是走了彎路的,一開始並沒有想到用 Hadoop API 就能實現壓縮解壓縮功能,代碼到此其實是經歷了優化迭代的過程。
最開始時壓縮的思路就是 將文件讀進來,再壓縮出去,一開始使用了 MapReduce 的方式,在編碼過程中,由於對生成壓縮文件的路徑還有要求,又在 Hadoop 輸出時自定義了輸出類來使的輸出文件的名字符合要求,不是 part-r-0000.txt ,而是時間戳.txt 的格式,至此符合原線上路徑的要求。
而在實際運行過程中發現,MR 程序需要啟動 Yarn,並佔用Yarn 資源,由於壓縮時間較長,有可能會長時間佔用 集群資源不釋放,後來發現 MR 程序的初衷是用來做并行計算的,而壓縮僅僅是 map 任務讀取一條就寫一條,不涉及計算,就是內容的簡單搬運。所以這裏放棄了使用 MR 想着可不可以就用簡單的 Hadoop API 就完成壓縮功能,經過一番嘗試后,發現真的可行! 使用了 Hadoop API 釋放了集群資源,壓縮速度也還可以,這樣就把這個壓縮程序當做一個後台進程跑就行了也不用考慮集群資源分配的問題。
實測壓縮步驟:
1 將項目打包,上傳到hadoop 集群任一節點即可,準備好相應的腳本,輸入數據文件,日誌文件,如下圖:
2 使用獲取文件路徑腳本,打印路徑:
getFileLish.sh 腳本內容如下,就是簡單調用,傳入參數為 hadoop集群上 HDFS 上目錄路徑:
#!/bin/sh
echo "begin get fileList"
echo "第一個參數$1"
if [ ! -n "$1" ]; then
echo "check param!"
fi
#original file
hadoop jar hadoop-compress-1.0.jar com.people.util.getFileList $1
3 將 打印的路徑 粘貼到 compress.txt 中,第 2 步中會把目錄的文件路徑包括子目錄路徑都打印出來,將其粘貼進 compress.txt 中即可,注意 文件名可隨意定。
4 使用壓縮腳本即可,sh compress.sh /data/new_compress/compress.txt ,加粗的部分是腳本的參數意思是 第3步中文件的路徑,注意:這裏只能是絕對路徑,不然可能報找不到文件的異常。
compress.sh 腳本內容如下,就是簡單調用,傳入參數為 第3步中文件的絕對路徑:
#!/bin/sh
echo "begin compress"
echo "第一個參數$1"
if [ ! -n "$1" ]; then
echo "check param!"
fi
hadoop jar hadoop-compress-1.0.jar com.people.compress.FileHdfsCompress $1 >> /data/new_compress/compress.log 2>&1 &
5 查看壓縮日誌,發現後台程序已經開始壓縮了!,tail -f compress.log
6 如果感覺壓縮速度不夠,可以多台機器執行腳本,也可以一台機器執行多個任務,因為這個腳本任務是一個後台進程,不會佔用集群 Yarn 資源。
相關下載:
程序源碼下載: git@github.com:fanpengyi/hadoop-compress.git
Hadoop 壓縮相關需要的 腳本、jar包、lib 下載: 關注公眾號 "大數據江湖",後台回復 "hadoop壓縮",即可下載
長按即可關注
--- The End ---
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※為什麼 USB CONNECTOR 是電子產業重要的元件?
※網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!
※想要讓你的商品成為最夯、最多人討論的話題?網頁設計公司讓你強力曝光
※想知道最厲害的台北網頁設計公司推薦、台中網頁設計公司推薦專業設計師"嚨底家"!!
※專營大陸快遞台灣服務
※台灣快遞大陸的貨運公司有哪些呢?
Orignal From: Hadoop壓縮的圖文教程
留言
張貼留言