Procházet zdrojové kódy

Various doc and naming tweaks from code review.

Evan Huus před 10 roky
rodič
revize
0e95e22dd4
4 změnil soubory, kde provedl 29 přidání a 25 odebrání
  1. 6 6
      client.go
  2. 2 2
      client_test.go
  3. 20 16
      config.go
  4. 1 1
      functional_test.go

+ 6 - 6
client.go

@@ -245,13 +245,13 @@ func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
 // RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
 // available metadata for those topics.
 func (client *Client) RefreshTopicMetadata(topics ...string) error {
-	return client.refreshMetadata(topics, client.conf.Metadata.Retries)
+	return client.refreshMetadata(topics, client.conf.Metadata.Retry.Max)
 }
 
 // RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
 func (client *Client) RefreshAllMetadata() error {
 	// Kafka refreshes all when you encode it an empty array...
-	return client.refreshMetadata(make([]string, 0), client.conf.Metadata.Retries)
+	return client.refreshMetadata(make([]string, 0), client.conf.Metadata.Retry.Max)
 }
 
 // GetOffset queries the cluster to get the most recent available offset at the given
@@ -500,8 +500,8 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 					return nil
 				}
 				Logger.Printf("Some partitions are leaderless, waiting %dms for election... (%d retries remaining)\n",
-					client.conf.Metadata.WaitForElection/time.Millisecond, retriesRemaining)
-				time.Sleep(client.conf.Metadata.WaitForElection) // wait for leader election
+					client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
+				time.Sleep(client.conf.Metadata.Retry.Backoff) // wait for leader election
 				return client.refreshMetadata(retry, retriesRemaining-1)
 			}
 
@@ -520,8 +520,8 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 
 	if retriesRemaining > 0 {
 		Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n",
-			client.conf.Metadata.WaitForElection/time.Millisecond, retriesRemaining)
-		time.Sleep(client.conf.Metadata.WaitForElection)
+			client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
+		time.Sleep(client.conf.Metadata.Retry.Backoff)
 		client.resurrectDeadBrokers()
 		return client.refreshMetadata(topics, retriesRemaining-1)
 	}

+ 2 - 2
client_test.go

