main.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package main
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "flag"
  6. "io/ioutil"
  7. "log"
  8. "os"
  9. "os/signal"
  10. "strings"
  11. "github.com/Shopify/sarama"
  12. )
  13. func init() {
  14. sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
  15. }
  16. var (
  17. brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
  18. userName = flag.String("username", "", "The SASL username")
  19. passwd = flag.String("passwd", "", "The SASL password")
  20. algorithm = flag.String("algorithm", "", "The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism")
  21. topic = flag.String("topic", "default_topic", "The Kafka topic to use")
  22. certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
  23. keyFile = flag.String("key", "", "The optional key file for client authentication")
  24. caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
  25. verifySSL = flag.Bool("verify", false, "Optional verify ssl certificates chain")
  26. useTLS = flag.Bool("tls", false, "Use TLS to communicate with the cluster")
  27. mode = flag.String("mode", "produce", "Mode to run in: \"produce\" to produce, \"consume\" to consume")
  28. logMsg = flag.Bool("logmsg", false, "True to log consumed messages to console")
  29. logger = log.New(os.Stdout, "[Producer] ", log.LstdFlags)
  30. )
  31. func createTLSConfiguration() (t *tls.Config) {
  32. t = &tls.Config{
  33. InsecureSkipVerify: *verifySSL,
  34. }
  35. if *certFile != "" && *keyFile != "" && *caFile != "" {
  36. cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
  37. if err != nil {
  38. log.Fatal(err)
  39. }
  40. caCert, err := ioutil.ReadFile(*caFile)
  41. if err != nil {
  42. log.Fatal(err)
  43. }
  44. caCertPool := x509.NewCertPool()
  45. caCertPool.AppendCertsFromPEM(caCert)
  46. t = &tls.Config{
  47. Certificates: []tls.Certificate{cert},
  48. RootCAs: caCertPool,
  49. InsecureSkipVerify: *verifySSL,
  50. }
  51. }
  52. return t
  53. }
  54. func main() {
  55. flag.Parse()
  56. if *brokers == "" {
  57. log.Fatalln("at least one broker is required")
  58. }
  59. splitBrokers := strings.Split(*brokers, ",")
  60. if *userName == "" {
  61. log.Fatalln("SASL username is required")
  62. }
  63. if *passwd == "" {
  64. log.Fatalln("SASL password is required")
  65. }
  66. conf := sarama.NewConfig()
  67. conf.Producer.Retry.Max = 1
  68. conf.Producer.RequiredAcks = sarama.WaitForAll
  69. conf.Producer.Return.Successes = true
  70. conf.Metadata.Full = true
  71. conf.Version = sarama.V0_10_0_0
  72. conf.ClientID = "sasl_scram_client"
  73. conf.Metadata.Full = true
  74. conf.Net.SASL.Enable = true
  75. conf.Net.SASL.User = *userName
  76. conf.Net.SASL.Password = *passwd
  77. conf.Net.SASL.Handshake = true
  78. if *algorithm == "sha512" {
  79. conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
  80. conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
  81. } else if *algorithm == "sha256" {
  82. conf.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
  83. conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
  84. } else {
  85. log.Fatalf("invalid SHA algorithm \"%s\": can be either \"sha256\" or \"sha512\"", *algorithm)
  86. }
  87. if *useTLS {
  88. conf.Net.TLS.Enable = true
  89. conf.Net.TLS.Config = createTLSConfiguration()
  90. }
  91. if *mode == "consume" {
  92. consumer, err := sarama.NewConsumer(splitBrokers, conf)
  93. if err != nil {
  94. panic(err)
  95. }
  96. log.Println("consumer created")
  97. defer func() {
  98. if err := consumer.Close(); err != nil {
  99. log.Fatalln(err)
  100. }
  101. }()
  102. log.Println("commence consuming")
  103. partitionConsumer, err := consumer.ConsumePartition(*topic, 0, sarama.OffsetOldest)
  104. if err != nil {
  105. panic(err)
  106. }
  107. defer func() {
  108. if err := partitionConsumer.Close(); err != nil {
  109. log.Fatalln(err)
  110. }
  111. }()
  112. // Trap SIGINT to trigger a shutdown.
  113. signals := make(chan os.Signal, 1)
  114. signal.Notify(signals, os.Interrupt)
  115. consumed := 0
  116. ConsumerLoop:
  117. for {
  118. log.Println("in the for")
  119. select {
  120. case msg := <-partitionConsumer.Messages():
  121. log.Printf("Consumed message offset %d\n", msg.Offset)
  122. if *logMsg {
  123. log.Printf("KEY: %s VALUE: %s", msg.Key, msg.Value)
  124. }
  125. consumed++
  126. case <-signals:
  127. break ConsumerLoop
  128. }
  129. }
  130. log.Printf("Consumed: %d\n", consumed)
  131. } else {
  132. syncProducer, err := sarama.NewSyncProducer(splitBrokers, conf)
  133. if err != nil {
  134. logger.Fatalln("failed to create producer: ", err)
  135. }
  136. partition, offset, err := syncProducer.SendMessage(&sarama.ProducerMessage{
  137. Topic: *topic,
  138. Value: sarama.StringEncoder("test_message"),
  139. })
  140. if err != nil {
  141. logger.Fatalln("failed to send message to ", *topic, err)
  142. }
  143. logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
  144. _ = syncProducer.Close()
  145. }
  146. logger.Println("Bye now !")
  147. }