noel0217 发表于 2016-12-7 11:19:58

hadoop API 读/写数据库

  利用Hadoop API 从数据库中读出数据 简单处理 并写入数据库中

package dbio;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class DBIO extends Configured {
static String driverClassName = "oracle.jdbc.driver.OracleDriver";
static String url = "jdbc:oracle:thin:@10.10.31.81:1521/oradev";
static String username = "scott";
static String password = "test001";
/**
* 用户自定义对象 保存
*
* @author Administrator
*
*/
public static class AccessRecord implements
WritableComparable<AccessRecord>, DBWritable {
int prodid; // 商品编码
int price; // 商品价格
int count; // 商品销售数量
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setInt(1, prodid);
statement.setInt(2, price);
statement.setInt(3, count);
}
@Override
public void readFields(ResultSet resultSet) throws SQLException {
this.prodid = resultSet.getInt(1);
this.price = resultSet.getInt(2);
this.count = resultSet.getInt(3);
}
/**
* Set the prodId and price and count values.
*/
public void set(int prodid, int price, int count) {
this.prodid = prodid;
this.price = price;
this.count = count;
}
public int getProdid() {
return prodid;
}
public int getPrice() {
return price;
}
public int getCount() {
return count;
}
@Override
// 反序列化,从流中的二进制转换成AccessRecord
public void readFields(DataInput in) throws IOException {
prodid = in.readInt();
price = in.readInt();
count = in.readInt();
}
@Override
// 序列化,将AccessRecord 转化成使用流传送的二进制
public void write(DataOutput out) throws IOException {
out.writeInt(prodid);
out.writeInt(price);
out.writeInt(count);
}
@Override
// key的比较
public int compareTo(AccessRecord o) {
if (o.count == count) {
if (o.count == 0) {
return o.prodid - prodid;
}
return o.price - price;
}
return o.count - count;
}
// 新定义类应该重写的两个方法
@Override
public int hashCode() {
return count + prodid * 3;
}
@Override
public boolean equals(Object right) {
if (right == null)
return false;
if (this == right)
return true;
if (right instanceof AccessRecord) {
AccessRecord r = (AccessRecord) right;
return r.prodid == prodid && r.price == price
&& r.count == count;
} else {
return false;
}
}
}
static class MapDataBaseInHDFS extends
Mapper<Object, AccessRecord, LongWritable , AccessRecord> {
LongWritable writeKey = new LongWritable();
public void map(Object key, AccessRecord value, Context context)
throws IOException, InterruptedException {
context.write(writeKey,value ); // 输出到Reduce
}
}
public static class ReduceHDFSInDataBase extends
Reducer<LongWritable , AccessRecord,AccessRecord , LongWritable> {
LongWritable writKey = new LongWritable();
public void reduce(LongWritable key, Iterable<AccessRecord> values,
Context context) throws IOException, InterruptedException {
for (AccessRecord writValue : values) {
context.write(writValue, writKey);
}
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
DBConfiguration.configureDB(conf, driverClassName, url, username,password);
Job job = new Job(conf, "dbio");
job.setJarByClass(DBIO.class);
// 设置mapper 和 reduce
job.setMapperClass(MapDataBaseInHDFS.class);
job.setReducerClass(ReduceHDFSInDataBase.class);
// 设置输出类型
// map 输出Key的类型
job.setMapOutputKeyClass(LongWritable.class);
// map输出Value的类型
job.setMapOutputValueClass(AccessRecord.class);
// rduce输出Key的类型
job.setOutputKeyClass(AccessRecord.class);
// rduce输出Value的类型
job.setOutputValueClass(LongWritable.class);
// 设置输入输出路径
job.setInputFormatClass(DBInputFormat.class);
DBInputFormat.setInput(job, AccessRecord.class,
"SELECT PRODID ,PRICE ,COUNT FROM TMP_HADOOP WHERE rownum<100",
"SELECT count(PRODID) FROM TMP_HADOOP");
// DBInputFormat.setInput(job, AccessRecord.class, "CUST_CLUB",
// "rownum<10", "PRODID", new String[] { "PRODID","PRICE","COUNT"});
job.setOutputFormatClass(DBOutputFormat.class);
DBOutputFormat.setOutput(job, "TMP_HADOOP", new String[] { "PRODID","PRICE", "COUNT" });
job.waitForCompletion(true);
}
}
页: [1]
查看完整版本: hadoop API 读/写数据库