Browse Source

Merge pull request #385 from Shopify/consumer-logging

Add better logs to the consumer
Evan Huus 10 years ago
parent
commit
94a77f758b
1 changed files with 5 additions and 1 deletions
  1. 5 1
      consumer.go

+ 5 - 1
consumer.go

@@ -279,6 +279,7 @@ func (child *partitionConsumer) dispatcher() {
 				child.broker = nil
 			}
 
+			Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
 			if err := child.dispatch(); err != nil {
 				child.sendError(err)
 				child.trigger <- none{}
@@ -435,7 +436,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
 		response, err := w.fetchNewMessages()
 
 		if err != nil {
-			Logger.Printf("Unexpected error processing FetchRequest; disconnecting from broker %s: %s\n", w.broker.addr, err)
+			Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", w.broker.ID(), err)
 			w.abort(err)
 			return
 		}
@@ -450,6 +451,7 @@ func (w *brokerConsumer) subscriptionConsumer() {
 					// these three are not fatal errors, but do require redispatching
 					child.trigger <- none{}
 					delete(w.subscriptions, child)
+					Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", w.broker.ID(), child.topic, child.partition, err)
 				}
 			}
 		}
@@ -460,6 +462,7 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo
 	// take new subscriptions, and abandon subscriptions that have been closed
 	for _, child := range newSubscriptions {
 		w.subscriptions[child] = none{}
+		Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", w.broker.ID(), child.topic, child.partition)
 	}
 
 	for child := range w.subscriptions {
@@ -467,6 +470,7 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo
 		case <-child.dying:
 			close(child.trigger)
 			delete(w.subscriptions, child)
+			Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", w.broker.ID(), child.topic, child.partition)
 		default:
 		}
 	}