Browse Source

Merge pull request #184 from Shopify/prod-logging

prod: explicitly log every state change
Evan Huus 11 years ago
parent
commit
8df83bf44d
1 changed files with 10 additions and 8 deletions
  1. 10 8
      producer.go

+ 10 - 8
producer.go

@@ -243,6 +243,7 @@ func (p *Producer) topicDispatcher() {
 		}
 
 		if msg.flags&shutdown != 0 {
+			Logger.Println("Producer shutting down.")
 			break
 		}
 
@@ -265,8 +266,6 @@ func (p *Producer) topicDispatcher() {
 		handler <- msg
 	}
 
-	Logger.Println("Producer shutting down.")
-
 	for _, handler := range handlers {
 		close(handler)
 	}
@@ -330,15 +329,15 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 			if backlog == nil {
 				// on the very first retried message we send off a chaser so that we know when everything "in between" has made it
 				// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
+				Logger.Printf("producer/leader state change to [retrying] on %s/%d\n", topic, partition)
 				output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser}
-				Logger.Println("Producer dispatching retried messages to new leader.")
 				backlog = make([]*MessageToSend, 0)
 				p.unrefBrokerWorker(leader)
 				output = nil
 			}
 		} else {
 			// retry *and* chaser flag set, flush the backlog and return to normal processing
-			Logger.Println("Producer finished dispatching retried messages, processing backlog.")
+			Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
 			if output == nil {
 				err := p.client.RefreshTopicMetadata(topic)
 				if err != nil {
@@ -360,7 +359,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 			for _, msg := range backlog {
 				output <- msg
 			}
-			Logger.Println("Producer backlog processsed.")
+			Logger.Printf("producer/leader state change to [normal] on %s/%d\n", topic, partition)
 
 			backlog = nil
 			continue
@@ -417,7 +416,7 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend)
 
 			if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
 				(p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) {
-				Logger.Println("Producer accumulated maximum request size, forcing blocking flush.")
+				Logger.Println("producer/aggregator hit maximum request size, forcing blocking flush")
 				flusher <- buffer
 				buffer = nil
 				doFlush = nil
@@ -467,6 +466,8 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 			if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.partition] != nil {
 				if msg.flags&chaser == chaser {
 					// we can start processing this topic/partition again
+					Logger.Printf("producer/flusher state change to [normal] on %s/%d\n",
+						msg.Topic, msg.partition)
 					currentRetries[msg.Topic][msg.partition] = nil
 				}
 				p.retryMessages([]*MessageToSend{msg}, currentRetries[msg.Topic][msg.partition])
@@ -498,6 +499,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 			continue
 		default:
 			p.client.disconnectBroker(broker)
+			Logger.Println("producer/flusher state change to [closing] because", err)
 			closing = err
 			p.retryMessages(batch, err)
 			continue
@@ -532,6 +534,8 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 						p.returnSuccesses(msgs)
 					}
 				case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
+					Logger.Printf("producer/flusher state change to [retrying] on %s/%d because %v\n",
+						topic, partition, block.Err)
 					if currentRetries[topic] == nil {
 						currentRetries[topic] = make(map[int32]error)
 					}
@@ -696,7 +700,6 @@ func (p *Producer) returnSuccesses(batch []*MessageToSend) {
 }
 
 func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
-	Logger.Println("Producer requeueing batch of", len(batch), "messages due to error:", err)
 	for _, msg := range batch {
 		if msg == nil {
 			continue
@@ -708,7 +711,6 @@ func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
 			p.retries <- msg
 		}
 	}
-	Logger.Println("Messages requeued")
 }
 
 type brokerWorker struct {