1. 如何检测hadoop中gz压缩文件是否损坏
执行hive任务的时候,进入到8088的map详细进度列表,即是RUNNING MAP attempts in job_1456816082333_1354,查看最后出错的map是哪个节点或者在页面直接点击logs进入详细log日志查看,或者进入到节点的Hadoop的logs/userlogs目录
根据jobid找到对应的目录: application_1456816082333_1354,里面有错误的文件id,然后删除掉hdfs的对应的损坏文件。
2. Hadoop 压缩从理论到实战
在大数据领域,无论上层计算引擎采用的是什么,在存储过程中,压缩都是一个避不开的问题。合适的压缩选择可以降低存储成本、减少网络传输I/O。而错误的压缩选择则可能让 cpu 负荷达到瓶颈、降低并发度等等,所以是否选择压缩、选择什么压缩格式在大数据存储环节中都是一个至关重要的问题。
点评:压缩时间和压缩率之间的取舍本质上是 cpu 资源和存储资源的取舍。是否需册猛要支持分片也不是绝对的,如果单个文件大小均小于 splitSize,则没必要支持分片。
点评:一阶段考虑尽可能支持分片(单个文件大于 splitSize 时)。二阶段考虑尽可能快的压悄姿基缩速度。三阶段根据是作为长期归档(几乎不用)or 作为下一作业输入,考虑尽可能高的压缩能力 or 支持分片。
点评:有两点需要注意,第一点:这里的速度和压缩率没有具体测试数据,而是给出了一个模糊的表达。因为即使具体测试了速度和压缩率,也会因数据不同而结果有很大的差异。后面会给出测试的脚本,大家可以结合自己的表数据自行测试。第二点:有些压缩格式性能参数很相似,为什么 Hadoop 中要搞这么多种?较为直观的一个原因是:不同存储格式支持的压缩是不一样的,比如 orc 存储格式只支持 zlib 和 snappy 两种压缩 [8] ,parquet 虽然支持很多压缩格式,但是不支持 bzip2 [7]
以下摘自《Hadoop The Definitive Guide》
重点阅读文中加粗片段。大致意思是:因为 gzip 压缩格式使用的 DEFLATE 压缩算法没办法做到随机任意读取,必须同步顺序读取。也就意味着没办法为每一个 block 创建一个分片(split),然后为该分片启一个 mapper 去读取数据。所以即使 gzip 文件有很多 block,MR 程序也只会启动一个 Mapper 去读取所有的 block。也即 gzip 这种压缩格式不支持分片。相反的,如果压缩格式使用的算法支持随机任意读取,那么就可以为每一个 block 创建一个分片,同时启动一个 mapper 去读取数据,这样有多少个 block 就有多少个分片,就有多少个 mapper ,这些 mapper 并行读取数据,效率大大提升。上述涉及到几个小概念,接下来分别进行详述。
一句话总结: zlib、gzip 在大数据语境中都是一种 压缩格式 ,他们使用相同的 压缩算法: DEFLATE,DefaultCodec 是 zlib 使用的 编解码器 ,Gzip 使用的编解码器是 GzipCodec
我们知道,Hadoop 在任务切分时,是按照文件的粒度进行的。即一个文件一个文件启谨进行切分。而每一个文件切分成几块,取决于 splitSize 的大小。比如两个文件,第一个文件 300M,第二个文件150M。分片大小是128M,那么对于第一个文件将会切分成3片(128M,128M,44M),第二个文件会切分成2片(128M,22M)。共计5片。所以分片数量除了由文件数决定,另一个决定因素就是 splitSize 即分片大小。
splitSize 如何计算?
几个前提:
影响参数:
接下来进行实际验证:
经过了 2.4.2 中的一系列实验,验证了一个结论:当一个输入格式支持分片时,mapper 数量是无限制的,反之 mapper 数量小于等于文件的数量。所以我们可以通过设置参数来试图调小分片大小来增加 mapper 数量看其上限是否等于文件数量即可。假如输入的文件个数只有一个,那么当 mapper 数量大于1的时候,说明该输入格式是支持分片的。
大家可以根据自己数据集和想测试的压缩和存储格式自行修改脚本。通过以上脚本跑出来的结果如下:
由 2.1 中评价压缩的三项指标可知,压缩率、压缩/解压速度、是否支持分片是衡量压缩最重要的三项指标。3.1.1小节中只对压缩率进行了测试。压缩/解压速度可以通过跑一些查询语句进一步测试。这里就不展开测试了。业界中常用的存储格式一般是 parquet, orc,所以上面测试除了纯文本只测试了这两种存储格式。
我们可以通过 hive> set io.compression.codecs; 来查看当前Hadoop集群支持的压缩,在公司的集群中查询得到的结果是:
可以看到 lzo 有两种编解码器: LzoCodec 和 LzopCodec。他们之间有什么区别呢?
如果你阅读过关于 Hadoop 压缩的文章,应该可以看到,绝大多数文章中对于 snappy 是否支持分片都是直接给出的否定的答案。 CDH 的文档中也指出来 snappy 是不支持分片的。
看文中加粗片段,虽然 snappy 本身是不支持分片的,但是如果 snappy 存储在一些特定的存储格式比如 SequenceFile 或者 Avro 中,那么是可以支持分片的。也就是说 snappy 是否支持分片是分情况讨论的。不能说使用了 snappy 压缩就一定不支持分片。前面提到了,业界中常用的存储格式一般是 parquet 或者 orc,而上面 CDH 的文章中恰恰没有提到 parquet 和 orc 是否支持,接下来以 parquet 为例,进行测试。测试内容即为 parquet + snappy 组合,是否支持分片。
首先准备数据,因为之前做压缩率测试,已经有了 parquet + snappy 文件了,这里直接拿来用。
一共3个输入文件,启了6个mapper,说明输入文件是可以分片的。即 parquet + snappy 的组合是支持分片的。在《Hadoop The Definitive Guide》中也对 parquet 是否支持分片有说明:
以 maprece.output.fileoutputformat.compress.codec 为例,这个参数可以在三个地方配置:
那么当三者都设置时,以哪个为准呢?按照经验来看,一定是粒度小的优先级大于粒度大的优先级。经过测试也验证了这种猜测。即:表级别 > hive > hadoop
初学者往往容易混淆存储格式和压缩格式之间的关系,其实二者是完全独立的。如果完整的阅读了该篇文章,应该已经消除了这一块理解对误区。这里总结一下:比如 parquet, orc,他们都是常见的 存储格式 。是否使用压缩,使用何种压缩都是可以设置的。而 zlib、gzip、lzo、lz4、snappy 等等这些都是常见的 压缩格式 ,他们既可以依附于某些 存储格式 ,比如之前提到的 parquet + snappy,orc + zlib 等等。也可以脱离特定的 存储格式 ,比如纯文本文件进行压缩,text + parquet, text + bzip2 等等。
3. 如何在Scala中读取Hadoop集群上的gz压缩文件
(1)一个从文件创建的Scala对象,或(2)一个并行切片(分布在各个节点之间),或(3)从其他RDD转换得来,或(4)改变已有RDD的持久性,如请求将已有RDD缓存在内存中。Spark应用称为driver,实现单个节点或一组节点上的操作。
4. 如何实现让用户在网页中上传下载文件到HDFS中
hadoop计算需要在hdfs文件系统上进行,文件上传到hdfs上通常有三种方法:a hadoop自带的dfs服务,put;b hadoop的API,Writer对象可以实现这一功能;c 调用OTL可执行程序,数据从数据库直接进入hadoop
hadoop计算需要在hdfs文件系统上进行,因此每次计算之前必须把需要用到的文件(我们称为原始文件)都上传到hdfs上。文件上传到hdfs上通常有三种方法:
a hadoop自带的dfs服务,put;
b hadoop的API,Writer对象可以实现这一功能;
c 调用OTL可执行程序,数据从数据库直接进入hadoop
由于存在ETL层,因此第三种方案不予考虑
将a、b方案进行对比,如下:
1 空间:方案a在hdfs上占用空间同本地,因此假设只上传日志文件,则保存一个月日志文件将消耗掉约10T空间,如果加上这期间的各种维表、事实表,将占用大约25T空间
方案b经测试,压缩比大约为3~4:1,因此假设hdfs空间为100T,原来只能保存约4个月的数据,现在可以保存约1年
2 上传时间:方案a的上传时间经测试,200G数据上传约1小时
方案b的上传时间,程序不做任何优化,大约是以上的4~6倍,但存在一定程度提升速度的余地
3 运算时间:经过对200G数据,大约4亿条记录的测试,如果程序以IO操作为主,则压缩数据的计算可以提高大约50%的速度,但如果程序以内存操作为主,则只能提高5%~10%的速度
4 其它:未压缩的数据还有一个好处是可以直接在hdfs上查看原始数据。压缩数据想看原始数据只能用程序把它导到本地,或者利用本地备份数据
压缩格式:按照hadoop api的介绍,压缩格式分两种:BLOCK和RECORD,其中RECORD是只对value进行压缩,一般采用BLOCK进行压缩。
对压缩文件进行计算,需要用SequenceFileInputFormat类来读入压缩文件,以下是计算程序的典型配置代码:
JobConf conf = new JobConf(getConf(), log.class);
conf.setJobName(”log”);
conf.setOutputKeyClass(Text.class);//set the map output key type
conf.setOutputValueClass(Text.class);//set the map output value type
conf.setMapperClass(MapClass.class);
//conf.setCombinerClass(Rece.class);//set the combiner class ,if havenot, use Recuce class for default
conf.setRecerClass(Rece.class);
conf.setInputFormat(SequenceFileInputFormat.class);//necessary if use compress
接下来的处理与非压缩格式的处理一样