Forráskód Böngészése

Merge pull request #1738 from varun06/varun-fix-package-name-as-identifier

fixed variable names that are named same as some std lib package names
Diego Alvarez 3 éve
szülő
commit
4bc9b8f84a
6 módosított fájl, 29 hozzáadás és 30 törlés
  1. 0 1
      .gitignore
  2. 8 8
      admin.go
  3. 7 7
      async_producer_test.go
  4. 3 3
      balance_strategy.go
  5. 4 4
      consumer.go
  6. 7 7
      consumer_group.go

+ 0 - 1
.gitignore

@@ -27,4 +27,3 @@ coverage.txt
 profile.out
 
 simplest-uncommitted-msg-0.1-jar-with-dependencies.jar
-

+ 8 - 8
admin.go

@@ -802,7 +802,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
 	// Query brokers in parallel, since we have to query *all* brokers
 	brokers := ca.client.Brokers()
 	groupMaps := make(chan map[string]string, len(brokers))
-	errors := make(chan error, len(brokers))
+	errChan := make(chan error, len(brokers))
 	wg := sync.WaitGroup{}
 
 	for _, b := range brokers {
@@ -813,7 +813,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
 
 			response, err := b.ListGroups(&ListGroupsRequest{})
 			if err != nil {
-				errors <- err
+				errChan <- err
 				return
 			}
 
@@ -828,7 +828,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
 
 	wg.Wait()
 	close(groupMaps)
-	close(errors)
+	close(errChan)
 
 	for groupMap := range groupMaps {
 		for group, protocolType := range groupMap {
@@ -837,7 +837,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
 	}
 
 	// Intentionally return only the first error for simplicity
-	err = <-errors
+	err = <-errChan
 	return
 }
 
@@ -893,7 +893,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
 
 	// Query brokers in parallel, since we may have to query multiple brokers
 	logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
-	errors := make(chan error, len(brokerIds))
+	errChan := make(chan error, len(brokerIds))
 	wg := sync.WaitGroup{}
 
 	for _, b := range brokerIds {
@@ -909,7 +909,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
 
 			response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
 			if err != nil {
-				errors <- err
+				errChan <- err
 				return
 			}
 			logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
@@ -920,7 +920,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
 
 	wg.Wait()
 	close(logDirsMaps)
-	close(errors)
+	close(errChan)
 
 	for logDirsMap := range logDirsMaps {
 		for id, logDirs := range logDirsMap {
@@ -929,6 +929,6 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
 	}
 
 	// Intentionally return only the first error for simplicity
-	err = <-errors
+	err = <-errChan
 	return
 }

+ 7 - 7
async_producer_test.go

@@ -1369,7 +1369,7 @@ func ExampleAsyncProducer_select() {
 	signals := make(chan os.Signal, 1)
 	signal.Notify(signals, os.Interrupt)
 
-	var enqueued, errors int
+	var enqueued, producerErrors int
 ProducerLoop:
 	for {
 		select {
@@ -1377,13 +1377,13 @@ ProducerLoop:
 			enqueued++
 		case err := <-producer.Errors():
 			log.Println("Failed to produce message", err)
-			errors++
+			producerErrors++
 		case <-signals:
 			break ProducerLoop
 		}
 	}
 
-	log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
+	log.Printf("Enqueued: %d; errors: %d\n", enqueued, producerErrors)
 }
 
 // This example shows how to use the producer with separate goroutines
@@ -1403,8 +1403,8 @@ func ExampleAsyncProducer_goroutines() {
 	signal.Notify(signals, os.Interrupt)
 
 	var (
-		wg                          sync.WaitGroup
-		enqueued, successes, errors int
+		wg                                  sync.WaitGroup
+		enqueued, successes, producerErrors int
 	)
 
 	wg.Add(1)
@@ -1420,7 +1420,7 @@ func ExampleAsyncProducer_goroutines() {
 		defer wg.Done()
 		for err := range producer.Errors() {
 			log.Println(err)
-			errors++
+			producerErrors++
 		}
 	}()
 
@@ -1439,5 +1439,5 @@ ProducerLoop:
 
 	wg.Wait()
 
-	log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
+	log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)
 }

+ 3 - 3
balance_strategy.go

@@ -690,11 +690,11 @@ func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumer
 }
 
 func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
-	copy := make(map[string][]topicPartitionAssignment, len(assignment))
+	m := make(map[string][]topicPartitionAssignment, len(assignment))
 	for memberID, subscriptions := range assignment {
-		copy[memberID] = append(subscriptions[:0:0], subscriptions...)
+		m[memberID] = append(subscriptions[:0:0], subscriptions...)
 	}
-	return copy
+	return m
 }
 
 func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {

+ 4 - 4
consumer.go

@@ -422,13 +422,13 @@ func (child *partitionConsumer) AsyncClose() {
 func (child *partitionConsumer) Close() error {
 	child.AsyncClose()
 
-	var errors ConsumerErrors
+	var consumerErrors ConsumerErrors
 	for err := range child.errors {
-		errors = append(errors, err)
+		consumerErrors = append(consumerErrors, err)
 	}
 
-	if len(errors) > 0 {
-		return errors
+	if len(consumerErrors) > 0 {
+		return consumerErrors
 	}
 	return nil
 }

+ 7 - 7
consumer_group.go

@@ -255,36 +255,36 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
 	}
 
 	// Sync consumer group
-	sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
+	groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
 	if err != nil {
 		_ = coordinator.Close()
 		return nil, err
 	}
-	switch sync.Err {
+	switch groupRequest.Err {
 	case ErrNoError:
 	case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
 		c.memberID = ""
 		return c.newSession(ctx, topics, handler, retries)
 	case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
 		if retries <= 0 {
-			return nil, sync.Err
+			return nil, groupRequest.Err
 		}
 
 		return c.retryNewSession(ctx, topics, handler, retries, true)
 	case ErrRebalanceInProgress: // retry after backoff
 		if retries <= 0 {
-			return nil, sync.Err
+			return nil, groupRequest.Err
 		}
 
 		return c.retryNewSession(ctx, topics, handler, retries, false)
 	default:
-		return nil, sync.Err
+		return nil, groupRequest.Err
 	}
 
 	// Retrieve and sort claims
 	var claims map[string][]int32
-	if len(sync.MemberAssignment) > 0 {
-		members, err := sync.GetMemberAssignment()
+	if len(groupRequest.MemberAssignment) > 0 {
+		members, err := groupRequest.GetMemberAssignment()
 		if err != nil {
 			return nil, err
 		}