Procházet zdrojové kódy

Fetch Request/Response pair updated

Evan Huus před 12 roky
rodič
revize
c1b36d1a4f
5 změnil soubory, kde provedl 93 přidání a 87 odebrání
  1. 12 0
      broker.go
  2. 37 31
      fetch_request.go
  3. 25 37
      fetch_response.go
  4. 9 9
      produce_request.go
  5. 10 10
      produce_response.go

+ 12 - 0
broker.go

@@ -103,6 +103,18 @@ func (b *Broker) Produce(clientID *string, request *ProduceRequest) (*ProduceRes
 	return response, nil
 }
 
+func (b *Broker) Fetch(clientID *string, request *FetchRequest) (*FetchResponse, error) {
+	response := new(FetchResponse)
+
+	err := b.sendAndReceive(clientID, request, response)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) send(clientID *string, req requestEncoder, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()

+ 37 - 31
fetch_request.go

@@ -1,50 +1,56 @@
 package kafka
 
-type fetchRequestPartitionBlock struct {
-	partition   int32
+type fetchRequestPartition struct {
 	fetchOffset int64
 	maxBytes    int32
 }
 
-func (p *fetchRequestPartitionBlock) encode(pe packetEncoder) {
-	pe.putInt32(p.partition)
-	pe.putInt64(p.fetchOffset)
-	pe.putInt32(p.maxBytes)
+func (f *fetchRequestPartition) encode(pe packetEncoder) {
+	pe.putInt64(f.fetchOffset)
+	pe.putInt32(f.maxBytes)
 }
 
-type fetchRequestTopicBlock struct {
-	topic      *string
-	partitions []fetchRequestPartitionBlock
+type FetchRequest struct {
+	MaxWaitTime int32
+	MinBytes    int32
+	partitions  map[*string]map[int32]*fetchRequestPartition
 }
 
-func (p *fetchRequestTopicBlock) encode(pe packetEncoder) {
-	pe.putString(p.topic)
-	pe.putArrayCount(len(p.partitions))
-	for i := range p.partitions {
-		(&p.partitions[i]).encode(pe)
-	}
-}
-
-type fetchRequest struct {
-	maxWaitTime int32
-	minBytes    int32
-	topics      []fetchRequestTopicBlock
-}
-
-func (p *fetchRequest) encode(pe packetEncoder) {
+func (f *FetchRequest) encode(pe packetEncoder) {
 	pe.putInt32(-1) // replica ID is always -1 for clients
-	pe.putInt32(p.maxWaitTime)
-	pe.putInt32(p.minBytes)
-	pe.putArrayCount(len(p.topics))
-	for i := range p.topics {
-		(&p.topics[i]).encode(pe)
+	pe.putInt32(f.MaxWaitTime)
+	pe.putInt32(f.MinBytes)
+	pe.putArrayCount(len(f.partitions))
+	for topic, partitions := range f.partitions {
+		pe.putString(topic)
+		pe.putArrayCount(len(partitions))
+		for partition, block := range partitions {
+			pe.putInt32(partition)
+			block.encode(pe)
+		}
 	}
 }
 
-func (p *fetchRequest) key() int16 {
+func (f *FetchRequest) key() int16 {
 	return 1
 }
 
-func (p *fetchRequest) version() int16 {
+func (f *FetchRequest) version() int16 {
 	return 0
 }
+
+func (f *FetchRequest) AddPartition(topic *string, partition_id int32, fetchOffset int64, maxBytes int32) {
+	if f.partitions == nil {
+		f.partitions = make(map[*string]map[int32]*fetchRequestPartition)
+	}
+
+	if f.partitions[topic] == nil {
+		f.partitions[topic] = make(map[int32]*fetchRequestPartition)
+	}
+
+	tmp := new(fetchRequestPartition)
+	tmp.maxBytes = maxBytes
+	tmp.fetchOffset = fetchOffset
+
+	f.partitions[topic][partition_id] = tmp
+}

+ 25 - 37
fetch_response.go

@@ -1,18 +1,12 @@
 package kafka
 
-type fetchResponsePartitionBlock struct {
-	id                  int32
+type FetchResponseBlock struct {
 	err                 KError
 	highWaterMarkOffset int64
 	msgSet              messageSet
 }
 
-func (pr *fetchResponsePartitionBlock) decode(pd packetDecoder) (err error) {
-	pr.id, err = pd.getInt32()
-	if err != nil {
-		return err
-	}
-
+func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
 	pr.err, err = pd.getError()
 	if err != nil {
 		return err
@@ -37,48 +31,42 @@ func (pr *fetchResponsePartitionBlock) decode(pd packetDecoder) (err error) {
 	return err
 }
 
-type fetchResponseTopicBlock struct {
-	name       *string
-	partitions []fetchResponsePartitionBlock
+type FetchResponse struct {
+	Blocks map[*string]map[int32]*FetchResponseBlock
 }
 
-func (pr *fetchResponseTopicBlock) decode(pd packetDecoder) (err error) {
-	pr.name, err = pd.getString()
-	if err != nil {
-		return err
-	}
-
-	n, err := pd.getArrayCount()
+func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
+	numTopics, err := pd.getArrayCount()
 	if err != nil {
 		return err
 	}
 
-	pr.partitions = make([]fetchResponsePartitionBlock, n)
-	for i := range pr.partitions {
-		err = (&pr.partitions[i]).decode(pd)
+	fr.Blocks = make(map[*string]map[int32]*FetchResponseBlock, numTopics)
+	for i := 0; i < numTopics; i++ {
+		name, err := pd.getString()
 		if err != nil {
 			return err
 		}
-	}
 
-	return nil
-}
+		numBlocks, err := pd.getArrayCount()
+		if err != nil {
+			return err
+		}
 
-type fetchResponse struct {
-	topics []fetchResponseTopicBlock
-}
+		fr.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
 
-func (pr *fetchResponse) decode(pd packetDecoder) (err error) {
-	n, err := pd.getArrayCount()
-	if err != nil {
-		return err
-	}
+		for j := 0; j < numBlocks; j++ {
+			id, err := pd.getInt32()
+			if err != nil {
+				return err
+			}
 
-	pr.topics = make([]fetchResponseTopicBlock, n)
-	for i := range pr.topics {
-		err = (&pr.topics[i]).decode(pd)
-		if err != nil {
-			return err
+			block := new(FetchResponseBlock)
+			err = block.decode(pd)
+			if err != nil {
+				return err
+			}
+			fr.Blocks[name][id] = block
 		}
 	}
 

+ 9 - 9
produce_request.go

@@ -9,14 +9,14 @@ const (
 type ProduceRequest struct {
 	ResponseCondition int16
 	Timeout           int32
-	MsgSets           map[*string]map[int32]*messageSet
+	msgSets           map[*string]map[int32]*messageSet
 }
 
 func (p *ProduceRequest) encode(pe packetEncoder) {
 	pe.putInt16(p.ResponseCondition)
 	pe.putInt32(p.Timeout)
-	pe.putArrayCount(len(p.MsgSets))
-	for topic, partitions := range p.MsgSets {
+	pe.putArrayCount(len(p.msgSets))
+	for topic, partitions := range p.msgSets {
 		pe.putString(topic)
 		pe.putArrayCount(len(partitions))
 		for id, msgSet := range partitions {
@@ -35,19 +35,19 @@ func (p *ProduceRequest) version() int16 {
 }
 
 func (p *ProduceRequest) AddMessage(topic *string, partition int32, msg *Message) {
-	if p.MsgSets == nil {
-		p.MsgSets = make(map[*string]map[int32]*messageSet)
+	if p.msgSets == nil {
+		p.msgSets = make(map[*string]map[int32]*messageSet)
 	}
 
-	if p.MsgSets[topic] == nil {
-		p.MsgSets[topic] = make(map[int32]*messageSet)
+	if p.msgSets[topic] == nil {
+		p.msgSets[topic] = make(map[int32]*messageSet)
 	}
 
-	set := p.MsgSets[topic][partition]
+	set := p.msgSets[topic][partition]
 
 	if set == nil {
 		set = newMessageSet()
-		p.MsgSets[topic][partition] = set
+		p.msgSets[topic][partition] = set
 	}
 
 	set.addMessage(msg)

+ 10 - 10
produce_response.go

@@ -1,11 +1,11 @@
 package kafka
 
-type ProduceResponsePartition struct {
+type ProduceResponseBlock struct {
 	Err    KError
 	Offset int64
 }
 
-func (pr *ProduceResponsePartition) decode(pd packetDecoder) (err error) {
+func (pr *ProduceResponseBlock) decode(pd packetDecoder) (err error) {
 	pr.Err, err = pd.getError()
 	if err != nil {
 		return err
@@ -20,7 +20,7 @@ func (pr *ProduceResponsePartition) decode(pd packetDecoder) (err error) {
 }
 
 type ProduceResponse struct {
-	Partitions map[*string]map[int32]*ProduceResponsePartition
+	Blocks map[*string]map[int32]*ProduceResponseBlock
 }
 
 func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
@@ -29,32 +29,32 @@ func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	pr.Partitions = make(map[*string]map[int32]*ProduceResponsePartition, numTopics)
+	pr.Blocks = make(map[*string]map[int32]*ProduceResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
 		name, err := pd.getString()
 		if err != nil {
 			return err
 		}
 
-		numPartitions, err := pd.getArrayCount()
+		numBlocks, err := pd.getArrayCount()
 		if err != nil {
 			return err
 		}
 
-		pr.Partitions[name] = make(map[int32]*ProduceResponsePartition, numPartitions)
+		pr.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
 
-		for j := 0; j < numPartitions; j++ {
+		for j := 0; j < numBlocks; j++ {
 			id, err := pd.getInt32()
 			if err != nil {
 				return err
 			}
 
-			partition := new(ProduceResponsePartition)
-			err = partition.decode(pd)
+			block := new(ProduceResponseBlock)
+			err = block.decode(pd)
 			if err != nil {
 				return err
 			}
-			pr.Partitions[name][id] = partition
+			pr.Blocks[name][id] = block
 		}
 	}