@@ -40,7 +40,7 @@ func TestCachedPartitions(t *testing.T) {
 	seedBroker.Returns(metadataResponse)
 
 	config := NewConfig()
-	config.Metadata.Retries = 0
+	config.Metadata.Retry.Max = 0
 	client, err := NewClient([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -98,7 +98,7 @@ func TestClientMetadata(t *testing.T) {
 	seedBroker.Returns(metadataResponse)
 
 	config := NewConfig()
-	config.Metadata.Retries = 0
+	config.Metadata.Retry.Max = 0
 	client, err := NewClient([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)

+ 20 - 16
config.go

@@ -16,8 +16,10 @@ type Config struct {
 
 	// Metadata is the namespace for metadata management properties used by the Client, and shared by the Producer/Consumer.
 	Metadata struct {
-		Retries         int           // How many times to retry a metadata request when a partition is in the middle of leader election (default 3).
-		WaitForElection time.Duration // How long to wait for leader election to finish between retries (default 250ms). Similar to `retry.backoff.ms` in the JVM version.
+		Retry struct {
+			Max     int           // The total number of times to retry a metadata request when the cluster is in the middle of a leader election (default 3).
+			Backoff time.Duration // How long to wait for leader election to occur before retrying (default 250ms). Similar to the JVM's `retry.backoff.ms`.
+		}
 		// How frequently to refresh the cluster metadata in the background. Defaults to 10 minutes.
 		// Set to 0 to disable. Similar to `topic.metadata.refresh.interval.ms` in the JVM version.
 		RefreshFrequency time.Duration
@@ -25,7 +27,7 @@ type Config struct {
 
 	// Producer is the namespace for configuration related to producing messages, used by the Producer.
 	Producer struct {
-		// The maximum permitted size of a message (defaults to 1000000). Equivalent to the broker's `message.max.bytes`.
+		// The maximum permitted size of a message (defaults to 1000000). Should be set equal to or smaller than the broker's `message.max.bytes`.
 		MaxMessageBytes int
 		// The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
 		// Equivalent to the `request.required.acks` setting of the JVM producer.
@@ -46,7 +48,7 @@ type Config struct {
 		// messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed
 		// into the subsequent batch.
 		Flush struct {
-			Bytes     int           // The best-effort number of bytes needed to trigger a flush. Use the gloabl `sarama.MaxRequestSize` to set a hard upper limit.
+			Bytes     int           // The best-effort number of bytes needed to trigger a flush. Use the global sarama.MaxRequestSize to set a hard upper limit.
 			Messages  int           // The best-effort number of messages needed to trigger a flush. Use `MaxMessages` to set a hard upper limit.
 			Frequency time.Duration // The best-effort frequency of flushes. Equivalent to `queue.buffering.max.ms` setting of JVM producer.
 			// The maximum number of messages the producer will send in a single broker request.
@@ -66,19 +68,21 @@ type Config struct {
 
 	// Consumer is the namespace for configuration related to consuming messages, used by the Consumer.
 	Consumer struct {
+		// Fetch is the namespace for controlling how many bytes are retrieved by any given request.
 		Fetch struct {
-			// The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
+			// The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available.
 			// The default is 1, as 0 causes the consumer to spin when no messages are available. Equivalent to the JVM's `fetch.min.bytes`.
 			Min int32
-			// The normal maximum amount of data to fetch from the broker in each request (default 32768 bytes). This should be larger than
-			// most messages, or else the consumer will spend a lot of time negotiating sizes and not actually consuming. Similar to the JVM's
-			// `fetch.message.max.bytes`.
+			// The default number of message bytes to fetch from the broker in each request (default 32768). This should be larger than the
+			// majority of your messages, or else the consumer will spend a lot of time negotiating sizes and not actually consuming. Similar
+			// to the JVM's `fetch.message.max.bytes`.
 			Default int32
-			// The maximum amount of data to fetch from the broker in a single request. Messages larger than this will return ErrMessageTooLarge.
+			// The maximum number of message bytes to fetch from the broker in a single request. Messages larger than this will return
+			// ErrMessageTooLarge and will not be consumable, so you must be sure this is at least as large as your largest message.
 			// Defaults to 0 (no limit). Similar to the JVM's `fetch.message.max.bytes`. The global `sarama.MaxResponseSize` still applies.
 			Max int32
 		}
-		// The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it
+		// The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
 		// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
 		// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
 		// Equivalent to the JVM's `fetch.wait.max.ms`.
@@ -103,8 +107,8 @@ func NewConfig() *Config {
 	c.Net.ReadTimeout = 30 * time.Second
 	c.Net.WriteTimeout = 30 * time.Second
 
-	c.Metadata.Retries = 3
-	c.Metadata.WaitForElection = 250 * time.Millisecond
+	c.Metadata.Retry.Max = 3
+	c.Metadata.Retry.Backoff = 250 * time.Millisecond
 	c.Metadata.RefreshFrequency = 10 * time.Minute
 
 	c.Producer.MaxMessageBytes = 1000000
@@ -163,10 +167,10 @@ func (c *Config) Validate() error {
 
 	// validate the Metadata values
 	switch {
-	case c.Metadata.Retries < 0:
-		return ConfigurationError("Invalid Metadata.Retries, must be >= 0")
-	case c.Metadata.WaitForElection <= time.Duration(0):
-		return ConfigurationError("Invalid Metadata.WaitForElection, must be > 0")
+	case c.Metadata.Retry.Max < 0:
+		return ConfigurationError("Invalid Metadata.Retry.Max, must be >= 0")
+	case c.Metadata.Retry.Backoff <= time.Duration(0):
+		return ConfigurationError("Invalid Metadata.Retry.Backoff, must be > 0")
 	case c.Metadata.RefreshFrequency < time.Duration(0):
 		return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0")
 	}

+ 1 - 1
functional_test.go

@@ -45,7 +45,7 @@ func checkKafkaAvailability(t *testing.T) {
 
 func TestFuncConnectionFailure(t *testing.T) {
 	config := NewConfig()
-	config.Metadata.Retries = 1
+	config.Metadata.Retry.Max = 1
 
 	_, err := NewClient([]string{"localhost:9000"}, config)
 	if err != ErrOutOfBrokers {