Browse Source

Refactor API key ownership

It is now owned by the appropriate object (ie metadataRequest owns the fact that
the metadataRequest key is 3).
Evan Huus 11 years ago
parent
commit
8dc6fa8ea4
6 changed files with 35 additions and 35 deletions
  1. 11 0
      api.go
  2. 3 13
      broker.go
  3. 2 2
      broker_manager.go
  4. 8 0
      metadata_request.go
  5. 8 0
      produce_request.go
  6. 3 20
      request.go

+ 11 - 0
api.go

@@ -0,0 +1,11 @@
+package kafka
+
+type api interface {
+	key() int16
+	version() int16
+}
+
+type apiEncoder interface {
+	encoder
+	api
+}

+ 3 - 13
broker.go

@@ -164,21 +164,11 @@ func (b *broker) rcvResponseLoop() {
 	}
 }
 
-func (b *broker) sendRequest(clientID *string, body encoder, expectResponse bool) (*responsePromise, error) {
+func (b *broker) sendRequest(clientID *string, body apiEncoder, expectResponse bool) (*responsePromise, error) {
 	var prepEnc prepEncoder
 	var realEnc realEncoder
-	var api API
-
-	switch body.(type) {
-	case *metadataRequest:
-		api = REQUEST_METADATA
-	case *produceRequest:
-		api = REQUEST_PRODUCE
-	default:
-		return nil, EncodingError{"Unknown API."}
-	}
 
-	req := request{api, b.correlation_id, clientID, body}
+	req := request{b.correlation_id, clientID, body}
 
 	req.encode(&prepEnc)
 	if prepEnc.err != nil {
@@ -197,7 +187,7 @@ func (b *broker) sendRequest(clientID *string, body encoder, expectResponse bool
 	return &request.response, nil
 }
 
-func (b *broker) sendAndReceive(clientID *string, req encoder, res decoder) error {
+func (b *broker) sendAndReceive(clientID *string, req apiEncoder, res decoder) error {
 	responseChan, err := b.sendRequest(clientID, req, res != nil)
 	if err != nil {
 		return err

+ 2 - 2
broker_manager.go

@@ -104,7 +104,7 @@ func (bm *brokerManager) choosePartition(topic string, p partitionChooser) (int3
 	return p.choosePartition(partitions), nil
 }
 
-func (bm *brokerManager) sendToPartition(topic string, partition int32, req encoder, res decoder) error {
+func (bm *brokerManager) sendToPartition(topic string, partition int32, req apiEncoder, res decoder) error {
 	b, err := bm.getValidLeader(topic, partition)
 	if err != nil {
 		return err
@@ -151,7 +151,7 @@ func (bm *brokerManager) getDefault() *broker {
 	return bm.defaultBroker
 }
 
-func (bm *brokerManager) sendToAny(req encoder, res decoder) error {
+func (bm *brokerManager) sendToAny(req apiEncoder, res decoder) error {
 	for b := bm.getDefault(); b != nil; b = bm.getDefault() {
 		err := b.sendAndReceive(bm.client.id, req, res)
 		switch err.(type) {

+ 8 - 0
metadata_request.go

@@ -27,3 +27,11 @@ func (mr *metadataRequest) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (mr *metadataRequest) key() int16 {
+	return 3
+}
+
+func (mr *metadataRequest) version() int16 {
+	return 0
+}

+ 8 - 0
produce_request.go

@@ -48,3 +48,11 @@ func newSingletonProduceRequest(topic string, partition int32, set *messageSet)
 	req.topics[0].partitions[0].msgSet = set
 	return req
 }
+
+func (p *produceRequest) key() int16 {
+	return 0
+}
+
+func (p *produceRequest) version() int16 {
+	return 0
+}

+ 3 - 20
request.go

@@ -1,31 +1,14 @@
 package kafka
 
-type API struct {
-	key     int16
-	version int16
-}
-
-var (
-	REQUEST_PRODUCE        = API{0, 0}
-	REQUEST_FETCH          = API{1, 0}
-	REQUEST_OFFSET         = API{2, 0}
-	REQUEST_METADATA       = API{3, 0}
-	REQUEST_LEADER_AND_ISR = API{4, 0}
-	REQUEST_STOP_REPLICA   = API{5, 0}
-	REQUEST_OFFSET_COMMIT  = API{6, 0}
-	REQUEST_OFFSET_FETCH   = API{7, 0}
-)
-
 type request struct {
-	api            API
 	correlation_id int32
 	id             *string
-	body           encoder
+	body           apiEncoder
 }
 
 func (r *request) encode(pe packetEncoder) {
-	pe.putInt16(r.api.key)
-	pe.putInt16(r.api.version)
+	pe.putInt16(r.body.key())
+	pe.putInt16(r.body.version())
 	pe.putInt32(r.correlation_id)
 	pe.putString(r.id)
 	r.body.encode(pe)