Prechádzať zdrojové kódy

fix bug deleteRecord (#1425)

evazca 6 rokov pred
rodič
commit
6fce0f9c8d
3 zmenil súbory, kde vykonal 144 pridanie a 24 odobranie
  1. 40 19
      admin.go
  2. 82 5
      admin_test.go
  3. 22 0
      errors.go

+ 40 - 19
admin.go

@@ -374,29 +374,50 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
 	if topic == "" {
 		return ErrInvalidTopic
 	}
-
-	topics := make(map[string]*DeleteRecordsRequestTopic)
-	topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
-	request := &DeleteRecordsRequest{
-		Topics:  topics,
-		Timeout: ca.conf.Admin.Timeout,
-	}
-
-	b, err := ca.Controller()
-	if err != nil {
-		return err
+	partitionPerBroker := make(map[*Broker][]int32)
+	for partition := range partitionOffsets {
+		broker, err := ca.client.Leader(topic, partition)
+		if err != nil {
+			return err
+		}
+		if _, ok := partitionPerBroker[broker]; ok {
+			partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
+		} else {
+			partitionPerBroker[broker] = []int32{partition}
+		}
 	}
+	errs := make([]error, 0)
+	for broker, partitions := range partitionPerBroker {
+		topics := make(map[string]*DeleteRecordsRequestTopic)
+		recordsToDelete := make(map[int32]int64)
+		for _, p := range partitions {
+			recordsToDelete[p] = partitionOffsets[p]
+		}
+		topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
+		request := &DeleteRecordsRequest{
+			Topics:  topics,
+			Timeout: ca.conf.Admin.Timeout,
+		}
 
-	rsp, err := b.DeleteRecords(request)
-	if err != nil {
-		return err
+		rsp, err := broker.DeleteRecords(request)
+		if err != nil {
+			errs = append(errs, err)
+		} else {
+			deleteRecordsResponseTopic, ok := rsp.Topics[topic]
+			if !ok {
+				errs = append(errs, ErrIncompleteResponse)
+			} else {
+				for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
+					if deleteRecordsResponsePartition.Err != ErrNoError {
+						errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
+					}
+				}
+			}
+		}
 	}
-
-	_, ok := rsp.Topics[topic]
-	if !ok {
-		return ErrIncompleteResponse
+	if len(errs) > 0 {
+		return ErrDeleteRecords{MultiError{&errs}}
 	}
-
 	//todo since we are dealing with couple of partitions it would be good if we return slice of errors
 	//for each partition instead of one error
 	return nil

+ 82 - 5
admin_test.go

@@ -333,13 +333,17 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
 }
 
 func TestClusterAdminDeleteRecords(t *testing.T) {
+	topicName := "my_topic"
 	seedBroker := NewMockBroker(t, 1)
 	defer seedBroker.Close()
 
 	seedBroker.SetHandlerByMap(map[string]MockResponse{
 		"MetadataRequest": NewMockMetadataResponse(t).
 			SetController(seedBroker.BrokerID()).
-			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetLeader(topicName, 1, 1).
+			SetLeader(topicName, 2, 1).
+			SetLeader(topicName, 3, 1),
 		"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
 	})
 
@@ -350,12 +354,70 @@ func TestClusterAdminDeleteRecords(t *testing.T) {
 		t.Fatal(err)
 	}
 
+	partitionOffsetFake := make(map[int32]int64)
+	partitionOffsetFake[4] = 1000
+	errFake := admin.DeleteRecords(topicName, partitionOffsetFake)
+	if errFake == nil {
+		t.Fatal(err)
+	}
+
+	partitionOffset := make(map[int32]int64)
+	partitionOffset[1] = 1000
+	partitionOffset[2] = 1000
+	partitionOffset[3] = 1000
+
+	err = admin.DeleteRecords(topicName, partitionOffset)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminDeleteRecordsWithInCorrectBroker(t *testing.T) {
+	topicName := "my_topic"
+	seedBroker := NewMockBroker(t, 1)
+	secondBroker := NewMockBroker(t, 2)
+	defer seedBroker.Close()
+	defer secondBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetBroker(secondBroker.Addr(), secondBroker.brokerID).
+			SetLeader(topicName, 1, 1).
+			SetLeader(topicName, 2, 1).
+			SetLeader(topicName, 3, 2),
+		"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
+	})
+
+	secondBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetBroker(secondBroker.Addr(), secondBroker.brokerID).
+			SetLeader(topicName, 1, 1).
+			SetLeader(topicName, 2, 1).
+			SetLeader(topicName, 3, 2),
+		"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
 	partitionOffset := make(map[int32]int64)
 	partitionOffset[1] = 1000
 	partitionOffset[2] = 1000
 	partitionOffset[3] = 1000
 
-	err = admin.DeleteRecords("my_topic", partitionOffset)
+	err = admin.DeleteRecords(topicName, partitionOffset)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -367,13 +429,17 @@ func TestClusterAdminDeleteRecords(t *testing.T) {
 }
 
 func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
+	topicName := "my_topic"
 	seedBroker := NewMockBroker(t, 1)
 	defer seedBroker.Close()
 
 	seedBroker.SetHandlerByMap(map[string]MockResponse{
 		"MetadataRequest": NewMockMetadataResponse(t).
 			SetController(seedBroker.BrokerID()).
-			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetLeader(topicName, 1, 1).
+			SetLeader(topicName, 2, 1).
+			SetLeader(topicName, 3, 1),
 		"DeleteRecordsRequest": NewMockDeleteRecordsResponse(t),
 	})
 
@@ -389,10 +455,21 @@ func TestClusterAdminDeleteRecordsWithDiffVersion(t *testing.T) {
 	partitionOffset[2] = 1000
 	partitionOffset[3] = 1000
 
-	err = admin.DeleteRecords("my_topic", partitionOffset)
-	if err != ErrUnsupportedVersion {
+	err = admin.DeleteRecords(topicName, partitionOffset)
+	if !strings.HasPrefix(err.Error(), "kafka server: failed to delete records") {
 		t.Fatal(err)
 	}
+	deleteRecordsError, ok := err.(ErrDeleteRecords)
+
+	if !ok {
+		t.Fatal(err)
+	}
+
+	for _, err := range *deleteRecordsError.Errors {
+		if err != ErrUnsupportedVersion {
+			t.Fatal(err)
+		}
+	}
 
 	err = admin.Close()
 	if err != nil {

+ 22 - 0
errors.go

@@ -81,6 +81,28 @@ func (err ConfigurationError) Error() string {
 // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
 type KError int16
 
+// MultiError is used to contain multi error.
+type MultiError struct {
+	Errors *[]error
+}
+
+func (mErr MultiError) Error() string {
+	var errString = ""
+	for _, err := range *mErr.Errors {
+		errString += err.Error() + ","
+	}
+	return errString
+}
+
+// ErrDeleteRecords is the type of error returned when fail to delete the required records
+type ErrDeleteRecords struct {
+	MultiError
+}
+
+func (err ErrDeleteRecords) Error() string {
+	return "kafka server: failed to delete records " + err.MultiError.Error()
+}
+
 // Numeric error codes returned by the Kafka server.
 const (
 	ErrNoError                            KError = 0