设为首页 收藏本站
查看: 676|回复: 0

[经验分享] 4.Hadoop I/O

[复制链接]

尚未签到

发表于 2016-12-7 10:21:42 | 显示全部楼层 |阅读模式
4.Hadoop I/O
4.1.Data Integrity 数据完整性
一般使用checksum检查数据的完整性,但是他仅能检查完整性,而不提供任何的修复办法,checksum的值也有出错的可能。
Hadoop采取了不同策略的checksum来克服以上的不足
4.1.1.Data Integrity In HDFS HDFS中的数据完整性
1)HDFS透明的计算其内部数据的checksum,并在读取数据的时候验证checksum。
2)HDFS为每io.bytes.per.checksum个字节创建checksum,默认是每512bytes创建一次
3)Datanode负责在存储数据和数据的checksum之前验证数据。在管道中的datanodes,最后一个负责验证数据的checksum,若错误则抛出ChecksumException。
4)当client读取的数据的时候也验证checksum,它将获取的数据的checksum与datanode的checksum进行比较。(每个datanode都保存checksum的验证记录,他客户端成功验证一个block之后,它将告诉给datanode,datanode更新验证记录。保存这样的记录对检测硬盘损坏非常有价值)
5)除了在client验证block之外,每个datanode都在运行了一个后台线程——DataBlockScanner——来定期对数据进行校验
6)因为HDFS存储了block的复制,所以HDFS可以通过拷贝没有被损坏的datanode上面的block来产生新的block复制。
7)可以在通过Open()方法读取文件之前,通过给FIleSystem的setVerifyChecksum()方法传递false来禁止数据校验。也可以在shell中使用-ignoreCrc选项配合-get或者-copyToLocal命令来禁止数据校验
4.1.2.LocalFileSystem
LocalFileSystem进行client断的checksum计算。当你写filename文件的时候,文件系统会透明的创建一个隐藏的filename.crc文件。在与filename.crc文件的同一个目录中包含了filename的每个chunk的checksum。Chunk的大小由io.bytes.per.checksum控制(默认512),并作为metadata存储在.crc文件中。正因为这样的设计,所以在以后改变了chunk size的值以后也可以准确的读取出来数据。
Checksum的开销比较小,但也可以禁用。禁用办法:
全局:设置fs.file.impl为org.apache.hadoop.fs.RawLocalFileSystem.
也可以创建一个RawLocalFileSystem的实例,这样当你需要为某些读操作禁止checksum的时候比较有用:
Configuration conf=……
FIleSystem fs=new RawLocalFileSystem();
fs.initalize(null,conf);
4.1.3.ChecksumFileSystem
LocalFileSystem使用ChecksumFileSystem来完成它的工作。ChecksumFileSystem这个类使得给没有Checksum功能的FileSystem添加Checksum变得简单。一个典型的用法如下:
FileSystem rawFs=……
FileSystem ChecksumFs=new ChecksumFIleSystem(rawFs);
底层的文件系统成为raw filesystem,可以使用ChecksumFileSystem的getFileSystem方法获得。ChecksumFileSystem还有几个好用的方法,例如getChecksumFile()可以获得指定文件的checksum文件路径。
4.2.Compression 压缩
文件压缩的好处:节省空间、加快在网络上的传输速度
下面是几种压缩格式:
DSC0000.bmp
其中要注意的问题是压缩比和压缩速度的权衡。压缩速度快相应的压缩比就小,而压缩比大压缩的速度就会慢下来。
可分割的(splitable)压缩非常适合MapReduce程序(因为可以seek to any point in the stream)。
4.2.1.Codecs
Codecs为压缩,解压缩的算法实现。
在Hadoop中,codecs由CompressionCode的实现来表示。下面是一些实现:
DSC0001.bmp

4.2.1.1.使用CompressionCodec压缩、解压缩流
使用CompressionCodec的CreateOutputStream(OutputStream out)方法创建CompressionOutputSream。可以把你的未压缩的数据使用压缩格式写入流
使用CompressionCodec的CreateInputStream(InputStream in)获得CompressionInputStream。可以把从流中解压缩之后的数据。
CompressionOutputSream和CompressionInputStream提供重设compressor和decompressor的能力。
下面的例子演示了从标准输入读取并压缩数据,然后写入到标准输出流中:
public class StreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec) ReflectionUtils
.newInstance(codecClass, conf);
CompressionOutputStream out = codec.createOutputStream(System.out);
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
}
}
使用ReflectionUtils创建一个codecs的实例。给createOutputStream传递一个标准输入进而获得一个封装了system.out的CompressionOutputStream,通过IOUtils的copyBytes方法把标准输入的数据拷贝到CompressionOutputStream中。
下面是运行的方式:
%echo “Text” | hadoop StreamCompressor org.apache.hadoop.io.compress.Gzipcodec | gunzip –
Text
4.2.1.2.使用compressionCodecFactory推断CompressionCodecs
CompressionCodecFactory提供从扩展名到CompressionCodec的映射。下面的例子是一个演示:
public class FileDecompressor {
public static void main(String[] args) throws Exception {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
}
String outputUri = CompressionCodecFactory.removeSuffix(uri, codec
.getDefaultExtension());
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
}
下面是运行方式:
%hadoop FileDecompressor file.gz
4.2.1.3.native libraries
使用本地库可以提高效率,但是请注意并不是所有的实现都提供了本地库,但是有的实现却只提供了本地库(native实现,native implementation)。下面是几种实现以及本地库情况:
DSC0002.bmp

