瀏覽代碼

Add DeleteRecords operation to Broker

Diego Ongaro 6 年之前
父節點
當前提交
76a6b90ec2
共有 6 個文件被更改,包括 372 次插入0 次删除
  1. 11 0
      broker.go
  2. 126 0
      delete_records_request.go
  3. 36 0
      delete_records_request_test.go
  4. 158 0
      delete_records_response.go
  5. 39 0
      delete_records_response_test.go
  6. 2 0
      request.go

+ 11 - 0
broker.go

@@ -395,6 +395,17 @@ func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsRespon
 	return response, nil
 }
 
+func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
+	response := new(DeleteRecordsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
 	response := new(DescribeAclsResponse)
 

+ 126 - 0
delete_records_request.go

@@ -0,0 +1,126 @@
+package sarama
+
+import (
+	"sort"
+	"time"
+)
+
+// request message format is:
+// [topic] timeout(int32)
+// where topic is:
+//  name(string) [partition]
+// where partition is:
+//  id(int32) offset(int64)
+
+type DeleteRecordsRequest struct {
+	Topics  map[string]*DeleteRecordsRequestTopic
+	Timeout time.Duration
+}
+
+func (d *DeleteRecordsRequest) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(d.Topics)); err != nil {
+		return err
+	}
+	keys := make([]string, 0, len(d.Topics))
+	for topic := range d.Topics {
+		keys = append(keys, topic)
+	}
+	sort.Strings(keys)
+	for _, topic := range keys {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := d.Topics[topic].encode(pe); err != nil {
+			return err
+		}
+	}
+	pe.putInt32(int32(d.Timeout / time.Millisecond))
+
+	return nil
+}
+
+func (d *DeleteRecordsRequest) decode(pd packetDecoder, version int16) error {
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	if n > 0 {
+		d.Topics = make(map[string]*DeleteRecordsRequestTopic, n)
+		for i := 0; i < n; i++ {
+			topic, err := pd.getString()
+			if err != nil {
+				return err
+			}
+			details := new(DeleteRecordsRequestTopic)
+			if err = details.decode(pd, version); err != nil {
+				return err
+			}
+			d.Topics[topic] = details
+		}
+	}
+
+	timeout, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	d.Timeout = time.Duration(timeout) * time.Millisecond
+
+	return nil
+}
+
+func (d *DeleteRecordsRequest) key() int16 {
+	return 21
+}
+
+func (d *DeleteRecordsRequest) version() int16 {
+	return 0
+}
+
+func (d *DeleteRecordsRequest) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}
+
+type DeleteRecordsRequestTopic struct {
+	PartitionOffsets map[int32]int64 // partition => offset
+}
+
+func (t *DeleteRecordsRequestTopic) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(t.PartitionOffsets)); err != nil {
+		return err
+	}
+	keys := make([]int32, 0, len(t.PartitionOffsets))
+	for partition := range t.PartitionOffsets {
+		keys = append(keys, partition)
+	}
+	sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
+	for _, partition := range keys {
+		pe.putInt32(partition)
+		pe.putInt64(t.PartitionOffsets[partition])
+	}
+	return nil
+}
+
+func (t *DeleteRecordsRequestTopic) decode(pd packetDecoder, version int16) error {
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	if n > 0 {
+		t.PartitionOffsets = make(map[int32]int64, n)
+		for i := 0; i < n; i++ {
+			partition, err := pd.getInt32()
+			if err != nil {
+				return err
+			}
+			offset, err := pd.getInt64()
+			if err != nil {
+				return err
+			}
+			t.PartitionOffsets[partition] = offset
+		}
+	}
+
+	return nil
+}

+ 36 - 0
delete_records_request_test.go

@@ -0,0 +1,36 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var deleteRecordsRequest = []byte{
+	0, 0, 0, 2,
+	0, 5, 'o', 't', 'h', 'e', 'r',
+	0, 0, 0, 0,
+	0, 5, 't', 'o', 'p', 'i', 'c',
+	0, 0, 0, 2,
+	0, 0, 0, 19,
+	0, 0, 0, 0, 0, 0, 0, 200,
+	0, 0, 0, 20,
+	0, 0, 0, 0, 0, 0, 0, 190,
+	0, 0, 0, 100,
+}
+
+func TestDeleteRecordsRequest(t *testing.T) {
+	req := &DeleteRecordsRequest{
+		Topics: map[string]*DeleteRecordsRequestTopic{
+			"topic": {
+				PartitionOffsets: map[int32]int64{
+					19: 200,
+					20: 190,
+				},
+			},
+			"other": {},
+		},
+		Timeout: 100 * time.Millisecond,
+	}
+
+	testRequest(t, "", req, deleteRecordsRequest)
+}

+ 158 - 0
delete_records_response.go

