Example of simple client:
package main
import (
"fmt"
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
broker := "kafka:9092"
topic := "my_topic"
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"group.id": "myGroup",
"auto.offset.reset": "latest",
})
if err != nil {
panic(err)
}
defer c.Close()
err = c.Subscribe(topic, nil)
if err != nil {
panic(err)
}
count := 0;
for {
msg, err := c.ReadMessage(-1)
if err != nil {
fmt.Printf("Consumer error: %v\n", err)
continue
}
count++
if count%100000 == 0 {
log.Printf("Received %d messages (%s)\n", count, msg.Key)
}
}
}