㈠ 如何使用java API读写HDFS
//流读入和写入
InputStream in=null;
//获取HDFS的conf
//读取HDFS上的文件系统
FileSystem hdfs=FileSystem.get(conf);
//使用缓冲流,进行按行读取的功能
BufferedReader buff=null;
//获取日志文件的根目录
Path listf =new Path("hdfs://10.2.143.5:9090/root/myfile/");
//获取根目录下的所有2级子文件目录
FileStatus stats[]=hdfs.listStatus(listf);
//自定义j,方便查看插入信息
int j=0;
for(int i = 0; i < stats.length; i++){
//获取子目录下的文件路径
FileStatus temp[]=hdfs.listStatus(new Path(stats[i].getPath().toString()));
for(int k = 0; k < temp.length;k++){
System.out.println("文件路径名:"+temp[k].getPath().toString());
//获取Path
Path p=new Path(temp[k].getPath().toString());
//打开文件流
in=hdfs.open(p);
//BufferedReader包装一个流
buff=new BufferedReader(new InputStreamReader(in));
String str=null;
while((str=buff.readLine())!=null){
System.out.println(str);
}
buff.close();
in.close();
}
㈡ 关于用java写程序把本地文件上传到HDFS中的问题
将这FileSystem hdfs = FileSystem.get(config);
改成FileSystem hdfs = FileSystem.get(URI.create("hdfs://master:9000"),config)
上面那句取得的是本地文件系统对象,改成下面这个才是取得hdfs文件系统对象,当你要操作本地文件对象的时候就要用上面那句取得本地文件对象,我在2.7.4刚开始也是跟你一样的错误,改为下面的就可以了
㈢ 使用java api调用HDFS文件系统时,遇到重复的代码,怎么解决
利用符集编码。
因为HDFS支持6种字符集编码,每个本地文件编码方式又是极可能不一样的,我们上传本地文件的时候其实就是把文件编码成字节流上传到文件系统存储。
㈣ 如何使用Java API读写HDFS
packagecom.wyc.hadoop.fs;
importjava.io.BufferedInputStream;
importjava.io.FileInputStream;
importjava.io.FileNotFoundException;
importjava.io.IOException;
importjava.io.InputStream;
importjava.io.OutputStream;
importjava.net.URI;
importjava.util.Date;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.BlockLocation;
importorg.apache.hadoop.fs.FSDataOutputStream;
importorg.apache.hadoop.fs.FileStatus;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.hdfs.DistributedFileSystem;
importorg.apache.hadoop.hdfs.protocol.DatanodeInfo;
importorg.apache.hadoop.io.IOUtils;
importorg.apache.hadoop.util.Progressable;
publicclassFSOptr{
/**
*@paramargs
*/
publicstaticvoidmain(String[]args)throwsException{
//TODOAuto-generatedmethodstub
Configurationconf=newConfiguration();
makeDir(conf);
rename(conf);
delete(conf);
}
//创建文件目录
privatestaticvoidmakeDir(Configurationconf)throwsException{
FileSystemfs=FileSystem.get(conf);
Pathdir=newPath("/user/hadoop/data/20140318");
booleanresult=fs.mkdirs(dir);//创建文件夹
System.out.println("makedir:"+result);
//创建文件,并写入内容
Pathdst=newPath("/user/hadoop/data/20140318/tmp");
byte[]buff="hello,hadoop!".getBytes();
=fs.create(dst);
outputStream.write(buff,0,buff.length);
outputStream.close();
FileStatusfiles[]=fs.listStatus(dst);
for(FileStatusfile:files){
System.out.println(file.getPath());
}
fs.close();
}
//重命名文件
privatestaticvoidrename(Configurationconf)throwsException{
FileSystemfs=FileSystem.get(conf);
PatholdName=newPath("/user/hadoop/data/20140318/1.txt");
PathnewName=newPath("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName,newName);
FileStatusfiles[]=fs.listStatus(newPath(
"/user/hadoop/data/20140318"));
for(FileStatusfile:files){
System.out.println(file.getPath());
}
fs.close();
}
//删除文件
@SuppressWarnings("deprecation")
privatestaticvoiddelete(Configurationconf)throwsException{
FileSystemfs=FileSystem.get(conf);
Pathpath=newPath("/user/hadoop/data/20140318");
if(fs.isDirectory(path)){
FileStatusfiles[]=fs.listStatus(path);
for(FileStatusfile:files){
fs.delete(file.getPath());
}
}else{
fs.delete(path);
}
//或者
fs.delete(path,true);
fs.close();
}
/**
*下载,将hdfs文件下载到本地磁盘
*
*@paramlocalSrc1
*本地的文件地址,即文件的路径
*@paramhdfsSrc1
*存放在hdfs的文件地址
*/
publicbooleansendFromHdfs(StringhdfsSrc1,StringlocalSrc1){
Configurationconf=newConfiguration();
FileSystemfs=null;
try{
fs=FileSystem.get(URI.create(hdfsSrc1),conf);
Pathhdfs_path=newPath(hdfsSrc1);
Pathlocal_path=newPath(localSrc1);
fs.ToLocalFile(hdfs_path,local_path);
returntrue;
}catch(IOExceptione){
e.printStackTrace();
}
returnfalse;
}
/**
*上传,将本地文件到hdfs系统中
*
*@paramlocalSrc
*本地的文件地址,即文件的路径
*@paramhdfsSrc
*存放在hdfs的文件地址
*/
publicbooleansendToHdfs1(StringlocalSrc,StringhdfsSrc){
InputStreamin;
try{
in=newBufferedInputStream(newFileInputStream(localSrc));
Configurationconf=newConfiguration();//得到配置对象
FileSystemfs;//文件系统
try{
fs=FileSystem.get(URI.create(hdfsSrc),conf);
//输出流,创建一个输出流
OutputStreamout=fs.create(newPath(hdfsSrc),
newProgressable(){
//重写progress方法
publicvoidprogress(){
//System.out.println("上传完一个设定缓存区大小容量的文件!");
}
});
//连接两个流,形成通道,使输入流向输出流传输数据,
IOUtils.Bytes(in,out,10240,true);//in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
returntrue;
}catch(IOExceptione){
e.printStackTrace();
}
}catch(FileNotFoundExceptione){
e.printStackTrace();
}
returnfalse;
}
/**
*移动
*
*@paramold_st原来存放的路径
*@paramnew_st移动到的路径
*/
publicbooleanmoveFileName(Stringold_st,Stringnew_st){
try{
//下载到服务器本地
booleandown_flag=sendFromHdfs(old_st,"/home/hadoop/文档/temp");
Configurationconf=newConfiguration();
FileSystemfs=null;
//删除源文件
try{
fs=FileSystem.get(URI.create(old_st),conf);
Pathhdfs_path=newPath(old_st);
fs.delete(hdfs_path);
}catch(IOExceptione){
e.printStackTrace();
}
//从服务器本地传到新路径
new_st=new_st+old_st.substring(old_st.lastIndexOf("/"));
booleanuplod_flag=sendToHdfs1("/home/hadoop/文档/temp",new_st);
if(down_flag&&uplod_flag){
returntrue;
}
}catch(Exceptione){
e.printStackTrace();
}
returnfalse;
}
//本地文件到hdfs
(Configurationconf)throwsException{
FileSystemfs=FileSystem.get(conf);
Pathsrc=newPath("/home/hadoop/word.txt");
Pathdst=newPath("/user/hadoop/data/");
fs.FromLocalFile(src,dst);
fs.close();
}
//获取给定目录下的所有子目录以及子文件
(Configurationconf)throwsException{
FileSystemfs=FileSystem.get(conf);
Pathpath=newPath("/user/hadoop");
getFile(path,fs);
}
privatestaticvoidgetFile(Pathpath,FileSystemfs)throwsException{
FileStatus[]fileStatus=fs.listStatus(path);
for(inti=0;i<fileStatus.length;i++){
if(fileStatus[i].isDir()){
Pathp=newPath(fileStatus[i].getPath().toString());
getFile(p,fs);
}else{
System.out.println(fileStatus[i].getPath().toString());
}
}
}
//判断文件是否存在
privatestaticbooleanisExist(Configurationconf,Stringpath)throwsException{
FileSystemfileSystem=FileSystem.get(conf);
returnfileSystem.exists(newPath(path));
}
//获取hdfs集群所有主机结点数据
(Configurationconf)throwsException{
FileSystemfs=FileSystem.get(conf);
DistributedFileSystemhdfs=(DistributedFileSystem)fs;
DatanodeInfo[]dataNodeStats=hdfs.getDataNodeStats();
String[]names=newString[dataNodeStats.length];
System.out.println(":");//printinfo
for(inti=0;i<dataNodeStats.length;i++){
names[i]=dataNodeStats[i].getHostName();
System.out.println(names[i]);//printinfo
}
}
//getthelocationsofafileinHDFS
(Configurationconf)throwsException{
FileSystemfs=FileSystem.get(conf);
Pathf=newPath("/user/cluster/dfs.txt");
FileStatusfilestatus=fs.getFileStatus(f);
BlockLocation[]blkLocations=fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
intblkCount=blkLocations.length;
for(inti=0;i<blkCount;i++){
String[]hosts=blkLocations[i].getHosts();
//Dosthwiththeblockhosts
System.out.println(hosts);
}
}
//
(Configurationconf)throwsException{
FileSystemfs=FileSystem.get(conf);
Pathf=newPath("/user/cluster/dfs.txt");
FileStatusfilestatus=fs.getFileStatus(f);
longmodificationTime=filestatus.getModificationTime();//
Dated=newDate(modificationTime);
System.out.println(d);
}
}
㈤ java怎么连接hdfs文件系统,需要哪些包
apache的Hadoop项目提供一类api可以通过java工程操作hdfs中的文件,包括:文件打开,读写,删除等、目录的创建,删除,读取目录中所有文件等。
1、到http://hadoop.apache.org/releases.html下载Hadoop,解压后把所有jar加入项目的lib里
2、程序处理步骤: 1)得到Configuration对象,2)得到FileSystem对象,3)进行文件操作,简单示例如下:
/**
*
*/
package org.jrs.wlh;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
* @PutMeger.java
* java操作hdfs 往 hdfs中上传数据
* @version $Revision$</br>
* update: $Date$
*/
public class PutMeger {
public static void main(String[] args) throws IOException {
String[] str = new String[]{"E:\\hadoop\\UploadFileClient.java","hdfs://master:9000/user/hadoop/inccnt.java"};
Configuration conf = new Configuration();
FileSystem fileS= FileSystem.get(conf);
FileSystem localFile = FileSystem.getLocal(conf); //得到一个本地的FileSystem对象
Path input = new Path(str[0]); //设定文件输入保存路径
Path out = new Path(str[1]); //文件到hdfs输出路径
try{
FileStatus[] inputFile = localFile.listStatus(input); //listStatus得到输入文件路径的文件列表
FSDataOutputStream outStream = fileS.create(out); //创建输出流
for (int i = 0; i < inputFile.length; i++) {
System.out.println(inputFile[i].getPath().getName());
FSDataInputStream in = localFile.open(inputFile[i].getPath());
byte buffer[] = new byte[1024];
int bytesRead = 0;
while((bytesRead = in.read(buffer))>0){ //按照字节读取数据
System.out.println(buffer);
outStream.write(buffer,0,bytesRead);
}
in.close();
}
}catch(Exception e){
e.printStackTrace();
}
}
}
㈥ 利用JAVA+API向HDFS文件系统上的文件写入数据一共有哪三种方法,请叙述该三种
摘要 一.构建环境
㈦ 如何使用Java API读写HDFS
Java API读写HDFS
public class FSOptr {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
// 创建文件目录
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path("/user/hadoop/data/20140318");
boolean result = fs.mkdirs(dir);// 创建文件夹
System.out.println("make dir :" + result);
// 创建文件,并写入内容
Path dst = new Path("/user/hadoop/data/20140318/tmp");
byte[] buff = "hello,hadoop!".getBytes();
FSDataOutputStream outputStream = fs.create(dst);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
Path newName = new Path("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName, newName);
FileStatus files[] = fs.listStatus(new Path(
"/user/hadoop/data/20140318"));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 删除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/data/20140318");
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files) {
fs.delete(file.getPath());
}
} else {
fs.delete(path);
}
// 或者
fs.delete(path, true);
fs.close();
}
/**
* 下载,将hdfs文件下载到本地磁盘
*
* @param localSrc1
* 本地的文件地址,即文件的路径
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fs.ToLocalFile(hdfs_path, local_path);
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* 上传,将本地文件到hdfs系统中
*
* @param localSrc
* 本地的文件地址,即文件的路径
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置对象
FileSystem fs; // 文件系统
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 输出流,创建一个输出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重写progress方法
public void progress() {
// System.out.println("上传完一个设定缓存区大小容量的文件!");
}
});
// 连接两个流,形成通道,使输入流向输出流传输数据,
IOUtils.Bytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
return true;
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}
/**
* 移动
*
* @param old_st原来存放的路径
* @param new_st移动到的路径
*/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下载到服务器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");
Configuration conf = new Configuration();
FileSystem fs = null;
// 删除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
} catch (IOException e) {
e.printStackTrace();
}
// 从服务器本地传到新路径
new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);
if (down_flag && uplod_flag) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// 本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path src = new Path("/home/hadoop/word.txt");
Path dst = new Path("/user/hadoop/data/");
fs.FromLocalFile(src, dst);
fs.close();
}
// 获取给定目录下的所有子目录以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop");
getFile(path, fs);
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i < fileStatus.length; i++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
} else {
System.out.println(fileStatus[i].getPath().toString());
}
}
}
//判断文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
}
//获取hdfs集群所有主机结点数据
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println("list of all the nodes in HDFS cluster:"); //print info
for(int i=0; i < dataNodeStats.length; i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i < blkCount; i++){
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts
System.out.println(hosts);
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
System.out.println(d);
}
}
㈧ 用java向hdfs上传文件时,如何实现断点续传
@Component("javaLargeFileUploaderServlet")
@WebServlet(name = "javaLargeFileUploaderServlet", urlPatterns = { "/javaLargeFileUploaderServlet" })
public class UploadServlet extends HttpRequestHandlerServlet
implements HttpRequestHandler {
private static final Logger log = LoggerFactory.getLogger(UploadServlet.class);
@Autowired
UploadProcessor uploadProcessor;
@Autowired
FileUploaderHelper fileUploaderHelper;
@Autowired
ExceptionCodeMappingHelper exceptionCodeMappingHelper;
@Autowired
Authorizer authorizer;
@Autowired
StaticStateIdentifierManager staticStateIdentifierManager;
@Override
public void handleRequest(HttpServletRequest request, HttpServletResponse response)
throws IOException {
log.trace("Handling request");
Serializable jsonObject = null;
try {
// extract the action from the request
UploadServletAction actionByParameterName =
UploadServletAction.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.action));
// check authorization
checkAuthorization(request, actionByParameterName);
// then process the asked action
jsonObject = processAction(actionByParameterName, request);
// if something has to be written to the response
if (jsonObject != null) {
fileUploaderHelper.writeToResponse(jsonObject, response);
}
}
// If exception, write it
catch (Exception e) {
exceptionCodeMappingHelper.processException(e, response);
}
}
private void checkAuthorization(HttpServletRequest request, UploadServletAction actionByParameterName)
throws MissingParameterException, AuthorizationException {
// check authorization
// if its not get progress (because we do not really care about authorization for get
// progress and it uses an array of file ids)
if (!actionByParameterName.equals(UploadServletAction.getProgress)) {
// extract uuid
final String fileIdFieldValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId, false);
// if this is init, the identifier is the one in parameter
UUID clientOrJobId;
String parameter = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
if (actionByParameterName.equals(UploadServletAction.getConfig) && parameter != null) {
clientOrJobId = UUID.fromString(parameter);
}
// if not, get it from manager
else {
clientOrJobId = staticStateIdentifierManager.getIdentifier();
}
// call authorizer
authorizer.getAuthorization(
request,
actionByParameterName,
clientOrJobId,
fileIdFieldValue != null ? getFileIdsFromString(fileIdFieldValue).toArray(new UUID[] {}) : null);
}
}
private Serializable processAction(UploadServletAction actionByParameterName, HttpServletRequest request)
throws Exception {
log.debug("Processing action " + actionByParameterName.name());
Serializable returnObject = null;
switch (actionByParameterName) {
case getConfig:
String parameterValue = fileUploaderHelper.getParameterValue(request, UploadServletParameter.clientId, false);
returnObject =
uploadProcessor.getConfig(
parameterValue != null ? UUID.fromString(parameterValue) : null);
break;
case verifyCrcOfUncheckedPart:
returnObject = verifyCrcOfUncheckedPart(request);
break;
case prepareUpload:
returnObject = prepareUpload(request);
break;
case clearFile:
uploadProcessor.clearFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case clearAll:
uploadProcessor.clearAll();
break;
case pauseFile:
List<UUID> uuids = getFileIdsFromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
uploadProcessor.pauseFile(uuids);
break;
case resumeFile:
returnObject =
uploadProcessor.resumeFile(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)));
break;
case setRate:
uploadProcessor.setUploadRate(UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId)),
Long.valueOf(fileUploaderHelper.getParameterValue(request, UploadServletParameter.rate)));
break;
case getProgress:
returnObject = getProgress(request);
break;
}
return returnObject;
}
List<UUID> getFileIdsFromString(String fileIds) {
String[] splittedFileIds = fileIds.split(",");
List<UUID> uuids = Lists.newArrayList();
for (int i = 0; i < splittedFileIds.length; i++) {
uuids.add(UUID.fromString(splittedFileIds[i]));
}
return uuids;
}
private Serializable getProgress(HttpServletRequest request)
throws MissingParameterException {
Serializable returnObject;
String[] ids =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId), String[].class);
Collection<UUID> uuids = Collections2.transform(Arrays.asList(ids), new Function<String, UUID>() {
@Override
public UUID apply(String input) {
return UUID.fromString(input);
}
});
returnObject = Maps.newHashMap();
for (UUID fileId : uuids) {
try {
ProgressJson progress = uploadProcessor.getProgress(fileId);
((HashMap<String, ProgressJson>) returnObject).put(fileId.toString(), progress);
}
catch (FileNotFoundException e) {
log.debug("No progress will be retrieved for " + fileId + " because " + e.getMessage());
}
}
return returnObject;
}
private Serializable prepareUpload(HttpServletRequest request)
throws MissingParameterException, IOException {
// extract file information
PrepareUploadJson[] fromJson =
new Gson()
.fromJson(fileUploaderHelper.getParameterValue(request, UploadServletParameter.newFiles), PrepareUploadJson[].class);
// prepare them
final HashMap<String, UUID> prepareUpload = uploadProcessor.prepareUpload(fromJson);
// return them
return Maps.newHashMap(Maps.transformValues(prepareUpload, new Function<UUID, String>() {
public String apply(UUID input) {
return input.toString();
};
}));
}
private Boolean verifyCrcOfUncheckedPart(HttpServletRequest request)
throws IOException, MissingParameterException, FileCorruptedException, FileStillProcessingException {
UUID fileId = UUID.fromString(fileUploaderHelper.getParameterValue(request, UploadServletParameter.fileId));
try {
uploadProcessor.verifyCrcOfUncheckedPart(fileId,
fileUploaderHelper.getParameterValue(request, UploadServletParameter.crc));
}
catch (InvalidCrcException e) {
// no need to log this exception, a fallback behaviour is defined in the
// throwing method.
// but we need to return something!
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}
㈨ 编写一个JAVA类方法,通过该方法可以获取出存储在HDFS集群中根目录的所有文件
public void listMyFile() throws Exception {
//获取FileSystem
//"hdfs"为伪造用户,使用hdfs用户进行访问
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.83.141:8020"), new Configuration(), "hdfs");
//获取指定目标目录下的所有文件信息
RemoteIterator<LocatedFileStatus> iterator =
fileSystem.listFiles(new Path("/"), true);
//遍历迭代器
while (iterator.hasNext()) {
//获取每个文件详细信息
LocatedFileStatus fileStatus = iterator.next();
//获取每个文件的存储路径
System.out.println("路径:" + fileStatus.getPath() +
"---" + fileStatus.getPath().getName());
//获取文件的block存储信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
//打印每个文件的block数
System.out.println("block数量:" + blockLocations.length);
//打印每一个block副本的存储位置
for (BlockLocation blockLocation : blockLocations) {
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.println("主机:" + host);
}
}
}
}