使用本地库默认不需要修改任何设置。可以通过设置hadoop.native.lib为false禁用本地库。
4.2.1.4.CodecPool codec池
顾名思义,使用CodecPool可以降低反复创建Codec实例的开销。下面的例子演示了如何使用:
public class PooledStreamCompressor {
public static void main(String[] args) throws Exception {
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration conf = new Configuration();
CompressionCodec codec = (CompressionCodec) ReflectionUtils
.newInstance(codecClass, conf);
Compressor compressor = null;
try {
compressor = CodecPool.getCompressor(codec);
CompressionOutputStream out = codec.createOutputStream(
System.out, compressor);
IOUtils.copyBytes(System.in, out, 4096, false);
out.finish();
} finally {
CodecPool.returnCompressor(compressor);
}
}
}
注意,使用完compressor之后要归还给pool。
4.3.Compression and InputSplit 压缩与输入分块
当考虑将怎样压缩将被MapReduce处理的数据的时候,考虑一下压缩格式是否支持split非常重要。对于gzip格式来说,因为gzip格式的stream不支持从任意点读取,因此不能用于MapReduce。这种情况下,MapReduce将不对文件进行切分(split),因为通过文件扩展名可以知道文件是gzip格式的,从而不切分.但是因为不能保证文件的每一个block都在同一个datanode上,所以,这样做效率不高,因为有可能需要通过网络从另外的datanode上面读取数据。
Bzip2支持作为split输入。
Zip死一个归档文件,里面存放多个文件。文件的位置保存在zip文件尾的中央目录中,所以理论上它支持,但事实上Hadoop现在还没有能支持zip格式的split输入。

怎么挑选压缩格式:
对于大的,没有边界的文件,像log文件,下面的策略可供参考:

1)存储成非压缩的
2)使用一个支持spliting的,像bzip2
3)在程序中把文件分成chunks,对chunks进行单独压缩(使用任何压缩格式都行)。这种情况下,你需要选择chunk的大小,以期能与HDFS的block大小接近。
4)使用SequenceFile,它支持压缩和切分(split)
4.4.在MapReduce中使用压缩
在4.2.1.2使用CompressionCodecFactory推断CompressionCodec一节中提到了如果输入文件是压缩的,他们将会在被MapReduce读取的时候自动被解压(使用文件扩展名来确定要使用的codec)。
压缩MapReduce job输出结果的方法:
1)在job configuration中设置mapred.output.compress属性为true
2)设置mapred.output.compression.codec属性为想要使用的codec对应的类名
下面的例子是一个演示:
public class MaxTemperatureWithCompression {
public static void main(String[] args) throws IOException {
if (args.length != 2) {
System.err
.println("Usage: MaxTemperatureWithCompression <input path> "
+ "<output path>");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperatureWithCompression.class);
conf.setJobName("Max temperature with output compression");
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setBoolean("mapred.output.compress", true);
conf.setClass("mapred.output.compression.codec", GzipCodec.class,
CompressionCodec.class);
conf.setMapperClass(MaxTemperatureMapper.class);
conf.setCombinerClass(MaxTemperatureReducer.class);
conf.setReducerClass(MaxTemperatureReducer.class);
JobClient.runJob(conf);
}
}
运行方法:
%hadoop MaxTemperatureWithCompression input/ncdc/sample.txt.gz output
最终的输出结果是压缩的,这个例子的结果如下:
%gunzip –c output/part-00000.gz
1949 111
1950 22
如果输入是sequencefile,那么你可以设置mapred.output.compression.type的属性来控制压缩的类型。默认是RECORD(压缩单个的records),若换成BLOCK他将压缩成组的records,这是推荐使用的。
Compressing map output
可以为map出来的中间文件进行压缩,因为中间文件是要在网络上进行传输的,所以压缩可以节省开销。
下面是涉及的两个属性值及例子
DSC0003.bmp
下面两行添加到程序中
conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.class);
4.5.Serialization
4.6.File-Based Data Structures

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-310886-1-1.html 上篇帖子: 关于hadoop的入门和进阶文章收录 下篇帖子: Hadoop系统通信协议介绍
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表