Browse Source

producer: explicitly register partitions

Introduce a `syn` flag (a la TCP) and send a syn message from partitionProducer
to brokerProducer on setup. This lets brokerProducer know exactly which
partitions it should handle at any time, even if they haven't sent an actual
message yet.

For symmetry, rename `chaser` to `fin`, since it is already the last message
from a partitionProducer to a brokerProducer, and also the last from
brokerProducer to partitionProducer.

First piece of a refactor my subconscious has been working on without me.
Evan Huus 10 years ago
parent
commit
77c54f710e
1 changed files with 27 additions and 19 deletions
  1. 27 19
      async_producer.go

+ 27 - 19
async_producer.go

@@ -105,7 +105,8 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
 type flagSet int8
 type flagSet int8
 
 
 const (
 const (
-	chaser   flagSet = 1 << iota // message is last in a group that failed
+	syn      flagSet = 1 << iota // first message from partitionProducer to brokerProducer
+	fin                          // final message from partitionProducer to brokerProducer and back
 	shutdown                     // start the shutdown process
 	shutdown                     // start the shutdown process
 )
 )
 
 
@@ -365,7 +366,7 @@ type partitionProducer struct {
 
 
 	// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
 	// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
 	// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
 	// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
-	// retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and
+	// retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
 	// therefore whether our buffer is complete and safe to flush)
 	// therefore whether our buffer is complete and safe to flush)
 	highWatermark int
 	highWatermark int
 	retryState    []partitionRetryState
 	retryState    []partitionRetryState
@@ -397,6 +398,8 @@ func (pp *partitionProducer) dispatch() {
 	pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
 	pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
 	if pp.leader != nil {
 	if pp.leader != nil {
 		pp.output = pp.parent.getBrokerProducer(pp.leader)
 		pp.output = pp.parent.getBrokerProducer(pp.leader)
+		pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
+		pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
 	}
 	}
 
 
 	for msg := range pp.input {
 	for msg := range pp.input {
@@ -407,20 +410,20 @@ func (pp *partitionProducer) dispatch() {
 		} else if pp.highWatermark > 0 {
 		} else if pp.highWatermark > 0 {
 			// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
 			// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
 			if msg.retries < pp.highWatermark {
 			if msg.retries < pp.highWatermark {
-				// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
-				if msg.flags&chaser == chaser {
+				// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
+				if msg.flags&fin == fin {
 					pp.retryState[msg.retries].expectChaser = false
 					pp.retryState[msg.retries].expectChaser = false
-					pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
+					pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
 				} else {
 				} else {
 					pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
 					pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
 				}
 				}
 				continue
 				continue
-			} else if msg.flags&chaser == chaser {
-				// this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
+			} else if msg.flags&fin == fin {
+				// this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
 				// meaning this retry level is done and we can go down (at least) one level and flush that
 				// meaning this retry level is done and we can go down (at least) one level and flush that
 				pp.retryState[pp.highWatermark].expectChaser = false
 				pp.retryState[pp.highWatermark].expectChaser = false
 				pp.flushRetryBuffers()
 				pp.flushRetryBuffers()
-				pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
+				pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
 				continue
 				continue
 			}
 			}
 		}
 		}
@@ -449,11 +452,11 @@ func (pp *partitionProducer) newHighWatermark(hwm int) {
 	Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
 	Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
 	pp.highWatermark = hwm
 	pp.highWatermark = hwm
 
 
-	// send off a chaser so that we know when everything "in between" has made it
+	// send off a fin so that we know when everything "in between" has made it
 	// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
 	// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
 	pp.retryState[pp.highWatermark].expectChaser = true
 	pp.retryState[pp.highWatermark].expectChaser = true
-	pp.parent.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight
-	pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: chaser, retries: pp.highWatermark - 1}
+	pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
+	pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
 
 
 	// a new HWM means that our current broker selection is out of date
 	// a new HWM means that our current broker selection is out of date
 	Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
 	Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
@@ -501,6 +504,9 @@ func (pp *partitionProducer) updateLeader() error {
 		}
 		}
 
 
 		pp.output = pp.parent.getBrokerProducer(pp.leader)
 		pp.output = pp.parent.getBrokerProducer(pp.leader)
+		pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
+		pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
+
 		return nil
 		return nil
 	})
 	})
 }
 }
@@ -578,10 +584,19 @@ func (bp *brokerProducer) run() {
 				goto shutdown
 				goto shutdown
 			}
 			}
 
 
+			if msg.flags&syn == syn {
+				if bp.currentRetries[msg.Topic] == nil {
+					bp.currentRetries[msg.Topic] = make(map[int32]error)
+				}
+				bp.currentRetries[msg.Topic][msg.Partition] = nil
+				bp.parent.inFlight.Done()
+				continue
+			}
+
 			if reason := bp.needsRetry(msg); reason != nil {
 			if reason := bp.needsRetry(msg); reason != nil {
 				bp.parent.retryMessage(msg, reason)
 				bp.parent.retryMessage(msg, reason)
 
 
-				if bp.closing == nil && msg.flags&chaser == chaser {
+				if bp.closing == nil && msg.flags&fin == fin {
 					// we were retrying this partition but we can start processing again
 					// we were retrying this partition but we can start processing again
 					delete(bp.currentRetries[msg.Topic], msg.Partition)
 					delete(bp.currentRetries[msg.Topic], msg.Partition)
 					Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n",
 					Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n",
@@ -643,10 +658,6 @@ func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
 		return bp.closing
 		return bp.closing
 	}
 	}
 
 
-	if bp.currentRetries[msg.Topic] == nil {
-		return nil
-	}
-
 	return bp.currentRetries[msg.Topic][msg.Partition]
 	return bp.currentRetries[msg.Topic][msg.Partition]
 }
 }
 
 
@@ -716,9 +727,6 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 			ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 			Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
 			Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
 				bp.broker.ID(), topic, partition, block.Err)
 				bp.broker.ID(), topic, partition, block.Err)
-			if bp.currentRetries[topic] == nil {
-				bp.currentRetries[topic] = make(map[int32]error)
-			}
 			bp.currentRetries[topic][partition] = block.Err
 			bp.currentRetries[topic][partition] = block.Err
 			bp.parent.retryMessages(msgs, block.Err)
 			bp.parent.retryMessages(msgs, block.Err)
 			bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
 			bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)