zhangxinba 发表于 2018-9-19 12:45:30

golang kafka

  consumer.go
package main import (    "fmt"    "github.com/Shopify/sarama"    "log"    "os"    "strings"    "sync") var (    wg   sync.WaitGroup    logger = log.New(os.Stderr, "", log.LstdFlags)) func main() {   sarama.Logger = logger   consumer, err := sarama.NewConsumer(strings.Split("localhost:9092", ","), nil)    if err != nil {      logger.Println("Failed to start consumer: %s", err)    }   partitionList, err := consumer.Partitions("hello")    if err != nil {      logger.Println("Failed to get the list of partitions: ", err)    }   for partition := range partitionList {      pc, err := consumer.ConsumePartition("hello", int32(partition), sarama.OffsetNewest)      if err != nil {            logger.Printf("Failed to start consumer for partition %d: %s\n", partition, err)      }      defer pc.AsyncClose()         wg.Add(1)         go func(sarama.PartitionConsumer) {            defer wg.Done()            for msg := range pc.Messages() {                fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))                fmt.Println()            }      }(pc)    }   wg.Wait()   logger.Println("Done consuming topic hello")    consumer.Close()}  producer.go
package main import (    "github.com/Shopify/sarama"    "log"    "os"    "strings") var (    logger = log.New(os.Stderr, "", log.LstdFlags)) func main() {    sarama.Logger = logger   config := sarama.NewConfig()    config.Producer.RequiredAcks = sarama.WaitForAll    config.Producer.Partitioner = sarama.NewRandomPartitioner   msg := &sarama.ProducerMessage{}    msg.Topic = "hello"    msg.Partition = int32(-1)    msg.Key = sarama.StringEncoder("key")    msg.Value = sarama.ByteEncoder("你好, 世界!")   producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)    if err != nil {      logger.Println("Failed to produce message: %s", err)      os.Exit(500)    }    defer producer.Close()   partition, offset, err := producer.SendMessage(msg)    if err != nil {      logger.Println("Failed to produce message: ", err)    }    logger.Printf("partition=%d, offset=%d\n", partition, offset)}
页: [1]
查看完整版本: golang kafka