Browse Source

add CreateTopicsRequest/Response

Robin 8 years ago
parent
commit
b5ace411a0
6 changed files with 401 additions and 0 deletions
  1. 11 0
      broker.go
  2. 174 0
      create_topics_request.go
  3. 50 0
      create_topics_request_test.go
  4. 112 0
      create_topics_response.go
  5. 52 0
      create_topics_response_test.go
  6. 2 0
      request.go

+ 11 - 0
broker.go

@@ -373,6 +373,17 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse,
 	return response, nil
 }
 
+func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
+	response := new(CreateTopicsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()

+ 174 - 0
create_topics_request.go

@@ -0,0 +1,174 @@
+package sarama
+
+import (
+	"time"
+)
+
+type CreateTopicsRequest struct {
+	Version int16
+
+	TopicDetails map[string]*TopicDetail
+	Timeout      time.Duration
+	ValidateOnly bool
+}
+
+func (c *CreateTopicsRequest) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(c.TopicDetails)); err != nil {
+		return err
+	}
+	for topic, detail := range c.TopicDetails {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := detail.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	pe.putInt32(int32(c.Timeout / time.Millisecond))
+
+	if c.Version >= 1 {
+		pe.putBool(c.ValidateOnly)
+	}
+
+	return nil
+}
+
+func (c *CreateTopicsRequest) decode(pd packetDecoder, version int16) (err error) {
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	c.TopicDetails = make(map[string]*TopicDetail, n)
+
+	for i := 0; i < n; i++ {
+		topic, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		c.TopicDetails[topic] = new(TopicDetail)
+		if err = c.TopicDetails[topic].decode(pd, version); err != nil {
+			return err
+		}
+	}
+
+	timeout, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	c.Timeout = time.Duration(timeout) * time.Millisecond
+
+	if version >= 1 {
+		c.ValidateOnly, err = pd.getBool()
+		if err != nil {
+			return err
+		}
+
+		c.Version = version
+	}
+
+	return nil
+}
+
+func (c *CreateTopicsRequest) key() int16 {
+	return 19
+}
+
+func (c *CreateTopicsRequest) version() int16 {
+	return c.Version
+}
+
+func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
+	switch c.Version {
+	case 2:
+		return V1_0_0_0
+	case 1:
+		return V0_11_0_0
+	default:
+		return V0_10_1_0
+	}
+}
+
+type TopicDetail struct {
+	NumPartitions     int32
+	ReplicationFactor int16
+	ReplicaAssignment map[int32][]int32
+	ConfigEntries     map[string]*string
+}
+
+func (t *TopicDetail) encode(pe packetEncoder) error {
+	pe.putInt32(t.NumPartitions)
+	pe.putInt16(t.ReplicationFactor)
+
+	if err := pe.putArrayLength(len(t.ReplicaAssignment)); err != nil {
+		return err
+	}
+	for partition, assignment := range t.ReplicaAssignment {
+		pe.putInt32(partition)
+		if err := pe.putInt32Array(assignment); err != nil {
+			return err
+		}
+	}
+
+	if err := pe.putArrayLength(len(t.ConfigEntries)); err != nil {
+		return err
+	}
+	for configKey, configValue := range t.ConfigEntries {
+		if err := pe.putString(configKey); err != nil {
+			return err
+		}
+		if err := pe.putNullableString(configValue); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (t *TopicDetail) decode(pd packetDecoder, version int16) (err error) {
+	if t.NumPartitions, err = pd.getInt32(); err != nil {
+		return err
+	}
+	if t.ReplicationFactor, err = pd.getInt16(); err != nil {
+		return err
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	if n > 0 {
+		t.ReplicaAssignment = make(map[int32][]int32, n)
+		for i := 0; i < n; i++ {
+			replica, err := pd.getInt32()
+			if err != nil {
+				return err
+			}
+			if t.ReplicaAssignment[replica], err = pd.getInt32Array(); err != nil {
+				return err
+			}
+		}
+	}
+
+	n, err = pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	if n > 0 {
+		t.ConfigEntries = make(map[string]*string, n)
+		for i := 0; i < n; i++ {
+			configKey, err := pd.getString()
+			if err != nil {
+				return err
+			}
+			if t.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}

+ 50 - 0
create_topics_request_test.go

@@ -0,0 +1,50 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	createTopicsRequestV0 = []byte{
+		0, 0, 0, 1,
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		255, 255, 255, 255,
+		255, 255,
+		0, 0, 0, 1, // 1 replica assignment
+		0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2,
+		0, 0, 0, 1, // 1 config
+		0, 12, 'r', 'e', 't', 'e', 'n', 't', 'i', 'o', 'n', '.', 'm', 's',
+		0, 2, '-', '1',
+		0, 0, 0, 100,
+	}
+
+	createTopicsRequestV1 = append(createTopicsRequestV0, byte(1))
+)
+
+func TestCreateTopicsRequest(t *testing.T) {
+	retention := "-1"
+
+	req := &CreateTopicsRequest{
+		TopicDetails: map[string]*TopicDetail{
+			"topic": {
+				NumPartitions:     -1,
+				ReplicationFactor: -1,
+				ReplicaAssignment: map[int32][]int32{
+					0: []int32{0, 1, 2},
+				},
+				ConfigEntries: map[string]*string{
+					"retention.ms": &retention,
+				},
+			},
+		},
+		Timeout: 100 * time.Millisecond,
+	}
+
+	testRequest(t, "version 0", req, createTopicsRequestV0)
+
+	req.Version = 1
+	req.ValidateOnly = true
+
+	testRequest(t, "version 1", req, createTopicsRequestV1)
+}

+ 112 - 0
create_topics_response.go

@@ -0,0 +1,112 @@
+package sarama
+
+import "time"
+
+type CreateTopicsResponse struct {
+	Version      int16
+	ThrottleTime time.Duration
+	TopicErrors  map[string]*TopicError
+}
+
+func (c *CreateTopicsResponse) encode(pe packetEncoder) error {
+	if c.Version >= 2 {
+		pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
+	}
+
+	if err := pe.putArrayLength(len(c.TopicErrors)); err != nil {
+		return err
+	}
+	for topic, topicError := range c.TopicErrors {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := topicError.encode(pe, c.Version); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (c *CreateTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
+	c.Version = version
+
+	if version >= 2 {
+		throttleTime, err := pd.getInt32()
+		if err != nil {
+			return err
+		}
+		c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	c.TopicErrors = make(map[string]*TopicError, n)
+	for i := 0; i < n; i++ {
+		topic, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		c.TopicErrors[topic] = new(TopicError)
+		if err := c.TopicErrors[topic].decode(pd, version); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (c *CreateTopicsResponse) key() int16 {
+	return 19
+}
+
+func (c *CreateTopicsResponse) version() int16 {
+	return c.Version
+}
+
+func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
+	switch c.Version {
+	case 2:
+		return V1_0_0_0
+	case 1:
+		return V0_11_0_0
+	default:
+		return V0_10_1_0
+	}
+}
+
+type TopicError struct {
+	Err    KError
+	ErrMsg *string
+}
+
+func (t *TopicError) encode(pe packetEncoder, version int16) error {
+	pe.putInt16(int16(t.Err))
+
+	if version >= 1 {
+		if err := pe.putNullableString(t.ErrMsg); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (t *TopicError) decode(pd packetDecoder, version int16) (err error) {
+	kErr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	t.Err = KError(kErr)
+
+	if version >= 1 {
+		if t.ErrMsg, err = pd.getNullableString(); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}

+ 52 - 0
create_topics_response_test.go

@@ -0,0 +1,52 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	createTopicsResponseV0 = []byte{
+		0, 0, 0, 1,
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		0, 42,
+	}
+
+	createTopicsResponseV1 = []byte{
+		0, 0, 0, 1,
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		0, 42,
+		0, 3, 'm', 's', 'g',
+	}
+
+	createTopicsResponseV2 = []byte{
+		0, 0, 0, 100,
+		0, 0, 0, 1,
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		0, 42,
+		0, 3, 'm', 's', 'g',
+	}
+)
+
+func TestCreateTopicsResponse(t *testing.T) {
+	resp := &CreateTopicsResponse{
+		TopicErrors: map[string]*TopicError{
+			"topic": &TopicError{
+				Err: ErrInvalidRequest,
+			},
+		},
+	}
+
+	testResponse(t, "version 0", resp, createTopicsResponseV0)
+
+	resp.Version = 1
+	msg := "msg"
+	resp.TopicErrors["topic"].ErrMsg = &msg
+
+	testResponse(t, "version 1", resp, createTopicsResponseV1)
+
+	resp.Version = 2
+	resp.ThrottleTime = 100 * time.Millisecond
+
+	testResponse(t, "version 2", resp, createTopicsResponseV2)
+}

+ 2 - 0
request.go

@@ -114,6 +114,8 @@ func allocateBody(key, version int16) protocolBody {
 		return &SaslHandshakeRequest{}
 	case 18:
 		return &ApiVersionsRequest{}
+	case 19:
+		return &CreateTopicsRequest{}
 	case 37:
 		return &CreatePartitionsRequest{}
 	}