Kafka的使用和错误解决
Kafka的使用和错误解决一、下载kafka解压缩:配置环境变量
vim /etc/profile
export KAFKA_HOME=/root/kafka_2.11-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
二 、kafka中需要使用zookeeper
(一)使用kafka自带的zookeeper
[*] 先将zookeeper启动,如果在伪分布式下,kafka已经集成了zk,在kafka中的config目录下。
可以编辑config/zookeeper.properties修改zookeeper的端口号。
后台启动zookeeper:
# nohup zookeeper-server-start.sh ../config/zookeeper.properties &
[*]启动broker
`# nohup kafka-server-start.sh ../config/server.properties &`
3.测试:模拟消息的消费和生产
(1)创建主题
# kafka-topics.sh --create --zookeeper localhost:2281 --topic KafkaTestTopic --partitions 1 --replication-factor 1
Created topic "KafkaTestTopic".
(2)创建生产者
# kafka-console-producer.sh --topic KafkaTestTopic --broker-list localhost:9092
查看server.properties中的#listeners=PLAINTEXT://:9092,获取kafka的端口
(3)创建消费者
# kafka-console-consumer.sh --topic KafkaTestTopic --zookeeper localhost:2281
(二)使用非kafka自带的zookeeper
使用zookeeper(非kafka自带)
# bin/zkServer.sh start conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED
(1) 创建主题
# bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic secondTopic --partitions 1 --replication-factor 1
Created topic "secondTopic".
(2)kafka启动
# nohup bin/kafka-server-start.sh config/server.properties &
(3)kafka生产者
# kafka-console-producer.sh --topic KafkaTestTopic --broker-list localhost:9092
(4)kafka消费者
#bin/kafka-console-consumer.sh --topic KafkaTestTopic --zookeeper localhost:2181
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing instead of .
(5)查看kafka中的数据
# ls
binconfiglibsLICENSElogslogs-kafkanohup.outNOTICEsite-docs
# cd logs-kafka/ #kafka中的数据存储目录
##这个目录是在kafka的config/server.properties文件中进行配置的
log.dirs=/root/kafka/kafka_2.11-1.0.0/logs-kafka
# ls #查看kafka中的主题
cleaner-offset-checkpoint__consumer_offsets-20__consumer_offsets-33__consumer_offsets-46kafka_test-0
__consumer_offsets-0 __consumer_offsets-21__consumer_offsets-34__consumer_offsets-47KafkaTestTopic-0
__consumer_offsets-1 __consumer_offsets-22__consumer_offsets-35__consumer_offsets-48log-start-offset-checkpoint
__consumer_offsets-10 __consumer_offsets-23__consumer_offsets-36__consumer_offsets-49meta.properties
__consumer_offsets-11 __consumer_offsets-24__consumer_offsets-37__consumer_offsets-5 My_LOVE_TOPIC-0
__consumer_offsets-12 __consumer_offsets-25__consumer_offsets-38__consumer_offsets-6 mytopic-0
__consumer_offsets-13 __consumer_offsets-26__consumer_offsets-39__consumer_offsets-7 recovery-point-offset-checkpoint
__consumer_offsets-14 __consumer_offsets-27__consumer_offsets-4 __consumer_offsets-8 replication-offset-checkpoint
__consumer_offsets-15 __consumer_offsets-28__consumer_offsets-40__consumer_offsets-9 stock-quotation-0
__consumer_offsets-16 __consumer_offsets-29__consumer_offsets-41hello-0 stock-quotation-avro-0
__consumer_offsets-17 __consumer_offsets-3 __consumer_offsets-42hello-1 stock-quotation-partition-0
__consumer_offsets-18 __consumer_offsets-30__consumer_offsets-43hello-2 TEST-TOPIC-0
__consumer_offsets-19 __consumer_offsets-31__consumer_offsets-44hello-3
__consumer_offsets-2 __consumer_offsets-32__consumer_offsets-45hello-4
# cd KafkaTestTopic-0/ #查看kakfa的主题为KafkaTestTopic的0号分区
# ls
00000000000000000000.index00000000000000000000.timeindexleader-epoch-checkpoint
00000000000000000000.log 00000000000000000063.snapshot
# tail -f 000000000000000000.log #kafka中的数据存储文件
(6)修改kafka的分区数,观察kafka的变化
## 修改kafka分区数
# bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic KafkaTestTopic --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
# ls
binconfiglibsLICENSElogslogs-kafkanohup.outNOTICEsite-docs
# cd logs-kafka/
#发现出现kakfa的主题为KafkaTestTopic的0号分区,1号分区,2号分区,总共3个分区
# ls
cleaner-offset-checkpoint__consumer_offsets-20__consumer_offsets-33__consumer_offsets-46kafka_test-0
__consumer_offsets-0 __consumer_offsets-21__consumer_offsets-34__consumer_offsets-47KafkaTestTopic-0
__consumer_offsets-1 __consumer_offsets-22__consumer_offsets-35__consumer_offsets-48KafkaTestTopic-1
__consumer_offsets-10 __consumer_offsets-23__consumer_offsets-36__consumer_offsets-49KafkaTestTopic-2
__consumer_offsets-11 __consumer_offsets-24__consumer_offsets-37__consumer_offsets-5 log-start-offset-checkpoint
__consumer_offsets-12 __consumer_offsets-25__consumer_offsets-38__consumer_offsets-6 meta.properties
__consumer_offsets-13 __consumer_offsets-26__consumer_offsets-39__consumer_offsets-7 My_LOVE_TOPIC-0
__consumer_offsets-14 __consumer_offsets-27__consumer_offsets-4 __consumer_offsets-8 mytopic-0
__consumer_offsets-15 __consumer_offsets-28__consumer_offsets-40__consumer_offsets-9 recovery-point-offset-checkpoint
__consumer_offsets-16 __consumer_offsets-29__consumer_offsets-41hello-0 replication-offset-checkpoint
__consumer_offsets-17 __consumer_offsets-3 __consumer_offsets-42hello-1 stock-quotation-0
__consumer_offsets-18 __consumer_offsets-30__consumer_offsets-43hello-2 stock-quotation-avro-0
__consumer_offsets-19 __consumer_offsets-31__consumer_offsets-44hello-3 stock-quotation-partition-0
__consumer_offsets-2 __consumer_offsets-32__consumer_offsets-45hello-4 TEST-TOPIC-0
# ls #查看kakfa的主题为KafkaTestTopic的1号分区
00000000000000000000.index00000000000000000000.log00000000000000000000.timeindexleader-epoch-checkpoint
# tail -f 00000000000000000000.log
三、可能出现的错误:
(1)
# kafka-topics.sh --create --zookeeper localhost:2281 --topic KafkaTestTopic --partitions 1 --replication-factor 1
Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.
(kafka.admin.TopicCommand$)
解决:修改server.properties中的:zookeeper.connect=localhost:2281,让2281端口号和zookeeper.properties中的zookeeper端口号一致,然后重启kafka。**
(2)
kafka.common.KafkaException: fetching topic metadata for topics from broker failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:77)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:98)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:67)
(3)
WARN Error while fetching metadata with correlation id 52 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
WARN Error while fetching metadata with correlation id 53 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
WARN Error while fetching metadata with correlation id 54 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
WARN Error while fetching metadata with correlation id 55 : {KafkaTestTopic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
解决(2)和(3)的错误:
修改server.properties中的
I、listeners=PLAINTEXT://localhost:9092
II、 advertised.listeners=PLAINTEXT://localhost:9092
(4) WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
解决:可能的原因:kafka未启动,重启启动kafka。
kafka中查看zookeeper状态:
bin/zookeeper-shell.sh localhost:2181
页:
[1]