1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- package main
- import (
- "context"
- "strings"
- "github.com/Shopify/sarama"
- "go.opentelemetry.io/otel/api/global"
- "go.opentelemetry.io/otel/api/kv"
- "go.opentelemetry.io/otel/api/trace"
- )
- type OTelInterceptor struct {
- tracer trace.Tracer
- fixedAttrs []kv.KeyValue
- }
- // NewOTelInterceptor processes span for intercepted messages and add some
- // headers with the span data.
- func NewOTelInterceptor(brokers []string) *OTelInterceptor {
- oi := OTelInterceptor{}
- oi.tracer = global.TraceProvider().Tracer("shopify.com/sarama/examples/interceptors")
- // These are based on the spec, which was reachable as of 2020-05-15
- // https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md
- oi.fixedAttrs = []kv.KeyValue{
- kv.String("messaging.destination_kind", "topic"),
- kv.String("span.otel.kind", "PRODUCER"),
- kv.String("messaging.system", "kafka"),
- kv.String("net.transport", "IP.TCP"),
- kv.String("messaging.url", strings.Join(brokers, ",")),
- }
- return &oi
- }
- const (
- MessageIDHeaderName = "message_id"
- SpanHeaderName = "span_id"
- TraceHeaderName = "trace_id"
- )
- func shouldIgnoreMsg(msg *sarama.ProducerMessage) bool {
- // check message hasn't been here before (retries)
- var traceFound, spanFound, msgIDFound bool
- for _, h := range msg.Headers {
- if string(h.Key) == TraceHeaderName {
- traceFound = true
- continue
- }
- if string(h.Key) == SpanHeaderName {
- spanFound = true
- continue
- }
- if string(h.Key) == MessageIDHeaderName {
- msgIDFound = true
- }
- }
- return traceFound && spanFound && msgIDFound
- }
- func (oi *OTelInterceptor) OnSend(msg *sarama.ProducerMessage) {
- if shouldIgnoreMsg(msg) {
- return
- }
- _ = oi.tracer.WithSpan(context.TODO(), msg.Topic,
- func(ctx context.Context) error {
- span := trace.SpanFromContext(ctx)
- spanContext := span.SpanContext()
- attWithTopic := append(
- oi.fixedAttrs,
- kv.String("messaging.destination", msg.Topic),
- kv.String("messaging.message_id", spanContext.SpanID.String()),
- )
- span.SetAttributes(attWithTopic...)
- // remove existing partial tracing headers if exists
- noTraceHeaders := msg.Headers[:0]
- for _, h := range msg.Headers {
- key := string(h.Key)
- if key != TraceHeaderName && key != SpanHeaderName && key != MessageIDHeaderName {
- noTraceHeaders = append(noTraceHeaders, h)
- }
- }
- traceHeaders := []sarama.RecordHeader{
- {Key: []byte(TraceHeaderName), Value: []byte(spanContext.TraceID.String())},
- {Key: []byte(SpanHeaderName), Value: []byte(spanContext.SpanID.String())},
- {Key: []byte(MessageIDHeaderName), Value: []byte(spanContext.SpanID.String())},
- }
- msg.Headers = append(noTraceHeaders, traceHeaders...)
- return nil
- })
- }
|