導航:首頁 > 編程語言 > python訪問hdfs文件

python訪問hdfs文件

發布時間:2023-05-28 01:40:56

1. 為什麼從事大數據行業,一定要學習python

Python這只小蟲子最近隨著大數據的興起可以說是十分的火了。有越來越多的人不敢小覷Python這門語言了。也有更多的人在學習Python。Python為何會有如此大的魅力?為什麼從事大數據行業必學Python?這還要從Python這門語言的優點開始講起。

雖然Python這種語言不如Java、C++這些語言普及,卻早在1991年就已經誕生了。它的語法簡單清晰,以實用為主,是門十分樸素的語言。同時,它還是編程語言中的「和事佬」,被人戲稱為膠水語言。因為它能夠將其他語言製作的各種模塊很輕松的聯結在一起。

如果將Python語言擬人化,它絕對屬於「老好人」的那一類,讓人容易親近,人們與它交流並不需要花太多心思。但它卻擁有強大的功能。很多語言不能完成的任務,Python都能輕易完成。它幾乎可以被用來做任何事情,應用於多個系統和平台。無論是系統操作還是Web開發,抑或是伺服器和管理工具、部署、科學建模等,它都能輕松掌握。因此,從事海量數據處理的大數據行業,自然少不了這個「萬能工具」。

除此之外,Python這只小蟲子還受到了大數據老大哥Google的青睞。Google的很多開發都用到了Python。這使得人們能夠找到Python的很多指南和教程。讓你學起來更方便,你在使用中可能遇到的很多問題大多數都已經被Google給解決了,並把解決方法發布到了網路平台。

Python還擁有一系列非常優秀的庫,這省了你編程中的很多時間。尤其是在人工智慧和機器學習領域,這些庫的價值體現得更為明顯。

不管怎麼說,從事大數據工作,少不得要在網路上爬取數據,不用Python爬蟲,你還打算用什麼呢?

因此,在當前的大數據領域,從事大數據行業必學Python。
人工智慧、大數據、雲計算和物聯網的未來發展值得重視,均為前沿產業,多智時代專注於人工智慧和大數據的入門和科譜,在此為你推薦幾篇優質好文:
————————————————
版權聲明:本文為CSDN博主「oshidai」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/oshidai/article/details/88712833

2. Python使用hdfs存放文件時報Proxy error: 502 Server dropped connection解決方案

Python3 使用hdfs分布式文件儲存系統

from pyhdfs import *

client = HdfsClient(hosts="testhdfs.org, 50070",

user_name="web_crawler")    #    創建一個連接

client.get_home_directory()    # 獲取hdfs根路徑

client.listdir(PATH)    # 獲取hdfs指定路徑下的文件列表

client._from_local(file_path, hdfs_path, overwrite=True)    # 把本地文件拷貝到伺服器,不支持文件夾;overwrite=True表示存在則覆蓋

​client.delete(PATH, recursive=True)    # 刪除指定文件

hdfs_path必須包含文件名及其後綴,不握殲然不會成功

如果連接

HdfsClient

報錯

Traceback (most recent call last):

  File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 2963, in run_code

    exec(code_obj, self.user_global_ns, self.user_ns)

  File "

    client.get_home_directory()

  File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\pyhdfs.py", line 565, in get_home_directory

    return _json(self._get('/', 'GETHOMEDIRECTORY', **kwargs))['Path']

  File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\pyhdfs.py", line 391, in _get

    return self._request('get', *args, **kwargs)

 伍皮並 File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\pyhdfs.py", line 377, in _request

    _check_response(response, expected_status)

  File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\pyhdfs.py", line 799, in _check_response

 腔跡   remote_exception = _json(response)['RemoteException']

  File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\pyhdfs.py", line 793, in _json

    "Expected JSON. Is WebHDFS enabled? Got {!r}".format(response.text))

pyhdfs.HdfsException: Expected JSON. Is WebHDFS enabled? Got '\n\n\n\n

502 Server dropped connection

\n

The following error occurred while trying to access http://%2050070:50070/webhdfs/v1/?user.name=web_crawler&op=GETHOMEDIRECTORY :

\n 502 Server dropped connection

\n

Generated Fri, 21 Dec 2018 02:03:18 GMT by Polipo on .\n\r\n'

則一般是訪問認證錯誤,可能原因是賬戶密碼不正確或者無許可權,或者本地網路不在可訪問名單中

3. 如何使用Java API讀寫HDFS

