Quellcode durchsuchen

Unify configuration structures into config.go

Also take the opportunity to rename and reorganize some values, and to document
some equivalents with the JVM version.
Evan Huus vor 10 Jahren
Ursprung
Commit
616ac5daa4
11 geänderte Dateien mit 371 neuen und 450 gelöschten Zeilen
  1. 8 50
      broker.go
  2. 21 60
      client.go
  3. 4 11
      client_test.go
  4. 213 0
      config.go
  5. 10 0
      config_test.go
  6. 24 112
      consumer.go
  7. 6 32
      consumer_test.go
  8. 19 21
      functional_test.go
  9. 30 121
      producer.go
  10. 32 39
      producer_test.go
  11. 4 4
      sync_producer.go

+ 8 - 50
broker.go

@@ -10,54 +10,12 @@ import (
 	"time"
 )
 
-// BrokerConfig is used to pass multiple configuration options to Broker.Open.
-type BrokerConfig struct {
-	MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5).
-
-	// All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka.
-	DialTimeout  time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
-	ReadTimeout  time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
-	WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
-}
-
-// NewBrokerConfig returns a new broker configuration with sane defaults.
-func NewBrokerConfig() *BrokerConfig {
-	return &BrokerConfig{
-		MaxOpenRequests: 5,
-		DialTimeout:     30 * time.Second,
-		ReadTimeout:     30 * time.Second,
-		WriteTimeout:    30 * time.Second,
-	}
-}
-
-// Validate checks a BrokerConfig instance. This will return a
-// ConfigurationError if the specified values don't make sense.
-func (config *BrokerConfig) Validate() error {
-	if config.MaxOpenRequests <= 0 {
-		return ConfigurationError("Invalid MaxOpenRequests")
-	}
-
-	if config.DialTimeout <= 0 {
-		return ConfigurationError("Invalid DialTimeout")
-	}
-
-	if config.ReadTimeout <= 0 {
-		return ConfigurationError("Invalid ReadTimeout")
-	}
-
-	if config.WriteTimeout <= 0 {
-		return ConfigurationError("Invalid WriteTimeout")
-	}
-
-	return nil
-}
-
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
 type Broker struct {
 	id   int32
 	addr string
 
-	conf          *BrokerConfig
+	conf          *Config
 	correlationID int32
 	conn          net.Conn
 	connErr       error
@@ -84,10 +42,10 @@ func NewBroker(addr string) *Broker {
 // waiting for the connection to complete. This means that any subsequent operations on the broker will
 // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
 // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
-// AlreadyConnected. If conf is nil, the result of NewBrokerConfig() is used.
-func (b *Broker) Open(conf *BrokerConfig) error {
+// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
+func (b *Broker) Open(conf *Config) error {
 	if conf == nil {
-		conf = NewBrokerConfig()
+		conf = NewConfig()
 	}
 
 	err := conf.Validate()
@@ -110,7 +68,7 @@ func (b *Broker) Open(conf *BrokerConfig) error {
 	go withRecover(func() {
 		defer b.lock.Unlock()
 
-		b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.DialTimeout)
+		b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.Net.DialTimeout)
 		if b.connErr != nil {
 			b.conn = nil
 			atomic.StoreInt32(&b.opened, 0)
@@ -120,7 +78,7 @@ func (b *Broker) Open(conf *BrokerConfig) error {
 
 		b.conf = conf
 		b.done = make(chan bool)
-		b.responses = make(chan responsePromise, b.conf.MaxOpenRequests-1)
+		b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
 
 		Logger.Printf("Connected to broker %s\n", b.addr)
 		go withRecover(b.responseReceiver)
@@ -285,7 +243,7 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 		return nil, err
 	}
 
-	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.WriteTimeout))
+	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
 	if err != nil {
 		return nil, err
 	}
@@ -372,7 +330,7 @@ func (b *Broker) encode(pe packetEncoder) (err error) {
 func (b *Broker) responseReceiver() {
 	header := make([]byte, 8)
 	for response := range b.responses {
-		err := b.conn.SetReadDeadline(time.Now().Add(b.conf.ReadTimeout))
+		err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
 		if err != nil {
 			response.errors <- err
 			continue

+ 21 - 60
client.go

@@ -6,54 +6,13 @@ import (
 	"time"
 )
 
-// ClientConfig is used to pass multiple configuration options to NewClient.
-type ClientConfig struct {
-	MetadataRetries            int           // How many times to retry a metadata request when a partition is in the middle of leader election.
-	WaitForElection            time.Duration // How long to wait for leader election to finish between retries.
-	DefaultBrokerConf          *BrokerConfig // Default configuration for broker connections created by this client.
-	BackgroundRefreshFrequency time.Duration // How frequently the client will refresh the cluster metadata in the background. Defaults to 10 minutes. Set to 0 to disable.
-}
-
-// NewClientConfig creates a new ClientConfig instance with sensible defaults
-func NewClientConfig() *ClientConfig {
-	return &ClientConfig{
-		MetadataRetries:            3,
-		WaitForElection:            250 * time.Millisecond,
-		BackgroundRefreshFrequency: 10 * time.Minute,
-	}
-}
-
-// Validate checks a ClientConfig instance. This will return a
-// ConfigurationError if the specified values don't make sense.
-func (config *ClientConfig) Validate() error {
-	if config.MetadataRetries < 0 {
-		return ConfigurationError("Invalid MetadataRetries, must be >= 0")
-	}
-
-	if config.WaitForElection <= time.Duration(0) {
-		return ConfigurationError("Invalid WaitForElection, must be > 0")
-	}
-
-	if config.DefaultBrokerConf != nil {
-		if err := config.DefaultBrokerConf.Validate(); err != nil {
-			return err
-		}
-	}
-
-	if config.BackgroundRefreshFrequency < time.Duration(0) {
-		return ConfigurationError("Invalid BackgroundRefreshFrequency, must be >= 0")
-	}
-
-	return nil
-}
-
 // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
 // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
 // automatically when it passes out of scope. A single client can be safely shared by
 // multiple concurrent Producers and Consumers.
 type Client struct {
 	id     string
-	config ClientConfig
+	conf   *Config
 	closer chan none
 
 	// the broker addresses given to us through the constructor are not guaranteed to be returned in
@@ -75,14 +34,14 @@ type Client struct {
 // NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
 // and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
 // be retrieved from any of the given broker addresses, the client is not created.
-func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error) {
+func NewClient(id string, addrs []string, conf *Config) (*Client, error) {
 	Logger.Println("Initializing new client")
 
-	if config == nil {
-		config = NewClientConfig()
+	if conf == nil {
+		conf = NewConfig()
 	}
 
-	if err := config.Validate(); err != nil {
+	if err := conf.Validate(); err != nil {
 		return nil, err
 	}
 
@@ -92,7 +51,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 
 	client := &Client{
 		id:                      id,
-		config:                  *config,
+		conf:                    conf,
 		closer:                  make(chan none),
 		seedBrokerAddrs:         addrs,
 		seedBroker:              NewBroker(addrs[0]),
@@ -101,7 +60,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 		metadata:                make(map[string]map[int32]*PartitionMetadata),
 		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
 	}
-	_ = client.seedBroker.Open(config.DefaultBrokerConf)
+	_ = client.seedBroker.Open(conf)
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	err := client.RefreshAllMetadata()
@@ -288,13 +247,13 @@ func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
 // RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
 // available metadata for those topics.
 func (client *Client) RefreshTopicMetadata(topics ...string) error {
-	return client.refreshMetadata(topics, client.config.MetadataRetries)
+	return client.refreshMetadata(topics, client.conf.Metadata.Retries)
 }
 
 // RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
 func (client *Client) RefreshAllMetadata() error {
 	// Kafka refreshes all when you encode it an empty array...
-	return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries)
+	return client.refreshMetadata(make([]string, 0), client.conf.Metadata.Retries)
 }
 
 // GetOffset queries the cluster to get the most recent available offset at the given
@@ -344,7 +303,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
 		client.seedBrokerAddrs = client.seedBrokerAddrs[1:]
 		if len(client.seedBrokerAddrs) > 0 {
 			client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
-			_ = client.seedBroker.Open(client.config.DefaultBrokerConf)
+			_ = client.seedBroker.Open(client.conf)
 		} else {
 			client.seedBroker = nil
 		}
@@ -372,7 +331,7 @@ func (client *Client) resurrectDeadBrokers() {
 	client.deadBrokerAddrs = make(map[string]none)
 
 	client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
-	_ = client.seedBroker.Open(client.config.DefaultBrokerConf)
+	_ = client.seedBroker.Open(client.conf)
 }
 
 func (client *Client) any() *Broker {
@@ -489,11 +448,11 @@ func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, er
 // core metadata update logic
 
 func (client *Client) backgroundMetadataUpdater() {
-	if client.config.BackgroundRefreshFrequency == time.Duration(0) {
+	if client.conf.Metadata.RefreshFrequency == time.Duration(0) {
 		return
 	}
 
-	ticker := time.NewTicker(client.config.BackgroundRefreshFrequency)
+	ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
 	for {
 		select {
 		case <-ticker.C:
@@ -542,8 +501,9 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 					Logger.Println("Some partitions are leaderless, but we're out of retries")
 					return nil
 				}
-				Logger.Printf("Some partitions are leaderless, waiting %dms for election... (%d retries remaining)\n", client.config.WaitForElection/time.Millisecond, retriesRemaining)
-				time.Sleep(client.config.WaitForElection) // wait for leader election
+				Logger.Printf("Some partitions are leaderless, waiting %dms for election... (%d retries remaining)\n",
+					client.conf.Metadata.WaitForElection/time.Millisecond, retriesRemaining)
+				time.Sleep(client.conf.Metadata.WaitForElection) // wait for leader election
 				return client.refreshMetadata(retry, retriesRemaining-1)
 			}
 
@@ -561,8 +521,9 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 	Logger.Println("Out of available brokers.")
 
 	if retriesRemaining > 0 {
-		Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.config.WaitForElection/time.Millisecond, retriesRemaining)
-		time.Sleep(client.config.WaitForElection)
+		Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n",
+			client.conf.Metadata.WaitForElection/time.Millisecond, retriesRemaining)
+		time.Sleep(client.conf.Metadata.WaitForElection)
 		client.resurrectDeadBrokers()
 		return client.refreshMetadata(topics, retriesRemaining-1)
 	}
@@ -584,12 +545,12 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	// If it fails and we do care, whoever tries to use it will get the connection error.
 	for _, broker := range data.Brokers {
 		if client.brokers[broker.ID()] == nil {
-			_ = broker.Open(client.config.DefaultBrokerConf)
+			_ = broker.Open(client.conf)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
 		} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
 			safeAsyncClose(client.brokers[broker.ID()])
-			_ = broker.Open(client.config.DefaultBrokerConf)
+			_ = broker.Open(client.conf)
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
 		}

+ 4 - 11
client_test.go

@@ -12,13 +12,6 @@ func safeClose(t *testing.T, c io.Closer) {
 	}
 }
 
-func TestDefaultClientConfigValidates(t *testing.T) {
-	config := NewClientConfig()
-	if err := config.Validate(); err != nil {
-		t.Error(err)
-	}
-}
-
 func TestSimpleClient(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 
@@ -46,8 +39,8 @@ func TestCachedPartitions(t *testing.T) {
 	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataResponse)
 
-	config := NewClientConfig()
-	config.MetadataRetries = 0
+	config := NewConfig()
+	config.Metadata.Retries = 0
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
@@ -104,8 +97,8 @@ func TestClientMetadata(t *testing.T) {
 	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataResponse)
 
-	config := NewClientConfig()
-	config.MetadataRetries = 0
+	config := NewConfig()
+	config.Metadata.Retries = 0
 	client, err := NewClient("client_id", []string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)

+ 213 - 0
config.go

@@ -0,0 +1,213 @@
+package sarama
+
+import "time"
+
+// Config is used to pass multiple configuration options to Sarama's constructors.
+type Config struct {
+	// Net is the namespace for network-level properties used by the Broker, and shared by the Client/Producer/Consumer.
+	Net struct {
+		MaxOpenRequests int // How many outstanding requests a connection is allowed to have before sending on it blocks (default 5).
+
+		// All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka.
+		DialTimeout  time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
+		ReadTimeout  time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
+		WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
+	}
+
+	// Metadata is the namespace for metadata management properties used by the Client, and shared by the Producer/Consumer.
+	Metadata struct {
+		Retries         int           // How many times to retry a metadata request when a partition is in the middle of leader election (default 3).
+		WaitForElection time.Duration // How long to wait for leader election to finish between retries (default 250ms). Similar to `retry.backoff.ms` in the JVM version.
+		// How frequently to refresh the cluster metadata in the background. Defaults to 10 minutes.
+		// Set to 0 to disable. Similar to `topic.metadata.refresh.interval.ms` in the JVM version.
+		RefreshFrequency time.Duration
+	}
+
+	// Producer is the namespace for configuration related to producing messages, used by the Producer.
+	Producer struct {
+		// The maximum permitted size of a message (defaults to 1000000). Equivalent to the broker's `message.max.bytes`.
+		MaxMessageBytes int
+		// The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
+		// Equivalent to the `request.required.acks` setting of the JVM producer.
+		RequiredAcks RequiredAcks
+		// The maximum duration the broker will wait the receipt of the number of RequiredAcks (defaults to 10 seconds).
+		// This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution,
+		// nanoseconds will be truncated. Equivalent to the JVM producer's `request.timeout.ms` setting.
+		Timeout time.Duration
+		// The type of compression to use on messages (defaults to no compression). Similar to `compression.codec` setting of the JVM producer.
+		Compression CompressionCodec
+		// Generates partitioners for choosing the partition to send messages to (defaults to hashing the message key).
+		// Similar to the `partitioner.class` setting for the JVM producer.
+		Partitioner PartitionerConstructor
+		// If enabled, successfully delivered messages will be returned on the Successes channel (default disabled).
+		AckSuccesses bool
+
+		// The following config options control how often messages are batched up and sent to the broker. By default,
+		// messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed
+		// into the subsequent batch.
+		Flush struct {
+			Bytes     int           // The best-effort number of bytes needed to trigger a flush. Use the gloabl `sarama.MaxRequestSize` to set a hard upper limit.
+			Messages  int           // The best-effort number of messages needed to trigger a flush. Use `MaxMessages` to set a hard upper limit.
+			Frequency time.Duration // The best-effort frequency of flushes. Equivalent to `queue.buffering.max.ms` setting of JVM producer.
+			// The maximum number of messages the producer will send in a single broker request.
+			// Defaults to 0 for unlimited. Similar to `queue.buffering.max.messages` in the JVM producer.
+			MaxMessages int
+		}
+
+		Retry struct {
+			// The total number of times to retry sending a message (default 3).
+			// Similar to the `message.send.max.retries` setting of the JVM producer.
+			Max int
+			// How long to wait for the cluster to settle between retries (default 100ms).
+			// Similar to the `retry.backoff.ms` setting of the JVM producer.
+			Backoff time.Duration
+		}
+	}
+
+	// Consumer is the namespace for configuration related to consuming messages, used by the Consumer.
+	Consumer struct {
+		Fetch struct {
+			// The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
+			// The default is 1, as 0 causes the consumer to spin when no messages are available. Equivalent to the JVM's `fetch.min.bytes`.
+			Min int32
+			// The normal maximum amount of data to fetch from the broker in each request (default 32768 bytes). This should be larger than
+			// most messages, or else the consumer will spend a lot of time negotiating sizes and not actually consuming. Similar to the JVM's
+			// `fetch.message.max.bytes`.
+			Default int32
+			// The maximum amount of data to fetch from the broker in a single request. Messages larger than this will return ErrMessageTooLarge.
+			// Defaults to 0 (no limit). Similar to the JVM's `fetch.message.max.bytes`. The global `sarama.MaxResponseSize` still applies.
+			Max int32
+		}
+		// The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it
+		// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
+		// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
+		// Equivalent to the JVM's `fetch.wait.max.ms`.
+		MaxWaitTime time.Duration
+	}
+
+	// The number of events to buffer in internal and external channels. This permits the producer and consumer to
+	// continue processing some messages in the background while user code is working, greatly improving throughput.
+	// Defaults to 256.
+	ChannelBufferSize int
+}
+
+// NewConfig returns a new configuration instance with sane defaults.
+func NewConfig() *Config {
+	c := &Config{}
+
+	c.Net.MaxOpenRequests = 5
+	c.Net.DialTimeout = 30 * time.Second
+	c.Net.ReadTimeout = 30 * time.Second
+	c.Net.WriteTimeout = 30 * time.Second
+
+	c.Metadata.Retries = 3
+	c.Metadata.WaitForElection = 250 * time.Millisecond
+	c.Metadata.RefreshFrequency = 10 * time.Minute
+
+	c.Producer.MaxMessageBytes = 1000000
+	c.Producer.RequiredAcks = WaitForLocal
+	c.Producer.Timeout = 10 * time.Second
+	c.Producer.Partitioner = NewHashPartitioner
+	c.Producer.Retry.Max = 3
+	c.Producer.Retry.Backoff = 100 * time.Millisecond
+
+	c.Consumer.Fetch.Min = 1
+	c.Consumer.Fetch.Default = 32768
+	c.Consumer.MaxWaitTime = 250 * time.Millisecond
+
+	c.ChannelBufferSize = 256
+
+	return c
+}
+
+// Validate checks a Config instance. It will return a
+// ConfigurationError if the specified values don't make sense.
+func (c *Config) Validate() error {
+	// some configuration values should be warned on but not fail completely, do those first
+	if c.Producer.RequiredAcks > 1 {
+		Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
+	}
+	if c.Producer.MaxMessageBytes >= forceFlushThreshold() {
+		Logger.Println("Producer.MaxMessageBytes is too close to MaxRequestSize; it will be ignored.")
+	}
+	if c.Producer.Flush.Bytes >= forceFlushThreshold() {
+		Logger.Println("Producer.Flush.Bytes is too close to MaxRequestSize; it will be ignored.")
+	}
+	if c.Producer.Timeout%time.Millisecond != 0 {
+		Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
+	}
+	if c.Consumer.MaxWaitTime < 100*time.Millisecond {
+		Logger.Println("Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
+	}
+	if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
+		Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
+	}
+
+	// validate Net values
+	switch {
+	case c.Net.MaxOpenRequests <= 0:
+		return ConfigurationError("Invalid Net.MaxOpenRequests, must be > 0")
+	case c.Net.DialTimeout <= 0:
+		return ConfigurationError("Invalid Net.DialTimeout, must be > 0")
+	case c.Net.ReadTimeout <= 0:
+		return ConfigurationError("Invalid Net.ReadTimeout, must be > 0")
+	case c.Net.WriteTimeout <= 0:
+		return ConfigurationError("Invalid Net.WriteTimeout, must be > 0")
+	}
+
+	// validate the Metadata values
+	switch {
+	case c.Metadata.Retries < 0:
+		return ConfigurationError("Invalid Metadata.Retries, must be >= 0")
+	case c.Metadata.WaitForElection <= time.Duration(0):
+		return ConfigurationError("Invalid Metadata.WaitForElection, must be > 0")
+	case c.Metadata.RefreshFrequency < time.Duration(0):
+		return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0")
+	}
+
+	// validate the Produce values
+	switch {
+	case c.Producer.MaxMessageBytes <= 0:
+		return ConfigurationError("Invalid Producer.MaxMessageBytes, must be > 0")
+	case c.Producer.RequiredAcks < -1:
+		return ConfigurationError("Invalid Producer.RequiredAcks, must be >= -1")
+	case c.Producer.Timeout <= 0:
+		return ConfigurationError("Invalid Producer.Timeout, must be > 0")
+	case c.Producer.Partitioner == nil:
+		return ConfigurationError("Invalid Producer.Partitioner, must not be nil")
+	case c.Producer.Flush.Bytes < 0:
+		return ConfigurationError("Invalid Producer.Flush.Bytes, must be >= 0")
+	case c.Producer.Flush.Messages < 0:
+		return ConfigurationError("Invalid Producer.Flush.Messages, must be >= 0")
+	case c.Producer.Flush.Frequency < 0:
+		return ConfigurationError("Invalid Producer.Flush.Frequency, must be >= 0")
+	case c.Producer.Flush.MaxMessages < 0:
+		return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= 0")
+	case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
+		return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= Producer.Flush.Messages when set")
+	case c.Producer.Retry.Max < 0:
+		return ConfigurationError("Invalid Producer.MaxRetries, must be >= 0")
+	case c.Producer.Retry.Backoff < 0:
+		return ConfigurationError("Invalid Producer.RetryBackoff, must be >= 0")
+	}
+
+	// validate the Consume values
+	switch {
+	case c.Consumer.Fetch.Min <= 0:
+		return ConfigurationError("Invalid Consumer.Fetch.Min, must be > 0")
+	case c.Consumer.Fetch.Default <= 0:
+		return ConfigurationError("Invalid Consumer.Fetch.Default, must be > 0")
+	case c.Consumer.Fetch.Max < 0:
+		return ConfigurationError("Invalid Consumer.Fetch.Max, must be >= 0")
+	case c.Consumer.MaxWaitTime < 1*time.Millisecond:
+		return ConfigurationError("Invalid Consumer.MaxWaitTime, must be > 1ms")
+	}
+
+	// validate misc shared values
+	switch {
+	case c.ChannelBufferSize < 0:
+		return ConfigurationError("Invalid ChannelBufferSize, must be >= 0")
+	}
+
+	return nil
+}

+ 10 - 0
config_test.go

@@ -0,0 +1,10 @@
+package sarama
+
+import "testing"
+
+func TestDefaultConfigValidates(t *testing.T) {
+	config := NewConfig()
+	if err := config.Validate(); err != nil {
+		t.Error(err)
+	}
+}

+ 24 - 112
consumer.go

@@ -6,7 +6,7 @@ import (
 	"time"
 )
 
-// OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
+// OffsetMethod is passed to ConsumePartition to tell the consumer how to determine the starting offset.
 type OffsetMethod int
 
 const (
@@ -16,91 +16,11 @@ const (
 	// OffsetMethodOldest causes the consumer to start at the oldest available offset, as
 	// determined by querying the broker.
 	OffsetMethodOldest
-	// OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
+	// OffsetMethodManual causes the consumer to interpret the offset value as the
 	// offset at which to start, allowing the user to manually specify their desired starting offset.
 	OffsetMethodManual
 )
 
-// ConsumerConfig is used to pass multiple configuration options to NewConsumer.
-type ConsumerConfig struct {
-	// The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
-	// The default is 1, as 0 causes the consumer to spin when no messages are available.
-	MinFetchSize int32
-	// The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it
-	// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
-	// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
-	MaxWaitTime time.Duration
-}
-
-// NewConsumerConfig creates a ConsumerConfig instance with sane defaults.
-func NewConsumerConfig() *ConsumerConfig {
-	return &ConsumerConfig{
-		MinFetchSize: 1,
-		MaxWaitTime:  250 * time.Millisecond,
-	}
-}
-
-// Validate checks a ConsumerConfig instance. It will return a
-// ConfigurationError if the specified value doesn't make sense.
-func (config *ConsumerConfig) Validate() error {
-	if config.MinFetchSize <= 0 {
-		return ConfigurationError("Invalid MinFetchSize")
-	}
-
-	if config.MaxWaitTime < 1*time.Millisecond {
-		return ConfigurationError("Invalid MaxWaitTime, it needs to be at least 1ms")
-	} else if config.MaxWaitTime < 100*time.Millisecond {
-		Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
-	} else if config.MaxWaitTime%time.Millisecond != 0 {
-		Logger.Println("ConsumerConfig.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
-	}
-
-	return nil
-}
-
-// PartitionConsumerConfig is used to pass multiple configuration options to AddPartition
-type PartitionConsumerConfig struct {
-	// The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes.
-	DefaultFetchSize int32
-	// The maximum permittable message size - messages larger than this will return ErrMessageTooLarge. The default of 0 is
-	// treated as no limit.
-	MaxMessageSize int32
-	// The method used to determine at which offset to begin consuming messages. The default is to start at the most recent message.
-	OffsetMethod OffsetMethod
-	// Interpreted differently according to the value of OffsetMethod.
-	OffsetValue int64
-	// The number of events to buffer in the Messages and Errors channel. Having this non-zero permits the
-	// consumer to continue fetching messages in the background while client code consumes events,
-	// greatly improving throughput. The default is 64.
-	ChannelBufferSize int
-}
-
-// NewPartitionConsumerConfig creates a PartitionConsumerConfig with sane defaults.
-func NewPartitionConsumerConfig() *PartitionConsumerConfig {
-	return &PartitionConsumerConfig{
-		DefaultFetchSize:  32768,
-		ChannelBufferSize: 64,
-	}
-}
-
-// Validate checks a PartitionConsumerConfig instance. It will return a
-// ConfigurationError if the specified value doesn't make sense.
-func (config *PartitionConsumerConfig) Validate() error {
-	if config.DefaultFetchSize <= 0 {
-		return ConfigurationError("Invalid DefaultFetchSize")
-	}
-
-	if config.MaxMessageSize < 0 {
-		return ConfigurationError("Invalid MaxMessageSize")
-	}
-
-	if config.ChannelBufferSize < 0 {
-		return ConfigurationError("Invalid ChannelBufferSize")
-	}
-
-	return nil
-}
-
 // ConsumerMessage encapsulates a Kafka message returned by the consumer.
 type ConsumerMessage struct {
 	Key, Value []byte
@@ -133,7 +53,7 @@ func (ce ConsumerErrors) Error() string {
 // Consumer manages PartitionConsumers which process Kafka messages from brokers.
 type Consumer struct {
 	client *Client
-	config ConsumerConfig
+	conf   *Config
 
 	lock            sync.Mutex
 	children        map[string]map[int32]*PartitionConsumer
@@ -141,14 +61,14 @@ type Consumer struct {
 }
 
 // NewConsumer creates a new consumer attached to the given client.
-func NewConsumer(client *Client, config *ConsumerConfig) (*Consumer, error) {
+func NewConsumer(client *Client, config *Config) (*Consumer, error) {
 	// Check that we are not dealing with a closed Client before processing any other arguments
 	if client.Closed() {
 		return nil, ErrClosedClient
 	}
 
 	if config == nil {
-		config = NewConsumerConfig()
+		config = NewConfig()
 	}
 
 	if err := config.Validate(); err != nil {
@@ -157,7 +77,7 @@ func NewConsumer(client *Client, config *ConsumerConfig) (*Consumer, error) {
 
 	c := &Consumer{
 		client:          client,
-		config:          *config,
+		conf:            config,
 		children:        make(map[string]map[int32]*PartitionConsumer),
 		brokerConsumers: make(map[*Broker]*brokerConsumer),
 	}
@@ -167,28 +87,20 @@ func NewConsumer(client *Client, config *ConsumerConfig) (*Consumer, error) {
 
 // ConsumePartition creates a PartitionConsumer on the given topic/partition with the given configuration. It will
 // return an error if this Consumer is already consuming on the given topic/partition.
-func (c *Consumer) ConsumePartition(topic string, partition int32, config *PartitionConsumerConfig) (*PartitionConsumer, error) {
-	if config == nil {
-		config = NewPartitionConsumerConfig()
-	}
-
-	if err := config.Validate(); err != nil {
-		return nil, err
-	}
-
+func (c *Consumer) ConsumePartition(topic string, partition int32, method OffsetMethod, offset int64) (*PartitionConsumer, error) {
 	child := &PartitionConsumer{
 		consumer:  c,
-		config:    *config,
+		conf:      c.conf,
 		topic:     topic,
 		partition: partition,
-		messages:  make(chan *ConsumerMessage, config.ChannelBufferSize),
-		errors:    make(chan *ConsumerError, config.ChannelBufferSize),
+		messages:  make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
+		errors:    make(chan *ConsumerError, c.conf.ChannelBufferSize),
 		trigger:   make(chan none, 1),
 		dying:     make(chan none),
-		fetchSize: config.DefaultFetchSize,
+		fetchSize: c.conf.Consumer.Fetch.Default,
 	}
 
-	if err := child.chooseStartingOffset(); err != nil {
+	if err := child.chooseStartingOffset(method, offset); err != nil {
 		return nil, err
 	}
 
@@ -281,7 +193,7 @@ func (c *Consumer) unrefBrokerConsumer(broker *Broker) {
 // You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
 type PartitionConsumer struct {
 	consumer  *Consumer
-	config    PartitionConsumerConfig
+	conf      *Config
 	topic     string
 	partition int32
 
@@ -353,15 +265,15 @@ func (child *PartitionConsumer) dispatch() error {
 	return nil
 }
 
-func (child *PartitionConsumer) chooseStartingOffset() (err error) {
+func (child *PartitionConsumer) chooseStartingOffset(method OffsetMethod, offset int64) (err error) {
 	var where OffsetTime
 
-	switch child.config.OffsetMethod {
+	switch method {
 	case OffsetMethodManual:
-		if child.config.OffsetValue < 0 {
-			return ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is MANUAL")
+		if offset < 0 {
+			return ConfigurationError("offset must be >= 0 when the method is manual")
 		}
-		child.offset = child.config.OffsetValue
+		child.offset = offset
 		return nil
 	case OffsetMethodNewest:
 		where = LatestOffsets
@@ -548,8 +460,8 @@ func (w *brokerConsumer) abort(err error) {
 
 func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 	request := &FetchRequest{
-		MinBytes:    w.consumer.config.MinFetchSize,
-		MaxWaitTime: int32(w.consumer.config.MaxWaitTime / time.Millisecond),
+		MinBytes:    w.consumer.conf.Consumer.Fetch.Min,
+		MaxWaitTime: int32(w.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
 	}
 
 	for child := range w.subscriptions {
@@ -577,14 +489,14 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe
 		// We got no messages. If we got a trailing one then we need to ask for more data.
 		// Otherwise we just poll again and wait for one to be produced...
 		if block.MsgSet.PartialTrailingMessage {
-			if child.config.MaxMessageSize > 0 && child.fetchSize == child.config.MaxMessageSize {
+			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
 				// we can't ask for more data, we've hit the configured limit
 				child.sendError(ErrMessageTooLarge)
 				child.offset++ // skip this one so we can keep processing future messages
 			} else {
 				child.fetchSize *= 2
-				if child.config.MaxMessageSize > 0 && child.fetchSize > child.config.MaxMessageSize {
-					child.fetchSize = child.config.MaxMessageSize
+				if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
+					child.fetchSize = child.conf.Consumer.Fetch.Max
 				}
 			}
 		}
@@ -593,7 +505,7 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe
 	}
 
 	// we got messages, reset our fetch size in case it was increased for a previous request
-	child.fetchSize = child.config.DefaultFetchSize
+	child.fetchSize = child.conf.Consumer.Fetch.Default
 
 	incomplete := false
 	atLeastOne := false

+ 6 - 32
consumer_test.go

@@ -7,20 +7,6 @@ import (
 	"time"
 )
 
-func TestDefaultConsumerConfigValidates(t *testing.T) {
-	config := NewConsumerConfig()
-	if err := config.Validate(); err != nil {
-		t.Error(err)
-	}
-}
-
-func TestDefaultPartitionConsumerConfigValidates(t *testing.T) {
-	config := NewPartitionConsumerConfig()
-	if err := config.Validate(); err != nil {
-		t.Error(err)
-	}
-}
-
 func TestConsumerOffsetManual(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
@@ -47,10 +33,7 @@ func TestConsumerOffsetManual(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewPartitionConsumerConfig()
-	config.OffsetMethod = OffsetMethodManual
-	config.OffsetValue = 1234
-	consumer, err := master.ConsumePartition("my_topic", 0, config)
+	consumer, err := master.ConsumePartition("my_topic", 0, OffsetMethodManual, 1234)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -100,9 +83,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewPartitionConsumerConfig()
-	config.OffsetMethod = OffsetMethodNewest
-	consumer, err := master.ConsumePartition("my_topic", 0, config)
+	consumer, err := master.ConsumePartition("my_topic", 0, OffsetMethodNewest, 0)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -148,10 +129,7 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewPartitionConsumerConfig()
-	config.OffsetMethod = OffsetMethodManual
-	config.OffsetValue = 2
-	consumer, err := master.ConsumePartition("my_topic", 0, config)
+	consumer, err := master.ConsumePartition("my_topic", 0, OffsetMethodManual, 2)
 
 	message := <-consumer.Messages()
 	if message.Offset != 3 {
@@ -188,14 +166,10 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewPartitionConsumerConfig()
-	config.OffsetMethod = OffsetMethodManual
-	config.OffsetValue = 0
-
 	// we expect to end up (eventually) consuming exactly ten messages on each partition
 	var wg sync.WaitGroup
 	for i := 0; i < 2; i++ {
-		consumer, err := master.ConsumePartition("my_topic", int32(i), config)
+		consumer, err := master.ConsumePartition("my_topic", int32(i), OffsetMethodManual, 0)
 		if err != nil {
 			t.Error(err)
 		}
@@ -314,7 +288,7 @@ func ExampleConsumerWithSelect() {
 		fmt.Println("> master consumer ready")
 	}
 
-	consumer, err := master.ConsumePartition("my_topic", 0, nil)
+	consumer, err := master.ConsumePartition("my_topic", 0, OffsetMethodManual, 0)
 	if err != nil {
 		panic(err)
 	} else {
@@ -363,7 +337,7 @@ func ExampleConsumerWithGoroutines() {
 		fmt.Println("> master consumer ready")
 	}
 
-	consumer, err := master.ConsumePartition("my_topic", 0, nil)
+	consumer, err := master.ConsumePartition("my_topic", 0, OffsetMethodManual, 0)
 	if err != nil {
 		panic(err)
 	} else {

+ 19 - 21
functional_test.go

@@ -44,8 +44,8 @@ func checkKafkaAvailability(t *testing.T) {
 }
 
 func TestFuncConnectionFailure(t *testing.T) {
-	config := NewClientConfig()
-	config.MetadataRetries = 1
+	config := NewConfig()
+	config.Metadata.Retries = 1
 
 	_, err := NewClient("test", []string{"localhost:9000"}, config)
 	if err != ErrOutOfBrokers {
@@ -54,32 +54,32 @@ func TestFuncConnectionFailure(t *testing.T) {
 }
 
 func TestFuncProducing(t *testing.T) {
-	config := NewProducerConfig()
+	config := NewConfig()
 	testProducingMessages(t, config)
 }
 
 func TestFuncProducingGzip(t *testing.T) {
-	config := NewProducerConfig()
-	config.Compression = CompressionGZIP
+	config := NewConfig()
+	config.Producer.Compression = CompressionGZIP
 	testProducingMessages(t, config)
 }
 
 func TestFuncProducingSnappy(t *testing.T) {
-	config := NewProducerConfig()
-	config.Compression = CompressionSnappy
+	config := NewConfig()
+	config.Producer.Compression = CompressionSnappy
 	testProducingMessages(t, config)
 }
 
 func TestFuncProducingNoResponse(t *testing.T) {
-	config := NewProducerConfig()
-	config.RequiredAcks = NoResponse
+	config := NewConfig()
+	config.Producer.RequiredAcks = NoResponse
 	testProducingMessages(t, config)
 }
 
 func TestFuncProducingFlushing(t *testing.T) {
-	config := NewProducerConfig()
-	config.FlushMsgCount = TestBatchSize / 8
-	config.FlushFrequency = 250 * time.Millisecond
+	config := NewConfig()
+	config.Producer.Flush.Messages = TestBatchSize / 8
+	config.Producer.Flush.Frequency = 250 * time.Millisecond
 	testProducingMessages(t, config)
 }
 
@@ -91,11 +91,11 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 	}
 	defer safeClose(t, client)
 
-	config := NewProducerConfig()
-	config.FlushFrequency = 50 * time.Millisecond
-	config.FlushMsgCount = 200
+	config := NewConfig()
 	config.ChannelBufferSize = 20
-	config.AckSuccesses = true
+	config.Producer.Flush.Frequency = 50 * time.Millisecond
+	config.Producer.Flush.Messages = 200
+	config.Producer.AckSuccesses = true
 	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
@@ -124,7 +124,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 	}
 }
 
-func testProducingMessages(t *testing.T, config *ProducerConfig) {
+func testProducingMessages(t *testing.T, config *Config) {
 	checkKafkaAvailability(t)
 
 	client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
@@ -136,14 +136,12 @@ func testProducingMessages(t *testing.T, config *ProducerConfig) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	consumerConfig := NewPartitionConsumerConfig()
-	consumerConfig.OffsetMethod = OffsetMethodNewest
-	consumer, err := master.ConsumePartition("single_partition", 0, consumerConfig)
+	consumer, err := master.ConsumePartition("single_partition", 0, OffsetMethodNewest, 0)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	config.AckSuccesses = true
+	config.Producer.AckSuccesses = true
 	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)

+ 30 - 121
producer.go

@@ -12,97 +12,6 @@ func forceFlushThreshold() int {
 	return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
 }
 
-// ProducerConfig is used to pass multiple configuration options to NewProducer.
-//
-// Some of these configuration settings match settings with the JVM producer, but some of
-// these are implementation specific and have no equivalent in the JVM producer.
-type ProducerConfig struct {
-	Partitioner       PartitionerConstructor // Generates partitioners for choosing the partition to send messages to (defaults to hash). Similar to the `partitioner.class` setting for the JVM producer.
-	RequiredAcks      RequiredAcks           // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal). Equivalent to the `request.required.acks` setting of the JVM producer.
-	Timeout           time.Duration          // The maximum duration the broker will wait the receipt of the number of RequiredAcks (defaults to 10 seconds). This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated. Equivalent to the JVM producer's `request.timeout.ms` setting.
-	Compression       CompressionCodec       // The type of compression to use on messages (defaults to no compression). Similar to `compression.codec` setting of the JVM producer.
-	FlushMsgCount     int                    // The number of messages needed to trigger a flush. This is a best effort; the number of messages may be more or less. Use `MaxMessagesPerReq` to set a hard upper limit.
-	FlushFrequency    time.Duration          // If this amount of time elapses without a flush, one will be queued. The frequency is a best effort, and the actual frequency can be more or less. Equivalent to `queue.buffering.max.ms` setting of JVM producer.
-	FlushByteCount    int                    // If this many bytes of messages are accumulated, a flush will be triggered. This is a best effort; the number of bytes may be more or less. Use the gloabl `sarama.MaxRequestSize` to set a hard upper limit.
-	AckSuccesses      bool                   // If enabled, successfully delivered messages will be returned on the Successes channel.
-	MaxMessageBytes   int                    // The maximum permitted size of a message (defaults to 1000000). Equivalent to the broker's `message.max.bytes`.
-	MaxMessagesPerReq int                    // The maximum number of messages the producer will send in a single broker request. Defaults to 0 for unlimited. The global setting MaxRequestSize still applies. Similar to `queue.buffering.max.messages` in the JVM producer.
-	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines (defaults to 256).
-	RetryBackoff      time.Duration          // The amount of time to wait for the cluster to elect a new leader before processing retries (defaults to 100ms). Similar to the retry.backoff.ms setting of the JVM producer.
-	MaxRetries        int                    // The total number of times to retry sending a message (defaults to 3). Similar to the message.send.max.retries setting of the JVM producer.
-}
-
-// NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
-func NewProducerConfig() *ProducerConfig {
-	return &ProducerConfig{
-		Partitioner:       NewHashPartitioner,
-		RequiredAcks:      WaitForLocal,
-		MaxMessageBytes:   1000000,
-		ChannelBufferSize: 256,
-		RetryBackoff:      100 * time.Millisecond,
-		Timeout:           10 * time.Second,
-		MaxRetries:        3,
-	}
-}
-
-// Validate checks a ProducerConfig instance. It will return a
-// ConfigurationError if the specified value doesn't make sense.
-func (config *ProducerConfig) Validate() error {
-	if config.RequiredAcks < -1 {
-		return ConfigurationError("Invalid RequiredAcks")
-	} else if config.RequiredAcks > 1 {
-		Logger.Println("ProducerConfig.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
-	}
-
-	if config.Timeout < 0 {
-		return ConfigurationError("Invalid Timeout")
-	} else if config.Timeout%time.Millisecond != 0 {
-		Logger.Println("ProducerConfig.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
-	}
-
-	if config.RequiredAcks == WaitForAll && config.Timeout == 0 {
-		return ConfigurationError("If you WaitForAll you must specify a non-zero timeout to wait.")
-	}
-
-	if config.FlushMsgCount < 0 {
-		return ConfigurationError("Invalid FlushMsgCount")
-	}
-
-	if config.FlushByteCount < 0 {
-		return ConfigurationError("Invalid FlushByteCount")
-	} else if config.FlushByteCount >= forceFlushThreshold() {
-		Logger.Println("ProducerConfig.FlushByteCount too close to MaxRequestSize; it will be ignored.")
-	}
-
-	if config.FlushFrequency < 0 {
-		return ConfigurationError("Invalid FlushFrequency")
-	}
-
-	if config.Partitioner == nil {
-		return ConfigurationError("No partitioner set")
-	}
-
-	if config.MaxMessageBytes <= 0 {
-		return ConfigurationError("Invalid MaxMessageBytes")
-	} else if config.MaxMessageBytes >= forceFlushThreshold() {
-		Logger.Println("ProducerConfig.MaxMessageBytes too close to MaxRequestSize; it will be ignored.")
-	}
-
-	if config.MaxMessagesPerReq < 0 || (config.MaxMessagesPerReq > 0 && config.MaxMessagesPerReq < config.FlushMsgCount) {
-		return ConfigurationError("Invalid MaxMessagesPerReq, must be non-negative and >= FlushMsgCount if set")
-	}
-
-	if config.RetryBackoff < 0 {
-		return ConfigurationError("Invalid RetryBackoff")
-	}
-
-	if config.MaxRetries < 0 {
-		return ConfigurationError("Invalid MaxRetries")
-	}
-
-	return nil
-}
-
 // Producer publishes Kafka messages. It routes messages to the correct broker
 // for the provided topic-partition, refreshing metadata as appropriate, and
 // parses responses for errors. You must read from the Errors() channel or the
@@ -112,7 +21,7 @@ func (config *ProducerConfig) Validate() error {
 // is still necessary).
 type Producer struct {
 	client *Client
-	config ProducerConfig
+	conf   *Config
 
 	errors                    chan *ProducerError
 	input, successes, retries chan *ProducerMessage
@@ -122,24 +31,24 @@ type Producer struct {
 }
 
 // NewProducer creates a new Producer using the given client.
-func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
+func NewProducer(client *Client, conf *Config) (*Producer, error) {
 	// Check that we are not dealing with a closed Client before processing
 	// any other arguments
 	if client.Closed() {
 		return nil, ErrClosedClient
 	}
 
-	if config == nil {
-		config = NewProducerConfig()
+	if conf == nil {
+		conf = NewConfig()
 	}
 
-	if err := config.Validate(); err != nil {
+	if err := conf.Validate(); err != nil {
 		return nil, err
 	}
 
 	p := &Producer{
 		client:    client,
-		config:    *config,
+		conf:      conf,
 		errors:    make(chan *ProducerError),
 		input:     make(chan *ProducerMessage),
 		successes: make(chan *ProducerMessage),
@@ -226,7 +135,7 @@ func (p *Producer) Errors() <-chan *ProducerError {
 	return p.errors
 }
 
-// Successes is the success output channel back to the user when AckSuccesses is configured.
+// Successes is the success output channel back to the user when AckSuccesses is confured.
 // If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
 // It is suggested that you send and read messages together in a single select statement.
 func (p *Producer) Successes() <-chan *ProducerMessage {
@@ -245,7 +154,7 @@ func (p *Producer) Input() chan<- *ProducerMessage {
 func (p *Producer) Close() error {
 	p.AsyncClose()
 
-	if p.config.AckSuccesses {
+	if p.conf.Producer.AckSuccesses {
 		go withRecover(func() {
 			for _ = range p.successes {
 			}
@@ -296,8 +205,8 @@ func (p *Producer) topicDispatcher() {
 			break
 		}
 
-		if (p.config.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.config.MaxMessageBytes) ||
-			(msg.byteSize() > p.config.MaxMessageBytes) {
+		if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
+			(msg.byteSize() > p.conf.Producer.MaxMessageBytes) {
 
 			p.returnError(msg, ErrMessageSizeTooLarge)
 			continue
@@ -306,7 +215,7 @@ func (p *Producer) topicDispatcher() {
 		handler := handlers[msg.Topic]
 		if handler == nil {
 			p.retries <- &ProducerMessage{flags: ref}
-			newHandler := make(chan *ProducerMessage, p.config.ChannelBufferSize)
+			newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
 			topic := msg.Topic // block local because go's closure semantics suck
 			go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
 			handler = newHandler
@@ -334,7 +243,7 @@ func (p *Producer) topicDispatcher() {
 // partitions messages, then dispatches them by partition
 func (p *Producer) partitionDispatcher(topic string, input chan *ProducerMessage) {
 	handlers := make(map[int32]chan *ProducerMessage)
-	partitioner := p.config.Partitioner()
+	partitioner := p.conf.Producer.Partitioner()
 
 	for msg := range input {
 		if msg.retries == 0 {
@@ -348,7 +257,7 @@ func (p *Producer) partitionDispatcher(topic string, input chan *ProducerMessage
 		handler := handlers[msg.partition]
 		if handler == nil {
 			p.retries <- &ProducerMessage{flags: ref}
-			newHandler := make(chan *ProducerMessage, p.config.ChannelBufferSize)
+			newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
 			topic := msg.Topic         // block local because go's closure semantics suck
 			partition := msg.partition // block local because go's closure semantics suck
 			go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
@@ -401,7 +310,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
 	retryState := make([]struct {
 		buf          []*ProducerMessage
 		expectChaser bool
-	}, p.config.MaxRetries+1)
+	}, p.conf.Producer.Retry.Max+1)
 
 	for msg := range input {
 		if msg.retries > highWatermark {
@@ -414,7 +323,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
 			Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
 			p.unrefBrokerProducer(leader)
 			output = nil
-			time.Sleep(p.config.RetryBackoff)
+			time.Sleep(p.conf.Producer.Retry.Backoff)
 		} else if highWatermark > 0 {
 			// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
 			if msg.retries < highWatermark {
@@ -469,7 +378,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
 		if output == nil {
 			if err := breaker.Run(doUpdate); err != nil {
 				p.returnError(msg, err)
-				time.Sleep(p.config.RetryBackoff)
+				time.Sleep(p.conf.Producer.Retry.Backoff)
 				continue
 			}
 			Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
@@ -488,8 +397,8 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
 func (p *Producer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
 	var ticker *time.Ticker
 	var timer <-chan time.Time
-	if p.config.FlushFrequency > 0 {
-		ticker = time.NewTicker(p.config.FlushFrequency)
+	if p.conf.Producer.Flush.Frequency > 0 {
+		ticker = time.NewTicker(p.conf.Producer.Flush.Frequency)
 		timer = ticker.C
 	}
 
@@ -508,8 +417,8 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *ProducerMessage
 			}
 
 			if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
-				(p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) ||
-				(p.config.MaxMessagesPerReq > 0 && len(buffer) >= p.config.MaxMessagesPerReq) {
+				(p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
+				(p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
 				Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
 				flusher <- buffer
 				buffer = nil
@@ -520,8 +429,8 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *ProducerMessage
 			buffer = append(buffer, msg)
 			bytesAccumulated += msg.byteSize()
 
-			if len(buffer) >= p.config.FlushMsgCount ||
-				(p.config.FlushByteCount > 0 && bytesAccumulated >= p.config.FlushByteCount) {
+			if len(buffer) >= p.conf.Producer.Flush.Messages ||
+				(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
 				doFlush = flusher
 			}
 		case <-timer:
@@ -603,7 +512,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 
 		if response == nil {
 			// this only happens when RequiredAcks is NoResponse, so we have to assume success
-			if p.config.AckSuccesses {
+			if p.conf.Producer.AckSuccesses {
 				p.returnSuccesses(batch)
 			}
 			continue
@@ -623,7 +532,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 				switch block.Err {
 				case ErrNoError:
 					// All the messages for this topic-partition were delivered successfully!
-					if p.config.AckSuccesses {
+					if p.conf.Producer.AckSuccesses {
 						for i := range msgs {
 							msgs[i].offset = block.Offset + int64(i)
 						}
@@ -733,7 +642,7 @@ func (p *Producer) assignPartition(partitioner Partitioner, msg *ProducerMessage
 
 func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
 
-	req := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: int32(p.config.Timeout / time.Millisecond)}
+	req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
 	empty := true
 
 	for topic, partitionSet := range batch {
@@ -756,7 +665,7 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 					}
 				}
 
-				if p.config.Compression != CompressionNone && setSize+msg.byteSize() > p.config.MaxMessageBytes {
+				if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
 					// compression causes message-sets to be wrapped as single messages, which have tighter
 					// size requirements, so we have to respect those limits
 					valBytes, err := encode(setToSend)
@@ -764,7 +673,7 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 						Logger.Println(err) // if this happens, it's basically our fault.
 						panic(err)
 					}
-					req.AddMessage(topic, partition, &Message{Codec: p.config.Compression, Key: nil, Value: valBytes})
+					req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
 					setToSend = new(MessageSet)
 					setSize = 0
 				}
@@ -774,7 +683,7 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 				empty = false
 			}
 
-			if p.config.Compression == CompressionNone {
+			if p.conf.Producer.Compression == CompressionNone {
 				req.AddSet(topic, partition, setToSend)
 			} else {
 				valBytes, err := encode(setToSend)
@@ -782,7 +691,7 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 					Logger.Println(err) // if this happens, it's basically our fault.
 					panic(err)
 				}
-				req.AddMessage(topic, partition, &Message{Codec: p.config.Compression, Key: nil, Value: valBytes})
+				req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
 			}
 		}
 	}
@@ -821,7 +730,7 @@ func (p *Producer) retryMessages(batch []*ProducerMessage, err error) {
 		if msg == nil {
 			continue
 		}
-		if msg.retries >= p.config.MaxRetries {
+		if msg.retries >= p.conf.Producer.Retry.Max {
 			p.returnError(msg, err)
 		} else {
 			msg.retries++

+ 32 - 39
producer_test.go

@@ -28,13 +28,6 @@ func closeProducer(t *testing.T, p *Producer) {
 	wg.Wait()
 }
 
-func TestDefaultProducerConfigValidates(t *testing.T) {
-	config := NewProducerConfig()
-	if err := config.Validate(); err != nil {
-		t.Error(err)
-	}
-}
-
 func TestSyncProducer(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
@@ -97,8 +90,8 @@ func TestConcurrentSyncProducer(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewProducerConfig()
-	config.FlushMsgCount = 100
+	config := NewConfig()
+	config.Producer.Flush.Messages = 100
 	producer, err := NewSyncProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
@@ -145,9 +138,9 @@ func TestProducer(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewProducerConfig()
-	config.FlushMsgCount = 10
-	config.AckSuccesses = true
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.AckSuccesses = true
 	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
@@ -199,9 +192,9 @@ func TestProducerMultipleFlushes(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewProducerConfig()
-	config.FlushMsgCount = 5
-	config.AckSuccesses = true
+	config := NewConfig()
+	config.Producer.Flush.Messages = 5
+	config.Producer.AckSuccesses = true
 	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
@@ -257,10 +250,10 @@ func TestProducerMultipleBrokers(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewProducerConfig()
-	config.FlushMsgCount = 5
-	config.AckSuccesses = true
-	config.Partitioner = NewRoundRobinPartitioner
+	config := NewConfig()
+	config.Producer.Flush.Messages = 5
+	config.Producer.AckSuccesses = true
+	config.Producer.Partitioner = NewRoundRobinPartitioner
 	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
@@ -305,10 +298,10 @@ func TestProducerFailureRetry(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewProducerConfig()
-	config.FlushMsgCount = 10
-	config.AckSuccesses = true
-	config.RetryBackoff = 0
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.AckSuccesses = true
+	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
@@ -383,10 +376,10 @@ func TestProducerBrokerBounce(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewProducerConfig()
-	config.FlushMsgCount = 10
-	config.AckSuccesses = true
-	config.RetryBackoff = 0
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.AckSuccesses = true
+	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
@@ -437,11 +430,11 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewProducerConfig()
-	config.FlushMsgCount = 10
-	config.AckSuccesses = true
-	config.MaxRetries = 3
-	config.RetryBackoff = 0
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.AckSuccesses = true
+	config.Producer.Retry.Max = 3
+	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
@@ -498,11 +491,11 @@ func TestProducerMultipleRetries(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	config := NewProducerConfig()
-	config.FlushMsgCount = 10
-	config.AckSuccesses = true
-	config.MaxRetries = 4
-	config.RetryBackoff = 0
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.AckSuccesses = true
+	config.Producer.Retry.Max = 4
+	config.Producer.Retry.Backoff = 0
 	producer, err := NewProducer(client, config)
 	if err != nil {
 		t.Fatal(err)
@@ -569,7 +562,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 }
 
 func ExampleProducer() {
-	client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
+	client, err := NewClient("client_id", []string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {
@@ -602,7 +595,7 @@ func ExampleProducer() {
 }
 
 func ExampleSyncProducer() {
-	client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
+	client, err := NewClient("client_id", []string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {

+ 4 - 4
sync_producer.go

@@ -10,12 +10,12 @@ type SyncProducer struct {
 	wg       sync.WaitGroup
 }
 
-// NewSyncProducer creates a new SyncProducer using the given client  and configuration.
-func NewSyncProducer(client *Client, config *ProducerConfig) (*SyncProducer, error) {
+// NewSyncProducer creates a new SyncProducer using the given client and configuration.
+func NewSyncProducer(client *Client, config *Config) (*SyncProducer, error) {
 	if config == nil {
-		config = NewProducerConfig()
+		config = NewConfig()
 	}
-	config.AckSuccesses = true
+	config.Producer.AckSuccesses = true
 
 	prod, err := NewProducer(client, config)