interceptors.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. package sarama
  2. // ProducerInterceptor allows you to intercept (and possibly mutate) the records
  3. // received by the producer before they are published to the Kafka cluster.
  4. // https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation
  5. type ProducerInterceptor interface {
  6. // OnSend is called when the producer message is intercepted. Please avoid
  7. // modifying the message until it's safe to do so, as this is _not_ a copy
  8. // of the message.
  9. OnSend(*ProducerMessage)
  10. }
  11. // ConsumerInterceptor allows you to intercept (and possibly mutate) the records
  12. // received by the consumer before they are sent to the messages channel.
  13. // https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation
  14. type ConsumerInterceptor interface {
  15. // OnConsume is called when the consumed message is intercepted. Please
  16. // avoid modifying the message until it's safe to do so, as this is _not_ a
  17. // copy of the message.
  18. OnConsume(*ConsumerMessage)
  19. }
  20. func (msg *ProducerMessage) safelyApplyInterceptor(interceptor ProducerInterceptor) {
  21. defer func() {
  22. if r := recover(); r != nil {
  23. Logger.Printf("Error when calling producer interceptor: %s, %w\n", interceptor, r)
  24. }
  25. }()
  26. interceptor.OnSend(msg)
  27. }
  28. func (msg *ConsumerMessage) safelyApplyInterceptor(interceptor ConsumerInterceptor) {
  29. defer func() {
  30. if r := recover(); r != nil {
  31. Logger.Printf("Error when calling consumer interceptor: %s, %w\n", interceptor, r)
  32. }
  33. }()
  34. interceptor.OnConsume(msg)
  35. }