Burke Libbey 12 years ago
parent
commit
5509b4a29c
9 changed files with 104 additions and 85 deletions
  1. 7 33
      consumer_test.go
  2. 21 6
      fetch_response.go
  3. 0 45
      offset_fetch_response.go
  4. 52 0
      offset_response.go
  5. 1 0
      packet_encoder.go
  6. 9 0
      prep_encoder.go
  7. 3 0
      produce_response.go
  8. 3 1
      producer_test.go
  9. 8 0
      real_encoder.go

+ 7 - 33
consumer_test.go

@@ -6,34 +6,8 @@ import (
 	"time"
 )
 
-var (
-	consumerStopper = []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-	}
-	extraBrokerMetadata = []byte{
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x01,
-		0x00, 0x00, 0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
-	}
-)
-
 func TestSimpleConsumer(t *testing.T) {
+	println("a")
 
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
@@ -41,7 +15,7 @@ func TestSimpleConsumer(t *testing.T) {
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
 	mdr.AddTopicPartition("my_topic", 0, 2)
-	mb2.Returns(mdr)
+	mb1.Returns(mdr)
 
 	for i := 0; i < 10; i++ {
 		fr := new(FetchResponse)
@@ -83,7 +57,7 @@ func TestConsumerRawOffset(t *testing.T) {
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
 	mdr.AddTopicPartition("my_topic", 0, 2)
-	mb2.Returns(mdr)
+	mb1.Returns(mdr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -113,11 +87,11 @@ func TestConsumerLatestOffset(t *testing.T) {
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
 	mdr.AddTopicPartition("my_topic", 0, 2)
-	mb2.Returns(mdr)
+	mb1.Returns(mdr)
 
-	ofr := new(OffsetFetchResponse)
-	ofr.AddTopicPartition("my_topic", 0, 0x010101)
-	mb2.Returns(ofr)
+	or := new(OffsetResponse)
+	or.AddTopicPartition("my_topic", 0, 0x010101)
+	mb2.Returns(or)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {

+ 21 - 6
fetch_response.go

@@ -41,9 +41,12 @@ func (pr *FetchResponseBlock) encode(pe packetEncoder) (err error) {
 
 	pe.putInt64(pr.HighWaterMarkOffset)
 
-	// TODO: Encode message set
-
-	return nil
+	pe.push(&lengthField{})
+	err = pr.MsgSet.encode(pe)
+	if err != nil {
+		return err
+	}
+	return pe.pop()
 }
 
 func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
@@ -126,15 +129,27 @@ func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseB
 }
 
 func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
+	if fr.Blocks == nil {
+		fr.Blocks = make(map[string]map[int32]*FetchResponseBlock)
+	}
 	partitions, ok := fr.Blocks[topic]
 	if !ok {
 		partitions = make(map[int32]*FetchResponseBlock)
 		fr.Blocks[topic] = partitions
 	}
-	msgSet := partitions[partition].MsgSet
-	kb, _ := key.Encode()
-	vb, _ := value.Encode()
+	frb := new(FetchResponseBlock)
+	partitions[partition] = frb
+	var kb []byte
+	var vb []byte
+	if key != nil {
+		kb, _ = key.Encode()
+	}
+	if value != nil {
+		vb, _ = value.Encode()
+	}
+	var msgSet MessageSet
 	msg := &Message{Key: kb, Value: vb}
 	msgBlock := &MessageBlock{Msg: msg, Offset: offset}
 	msgSet.Messages = append(msgSet.Messages, msgBlock)
+	frb.MsgSet = msgSet
 }

+ 0 - 45
offset_fetch_response.go

@@ -86,48 +86,3 @@ func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
-
-func (r *OffsetFetchResponse) encode(pe packetEncoder) (err error) {
-	err = pe.putString(r.ClientID)
-	if err != nil {
-		return err
-	}
-
-	err = pe.putArrayLength(len(r.Blocks))
-	if err != nil {
-		return err
-	}
-
-	for topic, partitions := range r.Blocks {
-		err = pe.putString(topic)
-		if err != nil {
-			return err
-		}
-
-		err = pe.putArrayLength(len(partitions))
-		if err != nil {
-			return err
-		}
-
-		for id, block := range partitions {
-			pe.putInt32(id)
-			err = block.encode(pe)
-			if err != nil {
-				return err
-			}
-		}
-	}
-
-	return nil
-}
-
-// testing API
-
-func (r *OffsetFetchResponse) AddTopicPartition(topic string, partition int32, offset int64) {
-	byTopic, ok := r.Blocks[topic]
-	if !ok {
-		byTopic = make(map[int32]*OffsetFetchResponseBlock)
-		r.Blocks[topic] = byTopic
-	}
-	byTopic[partition] = &OffsetFetchResponseBlock{Offset: offset}
-}

+ 52 - 0
offset_response.go

@@ -17,6 +17,12 @@ func (r *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
 	return err
 }
 
+func (r *OffsetResponseBlock) encode(pe packetEncoder) (err error) {
+	pe.putInt16(int16(r.Err))
+
+	return pe.putInt64Array(r.Offsets)
+}
+
 type OffsetResponse struct {
 	Blocks map[string]map[int32]*OffsetResponseBlock
 }
@@ -70,3 +76,49 @@ func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponse
 
 	return r.Blocks[topic][partition]
 }
+
+/*
+// [0 0 0 1 ntopics
+0 8 109 121 95 116 111 112 105 99 topic
+0 0 0 1 npartitions
+0 0 0 0 id
+0 0
+
+0 0 0 1 0 0 0 0
+0 1 1 1 0 0 0 1
+0 8 109 121 95 116 111 112
+105 99 0 0 0 1 0 0
+0 0 0 0 0 0 0 1
+0 0 0 0 0 1 1 1] <nil>
+
+*/
+func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
+	if err = pe.putArrayLength(len(r.Blocks)); err != nil {
+		return err
+	}
+
+	for topic, partitions := range r.Blocks {
+		pe.putString(topic)
+		pe.putArrayLength(len(partitions))
+		for partition, block := range partitions {
+			pe.putInt32(partition)
+			block.encode(pe)
+		}
+	}
+
+	return nil
+}
+
+// testing API
+
+func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {
+	if r.Blocks == nil {
+		r.Blocks = make(map[string]map[int32]*OffsetResponseBlock)
+	}
+	byTopic, ok := r.Blocks[topic]
+	if !ok {
+		byTopic = make(map[int32]*OffsetResponseBlock)
+		r.Blocks[topic] = byTopic
+	}
+	byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}}
+}

+ 1 - 0
packet_encoder.go

@@ -15,6 +15,7 @@ type packetEncoder interface {
 	putBytes(in []byte) error
 	putString(in string) error
 	putInt32Array(in []int32) error
+	putInt64Array(in []int64) error
 
 	// Stacks, see PushEncoder
 	push(in pushEncoder)

+ 9 - 0
prep_encoder.go

@@ -67,6 +67,15 @@ func (pe *prepEncoder) putInt32Array(in []int32) error {
 	return nil
 }
 
+func (pe *prepEncoder) putInt64Array(in []int64) error {
+	err := pe.putArrayLength(len(in))
+	if err != nil {
+		return err
+	}
+	pe.length += 8 * len(in)
+	return nil
+}
+
 // stackable
 
 func (pe *prepEncoder) push(in pushEncoder) {

+ 3 - 0
produce_response.go

@@ -100,6 +100,9 @@ func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceRespo
 // Testing API
 
 func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) {
+	if pr.Blocks == nil {
+		pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock)
+	}
 	byTopic, ok := pr.Blocks[topic]
 	if !ok {
 		byTopic = make(map[int32]*ProduceResponseBlock)

+ 3 - 1
producer_test.go

@@ -181,6 +181,8 @@ func TestMultipleProducer(t *testing.T) {
 // happens correctly; that is, the first messages are retried before the next
 // batch is allowed to submit.
 func TestFailureRetry(t *testing.T) {
+	t.Fatal("skip")
+
 	mb1 := NewMockBroker(t, 1)
 	mb2 := NewMockBroker(t, 2)
 	mb3 := NewMockBroker(t, 3)
@@ -202,7 +204,7 @@ func TestFailureRetry(t *testing.T) {
 
 	pr := new(ProduceResponse)
 	pr.AddTopicPartition("topic_a", 0, NoError)
-	pr.AddTopicPartition("topic_a", 0, NotLeaderForPartition)
+	pr.AddTopicPartition("topic_b", 0, NotLeaderForPartition)
 	mb2.Returns(pr)
 
 	/* mb2.ExpectProduceRequest(). */

+ 8 - 0
real_encoder.go

@@ -63,6 +63,14 @@ func (re *realEncoder) putInt32Array(in []int32) error {
 	return nil
 }
 
+func (re *realEncoder) putInt64Array(in []int64) error {
+	re.putArrayLength(len(in))
+	for _, val := range in {
+		re.putInt64(val)
+	}
+	return nil
+}
+
 // stacks
 
 func (re *realEncoder) push(in pushEncoder) {