sdfsdnfslk 发表于 2017-5-23 17:16:45

【Spark二一】Spark Streaming结合Kafka

  本篇运行Spark Streaming自带的例子KafkaWorkCount,为运行这个例子,需要搭建环境,具体的讲,需要


[*]安装运行Kafka
[*]安装运行Zookeeper(因为Kafka的运行依赖于Zookeeper以注册Topic到Zookeeper上)   ---,除了安装运行独立的Zookeeper,Kafka也可以使用安装包里的Zookeeper,如果Kafka要使用自己的Zookeeper,那么需要在Kafka的bin目录下启动Zookeeper。因此,如果使用独立的Zookeeper的时候,就无需启动Kafka下面的Zookeeper了。在Kafka启动过程中看到有关Zookeeper的日志,这是Kafka作为Zookeeper的客户端正在建立与Zookeeper服务器的通讯

[*]运行Spark Streaming

安装Kafka
   1. 下载Kafka

http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
     2.10表示Scala的版本,而0.8.1.1表示Kafka的版本

  2. 解压Kafka
      惊讶的是Kafka内置了Zookeeper的安装包以及启停Zookeeper的脚本,版本比较低,是3.3.4版本。理论上不应该使用Kafka的版本,因为Zookeeper是个通用分布式配置和协调系统。

配置Kafka
  1. 修改配置文件config/server.properties
     host.name和avertised.host.name默认是注释掉的,把它打开

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=localhost
  2. 配置Zookeeper

      本文使用单独安装的Zookeeper,而不是使用Kafka自带的Zookeeper,Kafka为了能够知道它要连接的Zookeeper地址,配置文件中提供了一系列和Zookeeper相关的配置参数


[*]config/server.properties

############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
//2181是Zookeeper的clientPort
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
   


[*]config/producer.properties
      无相关配置


[*]config/consumer.properties

# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=127.0.0.1:2181
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
   

启动Zookeeper
  1.根据之前Kafka对Zookeeper的配置,Zookeeper应该配置端口2181端口
  2. 使用如下命令启动Zookeeper,启动Zookeeper的参数如下:

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/hadoop/software/zookeeper-3.4.6/data
# the port at which the clients will connect
clientPort=2181
 
启动Kafka
  1. 启动Kafka

$ bin/kafka-server-start.sh config/server.properties
  2.启动日志:
     从启动日志中,貌似Kafka是在启动自身的Zookeeper,如果启动自身的Zookeeper也是可以理解的,因为Kafka依赖Zookeeper,只要维护好自己的这一份Zookeeper,而无需关心独立的Zookeeper。

INFO Verifying properties (kafka.utils.VerifiableProperties)
INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
INFO Property log.dirs is overridden to /tmp/kafka-logs (kafka.utils.VerifiableProperties)
INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)
INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)
INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)
INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)
INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)
INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)
INFO , starting (kafka.server.KafkaServer)
INFO , Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)
INFO Client environment:host.name=hadoop.master (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.version=1.7.0_67 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.home=/home/hadoop/software/jdk1.7.0_67/jre (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.class.path=:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/scala-library-2.10.1.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:os.version=3.10.0-123.el7.x86_64 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:user.name=hadoop (org.apache.zookeeper.ZooKeeper)
INFO Client environment:user.home=/home/hadoop (org.apache.zookeeper.ZooKeeper)
INFO Client environment:user.dir=/home/hadoop/software/kafka_2.10-0.8.1.1 (org.apache.zookeeper.ZooKeeper)
INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7a50a6d4 (org.apache.zookeeper.ZooKeeper)
INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x14ad79bb13d0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)
INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)
INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
INFO , Started (kafka.network.SocketServer)
INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
INFO Registered broker 0 at path /brokers/ids/0 with address hadoop.master:9092. (kafka.utils.ZkUtils$)
INFO , started (kafka.server.KafkaServer)
INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
 
Kafka测试
  1. ///创建一个Topic,取名为test

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1--topic test
Created topic "test".
///列出创建的Topic,这里只有一个test
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test
   2. Producer创建消息
     启动时,除了打印SLF4J之外,没有别的。下面可以直接输入生产的数据

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

This is mesage
This is a test
   3. Consumer消费消息
      启动时,除了打印SLF4J之外,没有别的

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

This is mesage
This is a test
 
 运行Spark Stream的Kafka实例
  1. 首先关掉在测试Kafka能够正常工作而开启的Producer和Consumer
  2. 运行KafkaWordCountProducer

bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test 3 5
   
  参数说明:


[*]KafkaWordCountProducer producer的类名
[*]localhost:9092表示Kafka服务器的IP和端口
[*]test表示topic
[*]3表示每秒发多少条消息
[*]5表示每条消息中有几个单词
   3. 运行KafkaWordCount

bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1
  参数说明 


[*]KafkaWordCount表示消息的消费者
[*]localhost:2181表示Zookeeper的客户端监听IP和端口
[*]test-consumer-group表示consumer的分组,这个分组是在Kafka的config/consumer.properties中配置的,有多个分组怎么办?

#consumer group id
group.id=test-consumer-group
   


[*]test表示topic
[*]1表示线程数

 总结:
  1. 同NetworkWordCount一样,这个例子运行起来后也没看到结果在那里输出,这究竟是什么地方出问题了???
  参考:http://www.cnblogs.com/hseagle/p/3887507.html
页: [1]
查看完整版本: 【Spark二一】Spark Streaming结合Kafka