@@ -0,0 +1,158 @@
+package sarama
+
+import (
+	"sort"
+	"time"
+)
+
+// response message format is:
+// throttleMs(int32) [topic]
+// where topic is:
+//  name(string) [partition]
+// where partition is:
+//  id(int32) low_watermark(int64) error_code(int16)
+
+type DeleteRecordsResponse struct {
+	Version      int16
+	ThrottleTime time.Duration
+	Topics       map[string]*DeleteRecordsResponseTopic
+}
+
+func (d *DeleteRecordsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
+
+	if err := pe.putArrayLength(len(d.Topics)); err != nil {
+		return err
+	}
+	keys := make([]string, 0, len(d.Topics))
+	for topic := range d.Topics {
+		keys = append(keys, topic)
+	}
+	sort.Strings(keys)
+	for _, topic := range keys {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := d.Topics[topic].encode(pe); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (d *DeleteRecordsResponse) decode(pd packetDecoder, version int16) error {
+	d.Version = version
+
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	if n > 0 {
+		d.Topics = make(map[string]*DeleteRecordsResponseTopic, n)
+		for i := 0; i < n; i++ {
+			topic, err := pd.getString()
+			if err != nil {
+				return err
+			}
+			details := new(DeleteRecordsResponseTopic)
+			if err = details.decode(pd, version); err != nil {
+				return err
+			}
+			d.Topics[topic] = details
+		}
+	}
+
+	return nil
+}
+
+func (d *DeleteRecordsResponse) key() int16 {
+	return 21
+}
+
+func (d *DeleteRecordsResponse) version() int16 {
+	return 0
+}
+
+func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}
+
+type DeleteRecordsResponseTopic struct {
+	Partitions map[int32]*DeleteRecordsResponsePartition
+}
+
+func (t *DeleteRecordsResponseTopic) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(t.Partitions)); err != nil {
+		return err
+	}
+	keys := make([]int32, 0, len(t.Partitions))
+	for partition := range t.Partitions {
+		keys = append(keys, partition)
+	}
+	sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
+	for _, partition := range keys {
+		pe.putInt32(partition)
+		if err := t.Partitions[partition].encode(pe); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (t *DeleteRecordsResponseTopic) decode(pd packetDecoder, version int16) error {
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	if n > 0 {
+		t.Partitions = make(map[int32]*DeleteRecordsResponsePartition, n)
+		for i := 0; i < n; i++ {
+			partition, err := pd.getInt32()
+			if err != nil {
+				return err
+			}
+			details := new(DeleteRecordsResponsePartition)
+			if err = details.decode(pd, version); err != nil {
+				return err
+			}
+			t.Partitions[partition] = details
+		}
+	}
+
+	return nil
+}
+
+type DeleteRecordsResponsePartition struct {
+	LowWatermark int64
+	Err          KError
+}
+
+func (t *DeleteRecordsResponsePartition) encode(pe packetEncoder) error {
+	pe.putInt64(t.LowWatermark)
+	pe.putInt16(int16(t.Err))
+	return nil
+}
+
+func (t *DeleteRecordsResponsePartition) decode(pd packetDecoder, version int16) error {
+	lowWatermark, err := pd.getInt64()
+	if err != nil {
+		return err
+	}
+	t.LowWatermark = lowWatermark
+
+	kErr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	t.Err = KError(kErr)
+
+	return nil
+}

+ 39 - 0
delete_records_response_test.go

@@ -0,0 +1,39 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var deleteRecordsResponse = []byte{
+	0, 0, 0, 100,
+	0, 0, 0, 2,
+	0, 5, 'o', 't', 'h', 'e', 'r',
+	0, 0, 0, 0,
+	0, 5, 't', 'o', 'p', 'i', 'c',
+	0, 0, 0, 2,
+	0, 0, 0, 19,
+	0, 0, 0, 0, 0, 0, 0, 200,
+	0, 0,
+	0, 0, 0, 20,
+	255, 255, 255, 255, 255, 255, 255, 255,
+	0, 3,
+}
+
+func TestDeleteRecordsResponse(t *testing.T) {
+	resp := &DeleteRecordsResponse{
+		Version:      0,
+		ThrottleTime: 100 * time.Millisecond,
+		Topics: map[string]*DeleteRecordsResponseTopic{
+			"topic": {
+				Partitions: map[int32]*DeleteRecordsResponsePartition{
+					19: {LowWatermark: 200, Err: 0},
+					20: {LowWatermark: -1, Err: 3},
+				},
+			},
+			"other": {},
+		},
+	}
+
+	testResponse(t, "", resp, deleteRecordsResponse)
+}

+ 2 - 0
request.go

@@ -118,6 +118,8 @@ func allocateBody(key, version int16) protocolBody {
 		return &CreateTopicsRequest{}
 	case 20:
 		return &DeleteTopicsRequest{}
+	case 21:
+		return &DeleteRecordsRequest{}
 	case 22:
 		return &InitProducerIDRequest{}
 	case 24: