Browse Source

refactor out common code

Evan Huus 12 years ago
parent
commit
0d5af9417d
2 changed files with 44 additions and 51 deletions
  1. 19 0
      broker.go
  2. 25 51
      broker_manager.go

+ 19 - 0
broker.go

@@ -196,3 +196,22 @@ func (b *broker) sendRequest(clientID *string, body encoder, expectResponse bool
 	b.correlation_id++
 	b.correlation_id++
 	return &request.response, nil
 	return &request.response, nil
 }
 }
+
+func (b *broker) sendAndReceive(clientID *string, req encoder, res decoder) error {
+	responseChan, err := b.sendRequest(clientID, req, res != nil)
+	if err != nil {
+		return err
+	}
+
+	select {
+	case buf := <-responseChan.packets:
+		// Only try to decode if we got a response.
+		if buf != nil {
+			decoder := realDecoder{raw: buf}
+			err = res.decode(&decoder)
+		}
+	case err = <-responseChan.errors:
+	}
+
+	return err
+}

+ 25 - 51
broker_manager.go

@@ -110,31 +110,21 @@ func (bm *brokerManager) sendToPartition(topic string, partition int32, req enco
 		return err
 		return err
 	}
 	}
 
 
-	responseChan, err := b.sendRequest(bm.client.id, req, res != nil)
-	if err != nil {
-		// errors that would make us refresh the broker metadata don't get returned here,
-		// they'd come through responseChan.errors, so it's safe to just return here
-		return err
-	}
-
-	select {
-	case buf := <-responseChan.packets:
-		if res != nil {
-			decoder := realDecoder{raw: buf}
-			err = res.decode(&decoder)
-		}
-	case err = <-responseChan.errors:
-	}
-
-	if err == nil {
+	err = b.sendAndReceive(bm.client.id, req, res)
+	switch err.(type) {
+	case nil:
 		// successfully received and decoded the packet, we're done
 		// successfully received and decoded the packet, we're done
-		// (the actual decoded data is stored in `res decoder`)
+		// (the actual decoded data is stored in the res parameter)
 		return nil
 		return nil
+	case EncodingError:
+		// encoding errors are our problem, not the broker's, so just return them
+		// rather than refreshing the broker metadata
+		return err
+	default:
+		// broker error, so discard that broker
+		bm.terminateBroker(b.id)
 	}
 	}
 
 
-	// we got an error, so discard that broker
-	bm.terminateBroker(b.id)
-
 	// then do the whole thing again
 	// then do the whole thing again
 	// (the metadata for the broker gets refreshed automatically in getValidLeader)
 	// (the metadata for the broker gets refreshed automatically in getValidLeader)
 	// if we get a broker here, it's guaranteed to be fresh, so if it fails then
 	// if we get a broker here, it's guaranteed to be fresh, so if it fails then
@@ -144,22 +134,7 @@ func (bm *brokerManager) sendToPartition(topic string, partition int32, req enco
 		return err
 		return err
 	}
 	}
 
 
-	responseChan, err = b.sendRequest(bm.client.id, req, res != nil)
-	if err != nil {
-		return err
-	}
-
-	select {
-	case buf := <-responseChan.packets:
-		if res != nil {
-			decoder := realDecoder{raw: buf}
-			err = res.decode(&decoder)
-		}
-	case err = <-responseChan.errors:
-	}
-
-	return err // will be nil if this broker worked
-
+	return b.sendAndReceive(bm.client.id, req, res)
 }
 }
 
 
 func (bm *brokerManager) getDefault() *broker {
 func (bm *brokerManager) getDefault() *broker {
@@ -176,21 +151,20 @@ func (bm *brokerManager) getDefault() *broker {
 	return bm.defaultBroker
 	return bm.defaultBroker
 }
 }
 
 
-func (bm *brokerManager) tryDefaultBrokers(req encoder, res decoder) error {
+func (bm *brokerManager) sendToAny(req encoder, res decoder) error {
 	for b := bm.getDefault(); b != nil; b = bm.getDefault() {
 	for b := bm.getDefault(); b != nil; b = bm.getDefault() {
-		responseChan, err := b.sendRequest(bm.client.id, req, res != nil)
-		if err != nil {
-			return err
-		}
-
-		select {
-		case buf := <-responseChan.packets:
-			if res != nil {
-				decoder := realDecoder{raw: buf}
-				err = res.decode(&decoder)
-			}
+		err := b.sendAndReceive(bm.client.id, req, res)
+		switch err.(type) {
+		case nil:
+			// successfully received and decoded the packet, we're done
+			// (the actual decoded data is stored in the res parameter)
+			return nil
+		case EncodingError:
+			// encoding errors are our problem, not the broker's, so just return them
+			// rather than trying another broker
 			return err
 			return err
-		case <-responseChan.errors:
+		default:
+			// broker error, so discard that broker
 			bm.defaultBroker = nil
 			bm.defaultBroker = nil
 			bm.terminateBroker(b.id)
 			bm.terminateBroker(b.id)
 		}
 		}
@@ -200,7 +174,7 @@ func (bm *brokerManager) tryDefaultBrokers(req encoder, res decoder) error {
 
 
 func (bm *brokerManager) refreshTopics(topics []*string) error {
 func (bm *brokerManager) refreshTopics(topics []*string) error {
 	response := new(metadata)
 	response := new(metadata)
-	err := bm.tryDefaultBrokers(&metadataRequest{topics}, response)
+	err := bm.sendToAny(&metadataRequest{topics}, response)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}