设为首页 收藏本站
查看: 509|回复: 0

[经验分享] Hadoop与关系数据库

[复制链接]

尚未签到

发表于 2016-12-6 06:46:29 | 显示全部楼层 |阅读模式
Hadoop对关系数据库无非两种操作,即从关系数据库输入到HDFS和从HDFS输出到关系数据库。Hadoop中分别提供了DBInputFormat类和DBOutputFormat类,前者用于从关系数据库输入到HDFS,该类将关系数据库中的一条记录作为向Mapper输入的value值,后者用于将HDFS中的文件输出到关系数据库,该类将Reducer输出的key值存储到数据库。我们只要在主程序中设置job的输入输出格式为这两个类中的一种,就可以让Hadoop从关系数据库输入或者向关系数据库输出。
正如我上面提到的,我们在操作的过程中使用了“记录”这个对象,因此需要写一个类对应到关系数据库中我们要操作的那个表,这个类要实现DBWritable接口和Writable接口,具体参见HadoopAPI。
具体代码参见文档。
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.db.*;
import java.sql.*;
import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;
public class SDBConnInput {
public static class CustomerRecord implements Writable,DBWritable{
String customerID;
String customerName;
String phoneNumber;
public void readFields(ResultSet resultSet)  throws SQLException{
customerID=resultSet.getString(1);
customerName=resultSet.getString(2);
phoneNumber=resultSet.getString(3);
}
public void write(PreparedStatement statement)  throws SQLException{
statement.setString(1, customerID);
statement.setString(2, customerName);
statement.setString(3,phoneNumber);
}
public void readFields(DataInput in) throws IOException{
customerID=in.readUTF();
customerName=in.readUTF();
phoneNumber=in.readUTF();
}
public void write(DataOutput out) throws IOException{
out.writeUTF(customerID);
out.writeUTF(customerName);
out.writeUTF(phoneNumber);
}
public void setCustomerID(String customerID){
this.customerID=customerID;
}
public void setCustomerName(String customerName){
this.customerName=customerName;
}
public void setPhoneNumber(String phoneNumber){
this.phoneNumber=phoneNumber;
}
public String toString(){
return this.customerID+","+this.customerName+","+this.phoneNumber;
}
}
public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,CustomerRecord,LongWritable,Text>{
Text result= new Text();
public void map(LongWritable key, CustomerRecord value,OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException{
result.set(value.toString());
collector.collect(key, result);
}
}
public static class ReducerClass extends MapReduceBase implements Reducer<LongWritable, Text,NullWritable,Text>{
public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<NullWritable,Text> output, Reporter reporter) throws IOException{
String str="";
while(values.hasNext()){
str+=values.next().toString();
}
output.collect(null, new Text(str));
}
}
public static void main(String [] args) throws Exception{
/**
* 从关系数据库读取数据到HDFS
*/
JobConf job = new JobConf();
job.setJarByClass(SDBConnInput.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/user/xuyizhen/out"));
DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.0.25:3306/hadoop","root","1117");
String fieldNames []={"customerID","customerName","phoneNumber"};
DBInputFormat.setInput(job, CustomerRecord.class,"customers",null,"customerID", fieldNames);
job.setMapperClass(MapperClass.class);
job.setReducerClass(ReducerClass.class);
JobClient.runJob(job);
}
}
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.lib.db.*;
import java.sql.*;
import java.io.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.*;
public class SDBConnOutput {
public static class CustomerRecord implements Writable,DBWritable{
String customerID;
String customerName;
String phoneNumber;
public void readFields(ResultSet resultSet)  throws SQLException{
customerID=resultSet.getString(1);
customerName=resultSet.getString(2);
phoneNumber=resultSet.getString(3);
}
public void write(PreparedStatement statement)  throws SQLException{
statement.setString(1, customerID);
statement.setString(2, customerName);
statement.setString(3,phoneNumber);
}
public void readFields(DataInput in) throws IOException{
customerID=in.readUTF();
customerName=in.readUTF();
phoneNumber=in.readUTF();
}
public void write(DataOutput out) throws IOException{
out.writeUTF(customerID);
out.writeUTF(customerName);
out.writeUTF(phoneNumber);
}
public void setCustomerID(String customerID){
this.customerID=customerID;
}
public void setCustomerName(String customerName){
this.customerName=customerName;
}
public void setPhoneNumber(String phoneNumber){
this.phoneNumber=phoneNumber;
}
public String toString(){
return this.customerID+","+this.customerName+","+this.phoneNumber;
}
}
public static class MapperClass extends MapReduceBase implements Mapper<LongWritable,Text,CustomerRecord,Text>{
CustomerRecord customer=new CustomerRecord();
public void map(LongWritable key, Text value,OutputCollector<CustomerRecord,Text> collector, Reporter reporter)  throws IOException{
String [] strs=value.toString().split(",");
customer.setCustomerID(strs[0]);
customer.setCustomerName(strs[1]);
customer.setPhoneNumber(strs[2]);
collector.collect( customer,value);
}
}
/**
*将HDFS中的文件输出到数据库
*/
public static void main(String [] args) throws Exception{

/**
* 从关系数据库读取数据到HDFS
*/
JobConf job = new JobConf(SDBConnInput.class);
//DBOutputFormat类只会将MapReduce框架输出结果的K值输出到关系数据库中
job.setOutputFormat(DBOutputFormat.class);
FileInputFormat.addInputPath(job, new Path("hdfs://master:9000/user/xuyizhen/in/customer.txt"));
DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver",
"jdbc:mysql://192.168.0.25:3306/hadoop","root","1117");
String fieldNames []={"customerID","customerName","phoneNumber"};
DBOutputFormat.setOutput(job, "customers", fieldNames);
job.setMapperClass(MapperClass.class);
job.setNumReduceTasks(0);
JobClient.runJob(job);
}
}
注意:运行MapReduce时候报错:
java.io.IOException: com.mysql.jdbc.Driver
一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
1.在每个节点下的${HADOOP_HOME}/lib下添加该包,然后重启集群,这是比较原始的方法。
2.把包传到集群上:hadoop fs -put mysql驱动jar包名称/lib,并且在提交job前,添加语句DistributedCache.addFileToClassPath(new Path("/lib/mysql驱动jar包名称"),conf);
以上方法使用与所有需要额外jar包的MapReduce代码。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.iyunv.com/thread-310105-1-1.html 上篇帖子: hadoop相关知识总结 下篇帖子: Greenplum Hadoop视频教程
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表