main.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package main
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "flag"
  6. "io/ioutil"
  7. "log"
  8. "os"
  9. "strings"
  10. "github.com/Shopify/sarama"
  11. )
  12. func init() {
  13. sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
  14. }
  15. var (
  16. brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
  17. userName = flag.String("username", "", "The SASL username")
  18. passwd = flag.String("passwd", "", "The SASL password")
  19. algorithm = flag.String("algorithm", "", "The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism")
  20. topic = flag.String("topic", "default_topic", "The Kafka topic to use")
  21. certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
  22. keyFile = flag.String("key", "", "The optional key file for client authentication")
  23. caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
  24. verifySSL = flag.Bool("verify", false, "Optional verify ssl certificates chain")
  25. useTLS = flag.Bool("tls", false, "Use TLS to communicate with the cluster")
  26. logger = log.New(os.Stdout, "[Producer] ", log.LstdFlags)
  27. )
  28. func createTLSConfiguration() (t *tls.Config) {
  29. t = &tls.Config{
  30. InsecureSkipVerify: *verifySSL,
  31. }
  32. if *certFile != "" && *keyFile != "" && *caFile != "" {
  33. cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
  34. if err != nil {
  35. log.Fatal(err)
  36. }
  37. caCert, err := ioutil.ReadFile(*caFile)
  38. if err != nil {
  39. log.Fatal(err)
  40. }
  41. caCertPool := x509.NewCertPool()
  42. caCertPool.AppendCertsFromPEM(caCert)
  43. t = &tls.Config{
  44. Certificates: []tls.Certificate{cert},
  45. RootCAs: caCertPool,
  46. InsecureSkipVerify: *verifySSL,
  47. }
  48. }
  49. return t
  50. }
  51. func main() {
  52. flag.Parse()
  53. if *brokers == "" {
  54. log.Fatalln("at least one brocker is required")
  55. }
  56. if *userName == "" {
  57. log.Fatalln("SASL username is required")
  58. }
  59. if *passwd == "" {
  60. log.Fatalln("SASL password is required")
  61. }
  62. conf := sarama.NewConfig()
  63. conf.Producer.Retry.Max = 1
  64. conf.Producer.RequiredAcks = sarama.WaitForAll
  65. conf.Producer.Return.Successes = true
  66. conf.Metadata.Full = true
  67. conf.Version = sarama.V0_10_0_0
  68. conf.ClientID = "sasl_scram_client"
  69. conf.Metadata.Full = true
  70. conf.Net.SASL.Enable = true
  71. conf.Net.SASL.User = *userName
  72. conf.Net.SASL.Password = *passwd
  73. conf.Net.SASL.Handshake = true
  74. if *algorithm == "sha512" {
  75. conf.Net.SASL.SCRAMClient = &XDGSCRAMClient{HashGeneratorFcn: SHA512}
  76. conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
  77. } else if *algorithm == "sha256" {
  78. conf.Net.SASL.SCRAMClient = &XDGSCRAMClient{HashGeneratorFcn: SHA256}
  79. conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
  80. } else {
  81. log.Fatalf("invalid SHA algorithm \"%s\": can be either \"sha256\" or \"sha512\"", *algorithm)
  82. }
  83. if *useTLS {
  84. conf.Net.TLS.Enable = true
  85. conf.Net.TLS.Config = createTLSConfiguration()
  86. }
  87. syncProcuder, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), conf)
  88. if err != nil {
  89. logger.Fatalln("failed to create producer: ", err)
  90. }
  91. partition, offset, err := syncProcuder.SendMessage(&sarama.ProducerMessage{
  92. Topic: *topic,
  93. Value: sarama.StringEncoder("test_message"),
  94. })
  95. if err != nil {
  96. logger.Fatalln("failed to send message to ", *topic, err)
  97. }
  98. logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
  99. _ = syncProcuder.Close()
  100. logger.Println("Bye now !")
  101. }