Kaynağa Gözat

Offset request/response

Evan Huus 12 yıl önce
ebeveyn
işleme
d53fb36f6a
6 değiştirilmiş dosya ile 150 ekleme ve 2 silme
  1. 13 1
      broker.go
  2. 1 1
      metadata_cache.go
  3. 57 0
      offset_request.go
  4. 58 0
      offset_response.go
  5. 1 0
      packet_decoder.go
  6. 20 0
      real_decoder.go

+ 13 - 1
broker.go

@@ -81,7 +81,7 @@ func (b *Broker) ID() int32 {
 	return b.id
 }
 
-func (b *Broker) RequestMetadata(clientID *string, request *MetadataRequest) (*MetadataResponse, error) {
+func (b *Broker) GetMetadata(clientID *string, request *MetadataRequest) (*MetadataResponse, error) {
 	response := new(MetadataResponse)
 
 	err := b.sendAndReceive(clientID, request, response)
@@ -93,6 +93,18 @@ func (b *Broker) RequestMetadata(clientID *string, request *MetadataRequest) (*M
 	return response, nil
 }
 
+func (b *Broker) GetOffset(clientID *string, request *OffsetRequest) (*OffsetResponse, error) {
+	response := new(OffsetResponse)
+
+	err := b.sendAndReceive(clientID, request, response)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) Produce(clientID *string, request *ProduceRequest) (*ProduceResponse, error) {
 	var response *ProduceResponse
 	if request.ResponseCondition != NO_RESPONSE {

+ 1 - 1
metadata_cache.go

@@ -132,7 +132,7 @@ func (mc *metadataCache) update(data *MetadataResponse) error {
 
 func (mc *metadataCache) refreshTopics(topics []*string) error {
 	for broker := mc.any(); broker != nil; broker = mc.any() {
-		response, err := broker.RequestMetadata(mc.client.id, &MetadataRequest{topics})
+		response, err := broker.GetMetadata(mc.client.id, &MetadataRequest{topics})
 
 		switch err.(type) {
 		case nil:

+ 57 - 0
offset_request.go

@@ -1 +1,58 @@
 package kafka
+
+// Special values accepted by Kafka for the 'time' parameter of OffsetRequest.AddBlock().
+const (
+	LATEST_OFFSET    int64 = -1
+	EARLIEST_OFFSETS int64 = -2
+)
+
+type offsetRequestBlock struct {
+	time       int64
+	maxOffsets int32
+}
+
+func (r *offsetRequestBlock) encode(pe packetEncoder) {
+	pe.putInt64(r.time)
+	pe.putInt32(r.maxOffsets)
+}
+
+type OffsetRequest struct {
+	blocks map[*string]map[int32]*offsetRequestBlock
+}
+
+func (r *OffsetRequest) encode(pe packetEncoder) {
+	pe.putInt32(-1) // replica ID is always -1 for clients
+	pe.putArrayCount(len(r.blocks))
+	for topic, partitions := range r.blocks {
+		pe.putString(topic)
+		pe.putArrayCount(len(partitions))
+		for partition, block := range partitions {
+			pe.putInt32(partition)
+			block.encode(pe)
+		}
+	}
+}
+
+func (r *OffsetRequest) key() int16 {
+	return 2
+}
+
+func (r *OffsetRequest) version() int16 {
+	return 0
+}
+
+func (r *OffsetRequest) AddBlock(topic *string, partition_id int32, time int64, maxOffsets int32) {
+	if r.blocks == nil {
+		r.blocks = make(map[*string]map[int32]*offsetRequestBlock)
+	}
+
+	if r.blocks[topic] == nil {
+		r.blocks[topic] = make(map[int32]*offsetRequestBlock)
+	}
+
+	tmp := new(offsetRequestBlock)
+	tmp.time = time
+	tmp.maxOffsets = maxOffsets
+
+	r.blocks[topic][partition_id] = tmp
+}

+ 58 - 0
offset_response.go

@@ -1 +1,59 @@
 package kafka
+
+type OffsetResponseBlock struct {
+	err     KError
+	offsets []int64
+}
+
+func (r *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
+	r.err, err = pd.getError()
+	if err != nil {
+		return err
+	}
+
+	r.offsets, err = pd.getInt64Array()
+
+	return err
+}
+
+type OffsetResponse struct {
+	Blocks map[*string]map[int32]*OffsetResponseBlock
+}
+
+func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
+	numTopics, err := pd.getArrayCount()
+	if err != nil {
+		return err
+	}
+
+	r.Blocks = make(map[*string]map[int32]*OffsetResponseBlock, numTopics)
+	for i := 0; i < numTopics; i++ {
+		name, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		numBlocks, err := pd.getArrayCount()
+		if err != nil {
+			return err
+		}
+
+		r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
+
+		for j := 0; j < numBlocks; j++ {
+			id, err := pd.getInt32()
+			if err != nil {
+				return err
+			}
+
+			block := new(OffsetResponseBlock)
+			err = block.decode(pd)
+			if err != nil {
+				return err
+			}
+			r.Blocks[name][id] = block
+		}
+	}
+
+	return nil
+}

+ 1 - 0
packet_decoder.go

@@ -11,6 +11,7 @@ type packetDecoder interface {
 
 	// arrays
 	getInt32Array() ([]int32, error)
+	getInt64Array() ([]int64, error)
 	getArrayCount() (int, error)
 
 	// misc

+ 20 - 0
real_decoder.go

@@ -75,6 +75,26 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) {
 	return ret, nil
 }
 
+func (rd *realDecoder) getInt64Array() ([]int64, error) {
+	if rd.remaining() < 4 {
+		return nil, DecodingError("Insufficient data in getInt64Array.")
+	}
+	n := int(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+	rd.off += 4
+
+	var ret []int64 = nil
+	if rd.remaining() < 8*n {
+		return nil, DecodingError("Insufficient data in getInt64Array.")
+	} else if n > 0 {
+		ret = make([]int64, n)
+		for i := range ret {
+			ret[i] = int64(binary.BigEndian.Uint64(rd.raw[rd.off:]))
+			rd.off += 8
+		}
+	}
+	return ret, nil
+}
+
 func (rd *realDecoder) getArrayCount() (int, error) {
 	if rd.remaining() < 4 {
 		return -1, DecodingError("Insufficient data in getArrayCount.")