51qsx 发表于 2019-1-31 09:28:45

kafka spring 实例

使用定时器发送后结果如下






kafka 代码下载
  Java代码

[*]  15.安装kafka
[*]  cd /usr/local/
[*]  wget http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
[*]  tar xf kafka_2.10-0.10.0.0.tgz
[*]  ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
[*]  chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
[*]  chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka
[*]  
[*]  /usr/local/zookeeper/bin/zkCli.sh
[*]  create /kafka ''
[*]  
[*]  vim /usr/local/kafka/config/server.properties
[*]  broker.id=0
[*]  zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka
[*]  
[*]  scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev05.aoiplus.openpf:/usr/local/
[*]  scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev06.aoiplus.openpf:/usr/local/
[*]  
[*]  scp -r /usr/local/kafka/config/server.properties root@dev05.aoiplus.openpf:/usr/local/kafka/config/server.properties
[*]  scp -r /usr/local/kafka/config/server.properties root@dev06.aoiplus.openpf:/usr/local/kafka/config/server.properties
[*]  
[*]  master slave 启动
[*]  /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
[*]  创建topic
[*]  /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic
[*]  /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic
[*]  
[*]  
[*]  /usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic
[*]  
[*]  /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic


安装完成 后测试 下载
productor

consumer



spring 接受信息

代码部分
applicationContext-kafka-productor.xml
  Java代码 下载

[*]  
[*]  
[*]  
[*]        
[*]        
[*]        
[*]           
[*]        
[*]        
[*]           
[*]              
[*]                  3600000
[*]                  5
[*]                  kafka.serializer.StringEncoder
[*]                  1
[*]              
[*]           
[*]        
[*]  
[*]        
[*]  
[*]        
[*]           
[*]        
[*]  
[*]        
[*]           
[*]        
[*]        
[*]        
[*]     
[*]        
[*]           
[*]              
[*]           
[*]        
[*]  

applicationContext-kafka-consumer.xml
  Java代码

[*]  
[*]  
[*]  
[*]        
[*]        
[*]           
[*]        
[*]        
[*]        
[*]        
[*]        
[*]           
[*]        
[*]        
[*]        
[*]  
[*]        
[*]           
[*]              
[*]                  smallest
[*]                  10485760   
[*]                  5242880
[*]                  1000
[*]              
[*]           
[*]        
[*]        
[*]        
[*]        
[*]        
[*]  
[*]        
[*]           
[*]                 
[*]                    
[*]              
[*]           
[*]        
[*]  

KafkaConsumerService
  Java代码 下载

[*]  @Service
[*]  public class KafkaConsumerService {
[*]  
[*]        
[*]      public void process(Map msgs) {
[*]        for (Map.Entry entry : msgs.entrySet()) {
[*]              System.out.println("======================================Consumer Message received: ");
[*]              System.out.println("=====================================Suchit Topic:" + entry.getKey());
[*]              for (String msg : entry.getValue().values()) {
[*]                  System.out.println("================================Suchit Consumed Message: " + msg);
[*]              }
[*]        }
[*]      }
[*]  
[*]  }

KafkaProductorService
  Java代码

[*]  @Service
[*]  ublic class KafkaProductorService {
[*]  
[*]     
[*]     @Autowired
[*]     @Qualifier("pChannel")
[*]     private MessageChannel messageChannel;
[*]  
[*]        
[*]     public void sendInfo(String topic, Object obj) {
[*]         System.out.println("---Service:KafkaService------sendInfo------");   
[*]         messageChannel.send(MessageBuilder.withPayload(obj).setHeader(KafkaHeaders.TOPIC,topic).build());
[*]     }
[*]     

pom

  Java代码 下载

[*]  
[*]  4.0.0
[*]  com.curiousby.baoyou.cn
[*]  SpringKafkaDEMO
[*]  war
[*]  0.0.1-SNAPSHOT
[*]  SpringKafkaDEMO Maven Webapp
[*]  http://maven.apache.org
[*]     
[*]  
[*]        
[*]        
[*]        4.2.5.RELEASE
[*]        
[*]  
[*]        
[*]           
[*]           
[*]              junit
[*]              junit
[*]              4.7
[*]              jar
[*]              test
[*]           
[*]           
[*]              org.dbunit
[*]              dbunit
[*]              2.4.9
[*]              test
[*]           
[*]           
[*]              com.github.springtestdbunit
[*]              spring-test-dbunit
[*]              1.1.0
[*]              test
[*]           
[*]           
[*]              org.springframework
[*]              spring-test
[*]              ${spring.version}
[*]              test
[*]           
[*]           
[*]           
[*]           
[*]              javax.servlet
[*]              javax.servlet-api
[*]              3.1.0
[*]              provided
[*]           
[*]           
[*]              org.aspectj
[*]              aspectjrt
[*]              1.7.2
[*]           
[*]           
[*]              org.aspectj
[*]              aspectjweaver
[*]              1.7.2
[*]           
[*]           
[*]              org.springframework
[*]              spring-aspects
[*]              ${spring.version}
[*]              jar
[*]           
[*]           
[*]              org.springframework
[*]              spring-core
[*]              ${spring.version}
[*]           
[*]           
[*]              org.springframework
[*]              spring-web
[*]              ${spring.version}
[*]           
[*]           
[*]              org.springframework
[*]              spring-webmvc
[*]              ${spring.version}
[*]           
[*]           
[*]              org.springframework.integration
[*]              spring-integration-kafka
[*]              1.3.0.RELEASE
[*]           
[*]           
[*]              commons-logging
[*]              commons-logging
[*]              1.1.1
[*]           
[*]        
[*]              org.slf4j
[*]              slf4j-api
[*]              1.6.4
[*]              jar
[*]           
[*]           
[*]              org.slf4j
[*]              slf4j-log4j12
[*]              1.6.4
[*]              jar
[*]            下载
[*]           
[*]              javax
[*]              javaee-api
[*]              7.0
[*]           
[*]           
[*]              com.fasterxml.jackson.core
[*]              jackson-core
[*]              2.7.6
[*]           
[*]           
[*]              com.fasterxml.jackson.core
[*]              jackson-databind
[*]              2.7.6
[*]           
[*]           
[*]              com.fasterxml.jackson.core
[*]              jackson-annotations
[*]              2.7.6
[*]           
[*]        
[*]           
[*]              org.apache.avro
[*]              avro
[*]              1.7.7
[*]           
[*]  
[*]        
[*]        
[*]        SpringKafkaDEMO
[*]           
[*]              
[*]                  org.apache.maven.plugins
[*]                  maven-compiler-plugin
[*]                  3.3
[*]                    
[*]                       
[*]                          org.codehaus.plexus
[*]                          plexus-compiler-javac
[*]                          2.5
[*]                       
[*]                    
[*]                    
[*]                    1.7
[*]                    1.7
[*]                    UTF-8
[*]                       
[*]                          
[*]                          ${java.home}/lib/rt.jar:${java.home}/lib/jce.jar
[*]                       
[*]                    
[*]              
[*]           
[*]        
[*]  

遇到的问题:下载地址
1. spring 中 日志 中的 logback必须 保持一致   ,这里我使用 org.slf4j 1.6.4
  Java代码

[*]  org.slf4j
[*]              slf4j-api
[*]              1.6.4
[*]              jar
[*]           
[*]           
[*]              org.slf4j
[*]              slf4j-log4j12
[*]              1.6.4
[*]              jar
[*]           
  




页: [1]
查看完整版本: kafka spring 实例