1. 为什么从事大数据行业,一定要学习python
Python这只小虫子最近随着大数据的兴起可以说是十分的火了。有越来越多的人不敢小觑Python这门语言了。也有更多的人在学习Python。Python为何会有如此大的魅力?为什么从事大数据行业必学Python?这还要从Python这门语言的优点开始讲起。
虽然Python这种语言不如Java、C++这些语言普及,却早在1991年就已经诞生了。它的语法简单清晰,以实用为主,是门十分朴素的语言。同时,它还是编程语言中的“和事佬”,被人戏称为胶水语言。因为它能够将其他语言制作的各种模块很轻松的联结在一起。
如果将Python语言拟人化,它绝对属于“老好人”的那一类,让人容易亲近,人们与它交流并不需要花太多心思。但它却拥有强大的功能。很多语言不能完成的任务,Python都能轻易完成。它几乎可以被用来做任何事情,应用于多个系统和平台。无论是系统操作还是Web开发,抑或是服务器和管理工具、部署、科学建模等,它都能轻松掌握。因此,从事海量数据处理的大数据行业,自然少不了这个“万能工具”。
除此之外,Python这只小虫子还受到了大数据老大哥Google的青睐。Google的很多开发都用到了Python。这使得人们能够找到Python的很多指南和教程。让你学起来更方便,你在使用中可能遇到的很多问题大多数都已经被Google给解决了,并把解决方法发布到了网络平台。
Python还拥有一系列非常优秀的库,这省了你编程中的很多时间。尤其是在人工智能和机器学习领域,这些库的价值体现得更为明显。
不管怎么说,从事大数据工作,少不得要在网络上爬取数据,不用Python爬虫,你还打算用什么呢?
因此,在当前的大数据领域,从事大数据行业必学Python。
人工智能、大数据、云计算和物联网的未来发展值得重视,均为前沿产业,多智时代专注于人工智能和大数据的入门和科谱,在此为你推荐几篇优质好文:
————————————————
版权声明:本文为CSDN博主“oshidai”的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/oshidai/article/details/88712833
2. Python使用hdfs存放文件时报Proxy error: 502 Server dropped connection解决方案
Python3 使用hdfs分布式文件储存系统
from pyhdfs import *
client = HdfsClient(hosts="testhdfs.org, 50070",
user_name="web_crawler") # 创建一个连接
client.get_home_directory() # 获取hdfs根路径
client.listdir(PATH) # 获取hdfs指定路径下的文件列表
client._from_local(file_path, hdfs_path, overwrite=True) # 把本地文件拷贝到服务器,不支持文件夹;overwrite=True表示存在则覆盖
client.delete(PATH, recursive=True) # 删除指定文件
hdfs_path必须包含文件名及其后缀,不握歼然不会成功
如果连接
HdfsClient
报错
Traceback (most recent call last):
File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 2963, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "
client.get_home_directory()
File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\pyhdfs.py", line 565, in get_home_directory
return _json(self._get('/', 'GETHOMEDIRECTORY', **kwargs))['Path']
File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\pyhdfs.py", line 391, in _get
return self._request('get', *args, **kwargs)
伍皮并 File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\pyhdfs.py", line 377, in _request
_check_response(response, expected_status)
File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\pyhdfs.py", line 799, in _check_response
腔迹 remote_exception = _json(response)['RemoteException']
File "C:\Users\billl\AppData\Local\Continuum\anaconda3\lib\site-packages\pyhdfs.py", line 793, in _json
"Expected JSON. Is WebHDFS enabled? Got {!r}".format(response.text))
pyhdfs.HdfsException: Expected JSON. Is WebHDFS enabled? Got '\n\n\n\n
502 Server dropped connection
\n
The following error occurred while trying to access http://%2050070:50070/webhdfs/v1/?user.name=web_crawler&op=GETHOMEDIRECTORY :
\n 502 Server dropped connection
\n
Generated Fri, 21 Dec 2018 02:03:18 GMT by Polipo on .\n\r\n'
则一般是访问认证错误,可能原因是账户密码不正确或者无权限,或者本地网络不在可访问名单中
3. 如何使用Java API读写HDFS
Java API读写HDFS
public class FSOptr {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
// 创建文件目录
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path("/user/hadoop/data/20140318");
boolean result = fs.mkdirs(dir);// 创建文件夹
System.out.println("make dir :" + result);
// 创建文件,并写入内容
Path dst = new Path("/user/hadoop/data/20140318/tmp");
byte[] buff = "hello,hadoop!".getBytes();
FSDataOutputStream outputStream = fs.create(dst);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
Path newName = new Path("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName, newName);
FileStatus files[] = fs.listStatus(new Path(
"/user/hadoop/data/20140318"));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 删除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/data/20140318");
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files) {
fs.delete(file.getPath());
}
} else {
fs.delete(path);
}
// 或者
fs.delete(path, true);
fs.close();
}
/**
* 下载,将hdfs文件下载到本地磁盘
*
* @param localSrc1
* 本地的文件地址,即文件的路径
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fs.ToLocalFile(hdfs_path, local_path);
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* 上传,将本地文件到hdfs系统中
*
* @param localSrc
* 本地的文件地址,即文件的路径
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置对象
FileSystem fs; // 文件系统
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 输出流,创建一个输出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重写progress方法
public void progress() {
// System.out.println("上传完一个设定缓存区大小容量的文件!");
}
});
// 连接两个流,形成通道,使输入流向输出流传输数据,
IOUtils.Bytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
return true;
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}
/**
* 移动
*
* @param old_st原来存放的路径
* @param new_st移动到的路径
*/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下载到服务器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");
Configuration conf = new Configuration();
FileSystem fs = null;
// 删除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
} catch (IOException e) {
e.printStackTrace();
}
// 从服务器本地传到新路径
new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);
if (down_flag && uplod_flag) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// 本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path src = new Path("/home/hadoop/word.txt");
Path dst = new Path("/user/hadoop/data/");
fs.FromLocalFile(src, dst);
fs.close();
}
// 获取给定目录下的所有子目录以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop");
getFile(path, fs);
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i < fileStatus.length; i++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
} else {
System.out.println(fileStatus[i].getPath().toString());
}
}
}
//判断文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
}
//获取hdfs集群所有主机结点数据
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println("list of all the nodes in HDFS cluster:"); //print info
for(int i=0; i < dataNodeStats.length; i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i < blkCount; i++){
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts
System.out.println(hosts);
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
System.out.println(d);
}
}
4. 如何使用Python为Hadoop编写一个简单的MapRece程序
在这个实例中,我将会向大家介绍如何使用Python 为 Hadoop编写一个简单的MapRece
程序。
尽管Hadoop 框架是使用Java编写的但是我们仍然需要使用像C++、Python等语言来实现Hadoop程序。尽管Hadoop官方网站给的示例程序是使用Jython编写并打包成Jar文件,这样显然造成了不便,其实,不一定非要这样来实现,我们可以使用Python与Hadoop 关联进行编程,看看位于/src/examples/python/WordCount.py 的例子,你将了解到我在说什么。
我们想要做什么?
我们将编写一个简单的 MapRece 程序,使用的是C-Python,而不是Jython编写后打包成jar包的程序。
我们的这个例子将模仿 WordCount 并使用Python来实现,例子通过读取文本文件来统计出单词的出现次数。结果也以文本形式输出,每一行包含一个单词和单词出现的次数,两者中间使用制表符来想间隔。
先决条件
编写这个程序之前,你学要架设好Hadoop 集群,这样才能不会在后期工作抓瞎。如果你没有架设好,那么在后面有个简明教程来教你在Ubuntu linux 上搭建(同样适用于其他发行版linux、unix)
如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立单节点的 Hadoop 集群
如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多节点的 Hadoop 集群
Python的MapRece代码
使用Python编写MapRece代码的技巧就在于我们使用了 HadoopStreaming 来帮助我们在Map 和 Rece间传递数据通过STDIN (标准输入)和STDOUT (标准输出).我们仅仅使用Python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为HadoopStreaming会帮我们办好其他事。这是真的,别不相信!
Map: mapper.py
将下列的代码保存在/home/hadoop/mapper.py中,他将从STDIN读取数据并将单词成行分隔开,生成一个列表映射单词与发生次数的关系:
注意:要确保这个脚本有足够权限(chmod +x /home/hadoop/mapper.py)。
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Rece step, i.e. the input for recer.py
#
# tab-delimited; the trivial word count is 1
print '%s\\t%s' % (word, 1)在这个脚本中,并不计算出单词出现的总数,它将输出 "<word> 1" 迅速地,尽管<word>可能会在输入中出现多次,计算是留给后来的Rece步骤(或叫做程序)来实现。当然你可以改变下编码风格,完全尊重你的习惯。
Rece: recer.py
将代码存储在/home/hadoop/recer.py 中,这个脚本的作用是从mapper.py 的STDIN中读取结果,然后计算每个单词出现次数的总和,并输出结果到STDOUT。
同样,要注意脚本权限:chmod +x /home/hadoop/recer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\\t%s'% (word, count)
测试你的代码(cat data | map | sort | rece)
我建议你在运行MapRece job测试前尝试手工测试你的mapper.py 和 recer.py脚本,以免得不到任何返回结果
这里有一些建议,关于如何测试你的Map和Rece的功能:
——————————————————————————————————————————————
\r\n
# very basic test
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
——————————————————————————————————————————————
hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py | sort | /home/hadoop/recer.py
bar 1
foo 3
labs 1
——————————————————————————————————————————————
# using one of the ebooks as example input
# (see below on where to get the ebooks)
hadoop@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hadoop/mapper.py
The 1
Project 1
Gutenberg 1
EBook 1
of 1
[...]
(you get the idea)
quux 2
quux 1
——————————————————————————————————————————————
在Hadoop平台上运行Python脚本
为了这个例子,我们将需要三种电子书:
The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson\r\n
The Notebooks of Leonardo Da Vinci\r\n
Ulysses by James Joyce
下载他们,并使用us-ascii编码存储 解压后的文件,保存在临时目录,比如/tmp/gutenberg.
hadoop@ubuntu:~$ ls -l /tmp/gutenberg/
total 3592
-rw-r--r-- 1 hadoop hadoop 674425 2007-01-22 12:56 20417-8.txt
-rw-r--r-- 1 hadoop hadoop 1423808 2006-08-03 16:36 7ldvc10.txt
-rw-r--r-- 1 hadoop hadoop 1561677 2004-11-26 09:48 ulyss12.txt
hadoop@ubuntu:~$
复制本地数据到HDFS
在我们运行MapRece job 前,我们需要将本地的文件复制到HDFS中:
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -FromLocal /tmp/gutenberg gutenberg
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
/user/hadoop/gutenberg <dir>
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg
Found 3 items
/user/hadoop/gutenberg/20417-8.txt <r 1> 674425
/user/hadoop/gutenberg/7ldvc10.txt <r 1> 1423808
/user/hadoop/gutenberg/ulyss12.txt <r 1> 1561677
执行 MapRece job
现在,一切准备就绪,我们将在运行Python MapRece job 在Hadoop集群上。像我上面所说的,我们使用的是
HadoopStreaming 帮助我们传递数据在Map和Rece间并通过STDIN和STDOUT,进行标准化输入输出。
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper /home/hadoop/mapper.py -recer /home/hadoop/recer.py -input gutenberg/*
-output gutenberg-output
在运行中,如果你想更改Hadoop的一些设置,如增加Rece任务的数量,你可以使用“-jobconf”选项:
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-jobconf mapred.rece.tasks=16 -mapper ...
一个重要的备忘是关于Hadoop does not honor mapred.map.tasks
这个任务将会读取HDFS目录下的gutenberg并处理他们,将结果存储在独立的结果文件中,并存储在HDFS目录下的
gutenberg-output目录。
之前执行的结果如下:
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.19.1-streaming.jar
-mapper /home/hadoop/mapper.py -recer /home/hadoop/recer.py -input gutenberg/*
-output gutenberg-output
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [/usr/local/hadoop-datastore/hadoop-hadoop/hadoop-unjar54543/]
[] /tmp/streamjob54544.jar tmpDir=null
[...] INFO mapred.FileInputFormat: Total input paths to process : 7
[...] INFO streaming.StreamJob: getLocalDirs(): [/usr/local/hadoop-datastore/hadoop-hadoop/mapred/local]
[...] INFO streaming.StreamJob: Running job: job_200803031615_0021
[...]
[...] INFO streaming.StreamJob: map 0% rece 0%
[...] INFO streaming.StreamJob: map 43% rece 0%
[...] INFO streaming.StreamJob: map 86% rece 0%
[...] INFO streaming.StreamJob: map 100% rece 0%
[...] INFO streaming.StreamJob: map 100% rece 33%
[...] INFO streaming.StreamJob: map 100% rece 70%
[...] INFO streaming.StreamJob: map 100% rece 77%
[...] INFO streaming.StreamJob: map 100% rece 100%
[...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
[...] INFO streaming.StreamJob: Output: gutenberg-output hadoop@ubuntu:/usr/local/hadoop$
正如你所见到的上面的输出结果,Hadoop 同时还提供了一个基本的WEB接口显示统计结果和信息。
当Hadoop集群在执行时,你可以使用浏览器访问 http://localhost:50030/ ,如图:
检查结果是否输出并存储在HDFS目录下的gutenberg-output中:
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls gutenberg-output
Found 1 items
/user/hadoop/gutenberg-output/part-00000 <r 1> 903193 2007-09-21 13:00
hadoop@ubuntu:/usr/local/hadoop$
可以使用dfs -cat 命令检查文件目录
hadoop@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat gutenberg-output/part-00000
"(Lo)cra" 1
"1490 1
"1498," 1
"35" 1
"40," 1
"A 2
"AS-IS". 2
"A_ 1
"Absoluti 1
[...]
hadoop@ubuntu:/usr/local/hadoop$
注意比输出,上面结果的(")符号不是Hadoop插入的。
转载仅供参考,版权属于原作者。祝你愉快,满意请采纳哦
5. 关于python使用hdfs3模块,提示找不到libhdfs3的处理
我在自己的Linux环境下安装了libhdfs3,发现不工作,提示找不到hdfs3这个库
于是按照网上的提示,先尝试用pip来安装解决,但是发现还是无解!
于是我转向anaconda2: https://www.anaconda.com/download/#macos
找到对应的installer安装,总算安装尺仔吵成陵侍戚扰功
开始安装hdfs3
然后找到对应的安装路径
在我的python文件头前加入以下几句话,就可以解决这个问题
6. 现在学好python能干什么
Python培训课程大同小异,整理如下:
Python语言基础:主要学习Python基础知识,如Python3、数据类型、字符串、函数、类、文件操作等。
Python语言高级:主要学习Python库、正则表达式帆慎行、进程线程、爬虫、遍历以及MySQL数据库。
Pythonweb开发:主要学习HTML、CSS、JavaScript、jQuery等前端知识,掌握python三大后端框架(Django、 Flask以及Tornado)。
Linux基础:主要态哗学习Linux相关的各种命令,如文件处理命令、压缩解压命令、权限管理以及Linux Shell开发等。
Linux运维自动化开发:主要学习Python开发Linux运维、Linux运维报警工具开发、Linux运维报警安全审计开发、Linux业务质量报表工具开发、Kali安全检测工具检测以及Kali 密码破解实战。
Python爬虫:主要学习python爬虫技术,掌握多线程爬虫技术,分布式爬虫技术。
Python数据分析和大数据:主要学习numpy数据处理、pandas数据分析、matplotlib数据可视化、scipy数据统计分析以及python 金融数据分析;Hadoop HDFS、python Hadoop MapRece、python Spark core、python Spark SQL以及python Spark MLlib。
Python机器学习:主要学习KNN算法、线性回归、逻辑斯蒂回归算法、决策树算法、孝清朴素贝叶斯算法、支持向量机以及聚类k-means算法。
7. python的map和rece和Hadoop的MapRece有什么关系
关系就是都是基于Map-Rece的处理思想设计出来的。
从用户角度看功能其实差不多,
Python的Map函数和Hadoop的Map阶段对输入进行逐行处理;
Python的Rece函数和Hadoop的Rece阶段对输入进行累积处理。
但是其实完整的Hadoop MapRece是Map+Shuffle+Sort+Rece过程。
其中Shuffle过程是为了让分布式机群之间将同Key数据进行互相交换,Sort过程是根据Key对所有数据进行排序,从而才能完成类WordCount功能,而这两步在Python里面当然是需要用户自己去编写的。
8. 如何使用Python为Hadoop编写一个简单的MapRece程序
我们将编写一个简单的 MapRece 程序,使用的是C-Python,而不是Jython编写后打包成jar包的程序。
我们的这个例子将模仿 WordCount 并使用Python来实现,例子通过读取文本文件来统计出单词的出现次数。结果也以文本形式输出,每一行包含一个单词和单词出现的次数,两者中间使用制表符来想间隔。
先决条件
编写这个程序之前,你学要架设好Hadoop 集群,这样才能不会在后期工作抓瞎。如果你没有架设好,那么在后面有个简明教程来教你在Ubuntu Linux 上搭建(同样适用于其他发行版linux、unix)
如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立单节点的 Hadoop 集群
如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多节点的 Hadoop 集群
Python的MapRece代码
使用Python编写MapRece代码的技巧就在于我们使用了 HadoopStreaming 来帮助我们在Map 和 Rece间传递数据通过STDIN (标准输入)和STDOUT (标准输出).我们仅仅使用Python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为HadoopStreaming会帮我们办好其他事。这是真的,别不相信!
9. HDFS由什么组成
大数据平台包含了采集层、存储层、计算层和应用层,是一个复杂的IT系统,需要学会Hadoop等分布式系统的开发技能。
1.1采集层:Sqoop可用来采集导入传统关系型数据库的数据、Flume对于日志型数据采集,另外使用Python一类的语言开发网络爬虫获取网络数据;
1.2储存层:分布式文件系统HDFS最为常用;采用了主从(Master/Slave)结构模型,一个HDFS集群是由一个NameNode和若干个DataNode组成的。其中NameNode作为主服务器,管理文件系统的命名空察冲碧间和客户端对文件的访问操作;集群中的DataNode管理存储的数据。
1.3计算层:有不同的计算框架可以选择,常见的如MapRece、Spark等,一般来讲,如果能使用计算框架的“原生语言”,运算效率会最高(MapRece的原生支败举持Java,而Spark原生支持Scala);
1.4应用层:包括结果数据的可视化、交互界面开发以及应用管理工具的开发等,更多的用到Java、Python等通用IT开发前端、后判迅端的能力;
10. Python怎么获取HDFS文件的编码格式
你好,你可以利用python3的python3-magic来获得文卖漏举件的编码格式。下面是对中碧应的代码搜陪
import magic
blob = open('unknown-file').read()
m = magic.open(magic.MAGIC_MIME_ENCODING)
m.load()
encoding = m.buffer(blob) # "utf-8" "us-ascii" etc