Java API讀寫HDFS

public class FSOptr {

/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);

}

// 創建文件目錄
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path("/user/hadoop/data/20140318");
boolean result = fs.mkdirs(dir);// 創建文件夾
System.out.println("make dir :" + result);

// 創建文件,並寫入內容
Path dst = new Path("/user/hadoop/data/20140318/tmp");
byte[] buff = "hello,hadoop!".getBytes();
FSDataOutputStream outputStream = fs.create(dst);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}

// 重命名文件
private static void rename(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);
Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
Path newName = new Path("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName, newName);

FileStatus files[] = fs.listStatus(new Path(
"/user/hadoop/data/20140318"));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}

// 刪除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/data/20140318");
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files) {
fs.delete(file.getPath());
}
} else {
fs.delete(path);
}

// 或者
fs.delete(path, true);

fs.close();
}

/**
* 下載,將hdfs文件下載到本地磁碟
*
* @param localSrc1
* 本地的文件地址,即文件的路徑
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {

Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);

fs.ToLocalFile(hdfs_path, local_path);

return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}

/**
* 上傳,將本地文件到hdfs系統中
*
* @param localSrc
* 本地的文件地址,即文件的路徑
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置對象
FileSystem fs; // 文件系統
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 輸出流,創建一個輸出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重寫progress方法
public void progress() {
// System.out.println("上傳完一個設定緩存區大小容量的文件!");
}
});
// 連接兩個流,形成通道,使輸入流向輸出流傳輸數據,
IOUtils.Bytes(in, out, 10240, true); // in為輸入流對象,out為輸出流對象,4096為緩沖區大小,true為上傳後關閉流
return true;
} catch (IOException e) {
e.printStackTrace();
}

} catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}

/**
* 移動
*
* @param old_st原來存放的路徑
* @param new_st移動到的路徑
*/
public boolean moveFileName(String old_st, String new_st) {

try {

// 下載到伺服器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文檔/temp");
Configuration conf = new Configuration();
FileSystem fs = null;

// 刪除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
} catch (IOException e) {
e.printStackTrace();
}

// 從伺服器本地傳到新路徑
new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
boolean uplod_flag = sendToHdfs1("/home/hadoop/文檔/temp", new_st);

if (down_flag && uplod_flag) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}

// 本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path src = new Path("/home/hadoop/word.txt");
Path dst = new Path("/user/hadoop/data/");
fs.FromLocalFile(src, dst);
fs.close();
}

// 獲取給定目錄下的所有子目錄以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop");
getFile(path, fs);
}

private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i < fileStatus.length; i++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
} else {
System.out.println(fileStatus[i].getPath().toString());
}
}
}

//判斷文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
}

//獲取hdfs集群所有主機結點數據
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println("list of all the nodes in HDFS cluster:"); //print info

for(int i=0; i < dataNodeStats.length; i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info

}
}

//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i < blkCount; i++){
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts

System.out.println(hosts);
}
}

//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);

long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch

Date d = new Date(modificationTime);
System.out.println(d);
}

}

4. 如何使用Python為Hadoop編寫一個簡單的MapRece程序

在這個實例中,我將會向大家介紹如何使用Python 為 Hadoop編寫一個簡單的MapRece
程序。
盡管Hadoop 框架是使用Java編寫的但是我們仍然需要使用像C++、Python等語言來實現Hadoop程序。盡管Hadoop官方網站給的示常式序是使用Jython編寫並打包成Jar文件,這樣顯然造成了不便,其實,不一定非要這樣來實現,我們可以使用Python與Hadoop 關聯進行編程,看看位於/src/examples/python/WordCount.py 的例子,你將了解到我在說什麼。

我們想要做什麼?

我們將編寫一個簡單的 MapRece 程序,使用的是C-Python,而不是Jython編寫後打包成jar包的程序。
我們的這個例子將模仿 WordCount 並使用Python來實現,例子通過讀取文本文件來統計出單詞的出現次數。結果也以文本形式輸出,每一行包含一個單詞和單詞出現的次數,兩者中間使用製表符來想間隔。

先決條件

編寫這個程序之前,你學要架設好Hadoop 集群,這樣才能不會在後期工作抓瞎。如果你沒有架設好,那麼在後面有個簡明教程來教你在Ubuntu linux 上搭建(同樣適用於其他發行版linux、unix)

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立單節點的 Hadoop 集群

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多節點的 Hadoop 集群

