|
|
sc.parallelize(List((stattime,"pv",1L),
(stattime,"ip",2L),
(stattime,"uv",3L),
(stattime,"newuser",4L),
(stattime,"beakrate",5L),
(stattime,"visittimes",6L),
(stattime,"avgvisittime",7L)
)).foreachPartition{
it =>
var conn:Connection = null
var ps : PreparedStatement = null
try{
Class.forName("com.mysql.jdbc.Driver").newInstance()
conn = DriverManager.getConnection("jdbc:mysql://10.0.0.46:3306/logviewtest", "logviewtest", "logviewtest")
ps = conn.prepareStatement("insert into loging_chexun_hour(stat_hour,type,value) values (?,?,?)")
for (data <- it) {
ps.setString(1, data._1)
ps.setString(2, data._2)
ps.setFloat(3, data._3)
ps.executeUpdate()
}
}catch {
case e : Exception => println("MySQL Exception")
println(e.getMessage)
}finally {
if(ps != null) ps.close()
if(conn != null) conn.close()
}
}
注意:需要把Class.forName("com.mysql.jdbc.Driver").newInstance()加上,否则在分布式提交的时候,数据没有插入到数据库中。
提交的命令如下:
bin/spark-submit --master spark://10.0.0.37:7077 --class com.chexun.statistic.ChexunHourCount --executor-memory 8g --jars /opt/soft/spark/lib/mysql-connector-java-5.1.34.jar /opt/soft/spark/test-example/chexun.jar
|
|
|