Spark1.5.1学习笔记(一)Standalone集群配置
鲁春利的工作笔记,好记性不如烂笔头Apache Spark是一个通用的大规模数据快速处理引擎,基于 Scala 语言实现,但提供多语言(Java, Scala, Python and R)API,以及丰富的工具集(如Spark SQL、MLib、GraphX和Spark Streaming)。
http://s5.运维网.com/wyfs02/M01/79/39/wKioL1aL4s3S5meGAACk0v7dD_k299.jpg
Spark下载地址:http://spark.apache.org/downloads.html
Scala下载地址:http://www.scala-lang.org/download/
Spark能够运行在Windows或类Unix系统中,在单台机器上可以以本地模式运行。
当前最新的版本为Spark1.5.1,要求的运行环境为:
Spark runs on Java 7+, Python 2.6+ and R 3.1+. For the Scala API, Spark 1.5.1 uses Scala 2.10. You will need to use a compatible Scala version (2.10.x).
Note: Scala 2.11 users should download the Spark source package and build with Scala 2.11 support。
Spark四大特性:
Speed
Ease of Use
Generality
Runs EveryWhere
版本说明:spark-1.5.1-bin-hadoop2.6.tgz
http://s2.运维网.com/wyfs02/M00/79/34/wKiom1aLcw2BRgw4AAI4bvTTrn8851.jpg
说明:下载时可以选择需要下载的内容及版本(千万别下成Source Code了)。
Spark主要包括Spark SQL、Spark Streaming、MLlib(Machine Learning)、Graphx四部分。
http://s1.运维网.com/wyfs02/M02/79/32/wKioL1aLahzxXg_xAADIRdxWub8542.jpg
Spark的运行模式分为:
Local : 本地模式
Standalone: Spark进行资源管理的集群模式
Yarn : 使用Yarn进行资源管理
Mesos : 类似Yarn,提供了有效的、跨分布式应用或框架的资源隔离和共享。
配置standalone模式时参考http://spark.apache.org/docs/1.5.1/spark-standalone.html
http://s3.运维网.com/wyfs02/M01/79/33/wKioL1aLctGzNYo4AACEpMqtANc191.jpg
解压spark-1.5.1-bin-hadoop2.6.tgz
$ tar -xzv -f spark-1.5.1-bin-hadoop2.6.tgz
$ mv spark-1.5.1-bin-hadoop2.6spark1.5.1 配置环境变量
$ vim ~/.bash_profile
# 新增
export SPARK_HOME=/usr/local/spark1.5.1
export PATH=$SPARK_HOME/bin:$PATH 配置Worker节点
$ cp slaves.template slaves
$ vim slaves
# A Spark Worker will be started on each of the machines listed below.
dnode1
dnode2
# 通过该文件指定Worker节点 配置spark-env.sh
$ cp spark-env.sh.template spark-env.sh
$ vim spark-env.sh
export JAVA_HOME=/usr/local/jdk1.7
# 指定在计算过程中产生的临时文件存储路径
SPARK_LOCAL_DIRS=${SPARK_HOME}/tmp
# 指定集群中的Master
SPARK_MASTER_IP=nnode
SPARK_PID_DIR=${SPARK_HOME} 启动Master
$ sbin/start-master.sh 启动Slaves
$ sbin/start-slaves.sh 查看进程
# master节点
$ jps
13050 Master
13149 Jps
$
# worker节点
$ jps
12540 Jps
12408 Worker
$
$ jps
12371 Worker
12500 Jps
$
通过Master节点WebUI验证
http://s2.运维网.com/wyfs02/M01/78/6D/wKioL1Z8CFai30T1AAQfFvmajMA426.jpg
查看Worker节点日志
http://s3.运维网.com/wyfs02/M02/78/6D/wKioL1Z8CSPhAJZ9AAfwAvXAJQ4344.jpg
通过Worker节点WebUI验证
http://s1.运维网.com/wyfs02/M02/78/6E/wKiom1Z8CUXT2mJRAAIO-fhG9uo169.jpg
客户端访问
说明:
bin/spark-shell --master local // 单独启动spark-shell时默认为local模式
bin/spark-shell --master spark://nnode:7077 // 以集群模式启动
$ bin/spark-shell --master spark://nnode:7077
Welcome to
____ __
/ __/_____ _____/ /__
_\ \/ _ \/ _ `/ __/'_/
/___/ .__/\_,_/_/ /_/\_\ version 1.5.1
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_75)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
# 中间略
SQL context available as sqlContext.
scala> :quit
Stopping spark context.
$ spark-shell脚本也有对应的WebUI界面
http://nnode:4040
如上所示,在spark-shell中,sc为SparkContext的简写形式。
http://spark.apache.org/docs/1.5.1/api/scala/index.html#org.apache.spark.SparkContext
SparkContext是Spark的入口,负责连接Spark集群,创建RDD,累积量和广播量等。从本质上来说,SparkContext是Spark的对外接口,负责向调用这提供Spark的各种功能。
scala> sc.textFile("/home/hadoop/test.txt").collect;
15/12/24 23:26:46 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 13, 192.168.137.119): java.io.FileNotFoundException: File file:/home/hadoop/test.txt does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:534)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:747)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:524)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:140)
at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
at org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:108)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:239)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:216)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
res5: Array = Array(hello world, hello you, how are you)
scala> sc.textFile("/home/hadoop/test.txt").collect;
res6: Array = Array(hello world, hello you, how are you) 说明:当执行sc.textFile命令的时候,会从两台Worker节点加载数据,如果只有一台Woker节点存在该文件的话,Scala会报错说另一台机器无对应的文件。
deftextFile(path: String, minPartitions: Int = defaultMinPartitions): RDD
Read a text file from HDFS, a local file system (available on all nodes), or any
Hadoop-supported file system URI, and return it as an RDD of Strings. 通过Spark实现WordCount
scala> var text = sc.textFile("/home/hadoop/test.txt");
text: org.apache.spark.rdd.RDD = MapPartitionsRDD at textFile at :21
# 代码一
scala> text.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect;
res10: Array[(String, Int)] = Array((are,1), (how,1), (hello,2), (world,1), (you,2))
# 代码二
scala> text.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect;
res11: Array[(String, Int)] = Array((are,1), (how,1), (hello,2), (world,1), (you,2)) 注意:
由于Tomcat在启动时默认的端口为8080,为了避免冲突可以通过
$ vim sbin/start-master.sh修改默认端口
if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
SPARK_MASTER_WEBUI_PORT=8080
fi
页:
[1]