<!--[if !supportLists]-->1. <!--[endif]-->MapReduce和分布式缓存
4.1. 创建 Hadoop作业
Hadoop配置完成之后需要提交一些作业。 SHDP让这个过程变得很简单,不管是 map-reduce类型的还是 streaming。下面看一个例子:
<hdp:job id= ”mr-job ”
input-path=”/input/ ” output-path= ”/output/ ”
mapper= ”org.apache.hadoop.examples.WordCount.TokenizerMapper ”
reducer= ”org.apache.hadoop.examples.WordCount.IntSumReducer ”/>
上面的声明创建一个典型的Hadoop Job:指定它的输入 /输出目录,以及 mapper和 reducer类。注意声明中并没有指向 Hadoop的配置 bean —因为如果没有指定,默认的约定命名(hadoopConfiguration)会被使用。声明中也没有说明 mapper和 reducer输入输出的 key或 value的类型 —这两个值的类型会通过解析mapper和 reducer类信息经最优尝试做出设定。当然,这些设定可以被覆盖:前者通过 configuration-ref,后者通过 key和 value的属性。还有大量可用的选项没有在例子中展示,例如 jar包(直接通过类指定了),排序分组比较器, combiner, partitioner,使用的解码器或是输入 /输出仅仅做了命名。只要看一看 SHDP的 schema或者在 ide中使用自动补全功能,如果它支持 XML的命名空间并被正确配置就能显示出可用的元素。另外,还可以扩展默认的 Hadoop配置对象添加任何在命名空间中不可用的特殊属性或者他的支持 bean( backing bean )(JobFactoryBean)。
值得指出的是每个job的特定的配置都可以被属性直接设定或者引用这些配置。
<hdp:job id= ”mr-job ”
input-path=”/input /” output-path= ”/output/ ”
mapper= ”mapper class ” reducer= “reducer class ”
jar-by-class= ”class used for jar detection,检测 jar包的类 ”
properties-location= ”classpath:special-job.properties>
electric=sea
</hdp:job>
<hdp:job>提供了一些额外的属性,例如 #hadoop:generic-options[通用属性 ],值得特别说明的是 jar属性可以将 job(以及其依赖的包)通过一个指定的 jar包完全加载。这在独立运行的 job中可以避免 classpath和版本冲突。注意:将 jar包提交至集群仍然依赖于目标环境 —察看前面提及的#hadoop:generic-options部分获取更多信息 (比如 libs属性 )。
4.1.1.创建 Hadoop Streaming作业
Hadoop Streaming是一个常用的属性,它允许使用任何可执行的脚本创建 Map/Reduce作业。从命令行启动 streaming非常简单,类似 java环境中编码方式是实现,但是由于各种类型的参数(以及它们的顺序)需要被解析会比较麻烦。 SHDP简化了这个过程 —可以像前面的例子一样非常简单直接地声明一个作业,事实上多数属性都是一样的。
<hdp:streaming id= ”streaming ”
Input-path= ”/input ” output-path= ”/output/ ”
Mapper= ”${path.cat} ” reducer= ”${path.wc} ”/>
终端用户可能比较想了解如何给命令行传递参数,例如-D或者 -cmdenv。前面已经定制了 Hadoop的配置,接下来看看如何使用 cmd-env元素。
<hdp:streaming id= ”streaming ”
Input-path= ”/input/ ” output-path= ”/output/ ”
Mapper= ”${path.cat} ” reducer= ”${path.wc} ”
<hdp:cmd-env>
EXAMPLE_DIR=/home/example/dictionaries/
......
</hdp:cmd-env>
</hdp:streaming>
像Job一样 streaming支持 #hadoop: generic-option[generic options]。
4.2. 运行 Hadoop 作业
作业被创建和配置之后,接下来的任务就是提交至集群并运行。对于比较重大的需要多个步骤连接运行的作业,推荐使用工作流解决方案,例如 Spring Batch。但是对于基础性的作业 SHDP提供了 job-runner(使用 JobRunner类进行支持)元素进行提交,它可以顺序提交多个 job(默认情况下一个运行结束再提交下一个)。
<hdp:job-runner id= ”myjob-runner ” pre-action= ”cleanup-script ” post-action= ”export-results ” job-ref= ”myjob ” run-at-startup= ”true ” />
<hdp:job id= ”myjob ” input-path= ”/input/ ” output-path= ”/output/ ”
Mapper= ”org.apache.hadoop.example.WordCount.TokenizerMapper ”
Reducer= ”org.apache.hadoop.example.WordCount.IntSumReducer ”
如果作业在runner之外没有被使用过,多个 job可以被同时提交。
<hdp:job-runner id= ”myjobs-runner ” pre-action= ”cleanup-script ” post-action= ”export-results ” job-ref= ”myjob,myjob2 ” run-at-startup= ”true ”/>
<hdp:job id= ”myjob ” ... />
<hdp:job id= ”myjob2 ” .. ./>
一个或多个Map-Reduce作业可以通过 job属性被同时提交,按照定义的顺序进行执行。 Runner在应用启动之后被触发(注意 run-at-startup的标志位默认是 false)。注意除非手工触发或者 run-at-startup设为 true,否则 runner不会被触发。另外 runner允许一个或者多个 pre和 post处理被指定,它们会在作业每次运行之前或之后运行。更过 runner的信息参见专门的章节。
注意:当Hadoop作业的提交和运行被阻塞( wait-for-completion为 true)时, JobRunner使用一个 JDK 执行器启动(停止)一个作业。默认的实现是 SyncTaskExecutor使用调用线程执行作业,模仿 hadoop 命令行的行为。但是 hadoop作业都是时间消型的,某些情况下会导致应用被冻结,影响应用的正常操作甚至意外退出。生产应用之前,要检查一下 job策略是否合适,控制数据流量或者数据池的实现方式是否更好。可以通过 executor-ref参数改变 job-runner的行为。
作业runner同时也支持应用关闭时取消某个作业(或者 kill掉)。这种方式只应用于 runner的等待完成模式( wait-for-completion为 true)使用一个不同的执行器,使用一个不同的线程调用(也就是说调用线程在执行下一个任务之前必须等待当前作业完成。)
4.2.1.使用 Hadoop作业的任务调度器
在 Spring Batch环境下, SHDP提供了一个专用任务调度器把 Hadoop的作业作为 Spring Batch工作流中的一个步骤执行。例子如下:
<hdp:job-tasklet id= ”hadoop-tasklet ” job-ref= ”mr-job ” wait-for-completion= ”true ”/>
上面的调度器指向了一个名为 ”mr-job ”的作业。默认情况下,wait-for-completion为真,因此当它执行时调度器会等待作业直至完成。把 wait-for-completion设置为 false可以把 job提交至 Hadoop,但是不会等待其结束。
4.3. 运行 Hadoop的工具
通过命令行启动 Hadoop的工具类和相关库司空见惯。 SHDP对这种情况的通用支持提供了建立在 Hadoop标准基础之上的的包,即 Tool类和 ToolRunner类。与命令行用法不同的是, Tool类的实例可以继承 Spring IOC的特性,他们可以按照需要被参数化,被创建被销毁并可拥有他们自己的属性(例如 Hadoop的 configuration)。
运行hadoop jor包典型的例子如下:调用一个含有两个参数的类,(注意同时也传递了 hadoop configuration的属性)
bin/hadoop jar -conf hadoop-site.xml -jt darwin:50020 -Dproperty=value someJar.jar
既然SHDP用第一个类支持了 Hadoop的 #hadoop: config[configuring]属性,那么被称为的“ generic options”就不再需要了,典型情况下甚至每个应用可能只有一个 Hadoop配置对象。通过 tool-runner元素(和它的支撑类 ToolRunner)通常只需指定 Tool实现类和它的参数。
<hdp:tool-runner id= ”someTool ” tool-class= ”org.foo.SomeTool ” run-at-startup= ”true ”>
<hdp:arg value= ”data/int.xt ”/>
<hdp:arg value= ”data/out.txt ”/>
Property=value
</hdp:tool-runner>
另外,tool-runner也允许在 tool-class运行之前或之后指定一个或多个 pre或者 post处理。通常指定的作业或者脚本不能是基于 JDK运行的。注意除非手动触发或者将 run-at-startup设置为 true,否则 tool-runner不能被运行。
前面的例子中假设了Tool依赖的类在 classpath中存在。如果不是这种情况, tool-runner允许指定 jar包:
<hdp:tool-runner ... Jar= ”myTool.jar ”>
...
</hdp:tool-runner>
这个jar被用来实例化并启动这个 tool,事实上所有依赖的都从这个 jar加载,它们可以不必是 classpath的一部分。这个运行机制为 tools以及它们依赖的特定包不同的版本之间建立隔离,而不用把它们加入同一个 app(消除版本冲突的影响),这些 tools可以被简单地指向不同的 jar包,并以其自有的方式运行。需要指出的是如果使用的 jar包的 main类就是目标 Tool,就不用指定这个 tool的类了。
像其他的SHDP元素一样, tool-runner允许传递相应的 Hadoop的配置 #hadoop:config:properties[customized]。通常 Tool的实现类有一个默认的参数 tool-class属性,不过指向其他的 Tool的实例或者声明一个嵌套的 Tool实现类也是可以的。
<hdp:tool-runner id = ”someTool ” run-at-startup= ”true ”>
<hdp:tool>
<bean class= ”org.foo.AnotherTool ” p:input= ”data/in.txt ” p:output= ”data/out.txt ”/>
</hdp:tool>
</hdp:tool-runner>
如果Tool类提供了 setters或者带参构造器这样做非常方便。 Tool-runner不会执行 Tool工具类除非工具类被真正的调用了,这个行为可以被上述的 run-at-startup属性改变。
4.3.1. 使用 tool-runner 替换Hadoop的命令行调用
Tool-runner是一个迁移通过 shell调用或者通过脚本连接的一系列 Java对象的好方法。比如下面的 shell脚本:
hadoop jar job1.jar -files fullpath:properties -Dconfig=config.properties ...
hadoop jar job2.jar arg1 arg2 ...
...
Hadoop jar job10.jar...
每个作业都被包含在特定的jar包中,包括其所有的依赖包。另外每个调用都可能要提供一些通用选项或者参数,但是多数情况下都共享相同的设置(就像他们都运行在同一个集群上)。
上面的脚本使用tool-runner元素移植到 SHDP会是如下的样子:
<hdp:tool-runner id= ”job1 ” tool-class= ”job1.Tool ” jar= ”job1.jar ” file= ”fullpath:props.properties ” properties-location= ”config.properties ”/>
<hdp:tool-runner id= ”job2 ” jar= ”job2.jar ”
<hdp:arg value= ”arg1 ”/>
<hdp:arg value= ”arg2 ”/>
</hdp:tool-runner>
<hdp:tool-runner id= ”job3 ” jar= ”job3.jar ”/>
...
所有的特性在前面的章节已经解释过了,让我们重新审视一下看看到底发生了什么。像前面所说的一样,所有的tool都读取 hadoopConfiguration的配置;但是 job1使用其自己的属性替代了统一的配置。对于 Tool类指定的第一个 jar,假定 jar_Main-Class_es实现了 Tool的接口,命名空间会自动发现并使用他们。当需要的时候额外的文件或者类库也会被提交至集群。这种方式同样适用与作业的参数。
很多事情可以通过使用这个配置避免使用脚本,每个作业不但可以不通过本地文件系统拥有多个属性加载或者内联声明,而且可以从classpath或者其他 url获取这些属性。事实上,整个配置可以通过 Spring的 property placeholder或者 Environment abstraction被外部化和参数化。此外,每个作业都可以通过 Spring的 depends-on或功能更加强大的 Spring Batch和 tool-tasklet独自运行或者作为工作流的一部分被运行
4.3.2. 使用 Hadoop的工具任务调度器
在 Hadoop Batch环境中, SHDP提供了一个专门的任务调度器把 Hadoop的任务当作 Spring Batch工作流中的一个环节运行。 Tasklet元素支持除了 run-at-startup之外的和 #hadoop:tool-runner[tool-runner](不支持在工作流中使用)一样的配置选项:
<hdp:tool-tasklet id= ”tool-tasket ” tool-ref= ”some-tool ” />
4.4. 运行 Hadoop的 Jar
SHDP还提供了对 Hadoop jar运行的支持。例如最著名的 WordCount例子:
bin/hadoop jar hadoop-example.jar wordcount /wrodcount/input /wordcount/output
变成了:
<hdp:jar-runner id= ”wordcount ” jar= ”hadoop-example.jar ” run-at-startup= ”true ”>
<hdp:arg value= ”wordcount ”/>
<hdp:arg value= ”/wordcount/input ”/>
<hdp:arg value= ”/wordcount/output ”/>
</hdp:jar-runner>
注意:和hadoop jar命令一样如果没有指明 jar的 Main-Class会自动读取。也可以通过 main-class属性进行设置。
另外,jar-runner和 job-runner一样允许指定一个或多个预处理或者后处理。通常情况下都是非 JDK调用的 runner。除非手动触发或者 run-at-startup设置为 true否则 jar-runner不会运行。
Jar support提供了一个代替命令行进行 jar调用的不错的方式。特别指出的是由于 jar被执行的时候 Hadoop Configuration对象的启用可以自动继承 Hadoop配置的上下文信息。实际上,像其他的 SHDP元素一样, jar元素允许为 jar的运行本地声明 #hadoop: config: properties[configuration properties]。例如,使用如下的声明:
<hdp:jar-runner id= ”wordcount ” jar= ”hadoop-example.jar ” run-at-startup= ”true ”>
<hdp:arg value= ”wordcount ”/>
...
speed=fast
</hdp:jar-runner>
在jar包的编码中可以这样使用添加的属性
Assert “fast ”.equals(new Configruation().get( “speed ”);
这种方式可以让Hadoop基础 jar包在不进行修改的情况下使用封闭的应用范围的 Hadoop配置。
我们认为这是一个不错的特性,强烈建议使用这个支持工具替换目前的方式或者迁移过来,主要有几个原因无约束和低耦合:
没有标准配置注入
SHDP为向 jar传递 Hadoop配置做了很好多的努力但是也不能保证 jar本身没有使用特定的初始化机制导致忽略这些传入的属性。一个 vanilla配置不是特别有用因此应用倾向于提供定制的编码来解决。
System.exit()调用
多数运行jar的例子假设它们在命令行被启动,不代码成功与否都通过调用 System.exit来关闭 JVM。 SHDP想要阻止这种情况的发生(否则整个应用程序会突然关闭),但这明显是一个糟糕的代码协作标志。
SHDP尽可能地用智能的默认方式提供最好的集成体验,但是直到现在还没有任何有保证的成型的使用模式。目前来看使用 Tool接口是个不错的选择。
4.4.1 使用 Hadoop Jar 包 任务调度器
像 Spring Batch环境下的其他任务一样, SHDP为将 Hadoop jar当做 Spring Batch的一个步骤运行提供了专用的调度器。调度器元素支持除“ run-at-startup”之外的像 #hadoop:jar-runner一样的配置选项。
<hdp:jar-tasklet id= ”jar-tasklet ” jar= ”some-jar.jar ”/>
4.5. 配置 Hadoop的分布式缓存
分布式缓存是Hadoop为分布式应用特别定制的组件,对于处理只读大文件有很高的效率( text、归档、 jar包等等)。应用通过 urls指定使用 DsiatributedCache缓存的文件,框架会在 job的任何任务在计算节点上运行之前将必要的文件加载至这个节点。这是一种效率很高的机制因为每个 job只需要拷贝一次并且只缓存没有归档的数据文件。需要注意的是 DistributedCache假设需要被缓存的文件(即通过 hdfs://urls指定的文件)在 Hadoop的 FileSystem上已经存在。
SHDP通过 cache元素(其支撑类为 DistributedCacheFactoryBean)为分布式缓存提供了非常易用的配置,通过计算节点何以方便地进行分布式缓存文件和归档。
<hdp:cache create-symlink= ”true ”>
<hdp:classpath value= ”/cp/some-library.jar#library.jar ” />
<hdp:cache value= ”/cache/some-archive.tgz#main-archive ” />
<hdp:cache value= ”/cache/some-resource.res ” />
<hdp:local value= ”some-file.txt ” />
</hdp:cache>
上面的定义使用cache注册了几个资源(把它们添加至 job的缓存或者 classpath)并且为它们创建了符号连接。声明的格式为 absolute-path#link-name。 Link-name如果没有指定则由 URI的片段自动设定, cache bean会根据 resource文件名生成一个。请注意,所有的设定并没有 hdfs://node:port前缀,这个前缀是根据与 bean联系的配置决定的;这样可以避免环境配置中以硬编码的方式进行设定,增加了应用的移植性。另外根据不同的扩展名( .tgz、 .tar.gz、 .zip和 .tar)对于需要解压的归档定义方法也是不同的,一般文件就进行简单的 copy。命令空间中剩下的声明对于默认依赖的 Hadoop Configuration和 FileSystem对象也没有相应的配置(通过 configuration-ref 和 file-system-ref)它会倒回到默认命名中与名为 hadoopConfiguration的 bean进行关联并自动创建 FileSystem对象。
注意:如果客户端在Windows平台在 DistributedCache中建立 classpath应该将系统的 path.separator属性设置为“:”。否则 classpath的设置就是错误的并被忽略 --查看 HADOOP-9123的 bug报告获取更多信息。修改 path.separator有多种方法,最快捷的方法就是在启动时运行一个 JavaScript脚本:
<hdp:script language= ”javascript ” run-at-startup= ”true ”>
java.lang.System.setProperty( “path.separator ”, ”: ”)
</hdp:script>
4.6. Map Reduce通用选项
Job、 streaming和 tool都支持通用选项的子集,尤其是 archives, files和 libs。扩展 job的 classpath时 libs无疑是最有用的一个属性,虽然其他两个属性允许为 job的运行在集群范围内 copy资源和归档。当面临依赖供应问题时重新审视这些属性有极大的帮助。注意, fs、 jt和 conf选项并不被支持,它们都是被设计用在命令行使用的,用来引导引用程序。在 SHDP提供了优秀的 Hadoop配置的定义和定制后这些就不再需要了。
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com