Browse Source

Merge pull request #1520 from weeco/add-describe-log-dirs

Add describe log dirs request and response
Vlad Gorodetsky 5 years ago
parent
commit
96122a6f6b

+ 12 - 0
broker.go

@@ -659,6 +659,18 @@ func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsRespon
 	return response, nil
 }
 
+//DescribeLogDirs sends a request to get the broker's log dir paths and sizes
+func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) {
+	response := new(DescribeLogDirsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()

+ 83 - 0
describe_log_dirs_request.go

@@ -0,0 +1,83 @@
+package sarama
+
+// DescribeLogDirsRequest is a describe request to get partitions' log size
+type DescribeLogDirsRequest struct {
+	// Version 0 and 1 are equal
+	// The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+	Version int16
+
+	// If this is an empty array, all topics will be queried
+	DescribeTopics []DescribeLogDirsRequestTopic
+}
+
+// DescribeLogDirsRequestTopic is a describe request about the log dir of one or more partitions within a Topic
+type DescribeLogDirsRequestTopic struct {
+	Topic        string
+	PartitionIDs []int32
+}
+
+func (r *DescribeLogDirsRequest) encode(pe packetEncoder) error {
+	length := len(r.DescribeTopics)
+	if length == 0 {
+		// In order to query all topics we must send null
+		length = -1
+	}
+
+	if err := pe.putArrayLength(length); err != nil {
+		return err
+	}
+
+	for _, d := range r.DescribeTopics {
+		if err := pe.putString(d.Topic); err != nil {
+			return err
+		}
+
+		if err := pe.putInt32Array(d.PartitionIDs); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *DescribeLogDirsRequest) decode(pd packetDecoder, version int16) error {
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	if n == -1 {
+		n = 0
+	}
+
+	topics := make([]DescribeLogDirsRequestTopic, n)
+	for i := 0; i < n; i++ {
+		topics[i] = DescribeLogDirsRequestTopic{}
+
+		topic, err := pd.getString()
+		if err != nil {
+			return err
+		}
+		topics[i].Topic = topic
+
+		pIDs, err := pd.getInt32Array()
+		if err != nil {
+			return err
+		}
+		topics[i].PartitionIDs = pIDs
+	}
+	r.DescribeTopics = topics
+
+	return nil
+}
+
+func (r *DescribeLogDirsRequest) key() int16 {
+	return 35
+}
+
+func (r *DescribeLogDirsRequest) version() int16 {
+	return r.Version
+}
+
+func (r *DescribeLogDirsRequest) requiredVersion() KafkaVersion {
+	return V1_0_0_0
+}

+ 31 - 0
describe_log_dirs_request_test.go

@@ -0,0 +1,31 @@
+package sarama
+
+import "testing"
+
+var (
+	emptyDescribeLogDirsRequest = []byte{255, 255, 255, 255} // Empty array (array length -1 sent)
+	topicDescribeLogDirsRequest = []byte{
+		0, 0, 0, 1, // DescribeTopics array, Array length 1
+		0, 6, // Topic name length 6
+		'r', 'a', 'n', 'd', 'o', 'm', // Topic name
+		0, 0, 0, 2, // PartitionIDs int32 array, Array length 2
+		0, 0, 0, 25, // PartitionID 25
+		0, 0, 0, 26, // PartitionID 26
+	}
+)
+
+func TestDescribeLogDirsRequest(t *testing.T) {
+	request := &DescribeLogDirsRequest{
+		Version:        0,
+		DescribeTopics: []DescribeLogDirsRequestTopic{},
+	}
+	testRequest(t, "no topics", request, emptyDescribeLogDirsRequest)
+
+	request.DescribeTopics = []DescribeLogDirsRequestTopic{
+		DescribeLogDirsRequestTopic{
+			Topic:        "random",
+			PartitionIDs: []int32{25, 26},
+		},
+	}
+	testRequest(t, "no topics", request, topicDescribeLogDirsRequest)
+}

+ 219 - 0
describe_log_dirs_response.go

@@ -0,0 +1,219 @@
+package sarama
+
+import "time"
+
+type DescribeLogDirsResponse struct {
+	ThrottleTime time.Duration
+
+	// Version 0 and 1 are equal
+	// The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
+	Version int16
+
+	LogDirs []DescribeLogDirsResponseDirMetadata
+}
+
+func (r *DescribeLogDirsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
+
+	if err := pe.putArrayLength(len(r.LogDirs)); err != nil {
+		return err
+	}
+
+	for _, dir := range r.LogDirs {
+		if err := dir.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *DescribeLogDirsResponse) decode(pd packetDecoder, version int16) error {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	// Decode array of DescribeLogDirsResponseDirMetadata
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	r.LogDirs = make([]DescribeLogDirsResponseDirMetadata, n)
+	for i := 0; i < n; i++ {
+		dir := DescribeLogDirsResponseDirMetadata{}
+		if err := dir.decode(pd, version); err != nil {
+			return err
+		}
+		r.LogDirs[i] = dir
+	}
+
+	return nil
+}
+
+func (r *DescribeLogDirsResponse) key() int16 {
+	return 35
+}
+
+func (r *DescribeLogDirsResponse) version() int16 {
+	return r.Version
+}
+
+func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion {
+	return V1_0_0_0
+}
+
+type DescribeLogDirsResponseDirMetadata struct {
+	ErrorCode KError
+
+	// The absolute log directory path
+	Path   string
+	Topics []DescribeLogDirsResponseTopic
+}
+
+func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error {
+	pe.putInt16(int16(r.ErrorCode))
+
+	if err := pe.putString(r.Path); err != nil {
+		return err
+	}
+
+	for _, topic := range r.Topics {
+		if err := topic.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *DescribeLogDirsResponseDirMetadata) decode(pd packetDecoder, version int16) error {
+	errCode, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	r.ErrorCode = KError(errCode)
+
+	path, err := pd.getString()
+	if err != nil {
+		return err
+	}
+	r.Path = path
+
+	// Decode array of DescribeLogDirsResponseTopic
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	r.Topics = make([]DescribeLogDirsResponseTopic, n)
+	for i := 0; i < n; i++ {
+		t := DescribeLogDirsResponseTopic{}
+
+		if err := t.decode(pd, version); err != nil {
+			return err
+		}
+
+		r.Topics[i] = t
+	}
+
+	return nil
+}
+
+// DescribeLogDirsResponseTopic contains a topic's partitions descriptions
+type DescribeLogDirsResponseTopic struct {
+	Topic      string
+	Partitions []DescribeLogDirsResponsePartition
+}
+
+func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error {
+	if err := pe.putString(r.Topic); err != nil {
+		return err
+	}
+
+	for _, partition := range r.Partitions {
+		if err := partition.encode(pe); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (r *DescribeLogDirsResponseTopic) decode(pd packetDecoder, version int16) error {
+	t, err := pd.getString()
+	if err != nil {
+		return err
+	}
+	r.Topic = t
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	r.Partitions = make([]DescribeLogDirsResponsePartition, n)
+	for i := 0; i < n; i++ {
+		p := DescribeLogDirsResponsePartition{}
+		if err := p.decode(pd, version); err != nil {
+			return err
+		}
+		r.Partitions[i] = p
+	}
+
+	return nil
+}
+
+// DescribeLogDirsResponsePartition describes a partition's log directory
+type DescribeLogDirsResponsePartition struct {
+	PartitionID int32
+
+	// The size of the log segments of the partition in bytes.
+	Size int64
+
+	// The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or
+	// current replica's LEO (if it is the future log for the partition)
+	OffsetLag int64
+
+	// True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of
+	// the replica in the future.
+	IsTemporary bool
+}
+
+func (r *DescribeLogDirsResponsePartition) encode(pe packetEncoder) error {
+	pe.putInt32(r.PartitionID)
+	pe.putInt64(r.Size)
+	pe.putInt64(r.OffsetLag)
+	pe.putBool(r.IsTemporary)
+
+	return nil
+}
+
+func (r *DescribeLogDirsResponsePartition) decode(pd packetDecoder, version int16) error {
+	pID, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	r.PartitionID = pID
+
+	size, err := pd.getInt64()
+	if err != nil {
+		return err
+	}
+	r.Size = size
+
+	lag, err := pd.getInt64()
+	if err != nil {
+		return err
+	}
+	r.OffsetLag = lag
+
+	isTemp, err := pd.getBool()
+	if err != nil {
+		return err
+	}
+	r.IsTemporary = isTemp
+
+	return nil
+}

+ 79 - 0
describe_log_dirs_response_test.go

@@ -0,0 +1,79 @@
+package sarama
+
+import (
+	"testing"
+)
+
+var (
+	describeLogDirsResponseEmpty = []byte{
+		0, 0, 0, 0, // no throttle time
+		0, 0, 0, 0, // no log dirs
+	}
+
+	describeLogDirsResponseTwoPartitions = []byte{
+		0, 0, 0, 0, // no throttle time
+		0, 0, 0, 1, // One describe log dir (array length)
+		0, 0, // No error code
+		0, 6, // Character length of path (6 chars)
+		'/', 'k', 'a', 'f', 'k', 'a',
+		0, 0, 0, 1, // One DescribeLogDirsResponseTopic (array length)
+		0, 6, // Character length of "random" topic (6 chars)
+		'r', 'a', 'n', 'd', 'o', 'm', // Topic name
+		0, 0, 0, 2, // Two DescribeLogDirsResponsePartition (array length)
+		0, 0, 0, 25, // PartitionID 25
+		0, 0, 0, 0, 0, 0, 0, 125, // Log Size
+		0, 0, 0, 0, 0, 0, 0, 0, // OffsetLag
+		0,           // IsTemporary = false
+		0, 0, 0, 26, // PartitionID 25
+		0, 0, 0, 0, 0, 0, 0, 100, // Log Size
+		0, 0, 0, 0, 0, 0, 0, 0, // OffsetLag
+		0, // IsTemporary = false
+	}
+)
+
+func TestDescribeLogDirsResponse(t *testing.T) {
+	// Test empty response
+	response := &DescribeLogDirsResponse{
+		LogDirs: []DescribeLogDirsResponseDirMetadata{},
+	}
+	testVersionDecodable(t, "empty", response, describeLogDirsResponseEmpty, 0)
+	if len(response.LogDirs) != 0 {
+		t.Error("Expected no log dirs")
+	}
+
+	response.LogDirs = []DescribeLogDirsResponseDirMetadata{
+		DescribeLogDirsResponseDirMetadata{
+			ErrorCode: 0,
+			Path:      "/kafka",
+			Topics: []DescribeLogDirsResponseTopic{
+				DescribeLogDirsResponseTopic{
+					Topic: "random",
+					Partitions: []DescribeLogDirsResponsePartition{
+						DescribeLogDirsResponsePartition{
+							PartitionID: 25,
+							Size:        125,
+							OffsetLag:   0,
+							IsTemporary: false,
+						},
+						DescribeLogDirsResponsePartition{
+							PartitionID: 26,
+							Size:        100,
+							OffsetLag:   0,
+							IsTemporary: false,
+						},
+					},
+				},
+			},
+		},
+	}
+	testVersionDecodable(t, "two partitions", response, describeLogDirsResponseTwoPartitions, 0)
+	if len(response.LogDirs) != 1 {
+		t.Error("Expected one log dirs")
+	}
+	if len(response.LogDirs[0].Topics) != 1 {
+		t.Error("Expected one topic in log dirs")
+	}
+	if len(response.LogDirs[0].Topics[0].Partitions) != 2 {
+		t.Error("Expected two partitions")
+	}
+}

+ 2 - 0
request.go

@@ -158,6 +158,8 @@ func allocateBody(key, version int16) protocolBody {
 		return &DescribeConfigsRequest{}
 	case 33:
 		return &AlterConfigsRequest{}
+	case 35:
+		return &DescribeLogDirsRequest{}
 	case 36:
 		return &SaslAuthenticateRequest{}
 	case 37: