package sarama // ProducerInterceptor allows you to intercept (and possibly mutate) the records // received by the producer before they are published to the Kafka cluster. // https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation type ProducerInterceptor interface { // OnSend is called when the producer message is intercepted. Please avoid // modifying the message until it's safe to do so, as this is _not_ a copy // of the message. OnSend(*ProducerMessage) } // ConsumerInterceptor allows you to intercept (and possibly mutate) the records // received by the consumer before they are sent to the messages channel. // https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation type ConsumerInterceptor interface { // OnConsume is called when the consumed message is intercepted. Please // avoid modifying the message until it's safe to do so, as this is _not_ a // copy of the message. OnConsume(*ConsumerMessage) } func (msg *ProducerMessage) safelyApplyInterceptor(interceptor ProducerInterceptor) { defer func() { if r := recover(); r != nil { Logger.Printf("Error when calling producer interceptor: %s, %w\n", interceptor, r) } }() interceptor.OnSend(msg) } func (msg *ConsumerMessage) safelyApplyInterceptor(interceptor ConsumerInterceptor) { defer func() { if r := recover(); r != nil { Logger.Printf("Error when calling consumer interceptor: %s, %w\n", interceptor, r) } }() interceptor.OnConsume(msg) }