浏览代码

Do not modify ProducerMessage.MetaData during the whole publishing process.

tianji 6 年之前
父节点
当前提交
1e2740380a
共有 2 个文件被更改,包括 7 次插入21 次删除
  1. 3 2
      async_producer.go
  2. 4 19
      sync_producer.go

+ 3 - 2
async_producer.go

@@ -145,8 +145,9 @@ type ProducerMessage struct {
 	// least version 0.10.0.
 	Timestamp time.Time
 
-	retries int
-	flags   flagSet
+	retries     int
+	flags       flagSet
+	expectation chan *ProducerError
 }
 
 const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.

+ 4 - 19
sync_producer.go

@@ -90,13 +90,8 @@ func verifyProducerConfig(config *Config) error {
 }
 
 func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
-	oldMetadata := msg.Metadata
-	defer func() {
-		msg.Metadata = oldMetadata
-	}()
-
 	expectation := make(chan *ProducerError, 1)
-	msg.Metadata = expectation
+	msg.expectation = expectation
 	sp.producer.Input() <- msg
 
 	if err := <-expectation; err != nil {
@@ -107,21 +102,11 @@ func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offs
 }
 
 func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
-	savedMetadata := make([]interface{}, len(msgs))
-	for i := range msgs {
-		savedMetadata[i] = msgs[i].Metadata
-	}
-	defer func() {
-		for i := range msgs {
-			msgs[i].Metadata = savedMetadata[i]
-		}
-	}()
-
 	expectations := make(chan chan *ProducerError, len(msgs))
 	go func() {
 		for _, msg := range msgs {
 			expectation := make(chan *ProducerError, 1)
-			msg.Metadata = expectation
+			msg.expectation = expectation
 			sp.producer.Input() <- msg
 			expectations <- expectation
 		}
@@ -144,7 +129,7 @@ func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
 func (sp *syncProducer) handleSuccesses() {
 	defer sp.wg.Done()
 	for msg := range sp.producer.Successes() {
-		expectation := msg.Metadata.(chan *ProducerError)
+		expectation := msg.expectation
 		expectation <- nil
 	}
 }
@@ -152,7 +137,7 @@ func (sp *syncProducer) handleSuccesses() {
 func (sp *syncProducer) handleErrors() {
 	defer sp.wg.Done()
 	for err := range sp.producer.Errors() {
-		expectation := err.Msg.Metadata.(chan *ProducerError)
+		expectation := err.Msg.expectation
 		expectation <- err
 	}
 }