main.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "strings"
  9. "time"
  10. "github.com/Shopify/sarama"
  11. "go.opentelemetry.io/otel/exporters/stdout"
  12. )
  13. var (
  14. brokers = flag.String("brokers", "localhost:9092", "The Kafka brokers to connect to, as a comma separated list")
  15. topic = flag.String("topic", "default_topic", "The Kafka topic to use")
  16. logger = log.New(os.Stdout, "[OTelInterceptor] ", log.LstdFlags)
  17. )
  18. func main() {
  19. flag.Parse()
  20. if *brokers == "" {
  21. logger.Fatalln("at least one broker is required")
  22. }
  23. splitBrokers := strings.Split(*brokers, ",")
  24. sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
  25. // oTel stdout example
  26. pusher, err := stdout.InstallNewPipeline([]stdout.Option{
  27. stdout.WithQuantiles([]float64{0.5, 0.9, 0.99}),
  28. }, nil)
  29. if err != nil {
  30. logger.Fatalf("failed to initialize stdout export pipeline: %v", err)
  31. }
  32. defer pusher.Stop()
  33. // simple sarama producer that adds a new producer interceptor
  34. conf := sarama.NewConfig()
  35. conf.Version = sarama.V0_11_0_0
  36. conf.Producer.Interceptors = []sarama.ProducerInterceptor{NewOTelInterceptor(splitBrokers)}
  37. producer, err := sarama.NewAsyncProducer(splitBrokers, conf)
  38. if err != nil {
  39. panic("Couldn't create a Kafka producer")
  40. }
  41. defer producer.AsyncClose()
  42. // kill -2, trap SIGINT to trigger a shutdown
  43. signals := make(chan os.Signal, 1)
  44. signal.Notify(signals, os.Interrupt)
  45. // ticker
  46. bulkSize := 2
  47. duration := 5 * time.Second
  48. ticker := time.NewTicker(duration)
  49. logger.Printf("Starting to produce %v messages every %v", bulkSize, duration)
  50. for {
  51. select {
  52. case t := <-ticker.C:
  53. now := t.Format(time.RFC3339)
  54. logger.Printf("\nproducing %v messages to topic %s at %s", bulkSize, *topic, now)
  55. for i := 0; i < bulkSize; i++ {
  56. producer.Input() <- &sarama.ProducerMessage{
  57. Topic: *topic, Key: nil,
  58. Value: sarama.StringEncoder(fmt.Sprintf("test message %v/%v from kafka-client-go-test at %s", i+1, bulkSize, now)),
  59. }
  60. }
  61. case <-signals:
  62. logger.Println("terminating the program")
  63. logger.Println("Bye :)")
  64. return
  65. }
  66. }
  67. }