Bladeren bron

Merge pull request #562 from Shopify/producer-syn-fin

producer: explicitly register partitions
Evan Huus 10 jaren geleden
bovenliggende
commit
b8b4670075
1 gewijzigde bestanden met toevoegingen van 30 en 20 verwijderingen
  1. 30 20
      async_producer.go

+ 30 - 20
async_producer.go

@@ -105,7 +105,8 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
 type flagSet int8
 type flagSet int8
 
 
 const (
 const (
-	chaser   flagSet = 1 << iota // message is last in a group that failed
+	syn      flagSet = 1 << iota // first message from partitionProducer to brokerProducer
+	fin                          // final message from partitionProducer to brokerProducer and back
 	shutdown                     // start the shutdown process
 	shutdown                     // start the shutdown process
 )
 )
 
 
@@ -365,7 +366,7 @@ type partitionProducer struct {
 
 
 	// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
 	// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
 	// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
 	// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
-	// retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and
+	// retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
 	// therefore whether our buffer is complete and safe to flush)
 	// therefore whether our buffer is complete and safe to flush)
 	highWatermark int
 	highWatermark int
 	retryState    []partitionRetryState
 	retryState    []partitionRetryState
@@ -397,6 +398,8 @@ func (pp *partitionProducer) dispatch() {
 	pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
 	pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
 	if pp.leader != nil {
 	if pp.leader != nil {
 		pp.output = pp.parent.getBrokerProducer(pp.leader)
 		pp.output = pp.parent.getBrokerProducer(pp.leader)
+		pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
+		pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
 	}
 	}
 
 
 	for msg := range pp.input {
 	for msg := range pp.input {
@@ -407,20 +410,20 @@ func (pp *partitionProducer) dispatch() {
 		} else if pp.highWatermark > 0 {
 		} else if pp.highWatermark > 0 {
 			// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
 			// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
 			if msg.retries < pp.highWatermark {
 			if msg.retries < pp.highWatermark {
-				// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
-				if msg.flags&chaser == chaser {
+				// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
+				if msg.flags&fin == fin {
 					pp.retryState[msg.retries].expectChaser = false
 					pp.retryState[msg.retries].expectChaser = false
-					pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
+					pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
 				} else {
 				} else {
 					pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
 					pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
 				}
 				}
 				continue
 				continue
-			} else if msg.flags&chaser == chaser {
-				// this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
+			} else if msg.flags&fin == fin {
+				// this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
 				// meaning this retry level is done and we can go down (at least) one level and flush that
 				// meaning this retry level is done and we can go down (at least) one level and flush that
 				pp.retryState[pp.highWatermark].expectChaser = false
 				pp.retryState[pp.highWatermark].expectChaser = false
 				pp.flushRetryBuffers()
 				pp.flushRetryBuffers()
-				pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
+				pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
 				continue
 				continue
 			}
 			}
 		}
 		}
@@ -449,11 +452,11 @@ func (pp *partitionProducer) newHighWatermark(hwm int) {
 	Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
 	Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
 	pp.highWatermark = hwm
 	pp.highWatermark = hwm
 
 
-	// send off a chaser so that we know when everything "in between" has made it
+	// send off a fin so that we know when everything "in between" has made it
 	// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
 	// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
 	pp.retryState[pp.highWatermark].expectChaser = true
 	pp.retryState[pp.highWatermark].expectChaser = true
-	pp.parent.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight
-	pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: chaser, retries: pp.highWatermark - 1}
+	pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
+	pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
 
 
 	// a new HWM means that our current broker selection is out of date
 	// a new HWM means that our current broker selection is out of date
 	Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
 	Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
@@ -501,6 +504,9 @@ func (pp *partitionProducer) updateLeader() error {
 		}
 		}
 
 
 		pp.output = pp.parent.getBrokerProducer(pp.leader)
 		pp.output = pp.parent.getBrokerProducer(pp.leader)
+		pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
+		pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
+
 		return nil
 		return nil
 	})
 	})
 }
 }
@@ -578,13 +584,24 @@ func (bp *brokerProducer) run() {
 				goto shutdown
 				goto shutdown
 			}
 			}
 
 
+			if msg.flags&syn == syn {
+				Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
+					bp.broker.ID(), msg.Topic, msg.Partition)
+				if bp.currentRetries[msg.Topic] == nil {
+					bp.currentRetries[msg.Topic] = make(map[int32]error)
+				}
+				bp.currentRetries[msg.Topic][msg.Partition] = nil
+				bp.parent.inFlight.Done()
+				continue
+			}
+
 			if reason := bp.needsRetry(msg); reason != nil {
 			if reason := bp.needsRetry(msg); reason != nil {
 				bp.parent.retryMessage(msg, reason)
 				bp.parent.retryMessage(msg, reason)
 
 
-				if bp.closing == nil && msg.flags&chaser == chaser {
+				if bp.closing == nil && msg.flags&fin == fin {
 					// we were retrying this partition but we can start processing again
 					// we were retrying this partition but we can start processing again
 					delete(bp.currentRetries[msg.Topic], msg.Partition)
 					delete(bp.currentRetries[msg.Topic], msg.Partition)
-					Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n",
+					Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
 						bp.broker.ID(), msg.Topic, msg.Partition)
 						bp.broker.ID(), msg.Topic, msg.Partition)
 				}
 				}
 
 
@@ -643,10 +660,6 @@ func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
 		return bp.closing
 		return bp.closing
 	}
 	}
 
 
-	if bp.currentRetries[msg.Topic] == nil {
-		return nil
-	}
-
 	return bp.currentRetries[msg.Topic][msg.Partition]
 	return bp.currentRetries[msg.Topic][msg.Partition]
 }
 }
 
 
@@ -716,9 +729,6 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 			Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
 			Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
 				bp.broker.ID(), topic, partition, block.Err)
 				bp.broker.ID(), topic, partition, block.Err)
-			if bp.currentRetries[topic] == nil {
-				bp.currentRetries[topic] = make(map[int32]error)
-			}
 			bp.currentRetries[topic][partition] = block.Err
 			bp.currentRetries[topic][partition] = block.Err
 			bp.parent.retryMessages(msgs, block.Err)
 			bp.parent.retryMessages(msgs, block.Err)
 			bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
 			bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)