‘壹’ 如何实现让用户在网页中上传下载文件到HDFS中
hadoop计算需要在hdfs文件系统上进行,文件上传到hdfs上通常有三种方法:a hadoop自带的dfs服务,put;b hadoop的API,Writer对象可以实现这一功能;c 调用OTL可执行程序,数据从数据库直接进入hadoop
hadoop计算需要在hdfs文件系统上进行,因此每次计算之前必须把需要用到的文件(我们称为原始文件)都上传到hdfs上。文件上传到hdfs上通常有三种方法:
a hadoop自带的dfs服务,put;
b hadoop的API,Writer对象可以实现这一功能;
c 调用OTL可执行程序,数据从数据库直接进入hadoop
由于存在ETL层,因此第三种方案不予考虑
将a、b方案进行对比,如下:
1 空间:方案a在hdfs上占用空间同本地,因此假设只上传日志文件,则保存一个月日志文件将消耗掉约10T空间,如果加上这期间的各种维表、事实表,将占用大约25T空间
方案b经测试,压缩比大约为3~4:1,因此假设hdfs空间为100T,原来只能保存约4个月的数据,现在可以保存约1年
2 上传时间:方案a的上传时间经测试,200G数据上传约1小时
方案b的上传时间,程序不做任何优化,大约是以上的4~6倍,但存在一定程度提升速度的余地
3 运算时间:经过对200G数据,大约4亿条记录的测试,如果程序以IO操作为主,则压缩数据的计算可以提高大约50%的速度,但如果程序以内存操作为主,则只能提高5%~10%的速度
4 其它:未压缩的数据还有一个好处是可以直接在hdfs上查看原始数据。压缩数据想看原始数据只能用程序把它导到本地,或者利用本地备份数据
压缩格式:按照hadoop api的介绍,压缩格式分两种:BLOCK和RECORD,其中RECORD是只对value进行压缩,一般采用BLOCK进行压缩。
对压缩文件进行计算,需要用SequenceFileInputFormat类来读入压缩文件,以下是计算程序的典型配置代码:
JobConf conf = new JobConf(getConf(), log.class);
conf.setJobName(”log”);
conf.setOutputKeyClass(Text.class);//set the map output key type
conf.setOutputValueClass(Text.class);//set the map output value type
conf.setMapperClass(MapClass.class);
//conf.setCombinerClass(Rece.class);//set the combiner class ,if havenot, use Recuce class for default
conf.setRecerClass(Rece.class);
conf.setInputFormat(SequenceFileInputFormat.class);//necessary if use compress
接下来的处理与非压缩格式的处理一样
‘贰’ HDFS文件
Hadoop支持的文件系统由很多(见下图),HDFS只是其中一种实现。java抽象类 org.apache.hadoop.fs.FileSystem 定义了Hadoop中一个文件系统的客户端接口,并且该抽象类有几个具体实现。Hadoop一般使用URI(下图)方案来选取合适的文件系统实例进行交互。
特别的,HDFS文件系统的操作可以使用 FsSystem shell 、客户端(http rest api、Java api、C api等)。
FsSystem shell 的用法基本同本地shell类似,命令可参考 FsSystem shell
Hadoop是用Java写的,通过Java Api( FileSystem 类)可以调用大部分Hadoop文件系统的交互操作。更详细的介绍可参考 hadoop Filesystem 。
非Java开发的应用可以使用由WebHDFS协议提供的HTTP REST API,但是HTTP比原生的Java客户端要慢,所以不到万不得已尽量不要使用HTTP传输特大数据。通过HTTP来访问HDFS有两种方法:
两种如图
在第一种情况中,namenode和datanode内嵌的web服务作为WebHDFS的端节点运行(是否启用WebHDFS可通过dfs.webhdfs.enabled设置,默认为true)。文件元数据在namenode上,文件读写操作首先被发往namenode,有namenode发送一个HTTP重定向至某个客户端,指示以流的方式传输文件数据的目的或源datanode。
第二种方法依靠一个或多个独立代理服务器通过HTTP访问HDFS。所有集群的网络通信都需要通过代理,因此客户端从来不直接访问namenode或datanode。使用代理后可以使用更严格的防火墙策略和带宽策略。
HttpFs代理提供和WebHDFS相同的HTTP接口,这样客户端能够通过webhdfs URI访问接口。HttpFS代理启动独立于namenode和datanode的守护进程,使用httpfs.sh 脚本,默认在一个不同的端口上监听(14000)。
下图描述了
读文件时客户端与 HDFS 中的 namenode, datanode 之间的数据流动。
对上图的解释如下:
在读取过程中, 如果 FSDataInputStream 在和一个 datanode 进行交流时出现了一个错误,他就去试一试下一个最接近的块,他当然也会记住刚才发生错误的 datanode 以至于之后不会再在这个 datanode 上进行没必要的尝试。 DFSInputStream 也会在 datanode 上传输出的数据上核查检查数(checknums).如果损坏的块被发现了, DFSInputStream 就试图从另一个拥有备份的 datanode 中去读取备份块中的数据。
在这个设计中一个重要的方面就是客户端直接从 datanode 上检索数据,并通过 namenode 指导来得到每一个块的最佳 datanode。这种设计允许 HDFS 扩展大量的并发客户端,因为数据传输只是集群上的所有 datanode 展开的。期间,namenode 仅仅只需要服务于获取块位置的请求(块位置信息是存放在内存中,所以效率很高)。如果不这样设计,随着客户端数据量的增长,数据服务就会很快成为一个瓶颈。
我们知道,相对于客户端(之后就是 maprece task 了),块的位置有以下可能性:
我们认为他们对于客户端的带宽递减,距离递增(括号中表示距离)。示意图如下:
如果集群中的机器都在同一个机架上,我们无需其他配置,若集群比较复杂,由于hadoop无法自动发现网络拓扑,所以需要额外配置网络拓扑。
基本读取程序,将文件内容输出到console
FileSystemCat
随机读取
展开原码
下图描述了写文件时客户端与 HDFS 中的 namenode, datanode 之间的数据流动。
对上图的解释如下:
如果在任何一个 datanode 在写入数据的时候失败了,接下来所做的一切对客户端都是透明的:首先, pipeline 被关闭,在确认队列中的剩下的包会被添加进数据队列的起始位置上,以至于在失败的节点下游的任 何节点都不会丢失任何的包。然后与 namenode 联系后,当前在一个好的 datanode 会联系 namenode, 给失败节点上还未写完的块生成一个新的标识ID, 以至于如果这个失败的 datanode 不久后恢复了,这个不完整的块将会被删除。失败节点会从 pipeline 中移除,然后剩下两个好的 datanode 会组成一个的新的 pipeline ,剩下的 这些块的包(也就是刚才放在数据队列队首的包)会继续写进 pipeline 中好的 datanode 中。最后,namenode 注意到块备份数小于规定的备份数,他就安排在另一个节点上创建完成备份,直接从已有的块中复制就可以。然后一直到满足了备份数( dfs.replication )。如果有多个节点的写入失败了,如果满足了最小备份数的设置( dfs.namenode.repliction.min ),写入也将会成功,然后剩下的备份会被集群异步的执行备份,直到满足了备份数( dfs.replication )。
创建目录
文件压缩有两大好处:
Hadoop 对于压缩格式的是自动识别。如果我们压缩的文件有相应压缩格式的扩展名(比如 lzo,gz,bzip2 等)。Hadoop 会根据压缩格式的扩展名自动选择相对应的解码器来解压数据,此过程完全是 Hadoop 自动处理,我们只需要确保输入的压缩文件有扩展名。
Hadoop中有多种压缩格式、算法和工具,下图列出了常用的压缩方法。
表中的“是否可切分”表示对应的压缩算法是否支持切分,也就是说是否可以搜索数据流的任意位置并进一步往下读取数据,可切分的压缩格式尤其适合MapRece。
所有的压缩算法都需要权衡空间/时间:压缩和解压缩速度更快,其代价通常是只能节省少量的空间。不同的压缩工具有不同的特性:
更详细的比较如下
1.压缩性能比较
2.优缺点
另外使用hadoop原生(native)类库比其他java实现有更快的压缩和解压缩速度。特征比较如下:
使用容器文件格式结合压缩算法也能更好的提高效率。顺序文件、Arvo文件、ORCFiles、Parqurt文件同时支持压缩和切分。
压缩举例(Java)
压缩
解压缩
六、文件序列化
序列化是指将结构化数据转换为字节流以便在网络上传输或写到磁盘进行永久存储。反序列化狮子将字节流转换回结构化对象的逆过程。
序列化用于分布式数据处理的两大领域:进程间通信和永久存储。
对序列化的要求时是格式紧凑(高效使用存储空间)、快速(读写效率高)、可扩展(可以透明地读取老格式数据)且可以互操作(可以使用不同的语言读写数据)。
Hadoop使用的是自己的序列化格式 Writable ,它绝对紧凑、速度快,但不太容易用java以外的语言进行扩展或使用。
当然,用户也可以使用其他序列化框架或者自定义序列化方式,如 Avro 框架。
Hadoop内部还使用了 Apache Thrift 和 Protocal Buffers 来实现RPC和数据交换。
‘叁’ 刚学习spark,想上传文件给hdfs,是不是需要hadoop然后java编程这样是用eclip
spark会把hdfs当做一个数据源来处理, 所以数据存储都要做, 之后编程是从Hadoop改成spark就可以了. 是否用eclipse无所谓, 只要能编译运行就可以
‘肆’ 如何使用Java API读写HDFS
Java API读写HDFS
public class FSOptr {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
// 创建文件目录
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path("/user/hadoop/data/20140318");
boolean result = fs.mkdirs(dir);// 创建文件夹
System.out.println("make dir :" + result);
// 创建文件,并写入内容
Path dst = new Path("/user/hadoop/data/20140318/tmp");
byte[] buff = "hello,hadoop!".getBytes();
FSDataOutputStream outputStream = fs.create(dst);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
Path newName = new Path("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName, newName);
FileStatus files[] = fs.listStatus(new Path(
"/user/hadoop/data/20140318"));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 删除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/data/20140318");
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files) {
fs.delete(file.getPath());
}
} else {
fs.delete(path);
}
// 或者
fs.delete(path, true);
fs.close();
}
/**
* 下载,将hdfs文件下载到本地磁盘
*
* @param localSrc1
* 本地的文件地址,即文件的路径
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fs.ToLocalFile(hdfs_path, local_path);
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* 上传,将本地文件到hdfs系统中
*
* @param localSrc
* 本地的文件地址,即文件的路径
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置对象
FileSystem fs; // 文件系统
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 输出流,创建一个输出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重写progress方法
public void progress() {
// System.out.println("上传完一个设定缓存区大小容量的文件!");
}
});
// 连接两个流,形成通道,使输入流向输出流传输数据,
IOUtils.Bytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
return true;
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}
/**
* 移动
*
* @param old_st原来存放的路径
* @param new_st移动到的路径
*/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下载到服务器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");
Configuration conf = new Configuration();
FileSystem fs = null;
// 删除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
} catch (IOException e) {
e.printStackTrace();
}
// 从服务器本地传到新路径
new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);
if (down_flag && uplod_flag) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// 本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path src = new Path("/home/hadoop/word.txt");
Path dst = new Path("/user/hadoop/data/");
fs.FromLocalFile(src, dst);
fs.close();
}
// 获取给定目录下的所有子目录以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop");
getFile(path, fs);
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i < fileStatus.length; i++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
} else {
System.out.println(fileStatus[i].getPath().toString());
}
}
}
//判断文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
}
//获取hdfs集群所有主机结点数据
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println("list of all the nodes in HDFS cluster:"); //print info
for(int i=0; i < dataNodeStats.length; i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i < blkCount; i++){
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts
System.out.println(hosts);
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
System.out.println(d);
}
}
‘伍’ hadoop上传文件出错:java.io.IOException: Mkdirs failed to create /user/hadoop
不了解Hadoop,不过看错误是创建文件夹失败,最好检查下权限问题
‘陆’ 用java向hdfs上传文件时,如何实现断点续传
@Component("javaLargeFileUploaderServlet")
@WebServlet(name = "javaLargeFileUploaderServlet", urlPatterns = { "/javaLargeFileUploaderServlet" })
public class UploadServlet extends HttpRequestHandlerServlet
implements HttpRequestHandler {
private static final Logger log = LoggerFactory.getLogger(UploadServlet.class);
@Autowired
UploadProcessor uploadProcessor;
@Autowired
FileUploaderHelper fileUploaderHelper;
@Autowired
ExceptionCodeMappingHelper exceptionCodeMappingHelper;
@Autowired
Authorizer authorizer;
@Autowired
StaticStateIdentifierManager staticStateIdentifierManager;
@Override
public void handleRequest(HttpServletRequest request, HttpServletResponse response)
throws IOException {
log.trace("Handling request");
Serializable jsonObject = null;
try {
// extract the action from the request
UploadServletAction actionByParameterName =
UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action));
// check authorization
checkAuthorization(request, actionByParameterName);
// then process the asked action
jsonObject = processAction(actionByParameterName, request);
// if something has to be written to the response
if (jsonObject != null) {
fileUploaderHelper.writeToResponse(jsonObject, response);
}
}
// If exception, write it
catch (Exception e) {
exceptionCodeMappingHelper.processException(e, response);
}
}
private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName)
throws MissingParameterException, AuthorizationException {
// check authorization
// if its not get progress (because we do not really care about authorization for get
// progress and it uses an array of file ids)
if (!actionByParameterName.equals(UploadServletAction.getProgress)) {
// extract uuid
final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false);
// if this is init, the identifier is the one in parameter
UUID clientOrJobId;
String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
if (actionByParameterName.equals(UploadServletAction.getConfig) && parameter != null) {
clientOrJobId = UUID.fromString(parameter);
}
// if not, get it from manager
else {
clientOrJobId = staticStateIdentifierManager.getIdentifier();
}
// call authorizer
authorizer.getAuthorization(
request,
actionByParameterName,
clientOrJobId,
fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null);
}
}
private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request)
throws Exception {
log.debug("Processing action " + actionByParameterName.name());
Serializable returnObject = null;
switch (actionByParameterName) {
case getConfig:
String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
returnObject =
uploadProcessor.getConfig(
parameterValue != null ? UUID.fromString(parameterValue) : null);
break;
case verifyCrcOfUncheckedPart:
returnObject = verifyCrcOfUncheckedPart(request);
break;
case prepareUpload:
returnObject = prepareUpload(request);
break;
case clearFile:
uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case clearAll:
uploadProcessor.clearAll();
break;
case pauseFile:
List<UUID> uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
uploadProcessor.pauseFile(uuids);
break;
case resumeFile:
returnObject =
uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case setRate:
uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)),
Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate)));
break;
case getProgress:
returnObject = getProgress(request);
break;
}
return returnObject;
}
List<UUID> getFileIdsFromString(String fileIds) {
String[] splittedFileIds = fileIds.split(",");
List<UUID> uuids = Lists.newArrayList();
for (int i = 0; i < splittedFileIds.length; i++) {
uuids.add(UUID.fromString(splittedFileIds[i]));
}
return uuids;
}
private Serializable getProgress(HttpServletRequest request)
throws MissingParameterException {
Serializable returnObject;
String[] ids =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class);
Collection<UUID> uuids = Collections2.transform(Arrays.asList(ids), new Function<String, UUID>() {
@Override
public UUID apply(String input) {
return UUID.fromString(input);
}
});
returnObject = Maps.newHashMap();
for (UUID fileId : uuids) {
try {
ProgressJson progress = uploadProcessor.getProgress(fileId);
((HashMap<String, ProgressJson>) returnObject).put(fileId.toString(), progress);
}
catch (FileNotFoundException e) {
log.debug("No progress will be retrieved for " + fileId + " because " + e.getMessage());
}
}
return returnObject;
}
private Serializable prepareUpload(HttpServletRequest request)
throws MissingParameterException, IOException {
// extract file information
PrepareUploadJson[] fromJson =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class);
// prepare them
final HashMap<String, UUID> prepareUpload = uploadProcessor.prepareUpload(fromJson);
// return them
return Maps.newHashMap(Maps.transformValues(prepareUpload, new Function<UUID, String>() {
public String apply(UUID input) {
return input.toString();
};
}));
}
private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request)
throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException {
UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
try {
uploadProcessor.verifyCrcOfUncheckedPart(fileId,
fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc));
}
catch (InvalidCrcException e) {
// no need to log this exception, a fallback behaviour is defined in the
// throwing method.
// but we need to return something!
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}
‘柒’ 关于用java写程序把本地文件上传到HDFS中的问题
将这FileSystem hdfs = FileSystem.get(config);
改成FileSystem hdfs = FileSystem.get(URI.create("hdfs://master:9000"),config)
上面那句取得的是本地文件系统对象,改成下面这个才是取得hdfs文件系统对象,当你要操作本地文件对象的时候就要用上面那句取得本地文件对象,我在2.7.4刚开始也是跟你一样的错误,改为下面的就可以了
‘捌’ hadoop使用put上传文件出错
先看看 /user/xxx目录的权限:drwxr-xr-x - hdfs supergroup 表示它属于hdfs用户,组名为 supergroup
因此需要使用 sudo -u hdfs hadoop fs -put localfile /user/xxx 来指定使用 hdfs 用户来执行上传命令。参考
当高兴地执行sudo -u hdfs hadoop fs -put localfile /user/xxx 以为能成功上传时,又报错:
put: localfile No such file or directory 说找不到本地文件localfile,可是用 ls 明明 能看到 localfile ,后来在一篇文章(参考)中发现发来是lcoalfile的权限问题。