|
@@ -6,6 +6,7 @@ import (
|
|
|
"log"
|
|
|
"os"
|
|
|
"os/signal"
|
|
|
+ "strconv"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
|
|
@@ -15,6 +16,7 @@ import (
|
|
|
var (
|
|
|
brokerList = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The comma separated list of brokers in the Kafka cluster")
|
|
|
topic = flag.String("topic", "", "REQUIRED: the topic to consume")
|
|
|
+ partitions = flag.String("partitions", "all", "The partitions to consume, can be 'all' or comma-separated numbers")
|
|
|
offset = flag.String("offset", "newest", "The offset to start with. Can be `oldest`, `newest`")
|
|
|
verbose = flag.Bool("verbose", false, "Whether to turn on sarama logging")
|
|
|
bufferSize = flag.Int("buffer-size", 256, "The buffer size of the message channel.")
|
|
@@ -52,7 +54,7 @@ func main() {
|
|
|
printErrorAndExit(69, "Failed to start consumer: %s", err)
|
|
|
}
|
|
|
|
|
|
- partitions, err := c.Partitions(*topic)
|
|
|
+ partitionList, err := getPartitions(c)
|
|
|
if err != nil {
|
|
|
printErrorAndExit(69, "Failed to get the list of partitions: %s", err)
|
|
|
}
|
|
@@ -71,7 +73,7 @@ func main() {
|
|
|
close(closing)
|
|
|
}()
|
|
|
|
|
|
- for _, partition := range partitions {
|
|
|
+ for _, partition := range partitionList {
|
|
|
pc, err := c.ConsumePartition(*topic, partition, initialOffset)
|
|
|
if err != nil {
|
|
|
printErrorAndExit(69, "Failed to start consumer for partition %d: %s", partition, err)
|
|
@@ -110,6 +112,24 @@ func main() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func getPartitions(c sarama.Consumer) ([]int32, error) {
|
|
|
+ if *partitions == "all" {
|
|
|
+ return c.Partitions(*topic)
|
|
|
+ }
|
|
|
+
|
|
|
+ tmp := strings.Split(*partitions, ",")
|
|
|
+ var pList []int32
|
|
|
+ for i := range tmp {
|
|
|
+ val, err := strconv.ParseInt(tmp[i], 10, 32)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ pList = append(pList, int32(val))
|
|
|
+ }
|
|
|
+
|
|
|
+ return pList, nil
|
|
|
+}
|
|
|
+
|
|
|
func printErrorAndExit(code int, format string, values ...interface{}) {
|
|
|
fmt.Fprintf(os.Stderr, "ERROR: %s\n", fmt.Sprintf(format, values...))
|
|
|
fmt.Fprintln(os.Stderr)
|