Example of simple client:

package main

import (
	"fmt"
	"log"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	broker := "kafka:9092"
	topic := "my_topic"

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": broker,
		"group.id":          "myGroup",
		"auto.offset.reset": "latest",
	})
	if err != nil {
		panic(err)
	}
	defer c.Close()

	err = c.Subscribe(topic, nil)
	if err != nil {
		panic(err)
	}

	count := 0;

	for {
		msg, err := c.ReadMessage(-1) 
		if err != nil {
			
			fmt.Printf("Consumer error: %v\n", err)
			continue
		}
		
                count++
                if count%100000 == 0 {
                        log.Printf("Received %d messages (%s)\n", count, msg.Key)
                }
	}
}

Leave a Reply

Your email address will not be published. Required fields are marked *