Python的MapRece代碼

使用Python編寫MapRece代碼的技巧就在於我們使用了 HadoopStreaming 來幫助我們在Map 和 Rece間傳遞數據通過STDIN (標准輸入)和STDOUT (標准輸出).我們僅僅使用Python的sys.stdin來輸入數據,使用sys.stdout輸出數據,這樣做是因為HadoopStreaming會幫我們辦好其他事。這是真的,別不相信!

Map: mapper.py

將下列的代碼保存在/home/hadoop/mapper.py中,他將從STDIN讀取數據並將單詞成行分隔開,生成一個列表映射單詞與發生次數的關系:
注意:要確保這個腳本有足夠許可權(chmod +x /home/hadoop/mapper.py)。

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Rece step, i.e. the input for recer.py
#
# tab-delimited; the trivial word count is 1
print '%s\\t%s' % (word, 1)在這個腳本中,並不計算出單詞出現的總數,它將輸出 "<word> 1" 迅速地,盡管<word>可能會在輸入中出現多次,計算是留給後來的Rece步驟(或叫做程序)來實現。當然你可以改變下編碼風格,完全尊重你的習慣。

Rece: recer.py

將代碼存儲在/home/hadoop/recer.py 中,這個腳本的作用是從mapper.py 的STDIN中讀取結果,然後計算每個單詞出現次數的總和,並輸出結果到STDOUT。
同樣,要注意腳本許可權:chmod +x /home/hadoop/recer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

# maps words to their counts
word2count = {}

# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()

