|
@@ -7,6 +7,7 @@ import (
|
|
|
"io/ioutil"
|
|
|
"log"
|
|
|
"os"
|
|
|
+ "os/signal"
|
|
|
"strings"
|
|
|
|
|
|
"github.com/Shopify/sarama"
|
|
@@ -27,6 +28,8 @@ var (
|
|
|
caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
|
|
|
verifySSL = flag.Bool("verify", false, "Optional verify ssl certificates chain")
|
|
|
useTLS = flag.Bool("tls", false, "Use TLS to communicate with the cluster")
|
|
|
+ mode = flag.String("mode", "produce", "Mode to run in: \"produce\" to produce, \"consume\" to consume")
|
|
|
+ logMsg = flag.Bool("logmsg", false, "True to log consumed messages to console")
|
|
|
|
|
|
logger = log.New(os.Stdout, "[Producer] ", log.LstdFlags)
|
|
|
)
|
|
@@ -62,8 +65,9 @@ func main() {
|
|
|
flag.Parse()
|
|
|
|
|
|
if *brokers == "" {
|
|
|
- log.Fatalln("at least one brocker is required")
|
|
|
+ log.Fatalln("at least one broker is required")
|
|
|
}
|
|
|
+ splitBrokers := strings.Split(*brokers, ",")
|
|
|
|
|
|
if *userName == "" {
|
|
|
log.Fatalln("SASL username is required")
|
|
@@ -101,18 +105,66 @@ func main() {
|
|
|
conf.Net.TLS.Config = createTLSConfiguration()
|
|
|
}
|
|
|
|
|
|
- syncProcuder, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), conf)
|
|
|
- if err != nil {
|
|
|
- logger.Fatalln("failed to create producer: ", err)
|
|
|
- }
|
|
|
- partition, offset, err := syncProcuder.SendMessage(&sarama.ProducerMessage{
|
|
|
- Topic: *topic,
|
|
|
- Value: sarama.StringEncoder("test_message"),
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- logger.Fatalln("failed to send message to ", *topic, err)
|
|
|
+ if *mode == "consume" {
|
|
|
+ consumer, err := sarama.NewConsumer(splitBrokers, conf)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+ log.Println("consumer created")
|
|
|
+ defer func() {
|
|
|
+ if err := consumer.Close(); err != nil {
|
|
|
+ log.Fatalln(err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ log.Println("commence consuming")
|
|
|
+ partitionConsumer, err := consumer.ConsumePartition(*topic, 0, sarama.OffsetOldest)
|
|
|
+ if err != nil {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ defer func() {
|
|
|
+ if err := partitionConsumer.Close(); err != nil {
|
|
|
+ log.Fatalln(err)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+
|
|
|
+ signals := make(chan os.Signal, 1)
|
|
|
+ signal.Notify(signals, os.Interrupt)
|
|
|
+
|
|
|
+ consumed := 0
|
|
|
+ ConsumerLoop:
|
|
|
+ for {
|
|
|
+ log.Println("in the for")
|
|
|
+ select {
|
|
|
+ case msg := <-partitionConsumer.Messages():
|
|
|
+ log.Printf("Consumed message offset %d\n", msg.Offset)
|
|
|
+ if *logMsg {
|
|
|
+ log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value)
|
|
|
+ }
|
|
|
+ consumed++
|
|
|
+ case <-signals:
|
|
|
+ break ConsumerLoop
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.Printf("Consumed: %d\n", consumed)
|
|
|
+
|
|
|
+ } else {
|
|
|
+ syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
|
|
|
+ if err != nil {
|
|
|
+ logger.Fatalln("failed to create producer: ", err)
|
|
|
+ }
|
|
|
+ partition, offset, err := syncProducer.SendMessage(&sarama.ProducerMessage{
|
|
|
+ Topic: *topic,
|
|
|
+ Value: sarama.StringEncoder("test_message"),
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ logger.Fatalln("failed to send message to ", *topic, err)
|
|
|
+ }
|
|
|
+ logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
|
|
|
+ _ = syncProducer.Close()
|
|
|
}
|
|
|
- logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
|
|
|
- _ = syncProcuder.Close()
|
|
|
logger.Println("Bye now !")
|
|
|
+
|
|
|
}
|