kafka spring 实例
使用定时器发送后结果如下kafka 代码下载
Java代码
[*] 15.安装kafka
[*] cd /usr/local/
[*] wget http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
[*] tar xf kafka_2.10-0.10.0.0.tgz
[*] ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
[*] chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
[*] chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
[*]
[*] /usr/local/zookeeper/bin/zkCli.sh
[*] create /kafka ''
[*]
[*] vim /usr/local/kafka/config/server.properties
[*] broker.id=0
[*] zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka
[*]
[*] scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev05.aoiplus.openpf:/usr/local/
[*] scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev06.aoiplus.openpf:/usr/local/
[*]
[*] scp -r /usr/local/kafka/config/server.properties root@dev05.aoiplus.openpf:/usr/local/kafka/config/server.properties
[*] scp -r /usr/local/kafka/config/server.properties root@dev06.aoiplus.openpf:/usr/local/kafka/config/server.properties
[*]
[*] master slave 启动
[*] /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
[*] 创建topic
[*] /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic
[*] /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic
[*]
[*]
[*] /usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic
[*]
[*] /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic
安装完成 后测试 下载
productor
consumer
spring 接受信息
代码部分
applicationContext-kafka-productor.xml
Java代码 下载
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*] 3600000
[*] 5
[*] kafka.serializer.StringEncoder
[*] 1
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
applicationContext-kafka-consumer.xml
Java代码
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*] smallest
[*] 10485760
[*] 5242880
[*] 1000
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
[*]
KafkaConsumerService
Java代码 下载
[*] @Service
[*] public class KafkaConsumerService {
[*]
[*]
[*] public void process(Map msgs) {
[*] for (Map.Entry entry : msgs.entrySet()) {
[*] System.out.println("======================================Consumer Message received: ");
[*] System.out.println("=====================================Suchit Topic:" + entry.getKey());
[*] for (String msg : entry.getValue().values()) {
[*] System.out.println("================================Suchit Consumed Message: " + msg);
[*] }
[*] }
[*] }
[*]
[*] }
KafkaProductorService
Java代码
[*] @Service
[*] ublic class KafkaProductorService {
[*]
[*]
[*] @Autowired
[*] @Qualifier("pChannel")
[*] private MessageChannel messageChannel;
[*]
[*]
[*] public void sendInfo(String topic, Object obj) {
[*] System.out.println("---Service:KafkaService------sendInfo------");
[*] messageChannel.send(MessageBuilder.withPayload(obj).setHeader(KafkaHeaders.TOPIC,topic).build());
[*] }
[*]
pom
Java代码 下载
[*]
[*] 4.0.0
[*] com.curiousby.baoyou.cn
[*] SpringKafkaDEMO
[*] war
[*] 0.0.1-SNAPSHOT
[*] SpringKafkaDEMO Maven Webapp
[*] http://maven.apache.org
[*]
[*]
[*]
[*]
[*] 4.2.5.RELEASE
[*]
[*]
[*]
[*]
[*]
[*] junit
[*] junit
[*] 4.7
[*] jar
[*] test
[*]
[*]
[*] org.dbunit
[*] dbunit
[*] 2.4.9
[*] test
[*]
[*]
[*] com.github.springtestdbunit
[*] spring-test-dbunit
[*] 1.1.0
[*] test
[*]
[*]
[*] org.springframework
[*] spring-test
[*] ${spring.version}
[*] test
[*]
[*]
[*]
[*]
[*] javax.servlet
[*] javax.servlet-api
[*] 3.1.0
[*] provided
[*]
[*]
[*] org.aspectj
[*] aspectjrt
[*] 1.7.2
[*]
[*]
[*] org.aspectj
[*] aspectjweaver
[*] 1.7.2
[*]
[*]
[*] org.springframework
[*] spring-aspects
[*] ${spring.version}
[*] jar
[*]
[*]
[*] org.springframework
[*] spring-core
[*] ${spring.version}
[*]
[*]
[*] org.springframework
[*] spring-web
[*] ${spring.version}
[*]
[*]
[*] org.springframework
[*] spring-webmvc
[*] ${spring.version}
[*]
[*]
[*] org.springframework.integration
[*] spring-integration-kafka
[*] 1.3.0.RELEASE
[*]
[*]
[*] commons-logging
[*] commons-logging
[*] 1.1.1
[*]
[*]
[*] org.slf4j
[*] slf4j-api
[*] 1.6.4
[*] jar
[*]
[*]
[*] org.slf4j
[*] slf4j-log4j12
[*] 1.6.4
[*] jar
[*] 下载
[*]
[*] javax
[*] javaee-api
[*] 7.0
[*]
[*]
[*] com.fasterxml.jackson.core
[*] jackson-core
[*] 2.7.6
[*]
[*]
[*] com.fasterxml.jackson.core
[*] jackson-databind
[*] 2.7.6
[*]
[*]
[*] com.fasterxml.jackson.core
[*] jackson-annotations
[*] 2.7.6
[*]
[*]
[*]
[*] org.apache.avro
[*] avro
[*] 1.7.7
[*]
[*]
[*]
[*]
[*] SpringKafkaDEMO
[*]
[*]
[*] org.apache.maven.plugins
[*] maven-compiler-plugin
[*] 3.3
[*]
[*]
[*] org.codehaus.plexus
[*] plexus-compiler-javac
[*] 2.5
[*]
[*]
[*]
[*] 1.7
[*] 1.7
[*] UTF-8
[*]
[*]
[*] ${java.home}/lib/rt.jar:${java.home}/lib/jce.jar
[*]
[*]
[*]
[*]
[*]
[*]
遇到的问题:下载地址
1. spring 中 日志 中的 logback必须 保持一致 ,这里我使用 org.slf4j 1.6.4
Java代码
[*] org.slf4j
[*] slf4j-api
[*] 1.6.4
[*] jar
[*]
[*]
[*] org.slf4j
[*] slf4j-log4j12
[*] 1.6.4
[*] jar
[*]
页:
[1]