# parse the input we got from mapper.py
word, count = line.split('\\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass

# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))

# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\\t%s'% (word, count)
測試你的代碼(cat data | map | sort | rece)

我建議你在運行MapRece job測試前嘗試手工測試你的mapper.py 和 recer.py腳本,以免得不到任何返回結果
這里有一些建議,關於如何測試你的Map和Rece的功能:
——————————————————————————————————————————————
\r\n
# very basic test
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
——————————————————————————————————————————————
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py | sort | /home/hadoop/recer.py
bar 1
foo 3
labs 1
——————————————————————————————————————————————

# using one of the ebooks as example input
# (see below on where to get the ebooks)
hadoop@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hadoop/mapper.py
The 1
Project 1
Gutenberg 1
EBook 1
of 1
[...]
(you get the idea)

quux 2

quux 1

——————————————————————————————————————————————

在Hadoop平台上運行Python腳本

為了這個例子,我們將需要三種電子書:

The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson\r\n
The Notebooks of Leonardo Da Vinci\r\n
Ulysses by James Joyce
下載他們,並使用us-ascii編碼存儲 解壓後的文件,保存在臨時目錄,比如/tmp/gutenberg.

hadoop@ubuntu:~$ ls -l /tmp/gutenberg/
total 3592
-rw-r--r-- 1 hadoop hadoop 674425 2007-01-22 12:56 20417-8.txt
-rw-r--r-- 1 hadoop hadoop 1423808 2006-08-03 16:36 7ldvc10.txt
-rw-r--r-- 1 hadoop hadoop 1561677 2004-11-26 09:48 ulyss12.txt
hadoop@ubuntu:~$

復制本地數據到HDFS

在我們運行MapRece job 前,我們需要將本地的文件復制到HDFS中:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -FromLocal /tmp/gutenberg gutenberg
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
/user/hadoop/gutenberg <dir>
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg
Found 3 items
/user/hadoop/gutenberg/20417-8.txt <r 1> 674425
/user/hadoop/gutenberg/7ldvc10.txt <r 1> 1423808
/user/hadoop/gutenberg/ulyss12.txt <r 1> 1561677

執行 MapRece job

現在,一切准備就緒,我們將在運行Python MapRece job 在Hadoop集群上。像我上面所說的,我們使用的是
HadoopStreaming 幫助我們傳遞數據在Map和Rece間並通過STDIN和STDOUT,進行標准化輸入輸出。

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper /home/hadoop/mapper.py -recer /home/hadoop/recer.py -input gutenberg/*
-output gutenberg-output
在運行中,如果你想更改Hadoop的一些設置,如增加Rece任務的數量,你可以使用「-jobconf」選項:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-jobconf mapred.rece.tasks=16 -mapper ...

一個重要的備忘是關於Hadoop does not honor mapred.map.tasks
這個任務將會讀取HDFS目錄下的gutenberg並處理他們,將結果存儲在獨立的結果文件中,並存儲在HDFS目錄下的
gutenberg-output目錄。
之前執行的結果如下:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper /home/hadoop/mapper.py -recer /home/hadoop/recer.py -input gutenberg/*
-output gutenberg-output

additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [/usr/local/hadoop-datastore/hadoop-hadoop/hadoop-unjar54543/]
[] /tmp/streamjob54544.jar tmpDir=null
[...] INFO mapred.FileInputFormat: Total input paths to process : 7
[...] INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-hadoop/mapred/local]
[...] INFO streaming.StreamJob: Running job: job_200803031615_0021
[...]
[...] INFO streaming.StreamJob: map 0% rece 0%
[...] INFO streaming.StreamJob: map 43% rece 0%
[...] INFO streaming.StreamJob: map 86% rece 0%
[...] INFO streaming.StreamJob: map 100% rece 0%
[...] INFO streaming.StreamJob: map 100% rece 33%
[...] INFO streaming.StreamJob: map 100% rece 70%
[...] INFO streaming.StreamJob: map 100% rece 77%
[...] INFO streaming.StreamJob: map 100% rece 100%
[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021

[...] INFO streaming.StreamJob: Output: gutenberg-output hadoop@ubuntu:/usr/local/hadoop$

正如你所見到的上面的輸出結果,Hadoop 同時還提供了一個基本的WEB介面顯示統計結果和信息。
當Hadoop集群在執行時,你可以使用瀏覽器訪問 http://localhost:50030/ ,如圖:

檢查結果是否輸出並存儲在HDFS目錄下的gutenberg-output中:

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg-output
Found 1 items
/user/hadoop/gutenberg-output/part-00000 <r 1> 903193 2007-09-21 13:00
hadoop@ubuntu:/usr/local/hadoop$

可以使用dfs -cat 命令檢查文件目錄

hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat gutenberg-output/part-00000
"(Lo)cra" 1
"1490 1
"1498," 1
"35" 1
"40," 1
"A 2
"AS-IS". 2
"A_ 1
"Absoluti 1
[...]
hadoop@ubuntu:/usr/local/hadoop$

注意比輸出,上面結果的(")符號不是Hadoop插入的。

轉載僅供參考,版權屬於原作者。祝你愉快,滿意請採納哦

5. 關於python使用hdfs3模塊,提示找不到libhdfs3的處理

我在自己的Linux環境下安裝了libhdfs3,發現不工作,提示找不到hdfs3這個庫
於是按照網上的提示,先嘗試用pip來安裝解決,但是發現還是無解!

於是我轉向anaconda2: https://www.anaconda.com/download/#macos
找到對應的installer安裝,總算安裝尺仔吵成陵侍戚擾功

開始安裝hdfs3

然後找到對應的安裝路徑

在我的python文件頭前加入以下幾句話,就可以解決這個問題

6. 現在學好python能幹什麼

Python培訓課程大同小異,整理如下:

Python語言基礎:主要學習Python基礎知識,如Python3、數據類型、字元串、函數、類、文件操作等。

Python語言高級:主要學習Python庫、正則表達式帆慎行、進程線程、爬蟲、遍歷以及MySQL資料庫。

Pythonweb開發:主要學習HTML、CSS、JavaScript、jQuery等前端知識,掌握python三大後端框架(Django、 Flask以及Tornado)。

Linux基礎:主要態嘩學習Linux相關的各種命令,如文件處理命令、壓縮解壓命令、許可權管理以及Linux Shell開發等。

Linux運維自動化開發:主要學習Python開發Linux運維、Linux運維報警工具開發、Linux運維報警安全審計開發、Linux業務質量報表工具開發、Kali安全檢測工具檢測以及Kali 密碼破解實戰。

Python爬蟲:主要學習python爬蟲技術,掌握多線程爬蟲技術,分布式爬蟲技術。

Python數據分析和大數據:主要學習numpy數據處理、pandas數據分析、matplotlib數據可視化、scipy數據統計分析以及python 金融數據分析;Hadoop HDFS、python Hadoop MapRece、python Spark core、python Spark SQL以及python Spark MLlib。

Python機器學習:主要學習KNN演算法、線性回歸、邏輯斯蒂回歸演算法、決策樹演算法、孝清樸素貝葉斯演算法、支持向量機以及聚類k-means演算法。

7. python的map和rece和Hadoop的MapRece有什麼關系

關系就是都是基於Map-Rece的處理思想設計出來的。
從用戶角度看功能其實差不多,
Python的Map函數和Hadoop的Map階段對輸入進行逐行處理;
Python的Rece函數和Hadoop的Rece階段對輸入進行累積處理。
但是其實完整的Hadoop MapRece是Map+Shuffle+Sort+Rece過程。
其中Shuffle過程是為了讓分布式機群之間將同Key數據進行互相交換,Sort過程是根據Key對所有數據進行排序,從而才能完成類WordCount功能,而這兩步在Python裡面當然是需要用戶自己去編寫的。

8. 如何使用Python為Hadoop編寫一個簡單的MapRece程序

我們將編寫一個簡單的 MapRece 程序,使用的是C-Python,而不是Jython編寫後打包成jar包的程序。
我們的這個例子將模仿 WordCount 並使用Python來實現,例子通過讀取文本文件來統計出單詞的出現次數。結果也以文本形式輸出,每一行包含一個單詞和單詞出現的次數,兩者中間使用製表符來想間隔。

先決條件

編寫這個程序之前,你學要架設好Hadoop 集群,這樣才能不會在後期工作抓瞎。如果你沒有架設好,那麼在後面有個簡明教程來教你在Ubuntu Linux 上搭建(同樣適用於其他發行版linux、unix)

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立單節點的 Hadoop 集群

如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多節點的 Hadoop 集群

Python的MapRece代碼

使用Python編寫MapRece代碼的技巧就在於我們使用了 HadoopStreaming 來幫助我們在Map 和 Rece間傳遞數據通過STDIN (標准輸入)和STDOUT (標准輸出).我們僅僅使用Python的sys.stdin來輸入數據,使用sys.stdout輸出數據,這樣做是因為HadoopStreaming會幫我們辦好其他事。這是真的,別不相信!

9. HDFS由什麼組成

大數據平台包含了採集層、存儲層、計算層和應用層,是一個復雜的IT系統,需要學會Hadoop等分布式系統的開發技能。
1.1採集層:Sqoop可用來採集導入傳統關系型資料庫的數據、Flume對於日誌型數據採集,另外使用Python一類的語言開發網路爬蟲獲取網路數據;
1.2儲存層:分布式文件系統HDFS最為常用;採用了主從(Master/Slave)結構模型,一個HDFS集群是由一個NameNode和若干個DataNode組成的。其中NameNode作為主伺服器,管理文件系統的命名空察沖碧間和客戶端對文件的訪問操作;集群中的DataNode管理存儲的數據。
1.3計算層:有不同的計算框架可以選擇,常見的如MapRece、Spark等,一般來講,如果能使用計算框架的「原生語言」,運算效率會最高(MapRece的原生支敗舉持Java,而Spark原生支持Scala);
1.4應用層:包括結果數據的可視化、交互界面開發以及應用管理工具的開發等,更多的用到Java、Python等通用IT開發前端、後判迅端的能力;

10. Python怎麼獲取HDFS文件的編碼格式

你好,你可以利用python3的python3-magic來獲得文賣漏舉件的編碼格式。下面是對中碧應的代碼搜陪
import magic

blob = open('unknown-file').read()

m = magic.open(magic.MAGIC_MIME_ENCODING)

m.load()

encoding = m.buffer(blob) # "utf-8" "us-ascii" etc

閱讀全文

與python訪問hdfs文件相關的資料

熱點內容
失控的演算法代碼 瀏覽:293
程序員說有人愛你怎麼回答 瀏覽:106
騰訊游戲安卓怎麼用ios登錄 瀏覽:759
石獅雲存儲伺服器 瀏覽:180
python滲透入門到精通 瀏覽:272
如何真機調試安卓進程 瀏覽:739
農行app怎麼交公共維修基金 瀏覽:667
python中字典增加元素 瀏覽:240
伺服器端渲染的數據怎麼爬 瀏覽:163
壓縮空氣噴射器 瀏覽:488
python提高效率 瀏覽:796
華為文件管理怎麼樣輸入解壓碼 瀏覽:800
深思加密狗初始化 瀏覽:566
黃金崩潰pdf 瀏覽:310
華為特定簡訊息加密 瀏覽:375
微機原理與單片機技術李精華答案 瀏覽:816
pic12c508單片機 瀏覽:309
androidgps調用 瀏覽:226
金文編pdf 瀏覽:445
14乘87減147的簡便演算法 瀏覽:473