|
|
@@ -54,7 +54,7 @@ func main() {
|
|
|
|
|
|
version, err := sarama.ParseKafkaVersion(version)
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ log.Panicf("Error parsing Kafka version: %v", err)
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -78,7 +78,7 @@ func main() {
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ log.Panicf("Error creating consumer group client: %v", err)
|
|
|
}
|
|
|
|
|
|
wg := &sync.WaitGroup{}
|
|
|
@@ -86,9 +86,8 @@ func main() {
|
|
|
wg.Add(1)
|
|
|
defer wg.Done()
|
|
|
for {
|
|
|
- err := client.Consume(ctx, strings.Split(topics, ","), &consumer)
|
|
|
- if err != nil {
|
|
|
- panic(err)
|
|
|
+ if err := client.Consume(ctx, strings.Split(topics, ","), &consumer); err != nil {
|
|
|
+ log.Panicf("Error from consumer: %v", err)
|
|
|
}
|
|
|
// check if context was cancelled, signaling that the consumer should stop
|
|
|
if ctx.Err() != nil {
|
|
|
@@ -112,7 +111,7 @@ func main() {
|
|
|
cancel()
|
|
|
wg.Wait()
|
|
|
if err = client.Close(); err != nil {
|
|
|
- panic(err)
|
|
|
+ log.Panicf("Error closing client: %v", err)
|
|
|
}
|
|
|
}
|
|
|
|