Browse Source

Add version check for headers

Evan Huus 7 years ago
parent
commit
6d9cc201b5
1 changed files with 3 additions and 0 deletions
  1. 3 0
      async_producer.go

+ 3 - 0
async_producer.go

@@ -271,6 +271,9 @@ func (p *asyncProducer) dispatcher() {
 		version := 1
 		if p.conf.Version.IsAtLeast(V0_11_0_0) {
 			version = 2
+		} else if msg.Headers != nil {
+			p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
+			continue
 		}
 		if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
 			p.returnError(msg, ErrMessageSizeTooLarge)