trace_interceptor.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package main
  2. import (
  3. "context"
  4. "strings"
  5. "github.com/Shopify/sarama"
  6. "go.opentelemetry.io/otel/api/global"
  7. "go.opentelemetry.io/otel/api/kv"
  8. "go.opentelemetry.io/otel/api/trace"
  9. )
  10. type OTelInterceptor struct {
  11. tracer trace.Tracer
  12. fixedAttrs []kv.KeyValue
  13. }
  14. // NewOTelInterceptor processes span for intercepted messages and add some
  15. // headers with the span data.
  16. func NewOTelInterceptor(brokers []string) *OTelInterceptor {
  17. oi := OTelInterceptor{}
  18. oi.tracer = global.TraceProvider().Tracer("shopify.com/sarama/examples/interceptors")
  19. // These are based on the spec, which was reachable as of 2020-05-15
  20. // https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md
  21. oi.fixedAttrs = []kv.KeyValue{
  22. kv.String("messaging.destination_kind", "topic"),
  23. kv.String("span.otel.kind", "PRODUCER"),
  24. kv.String("messaging.system", "kafka"),
  25. kv.String("net.transport", "IP.TCP"),
  26. kv.String("messaging.url", strings.Join(brokers, ",")),
  27. }
  28. return &oi
  29. }
  30. const (
  31. MessageIDHeaderName = "message_id"
  32. SpanHeaderName = "span_id"
  33. TraceHeaderName = "trace_id"
  34. )
  35. func shouldIgnoreMsg(msg *sarama.ProducerMessage) bool {
  36. // check message hasn't been here before (retries)
  37. var traceFound, spanFound, msgIDFound bool
  38. for _, h := range msg.Headers {
  39. if string(h.Key) == TraceHeaderName {
  40. traceFound = true
  41. continue
  42. }
  43. if string(h.Key) == SpanHeaderName {
  44. spanFound = true
  45. continue
  46. }
  47. if string(h.Key) == MessageIDHeaderName {
  48. msgIDFound = true
  49. }
  50. }
  51. return traceFound && spanFound && msgIDFound
  52. }
  53. func (oi *OTelInterceptor) OnSend(msg *sarama.ProducerMessage) {
  54. if shouldIgnoreMsg(msg) {
  55. return
  56. }
  57. _ = oi.tracer.WithSpan(context.TODO(), msg.Topic,
  58. func(ctx context.Context) error {
  59. span := trace.SpanFromContext(ctx)
  60. spanContext := span.SpanContext()
  61. attWithTopic := append(
  62. oi.fixedAttrs,
  63. kv.String("messaging.destination", msg.Topic),
  64. kv.String("messaging.message_id", spanContext.SpanID.String()),
  65. )
  66. span.SetAttributes(attWithTopic...)
  67. // remove existing partial tracing headers if exists
  68. noTraceHeaders := msg.Headers[:0]
  69. for _, h := range msg.Headers {
  70. key := string(h.Key)
  71. if key != TraceHeaderName && key != SpanHeaderName && key != MessageIDHeaderName {
  72. noTraceHeaders = append(noTraceHeaders, h)
  73. }
  74. }
  75. traceHeaders := []sarama.RecordHeader{
  76. {Key: []byte(TraceHeaderName), Value: []byte(spanContext.TraceID.String())},
  77. {Key: []byte(SpanHeaderName), Value: []byte(spanContext.SpanID.String())},
  78. {Key: []byte(MessageIDHeaderName), Value: []byte(spanContext.SpanID.String())},
  79. }
  80. msg.Headers = append(noTraceHeaders, traceHeaders...)
  81. return nil
  82. })
  83. }