Преглед изворни кода

add DeleteTopicsRequest/Response

Robin пре 6 година
родитељ
комит
620549d3ed
6 измењених фајлова са 190 додато и 0 уклоњено
  1. 11 0
      broker.go
  2. 41 0
      delete_topics_request.go
  3. 22 0
      delete_topics_request_test.go
  4. 78 0
      delete_topics_response.go
  5. 36 0
      delete_topics_response_test.go
  6. 2 0
      request.go

+ 11 - 0
broker.go

@@ -384,6 +384,17 @@ func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsRespon
 	return response, nil
 }
 
+func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
+	response := new(DeleteTopicsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()

+ 41 - 0
delete_topics_request.go

@@ -0,0 +1,41 @@
+package sarama
+
+import "time"
+
+type DeleteTopicsRequest struct {
+	Topics  []string
+	Timeout time.Duration
+}
+
+func (d *DeleteTopicsRequest) encode(pe packetEncoder) error {
+	if err := pe.putStringArray(d.Topics); err != nil {
+		return err
+	}
+	pe.putInt32(int32(d.Timeout / time.Millisecond))
+
+	return nil
+}
+
+func (d *DeleteTopicsRequest) decode(pd packetDecoder, version int16) (err error) {
+	if d.Topics, err = pd.getStringArray(); err != nil {
+		return err
+	}
+	timeout, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	d.Timeout = time.Duration(timeout) * time.Millisecond
+	return nil
+}
+
+func (d *DeleteTopicsRequest) key() int16 {
+	return 20
+}
+
+func (d *DeleteTopicsRequest) version() int16 {
+	return 0
+}
+
+func (d *DeleteTopicsRequest) requiredVersion() KafkaVersion {
+	return V0_10_1_0
+}

+ 22 - 0
delete_topics_request_test.go

@@ -0,0 +1,22 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var deleteTopicsRequest = []byte{
+	0, 0, 0, 2,
+	0, 5, 't', 'o', 'p', 'i', 'c',
+	0, 5, 'o', 't', 'h', 'e', 'r',
+	0, 0, 0, 100,
+}
+
+func TestDeleteTopicsRequest(t *testing.T) {
+	req := &DeleteTopicsRequest{
+		Topics:  []string{"topic", "other"},
+		Timeout: 100 * time.Millisecond,
+	}
+
+	testRequest(t, "", req, deleteTopicsRequest)
+}

+ 78 - 0
delete_topics_response.go

@@ -0,0 +1,78 @@
+package sarama
+
+import "time"
+
+type DeleteTopicsResponse struct {
+	Version         int16
+	ThrottleTime    time.Duration
+	TopicErrorCodes map[string]KError
+}
+
+func (d *DeleteTopicsResponse) encode(pe packetEncoder) error {
+	if d.Version >= 1 {
+		pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
+	}
+
+	if err := pe.putArrayLength(len(d.TopicErrorCodes)); err != nil {
+		return err
+	}
+	for topic, errorCode := range d.TopicErrorCodes {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		pe.putInt16(int16(errorCode))
+	}
+
+	return nil
+}
+
+func (d *DeleteTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
+	if version >= 1 {
+		throttleTime, err := pd.getInt32()
+		if err != nil {
+			return err
+		}
+		d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+		d.Version = version
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	d.TopicErrorCodes = make(map[string]KError, n)
+
+	for i := 0; i < n; i++ {
+		topic, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		errorCode, err := pd.getInt16()
+		if err != nil {
+			return err
+		}
+
+		d.TopicErrorCodes[topic] = KError(errorCode)
+	}
+
+	return nil
+}
+
+func (d *DeleteTopicsResponse) key() int16 {
+	return 20
+}
+
+func (d *DeleteTopicsResponse) version() int16 {
+	return d.Version
+}
+
+func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion {
+	switch d.Version {
+	case 1:
+		return V0_11_0_0
+	default:
+		return V0_10_1_0
+	}
+}

+ 36 - 0
delete_topics_response_test.go

@@ -0,0 +1,36 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	deleteTopicsResponseV0 = []byte{
+		0, 0, 0, 1,
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		0, 0,
+	}
+
+	deleteTopicsResponseV1 = []byte{
+		0, 0, 0, 100,
+		0, 0, 0, 1,
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		0, 0,
+	}
+)
+
+func TestDeleteTopicsResponse(t *testing.T) {
+	resp := &DeleteTopicsResponse{
+		TopicErrorCodes: map[string]KError{
+			"topic": ErrNoError,
+		},
+	}
+
+	testResponse(t, "version 0", resp, deleteTopicsResponseV0)
+
+	resp.Version = 1
+	resp.ThrottleTime = 100 * time.Millisecond
+
+	testResponse(t, "version 1", resp, deleteTopicsResponseV1)
+}

+ 2 - 0
request.go

@@ -116,6 +116,8 @@ func allocateBody(key, version int16) protocolBody {
 		return &ApiVersionsRequest{}
 	case 19:
 		return &CreateTopicsRequest{}
+	case 20:
+		return &DeleteTopicsRequest{}
 	case 37:
 		return &CreatePartitionsRequest{}
 	}