Browse Source

Merge pull request #311 from Shopify/unify-config

Unify config
Evan Huus 10 years ago
parent
commit
20700b5c7c
12 changed files with 513 additions and 675 deletions
  1. 27 69
      broker.go
  2. 9 9
      broker_test.go
  3. 24 65
      client.go
  4. 9 16
      client_test.go
  5. 223 0
      config.go
  6. 10 0
      config_test.go
  7. 63 140
      consumer.go
  8. 16 81
      consumer_test.go
  9. 24 31
      functional_test.go
  10. 51 129
      producer.go
  11. 41 126
      producer_test.go
  12. 16 9
      sync_producer.go

+ 27 - 69
broker.go

@@ -10,54 +10,12 @@ import (
 	"time"
 	"time"
 )
 )
 
 
-// BrokerConfig is used to pass multiple configuration options to Broker.Open.
-type BrokerConfig struct {
-	MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5).
-
-	// All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka.
-	DialTimeout  time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
-	ReadTimeout  time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
-	WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
-}
-
-// NewBrokerConfig returns a new broker configuration with sane defaults.
-func NewBrokerConfig() *BrokerConfig {
-	return &BrokerConfig{
-		MaxOpenRequests: 5,
-		DialTimeout:     30 * time.Second,
-		ReadTimeout:     30 * time.Second,
-		WriteTimeout:    30 * time.Second,
-	}
-}
-
-// Validate checks a BrokerConfig instance. This will return a
-// ConfigurationError if the specified values don't make sense.
-func (config *BrokerConfig) Validate() error {
-	if config.MaxOpenRequests <= 0 {
-		return ConfigurationError("Invalid MaxOpenRequests")
-	}
-
-	if config.DialTimeout <= 0 {
-		return ConfigurationError("Invalid DialTimeout")
-	}
-
-	if config.ReadTimeout <= 0 {
-		return ConfigurationError("Invalid ReadTimeout")
-	}
-
-	if config.WriteTimeout <= 0 {
-		return ConfigurationError("Invalid WriteTimeout")
-	}
-
-	return nil
-}
-
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
 // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
 type Broker struct {
 type Broker struct {
 	id   int32
 	id   int32
 	addr string
 	addr string
 
 
-	conf          *BrokerConfig
+	conf          *Config
 	correlationID int32
 	correlationID int32
 	conn          net.Conn
 	conn          net.Conn
 	connErr       error
 	connErr       error
@@ -84,10 +42,10 @@ func NewBroker(addr string) *Broker {
 // waiting for the connection to complete. This means that any subsequent operations on the broker will
 // waiting for the connection to complete. This means that any subsequent operations on the broker will
 // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
 // block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call,
 // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
 // follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or
-// AlreadyConnected. If conf is nil, the result of NewBrokerConfig() is used.
-func (b *Broker) Open(conf *BrokerConfig) error {
+// AlreadyConnected. If conf is nil, the result of NewConfig() is used.
+func (b *Broker) Open(conf *Config) error {
 	if conf == nil {
 	if conf == nil {
-		conf = NewBrokerConfig()
+		conf = NewConfig()
 	}
 	}
 
 
 	err := conf.Validate()
 	err := conf.Validate()
@@ -110,7 +68,7 @@ func (b *Broker) Open(conf *BrokerConfig) error {
 	go withRecover(func() {
 	go withRecover(func() {
 		defer b.lock.Unlock()
 		defer b.lock.Unlock()
 
 
-		b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.DialTimeout)
+		b.conn, b.connErr = net.DialTimeout("tcp", b.addr, conf.Net.DialTimeout)
 		if b.connErr != nil {
 		if b.connErr != nil {
 			b.conn = nil
 			b.conn = nil
 			atomic.StoreInt32(&b.opened, 0)
 			atomic.StoreInt32(&b.opened, 0)
@@ -120,7 +78,7 @@ func (b *Broker) Open(conf *BrokerConfig) error {
 
 
 		b.conf = conf
 		b.conf = conf
 		b.done = make(chan bool)
 		b.done = make(chan bool)
-		b.responses = make(chan responsePromise, b.conf.MaxOpenRequests-1)
+		b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)
 
 
 		Logger.Printf("Connected to broker %s\n", b.addr)
 		Logger.Printf("Connected to broker %s\n", b.addr)
 		go withRecover(b.responseReceiver)
 		go withRecover(b.responseReceiver)
@@ -178,10 +136,10 @@ func (b *Broker) Addr() string {
 	return b.addr
 	return b.addr
 }
 }
 
 
-func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error) {
+func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
 	response := new(MetadataResponse)
 	response := new(MetadataResponse)
 
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -190,10 +148,10 @@ func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*Metada
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
+func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
 	response := new(ConsumerMetadataResponse)
 	response := new(ConsumerMetadataResponse)
 
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -202,10 +160,10 @@ func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataR
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error) {
+func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
 	response := new(OffsetResponse)
 	response := new(OffsetResponse)
 
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -214,15 +172,15 @@ func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error) {
+func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
 	var response *ProduceResponse
 	var response *ProduceResponse
 	var err error
 	var err error
 
 
 	if request.RequiredAcks == NoResponse {
 	if request.RequiredAcks == NoResponse {
-		err = b.sendAndReceive(clientID, request, nil)
+		err = b.sendAndReceive(request, nil)
 	} else {
 	} else {
 		response = new(ProduceResponse)
 		response = new(ProduceResponse)
-		err = b.sendAndReceive(clientID, request, response)
+		err = b.sendAndReceive(request, response)
 	}
 	}
 
 
 	if err != nil {
 	if err != nil {
@@ -232,10 +190,10 @@ func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResp
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error) {
+func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
 	response := new(FetchResponse)
 	response := new(FetchResponse)
 
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -244,10 +202,10 @@ func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse,
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
+func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
 	response := new(OffsetCommitResponse)
 	response := new(OffsetCommitResponse)
 
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -256,10 +214,10 @@ func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*O
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
+func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
 	response := new(OffsetFetchResponse)
 	response := new(OffsetFetchResponse)
 
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -268,7 +226,7 @@ func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*Off
 	return response, nil
 	return response, nil
 }
 }
 
 
-func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool) (*responsePromise, error) {
+func (b *Broker) send(req requestEncoder, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	b.lock.Lock()
 	defer b.lock.Unlock()
 	defer b.lock.Unlock()
 
 
@@ -279,13 +237,13 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 		return nil, ErrNotConnected
 		return nil, ErrNotConnected
 	}
 	}
 
 
-	fullRequest := request{b.correlationID, clientID, req}
+	fullRequest := request{b.correlationID, b.conf.ClientID, req}
 	buf, err := encode(&fullRequest)
 	buf, err := encode(&fullRequest)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.WriteTimeout))
+	err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -306,8 +264,8 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 	return &promise, nil
 	return &promise, nil
 }
 }
 
 
-func (b *Broker) sendAndReceive(clientID string, req requestEncoder, res decoder) error {
-	promise, err := b.send(clientID, req, res != nil)
+func (b *Broker) sendAndReceive(req requestEncoder, res decoder) error {
+	promise, err := b.send(req, res != nil)
 
 
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -372,7 +330,7 @@ func (b *Broker) encode(pe packetEncoder) (err error) {
 func (b *Broker) responseReceiver() {
 func (b *Broker) responseReceiver() {
 	header := make([]byte, 8)
 	header := make([]byte, 8)
 	for response := range b.responses {
 	for response := range b.responses {
-		err := b.conn.SetReadDeadline(time.Now().Add(b.conf.ReadTimeout))
+		err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
 		if err != nil {
 		if err != nil {
 			response.errors <- err
 			response.errors <- err
 			continue
 			continue

+ 9 - 9
broker_test.go

@@ -13,7 +13,7 @@ func ExampleBroker() error {
 	}
 	}
 
 
 	request := MetadataRequest{Topics: []string{"myTopic"}}
 	request := MetadataRequest{Topics: []string{"myTopic"}}
-	response, err := broker.GetMetadata("myClient", &request)
+	response, err := broker.GetMetadata(&request)
 	if err != nil {
 	if err != nil {
 		_ = broker.Close()
 		_ = broker.Close()
 		return err
 		return err
@@ -80,7 +80,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 		func(t *testing.T, broker *Broker) {
 			request := MetadataRequest{}
 			request := MetadataRequest{}
-			response, err := broker.GetMetadata("clientID", &request)
+			response, err := broker.GetMetadata(&request)
 			if err != nil {
 			if err != nil {
 				t.Error(err)
 				t.Error(err)
 			}
 			}
@@ -92,7 +92,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
 	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 		func(t *testing.T, broker *Broker) {
 			request := ConsumerMetadataRequest{}
 			request := ConsumerMetadataRequest{}
-			response, err := broker.GetConsumerMetadata("clientID", &request)
+			response, err := broker.GetConsumerMetadata(&request)
 			if err != nil {
 			if err != nil {
 				t.Error(err)
 				t.Error(err)
 			}
 			}
@@ -105,7 +105,7 @@ var brokerTestTable = []struct {
 		func(t *testing.T, broker *Broker) {
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
 			request := ProduceRequest{}
 			request.RequiredAcks = NoResponse
 			request.RequiredAcks = NoResponse
-			response, err := broker.Produce("clientID", &request)
+			response, err := broker.Produce(&request)
 			if err != nil {
 			if err != nil {
 				t.Error(err)
 				t.Error(err)
 			}
 			}
@@ -118,7 +118,7 @@ var brokerTestTable = []struct {
 		func(t *testing.T, broker *Broker) {
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
 			request := ProduceRequest{}
 			request.RequiredAcks = WaitForLocal
 			request.RequiredAcks = WaitForLocal
-			response, err := broker.Produce("clientID", &request)
+			response, err := broker.Produce(&request)
 			if err != nil {
 			if err != nil {
 				t.Error(err)
 				t.Error(err)
 			}
 			}
@@ -130,7 +130,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 		func(t *testing.T, broker *Broker) {
 			request := FetchRequest{}
 			request := FetchRequest{}
-			response, err := broker.Fetch("clientID", &request)
+			response, err := broker.Fetch(&request)
 			if err != nil {
 			if err != nil {
 				t.Error(err)
 				t.Error(err)
 			}
 			}
@@ -142,7 +142,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 		func(t *testing.T, broker *Broker) {
 			request := OffsetFetchRequest{}
 			request := OffsetFetchRequest{}
-			response, err := broker.FetchOffset("clientID", &request)
+			response, err := broker.FetchOffset(&request)
 			if err != nil {
 			if err != nil {
 				t.Error(err)
 				t.Error(err)
 			}
 			}
@@ -154,7 +154,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 		func(t *testing.T, broker *Broker) {
 			request := OffsetCommitRequest{}
 			request := OffsetCommitRequest{}
-			response, err := broker.CommitOffset("clientID", &request)
+			response, err := broker.CommitOffset(&request)
 			if err != nil {
 			if err != nil {
 				t.Error(err)
 				t.Error(err)
 			}
 			}
@@ -166,7 +166,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 		func(t *testing.T, broker *Broker) {
 			request := OffsetRequest{}
 			request := OffsetRequest{}
-			response, err := broker.GetAvailableOffsets("clientID", &request)
+			response, err := broker.GetAvailableOffsets(&request)
 			if err != nil {
 			if err != nil {
 				t.Error(err)
 				t.Error(err)
 			}
 			}

+ 24 - 65
client.go

@@ -6,54 +6,12 @@ import (
 	"time"
 	"time"
 )
 )
 
 
-// ClientConfig is used to pass multiple configuration options to NewClient.
-type ClientConfig struct {
-	MetadataRetries            int           // How many times to retry a metadata request when a partition is in the middle of leader election.
-	WaitForElection            time.Duration // How long to wait for leader election to finish between retries.
-	DefaultBrokerConf          *BrokerConfig // Default configuration for broker connections created by this client.
-	BackgroundRefreshFrequency time.Duration // How frequently the client will refresh the cluster metadata in the background. Defaults to 10 minutes. Set to 0 to disable.
-}
-
-// NewClientConfig creates a new ClientConfig instance with sensible defaults
-func NewClientConfig() *ClientConfig {
-	return &ClientConfig{
-		MetadataRetries:            3,
-		WaitForElection:            250 * time.Millisecond,
-		BackgroundRefreshFrequency: 10 * time.Minute,
-	}
-}
-
-// Validate checks a ClientConfig instance. This will return a
-// ConfigurationError if the specified values don't make sense.
-func (config *ClientConfig) Validate() error {
-	if config.MetadataRetries < 0 {
-		return ConfigurationError("Invalid MetadataRetries, must be >= 0")
-	}
-
-	if config.WaitForElection <= time.Duration(0) {
-		return ConfigurationError("Invalid WaitForElection, must be > 0")
-	}
-
-	if config.DefaultBrokerConf != nil {
-		if err := config.DefaultBrokerConf.Validate(); err != nil {
-			return err
-		}
-	}
-
-	if config.BackgroundRefreshFrequency < time.Duration(0) {
-		return ConfigurationError("Invalid BackgroundRefreshFrequency, must be >= 0")
-	}
-
-	return nil
-}
-
 // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
 // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
 // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
 // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
 // automatically when it passes out of scope. A single client can be safely shared by
 // automatically when it passes out of scope. A single client can be safely shared by
 // multiple concurrent Producers and Consumers.
 // multiple concurrent Producers and Consumers.
 type Client struct {
 type Client struct {
-	id     string
-	config ClientConfig
+	conf   *Config
 	closer chan none
 	closer chan none
 
 
 	// the broker addresses given to us through the constructor are not guaranteed to be returned in
 	// the broker addresses given to us through the constructor are not guaranteed to be returned in
@@ -72,17 +30,17 @@ type Client struct {
 	lock                    sync.RWMutex // protects access to the maps, only one since they're always written together
 	lock                    sync.RWMutex // protects access to the maps, only one since they're always written together
 }
 }
 
 
-// NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
+// NewClient creates a new Client. It connects to one of the given broker addresses
 // and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
 // and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
 // be retrieved from any of the given broker addresses, the client is not created.
 // be retrieved from any of the given broker addresses, the client is not created.
-func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error) {
+func NewClient(addrs []string, conf *Config) (*Client, error) {
 	Logger.Println("Initializing new client")
 	Logger.Println("Initializing new client")
 
 
-	if config == nil {
-		config = NewClientConfig()
+	if conf == nil {
+		conf = NewConfig()
 	}
 	}
 
 
-	if err := config.Validate(); err != nil {
+	if err := conf.Validate(); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
@@ -91,8 +49,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 	}
 	}
 
 
 	client := &Client{
 	client := &Client{
-		id:                      id,
-		config:                  *config,
+		conf:                    conf,
 		closer:                  make(chan none),
 		closer:                  make(chan none),
 		seedBrokerAddrs:         addrs,
 		seedBrokerAddrs:         addrs,
 		seedBroker:              NewBroker(addrs[0]),
 		seedBroker:              NewBroker(addrs[0]),
@@ -101,7 +58,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 		metadata:                make(map[string]map[int32]*PartitionMetadata),
 		metadata:                make(map[string]map[int32]*PartitionMetadata),
 		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
 		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
 	}
 	}
-	_ = client.seedBroker.Open(config.DefaultBrokerConf)
+	_ = client.seedBroker.Open(conf)
 
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	err := client.RefreshAllMetadata()
 	err := client.RefreshAllMetadata()
@@ -288,13 +245,13 @@ func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
 // RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
 // RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
 // available metadata for those topics.
 // available metadata for those topics.
 func (client *Client) RefreshTopicMetadata(topics ...string) error {
 func (client *Client) RefreshTopicMetadata(topics ...string) error {
-	return client.refreshMetadata(topics, client.config.MetadataRetries)
+	return client.refreshMetadata(topics, client.conf.Metadata.Retry.Max)
 }
 }
 
 
 // RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
 // RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
 func (client *Client) RefreshAllMetadata() error {
 func (client *Client) RefreshAllMetadata() error {
 	// Kafka refreshes all when you encode it an empty array...
 	// Kafka refreshes all when you encode it an empty array...
-	return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries)
+	return client.refreshMetadata(make([]string, 0), client.conf.Metadata.Retry.Max)
 }
 }
 
 
 // GetOffset queries the cluster to get the most recent available offset at the given
 // GetOffset queries the cluster to get the most recent available offset at the given
@@ -308,7 +265,7 @@ func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTim
 	request := &OffsetRequest{}
 	request := &OffsetRequest{}
 	request.AddBlock(topic, partitionID, where, 1)
 	request.AddBlock(topic, partitionID, where, 1)
 
 
-	response, err := broker.GetAvailableOffsets(client.id, request)
+	response, err := broker.GetAvailableOffsets(request)
 	if err != nil {
 	if err != nil {
 		return -1, err
 		return -1, err
 	}
 	}
@@ -344,7 +301,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
 		client.seedBrokerAddrs = client.seedBrokerAddrs[1:]
 		client.seedBrokerAddrs = client.seedBrokerAddrs[1:]
 		if len(client.seedBrokerAddrs) > 0 {
 		if len(client.seedBrokerAddrs) > 0 {
 			client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
 			client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
-			_ = client.seedBroker.Open(client.config.DefaultBrokerConf)
+			_ = client.seedBroker.Open(client.conf)
 		} else {
 		} else {
 			client.seedBroker = nil
 			client.seedBroker = nil
 		}
 		}
@@ -372,7 +329,7 @@ func (client *Client) resurrectDeadBrokers() {
 	client.deadBrokerAddrs = make(map[string]none)
 	client.deadBrokerAddrs = make(map[string]none)
 
 
 	client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
 	client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
-	_ = client.seedBroker.Open(client.config.DefaultBrokerConf)
+	_ = client.seedBroker.Open(client.conf)
 }
 }
 
 
 func (client *Client) any() *Broker {
 func (client *Client) any() *Broker {
@@ -489,11 +446,11 @@ func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, er
 // core metadata update logic
 // core metadata update logic
 
 
 func (client *Client) backgroundMetadataUpdater() {
 func (client *Client) backgroundMetadataUpdater() {
-	if client.config.BackgroundRefreshFrequency == time.Duration(0) {
+	if client.conf.Metadata.RefreshFrequency == time.Duration(0) {
 		return
 		return
 	}
 	}
 
 
-	ticker := time.NewTicker(client.config.BackgroundRefreshFrequency)
+	ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
 	for {
 	for {
 		select {
 		select {
 		case <-ticker.C:
 		case <-ticker.C:
@@ -530,7 +487,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 		} else {
 		} else {
 			Logger.Printf("Fetching metadata for all topics from broker %s\n", broker.addr)
 			Logger.Printf("Fetching metadata for all topics from broker %s\n", broker.addr)
 		}
 		}
-		response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
+		response, err := broker.GetMetadata(&MetadataRequest{Topics: topics})
 
 
 		switch err.(type) {
 		switch err.(type) {
 		case nil:
 		case nil:
@@ -542,8 +499,9 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 					Logger.Println("Some partitions are leaderless, but we're out of retries")
 					Logger.Println("Some partitions are leaderless, but we're out of retries")
 					return nil
 					return nil
 				}
 				}
-				Logger.Printf("Some partitions are leaderless, waiting %dms for election... (%d retries remaining)\n", client.config.WaitForElection/time.Millisecond, retriesRemaining)
-				time.Sleep(client.config.WaitForElection) // wait for leader election
+				Logger.Printf("Some partitions are leaderless, waiting %dms for election... (%d retries remaining)\n",
+					client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
+				time.Sleep(client.conf.Metadata.Retry.Backoff) // wait for leader election
 				return client.refreshMetadata(retry, retriesRemaining-1)
 				return client.refreshMetadata(retry, retriesRemaining-1)
 			}
 			}
 
 
@@ -561,8 +519,9 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 	Logger.Println("Out of available brokers.")
 	Logger.Println("Out of available brokers.")
 
 
 	if retriesRemaining > 0 {
 	if retriesRemaining > 0 {
-		Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.config.WaitForElection/time.Millisecond, retriesRemaining)
-		time.Sleep(client.config.WaitForElection)
+		Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n",
+			client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
+		time.Sleep(client.conf.Metadata.Retry.Backoff)
 		client.resurrectDeadBrokers()
 		client.resurrectDeadBrokers()
 		return client.refreshMetadata(topics, retriesRemaining-1)
 		return client.refreshMetadata(topics, retriesRemaining-1)
 	}
 	}
@@ -584,12 +543,12 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	// If it fails and we do care, whoever tries to use it will get the connection error.
 	// If it fails and we do care, whoever tries to use it will get the connection error.
 	for _, broker := range data.Brokers {
 	for _, broker := range data.Brokers {
 		if client.brokers[broker.ID()] == nil {
 		if client.brokers[broker.ID()] == nil {
-			_ = broker.Open(client.config.DefaultBrokerConf)
+			_ = broker.Open(client.conf)
 			client.brokers[broker.ID()] = broker
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
 			Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
 		} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
 		} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
 			safeAsyncClose(client.brokers[broker.ID()])
 			safeAsyncClose(client.brokers[broker.ID()])
-			_ = broker.Open(client.config.DefaultBrokerConf)
+			_ = broker.Open(client.conf)
 			client.brokers[broker.ID()] = broker
 			client.brokers[broker.ID()] = broker
 			Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
 			Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
 		}
 		}

+ 9 - 16
client_test.go

@@ -12,19 +12,12 @@ func safeClose(t *testing.T, c io.Closer) {
 	}
 	}
 }
 }
 
 
-func TestDefaultClientConfigValidates(t *testing.T) {
-	config := NewClientConfig()
-	if err := config.Validate(); err != nil {
-		t.Error(err)
-	}
-}
-
 func TestSimpleClient(t *testing.T) {
 func TestSimpleClient(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	seedBroker := newMockBroker(t, 1)
 
 
 	seedBroker.Returns(new(MetadataResponse))
 	seedBroker.Returns(new(MetadataResponse))
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -46,9 +39,9 @@ func TestCachedPartitions(t *testing.T) {
 	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
 	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataResponse)
 	seedBroker.Returns(metadataResponse)
 
 
-	config := NewClientConfig()
-	config.MetadataRetries = 0
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, config)
+	config := NewConfig()
+	config.Metadata.Retry.Max = 0
+	client, err := NewClient([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -81,7 +74,7 @@ func TestClientSeedBrokers(t *testing.T) {
 	metadataResponse.AddBroker(discoveredBroker.Addr(), discoveredBroker.BrokerID())
 	metadataResponse.AddBroker(discoveredBroker.Addr(), discoveredBroker.BrokerID())
 	seedBroker.Returns(metadataResponse)
 	seedBroker.Returns(metadataResponse)
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -104,9 +97,9 @@ func TestClientMetadata(t *testing.T) {
 	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
 	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataResponse)
 	seedBroker.Returns(metadataResponse)
 
 
-	config := NewClientConfig()
-	config.MetadataRetries = 0
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, config)
+	config := NewConfig()
+	config.Metadata.Retry.Max = 0
+	client, err := NewClient([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -176,7 +169,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, ErrNoError)
 	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse2)
 	seedBroker.Returns(metadataResponse2)
 
 
-	client, err := NewClient("clientID", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}

+ 223 - 0
config.go

@@ -0,0 +1,223 @@
+package sarama
+
+import "time"
+
+// Config is used to pass multiple configuration options to Sarama's constructors.
+type Config struct {
+	// Net is the namespace for network-level properties used by the Broker, and shared by the Client/Producer/Consumer.
+	Net struct {
+		MaxOpenRequests int // How many outstanding requests a connection is allowed to have before sending on it blocks (default 5).
+
+		// All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka.
+		DialTimeout  time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
+		ReadTimeout  time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
+		WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
+	}
+
+	// Metadata is the namespace for metadata management properties used by the Client, and shared by the Producer/Consumer.
+	Metadata struct {
+		Retry struct {
+			Max     int           // The total number of times to retry a metadata request when the cluster is in the middle of a leader election (default 3).
+			Backoff time.Duration // How long to wait for leader election to occur before retrying (default 250ms). Similar to the JVM's `retry.backoff.ms`.
+		}
+		// How frequently to refresh the cluster metadata in the background. Defaults to 10 minutes.
+		// Set to 0 to disable. Similar to `topic.metadata.refresh.interval.ms` in the JVM version.
+		RefreshFrequency time.Duration
+	}
+
+	// Producer is the namespace for configuration related to producing messages, used by the Producer.
+	Producer struct {
+		// The maximum permitted size of a message (defaults to 1000000). Should be set equal to or smaller than the broker's `message.max.bytes`.
+		MaxMessageBytes int
+		// The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
+		// Equivalent to the `request.required.acks` setting of the JVM producer.
+		RequiredAcks RequiredAcks
+		// The maximum duration the broker will wait the receipt of the number of RequiredAcks (defaults to 10 seconds).
+		// This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution,
+		// nanoseconds will be truncated. Equivalent to the JVM producer's `request.timeout.ms` setting.
+		Timeout time.Duration
+		// The type of compression to use on messages (defaults to no compression). Similar to `compression.codec` setting of the JVM producer.
+		Compression CompressionCodec
+		// Generates partitioners for choosing the partition to send messages to (defaults to hashing the message key).
+		// Similar to the `partitioner.class` setting for the JVM producer.
+		Partitioner PartitionerConstructor
+		// If enabled, successfully delivered messages will be returned on the Successes channel (default disabled).
+		AckSuccesses bool
+
+		// The following config options control how often messages are batched up and sent to the broker. By default,
+		// messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed
+		// into the subsequent batch.
+		Flush struct {
+			Bytes     int           // The best-effort number of bytes needed to trigger a flush. Use the global sarama.MaxRequestSize to set a hard upper limit.
+			Messages  int           // The best-effort number of messages needed to trigger a flush. Use `MaxMessages` to set a hard upper limit.
+			Frequency time.Duration // The best-effort frequency of flushes. Equivalent to `queue.buffering.max.ms` setting of JVM producer.
+			// The maximum number of messages the producer will send in a single broker request.
+			// Defaults to 0 for unlimited. Similar to `queue.buffering.max.messages` in the JVM producer.
+			MaxMessages int
+		}
+
+		Retry struct {
+			// The total number of times to retry sending a message (default 3).
+			// Similar to the `message.send.max.retries` setting of the JVM producer.
+			Max int
+			// How long to wait for the cluster to settle between retries (default 100ms).
+			// Similar to the `retry.backoff.ms` setting of the JVM producer.
+			Backoff time.Duration
+		}
+	}
+
+	// Consumer is the namespace for configuration related to consuming messages, used by the Consumer.
+	Consumer struct {
+		// Fetch is the namespace for controlling how many bytes are retrieved by any given request.
+		Fetch struct {
+			// The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available.
+			// The default is 1, as 0 causes the consumer to spin when no messages are available. Equivalent to the JVM's `fetch.min.bytes`.
+			Min int32
+			// The default number of message bytes to fetch from the broker in each request (default 32768). This should be larger than the
+			// majority of your messages, or else the consumer will spend a lot of time negotiating sizes and not actually consuming. Similar
+			// to the JVM's `fetch.message.max.bytes`.
+			Default int32
+			// The maximum number of message bytes to fetch from the broker in a single request. Messages larger than this will return
+			// ErrMessageTooLarge and will not be consumable, so you must be sure this is at least as large as your largest message.
+			// Defaults to 0 (no limit). Similar to the JVM's `fetch.message.max.bytes`. The global `sarama.MaxResponseSize` still applies.
+			Max int32
+		}
+		// The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
+		// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
+		// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
+		// Equivalent to the JVM's `fetch.wait.max.ms`.
+		MaxWaitTime time.Duration
+	}
+
+	// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
+	// Defaults to "sarama", but you should probably set it to something specific to your application.
+	ClientID string
+	// The number of events to buffer in internal and external channels. This permits the producer and consumer to
+	// continue processing some messages in the background while user code is working, greatly improving throughput.
+	// Defaults to 256.
+	ChannelBufferSize int
+}
+
+// NewConfig returns a new configuration instance with sane defaults.
+func NewConfig() *Config {
+	c := &Config{}
+
+	c.Net.MaxOpenRequests = 5
+	c.Net.DialTimeout = 30 * time.Second
+	c.Net.ReadTimeout = 30 * time.Second
+	c.Net.WriteTimeout = 30 * time.Second
+
+	c.Metadata.Retry.Max = 3
+	c.Metadata.Retry.Backoff = 250 * time.Millisecond
+	c.Metadata.RefreshFrequency = 10 * time.Minute
+
+	c.Producer.MaxMessageBytes = 1000000
+	c.Producer.RequiredAcks = WaitForLocal
+	c.Producer.Timeout = 10 * time.Second
+	c.Producer.Partitioner = NewHashPartitioner
+	c.Producer.Retry.Max = 3
+	c.Producer.Retry.Backoff = 100 * time.Millisecond
+
+	c.Consumer.Fetch.Min = 1
+	c.Consumer.Fetch.Default = 32768
+	c.Consumer.MaxWaitTime = 250 * time.Millisecond
+
+	c.ChannelBufferSize = 256
+
+	return c
+}
+
+// Validate checks a Config instance. It will return a
+// ConfigurationError if the specified values don't make sense.
+func (c *Config) Validate() error {
+	// some configuration values should be warned on but not fail completely, do those first
+	if c.Producer.RequiredAcks > 1 {
+		Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
+	}
+	if c.Producer.MaxMessageBytes >= forceFlushThreshold() {
+		Logger.Println("Producer.MaxMessageBytes is too close to MaxRequestSize; it will be ignored.")
+	}
+	if c.Producer.Flush.Bytes >= forceFlushThreshold() {
+		Logger.Println("Producer.Flush.Bytes is too close to MaxRequestSize; it will be ignored.")
+	}
+	if c.Producer.Timeout%time.Millisecond != 0 {
+		Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
+	}
+	if c.Consumer.MaxWaitTime < 100*time.Millisecond {
+		Logger.Println("Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
+	}
+	if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
+		Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
+	}
+	if c.ClientID == "sarama" {
+		Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
+	}
+
+	// validate Net values
+	switch {
+	case c.Net.MaxOpenRequests <= 0:
+		return ConfigurationError("Invalid Net.MaxOpenRequests, must be > 0")
+	case c.Net.DialTimeout <= 0:
+		return ConfigurationError("Invalid Net.DialTimeout, must be > 0")
+	case c.Net.ReadTimeout <= 0:
+		return ConfigurationError("Invalid Net.ReadTimeout, must be > 0")
+	case c.Net.WriteTimeout <= 0:
+		return ConfigurationError("Invalid Net.WriteTimeout, must be > 0")
+	}
+
+	// validate the Metadata values
+	switch {
+	case c.Metadata.Retry.Max < 0:
+		return ConfigurationError("Invalid Metadata.Retry.Max, must be >= 0")
+	case c.Metadata.Retry.Backoff <= time.Duration(0):
+		return ConfigurationError("Invalid Metadata.Retry.Backoff, must be > 0")
+	case c.Metadata.RefreshFrequency < time.Duration(0):
+		return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0")
+	}
+
+	// validate the Produce values
+	switch {
+	case c.Producer.MaxMessageBytes <= 0:
+		return ConfigurationError("Invalid Producer.MaxMessageBytes, must be > 0")
+	case c.Producer.RequiredAcks < -1:
+		return ConfigurationError("Invalid Producer.RequiredAcks, must be >= -1")
+	case c.Producer.Timeout <= 0:
+		return ConfigurationError("Invalid Producer.Timeout, must be > 0")
+	case c.Producer.Partitioner == nil:
+		return ConfigurationError("Invalid Producer.Partitioner, must not be nil")
+	case c.Producer.Flush.Bytes < 0:
+		return ConfigurationError("Invalid Producer.Flush.Bytes, must be >= 0")
+	case c.Producer.Flush.Messages < 0:
+		return ConfigurationError("Invalid Producer.Flush.Messages, must be >= 0")
+	case c.Producer.Flush.Frequency < 0:
+		return ConfigurationError("Invalid Producer.Flush.Frequency, must be >= 0")
+	case c.Producer.Flush.MaxMessages < 0:
+		return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= 0")
+	case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
+		return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= Producer.Flush.Messages when set")
+	case c.Producer.Retry.Max < 0:
+		return ConfigurationError("Invalid Producer.MaxRetries, must be >= 0")
+	case c.Producer.Retry.Backoff < 0:
+		return ConfigurationError("Invalid Producer.RetryBackoff, must be >= 0")
+	}
+
+	// validate the Consume values
+	switch {
+	case c.Consumer.Fetch.Min <= 0:
+		return ConfigurationError("Invalid Consumer.Fetch.Min, must be > 0")
+	case c.Consumer.Fetch.Default <= 0:
+		return ConfigurationError("Invalid Consumer.Fetch.Default, must be > 0")
+	case c.Consumer.Fetch.Max < 0:
+		return ConfigurationError("Invalid Consumer.Fetch.Max, must be >= 0")
+	case c.Consumer.MaxWaitTime < 1*time.Millisecond:
+		return ConfigurationError("Invalid Consumer.MaxWaitTime, must be > 1ms")
+	}
+
+	// validate misc shared values
+	switch {
+	case c.ChannelBufferSize < 0:
+		return ConfigurationError("Invalid ChannelBufferSize, must be >= 0")
+	}
+
+	return nil
+}

+ 10 - 0
config_test.go

@@ -0,0 +1,10 @@
+package sarama
+
+import "testing"
+
+func TestDefaultConfigValidates(t *testing.T) {
+	config := NewConfig()
+	if err := config.Validate(); err != nil {
+		t.Error(err)
+	}
+}

+ 63 - 140
consumer.go

@@ -6,101 +6,6 @@ import (
 	"time"
 	"time"
 )
 )
 
 
-// OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.
-type OffsetMethod int
-
-const (
-	// OffsetMethodNewest causes the consumer to start at the most recent available offset, as
-	// determined by querying the broker.
-	OffsetMethodNewest OffsetMethod = iota
-	// OffsetMethodOldest causes the consumer to start at the oldest available offset, as
-	// determined by querying the broker.
-	OffsetMethodOldest
-	// OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
-	// offset at which to start, allowing the user to manually specify their desired starting offset.
-	OffsetMethodManual
-)
-
-// ConsumerConfig is used to pass multiple configuration options to NewConsumer.
-type ConsumerConfig struct {
-	// The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
-	// The default is 1, as 0 causes the consumer to spin when no messages are available.
-	MinFetchSize int32
-	// The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it
-	// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
-	// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
-	MaxWaitTime time.Duration
-}
-
-// NewConsumerConfig creates a ConsumerConfig instance with sane defaults.
-func NewConsumerConfig() *ConsumerConfig {
-	return &ConsumerConfig{
-		MinFetchSize: 1,
-		MaxWaitTime:  250 * time.Millisecond,
-	}
-}
-
-// Validate checks a ConsumerConfig instance. It will return a
-// ConfigurationError if the specified value doesn't make sense.
-func (config *ConsumerConfig) Validate() error {
-	if config.MinFetchSize <= 0 {
-		return ConfigurationError("Invalid MinFetchSize")
-	}
-
-	if config.MaxWaitTime < 1*time.Millisecond {
-		return ConfigurationError("Invalid MaxWaitTime, it needs to be at least 1ms")
-	} else if config.MaxWaitTime < 100*time.Millisecond {
-		Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
-	} else if config.MaxWaitTime%time.Millisecond != 0 {
-		Logger.Println("ConsumerConfig.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
-	}
-
-	return nil
-}
-
-// PartitionConsumerConfig is used to pass multiple configuration options to AddPartition
-type PartitionConsumerConfig struct {
-	// The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes.
-	DefaultFetchSize int32
-	// The maximum permittable message size - messages larger than this will return ErrMessageTooLarge. The default of 0 is
-	// treated as no limit.
-	MaxMessageSize int32
-	// The method used to determine at which offset to begin consuming messages. The default is to start at the most recent message.
-	OffsetMethod OffsetMethod
-	// Interpreted differently according to the value of OffsetMethod.
-	OffsetValue int64
-	// The number of events to buffer in the Messages and Errors channel. Having this non-zero permits the
-	// consumer to continue fetching messages in the background while client code consumes events,
-	// greatly improving throughput. The default is 64.
-	ChannelBufferSize int
-}
-
-// NewPartitionConsumerConfig creates a PartitionConsumerConfig with sane defaults.
-func NewPartitionConsumerConfig() *PartitionConsumerConfig {
-	return &PartitionConsumerConfig{
-		DefaultFetchSize:  32768,
-		ChannelBufferSize: 64,
-	}
-}
-
-// Validate checks a PartitionConsumerConfig instance. It will return a
-// ConfigurationError if the specified value doesn't make sense.
-func (config *PartitionConsumerConfig) Validate() error {
-	if config.DefaultFetchSize <= 0 {
-		return ConfigurationError("Invalid DefaultFetchSize")
-	}
-
-	if config.MaxMessageSize < 0 {
-		return ConfigurationError("Invalid MaxMessageSize")
-	}
-
-	if config.ChannelBufferSize < 0 {
-		return ConfigurationError("Invalid ChannelBufferSize")
-	}
-
-	return nil
-}
-
 // ConsumerMessage encapsulates a Kafka message returned by the consumer.
 // ConsumerMessage encapsulates a Kafka message returned by the consumer.
 type ConsumerMessage struct {
 type ConsumerMessage struct {
 	Key, Value []byte
 	Key, Value []byte
@@ -130,34 +35,44 @@ func (ce ConsumerErrors) Error() string {
 	return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
 	return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
 }
 }
 
 
-// Consumer manages PartitionConsumers which process Kafka messages from brokers.
+// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
+// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
+// scope.
 type Consumer struct {
 type Consumer struct {
-	client *Client
-	config ConsumerConfig
+	client    *Client
+	conf      *Config
+	ownClient bool
 
 
 	lock            sync.Mutex
 	lock            sync.Mutex
 	children        map[string]map[int32]*PartitionConsumer
 	children        map[string]map[int32]*PartitionConsumer
 	brokerConsumers map[*Broker]*brokerConsumer
 	brokerConsumers map[*Broker]*brokerConsumer
 }
 }
 
 
-// NewConsumer creates a new consumer attached to the given client.
-func NewConsumer(client *Client, config *ConsumerConfig) (*Consumer, error) {
-	// Check that we are not dealing with a closed Client before processing any other arguments
-	if client.Closed() {
-		return nil, ErrClosedClient
+// NewConsumer creates a new consumer using the given broker addresses and configuration.
+func NewConsumer(addrs []string, config *Config) (*Consumer, error) {
+	client, err := NewClient(addrs, config)
+	if err != nil {
+		return nil, err
 	}
 	}
 
 
-	if config == nil {
-		config = NewConsumerConfig()
+	c, err := NewConsumerFromClient(client)
+	if err != nil {
+		return nil, err
 	}
 	}
+	c.ownClient = true
+	return c, nil
+}
 
 
-	if err := config.Validate(); err != nil {
-		return nil, err
+// NewConsumerFromClient creates a new consumer using the given client.
+func NewConsumerFromClient(client *Client) (*Consumer, error) {
+	// Check that we are not dealing with a closed Client before processing any other arguments
+	if client.Closed() {
+		return nil, ErrClosedClient
 	}
 	}
 
 
 	c := &Consumer{
 	c := &Consumer{
 		client:          client,
 		client:          client,
-		config:          *config,
+		conf:            client.conf,
 		children:        make(map[string]map[int32]*PartitionConsumer),
 		children:        make(map[string]map[int32]*PartitionConsumer),
 		brokerConsumers: make(map[*Broker]*brokerConsumer),
 		brokerConsumers: make(map[*Broker]*brokerConsumer),
 	}
 	}
@@ -165,30 +80,40 @@ func NewConsumer(client *Client, config *ConsumerConfig) (*Consumer, error) {
 	return c, nil
 	return c, nil
 }
 }
 
 
-// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given configuration. It will
-// return an error if this Consumer is already consuming on the given topic/partition.
-func (c *Consumer) ConsumePartition(topic string, partition int32, config *PartitionConsumerConfig) (*PartitionConsumer, error) {
-	if config == nil {
-		config = NewPartitionConsumerConfig()
+// Close shuts down the consumer. It must be called after all child PartitionConsumers have already been closed.
+func (c *Consumer) Close() error {
+	if c.ownClient {
+		return c.client.Close()
 	}
 	}
+	return nil
+}
 
 
-	if err := config.Validate(); err != nil {
-		return nil, err
-	}
+const (
+	// OffsetNewest causes the consumer to start at the most recent available offset, as
+	// determined by querying the broker.
+	OffsetNewest int64 = -1
+	// OffsetOldest causes the consumer to start at the oldest available offset, as
+	// determined by querying the broker.
+	OffsetOldest int64 = -2
+)
 
 
+// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will
+// return an error if this Consumer is already consuming on the given topic/partition. Offset can be a
+// literal offset, or OffsetNewest or OffsetOldest
+func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64) (*PartitionConsumer, error) {
 	child := &PartitionConsumer{
 	child := &PartitionConsumer{
 		consumer:  c,
 		consumer:  c,
-		config:    *config,
+		conf:      c.conf,
 		topic:     topic,
 		topic:     topic,
 		partition: partition,
 		partition: partition,
-		messages:  make(chan *ConsumerMessage, config.ChannelBufferSize),
-		errors:    make(chan *ConsumerError, config.ChannelBufferSize),
+		messages:  make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
+		errors:    make(chan *ConsumerError, c.conf.ChannelBufferSize),
 		trigger:   make(chan none, 1),
 		trigger:   make(chan none, 1),
 		dying:     make(chan none),
 		dying:     make(chan none),
-		fetchSize: config.DefaultFetchSize,
+		fetchSize: c.conf.Consumer.Fetch.Default,
 	}
 	}
 
 
-	if err := child.chooseStartingOffset(); err != nil {
+	if err := child.chooseStartingOffset(offset); err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
@@ -281,7 +206,7 @@ func (c *Consumer) unrefBrokerConsumer(broker *Broker) {
 // You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
 // You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
 type PartitionConsumer struct {
 type PartitionConsumer struct {
 	consumer  *Consumer
 	consumer  *Consumer
-	config    PartitionConsumerConfig
+	conf      *Config
 	topic     string
 	topic     string
 	partition int32
 	partition int32
 
 
@@ -353,22 +278,20 @@ func (child *PartitionConsumer) dispatch() error {
 	return nil
 	return nil
 }
 }
 
 
-func (child *PartitionConsumer) chooseStartingOffset() (err error) {
+func (child *PartitionConsumer) chooseStartingOffset(offset int64) (err error) {
 	var where OffsetTime
 	var where OffsetTime
 
 
-	switch child.config.OffsetMethod {
-	case OffsetMethodManual:
-		if child.config.OffsetValue < 0 {
-			return ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is MANUAL")
-		}
-		child.offset = child.config.OffsetValue
-		return nil
-	case OffsetMethodNewest:
+	switch offset {
+	case OffsetNewest:
 		where = LatestOffsets
 		where = LatestOffsets
-	case OffsetMethodOldest:
+	case OffsetOldest:
 		where = EarliestOffset
 		where = EarliestOffset
 	default:
 	default:
-		return ConfigurationError("Invalid OffsetMethod")
+		if offset < 0 {
+			return ConfigurationError("Invalid offset")
+		}
+		child.offset = offset
+		return nil
 	}
 	}
 
 
 	child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, where)
 	child.offset, err = child.consumer.client.GetOffset(child.topic, child.partition, where)
@@ -548,15 +471,15 @@ func (w *brokerConsumer) abort(err error) {
 
 
 func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 	request := &FetchRequest{
 	request := &FetchRequest{
-		MinBytes:    w.consumer.config.MinFetchSize,
-		MaxWaitTime: int32(w.consumer.config.MaxWaitTime / time.Millisecond),
+		MinBytes:    w.consumer.conf.Consumer.Fetch.Min,
+		MaxWaitTime: int32(w.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
 	}
 	}
 
 
 	for child := range w.subscriptions {
 	for child := range w.subscriptions {
 		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
 		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
 	}
 	}
 
 
-	return w.broker.Fetch(w.consumer.client.id, request)
+	return w.broker.Fetch(request)
 }
 }
 
 
 func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchResponseBlock) {
 func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchResponseBlock) {
@@ -577,14 +500,14 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe
 		// We got no messages. If we got a trailing one then we need to ask for more data.
 		// We got no messages. If we got a trailing one then we need to ask for more data.
 		// Otherwise we just poll again and wait for one to be produced...
 		// Otherwise we just poll again and wait for one to be produced...
 		if block.MsgSet.PartialTrailingMessage {
 		if block.MsgSet.PartialTrailingMessage {
-			if child.config.MaxMessageSize > 0 && child.fetchSize == child.config.MaxMessageSize {
+			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
 				// we can't ask for more data, we've hit the configured limit
 				// we can't ask for more data, we've hit the configured limit
 				child.sendError(ErrMessageTooLarge)
 				child.sendError(ErrMessageTooLarge)
 				child.offset++ // skip this one so we can keep processing future messages
 				child.offset++ // skip this one so we can keep processing future messages
 			} else {
 			} else {
 				child.fetchSize *= 2
 				child.fetchSize *= 2
-				if child.config.MaxMessageSize > 0 && child.fetchSize > child.config.MaxMessageSize {
-					child.fetchSize = child.config.MaxMessageSize
+				if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
+					child.fetchSize = child.conf.Consumer.Fetch.Max
 				}
 				}
 			}
 			}
 		}
 		}
@@ -593,7 +516,7 @@ func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchRe
 	}
 	}
 
 
 	// we got messages, reset our fetch size in case it was increased for a previous request
 	// we got messages, reset our fetch size in case it was increased for a previous request
-	child.fetchSize = child.config.DefaultFetchSize
+	child.fetchSize = child.conf.Consumer.Fetch.Default
 
 
 	incomplete := false
 	incomplete := false
 	atLeastOne := false
 	atLeastOne := false

+ 16 - 81
consumer_test.go

@@ -7,20 +7,6 @@ import (
 	"time"
 	"time"
 )
 )
 
 
-func TestDefaultConsumerConfigValidates(t *testing.T) {
-	config := NewConsumerConfig()
-	if err := config.Validate(); err != nil {
-		t.Error(err)
-	}
-}
-
-func TestDefaultPartitionConsumerConfigValidates(t *testing.T) {
-	config := NewPartitionConsumerConfig()
-	if err := config.Validate(); err != nil {
-		t.Error(err)
-	}
-}
-
 func TestConsumerOffsetManual(t *testing.T) {
 func TestConsumerOffsetManual(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
 	leader := newMockBroker(t, 2)
@@ -36,21 +22,12 @@ func TestConsumerOffsetManual(t *testing.T) {
 		leader.Returns(fetchResponse)
 		leader.Returns(fetchResponse)
 	}
 	}
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
-
+	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	master, err := NewConsumer(client, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	config := NewPartitionConsumerConfig()
-	config.OffsetMethod = OffsetMethodManual
-	config.OffsetValue = 1234
-	consumer, err := master.ConsumePartition("my_topic", 0, config)
+	consumer, err := master.ConsumePartition("my_topic", 0, 1234)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -68,7 +45,6 @@ func TestConsumerOffsetManual(t *testing.T) {
 	}
 	}
 
 
 	safeClose(t, consumer)
 	safeClose(t, consumer)
-	safeClose(t, client)
 	leader.Close()
 	leader.Close()
 }
 }
 
 
@@ -89,27 +65,19 @@ func TestConsumerLatestOffset(t *testing.T) {
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
 	leader.Returns(fetchResponse)
 	leader.Returns(fetchResponse)
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	seedBroker.Close()
 	seedBroker.Close()
 
 
-	master, err := NewConsumer(client, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	config := NewPartitionConsumerConfig()
-	config.OffsetMethod = OffsetMethodNewest
-	consumer, err := master.ConsumePartition("my_topic", 0, config)
+	consumer, err := master.ConsumePartition("my_topic", 0, OffsetNewest)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
 	leader.Close()
 	leader.Close()
 	safeClose(t, consumer)
 	safeClose(t, consumer)
-	safeClose(t, client)
 
 
 	// we deliver one message, so it should be one higher than we return in the OffsetResponse
 	// we deliver one message, so it should be one higher than we return in the OffsetResponse
 	if consumer.offset != 0x010102 {
 	if consumer.offset != 0x010102 {
@@ -138,20 +106,12 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
 	leader.Returns(fetchResponse)
 	leader.Returns(fetchResponse)
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	master, err := NewConsumer(client, nil)
+	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	config := NewPartitionConsumerConfig()
-	config.OffsetMethod = OffsetMethodManual
-	config.OffsetValue = 2
-	consumer, err := master.ConsumePartition("my_topic", 0, config)
+	consumer, err := master.ConsumePartition("my_topic", 0, 2)
 
 
 	message := <-consumer.Messages()
 	message := <-consumer.Messages()
 	if message.Offset != 3 {
 	if message.Offset != 3 {
@@ -161,7 +121,6 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 	leader.Close()
 	leader.Close()
 	seedBroker.Close()
 	seedBroker.Close()
 	safeClose(t, consumer)
 	safeClose(t, consumer)
-	safeClose(t, client)
 }
 }
 
 
 func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
@@ -178,24 +137,15 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	seedBroker.Returns(metadataResponse)
 	seedBroker.Returns(metadataResponse)
 
 
 	// launch test goroutines
 	// launch test goroutines
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	master, err := NewConsumer(client, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	config := NewPartitionConsumerConfig()
-	config.OffsetMethod = OffsetMethodManual
-	config.OffsetValue = 0
-
 	// we expect to end up (eventually) consuming exactly ten messages on each partition
 	// we expect to end up (eventually) consuming exactly ten messages on each partition
 	var wg sync.WaitGroup
 	var wg sync.WaitGroup
 	for i := 0; i < 2; i++ {
 	for i := 0; i < 2; i++ {
-		consumer, err := master.ConsumePartition("my_topic", int32(i), config)
+		consumer, err := master.ConsumePartition("my_topic", int32(i), 0)
 		if err != nil {
 		if err != nil {
 			t.Error(err)
 			t.Error(err)
 		}
 		}
@@ -291,30 +241,22 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	leader1.Close()
 	leader1.Close()
 	leader0.Close()
 	leader0.Close()
 	seedBroker.Close()
 	seedBroker.Close()
-	safeClose(t, client)
 }
 }
 
 
 func ExampleConsumerWithSelect() {
 func ExampleConsumerWithSelect() {
-	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
+	master, err := NewConsumer([]string{"localhost:9092"}, nil)
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
 	} else {
 	} else {
-		fmt.Println("> connected")
+		fmt.Println("> master consumer ready")
 	}
 	}
 	defer func() {
 	defer func() {
-		if err := client.Close(); err != nil {
+		if err := master.Close(); err != nil {
 			panic(err)
 			panic(err)
 		}
 		}
 	}()
 	}()
 
 
-	master, err := NewConsumer(client, nil)
-	if err != nil {
-		panic(err)
-	} else {
-		fmt.Println("> master consumer ready")
-	}
-
-	consumer, err := master.ConsumePartition("my_topic", 0, nil)
+	consumer, err := master.ConsumePartition("my_topic", 0, 0)
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
 	} else {
 	} else {
@@ -344,26 +286,19 @@ consumerLoop:
 }
 }
 
 
 func ExampleConsumerWithGoroutines() {
 func ExampleConsumerWithGoroutines() {
-	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
+	master, err := NewConsumer([]string{"localhost:9092"}, nil)
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
 	} else {
 	} else {
-		fmt.Println("> connected")
+		fmt.Println("> master consumer ready")
 	}
 	}
 	defer func() {
 	defer func() {
-		if err := client.Close(); err != nil {
+		if err := master.Close(); err != nil {
 			panic(err)
 			panic(err)
 		}
 		}
 	}()
 	}()
 
 
-	master, err := NewConsumer(client, nil)
-	if err != nil {
-		panic(err)
-	} else {
-		fmt.Println("> master consumer ready")
-	}
-
-	consumer, err := master.ConsumePartition("my_topic", 0, nil)
+	consumer, err := master.ConsumePartition("my_topic", 0, 0)
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
 	} else {
 	} else {

+ 24 - 31
functional_test.go

@@ -44,59 +44,54 @@ func checkKafkaAvailability(t *testing.T) {
 }
 }
 
 
 func TestFuncConnectionFailure(t *testing.T) {
 func TestFuncConnectionFailure(t *testing.T) {
-	config := NewClientConfig()
-	config.MetadataRetries = 1
+	config := NewConfig()
+	config.Metadata.Retry.Max = 1
 
 
-	_, err := NewClient("test", []string{"localhost:9000"}, config)
+	_, err := NewClient([]string{"localhost:9000"}, config)
 	if err != ErrOutOfBrokers {
 	if err != ErrOutOfBrokers {
 		t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
 		t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
 	}
 	}
 }
 }
 
 
 func TestFuncProducing(t *testing.T) {
 func TestFuncProducing(t *testing.T) {
-	config := NewProducerConfig()
+	config := NewConfig()
 	testProducingMessages(t, config)
 	testProducingMessages(t, config)
 }
 }
 
 
 func TestFuncProducingGzip(t *testing.T) {
 func TestFuncProducingGzip(t *testing.T) {
-	config := NewProducerConfig()
-	config.Compression = CompressionGZIP
+	config := NewConfig()
+	config.Producer.Compression = CompressionGZIP
 	testProducingMessages(t, config)
 	testProducingMessages(t, config)
 }
 }
 
 
 func TestFuncProducingSnappy(t *testing.T) {
 func TestFuncProducingSnappy(t *testing.T) {
-	config := NewProducerConfig()
-	config.Compression = CompressionSnappy
+	config := NewConfig()
+	config.Producer.Compression = CompressionSnappy
 	testProducingMessages(t, config)
 	testProducingMessages(t, config)
 }
 }
 
 
 func TestFuncProducingNoResponse(t *testing.T) {
 func TestFuncProducingNoResponse(t *testing.T) {
-	config := NewProducerConfig()
-	config.RequiredAcks = NoResponse
+	config := NewConfig()
+	config.Producer.RequiredAcks = NoResponse
 	testProducingMessages(t, config)
 	testProducingMessages(t, config)
 }
 }
 
 
 func TestFuncProducingFlushing(t *testing.T) {
 func TestFuncProducingFlushing(t *testing.T) {
-	config := NewProducerConfig()
-	config.FlushMsgCount = TestBatchSize / 8
-	config.FlushFrequency = 250 * time.Millisecond
+	config := NewConfig()
+	config.Producer.Flush.Messages = TestBatchSize / 8
+	config.Producer.Flush.Frequency = 250 * time.Millisecond
 	testProducingMessages(t, config)
 	testProducingMessages(t, config)
 }
 }
 
 
 func TestFuncMultiPartitionProduce(t *testing.T) {
 func TestFuncMultiPartitionProduce(t *testing.T) {
 	checkKafkaAvailability(t)
 	checkKafkaAvailability(t)
-	client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer safeClose(t, client)
 
 
-	config := NewProducerConfig()
-	config.FlushFrequency = 50 * time.Millisecond
-	config.FlushMsgCount = 200
+	config := NewConfig()
 	config.ChannelBufferSize = 20
 	config.ChannelBufferSize = 20
-	config.AckSuccesses = true
-	producer, err := NewProducer(client, config)
+	config.Producer.Flush.Frequency = 50 * time.Millisecond
+	config.Producer.Flush.Messages = 200
+	config.Producer.AckSuccesses = true
+	producer, err := NewProducer([]string{kafkaAddr}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -124,27 +119,25 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 	}
 	}
 }
 }
 
 
-func testProducingMessages(t *testing.T, config *ProducerConfig) {
+func testProducingMessages(t *testing.T, config *Config) {
 	checkKafkaAvailability(t)
 	checkKafkaAvailability(t)
 
 
-	client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
+	config.Producer.AckSuccesses = true
+	client, err := NewClient([]string{kafkaAddr}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	master, err := NewConsumer(client, nil)
+	master, err := NewConsumerFromClient(client)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	consumerConfig := NewPartitionConsumerConfig()
-	consumerConfig.OffsetMethod = OffsetMethodNewest
-	consumer, err := master.ConsumePartition("single_partition", 0, consumerConfig)
+	consumer, err := master.ConsumePartition("single_partition", 0, OffsetNewest)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	config.AckSuccesses = true
-	producer, err := NewProducer(client, config)
+	producer, err := NewProducerFromClient(client)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}

+ 51 - 129
producer.go

@@ -12,97 +12,6 @@ func forceFlushThreshold() int {
 	return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
 	return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
 }
 }
 
 
-// ProducerConfig is used to pass multiple configuration options to NewProducer.
-//
-// Some of these configuration settings match settings with the JVM producer, but some of
-// these are implementation specific and have no equivalent in the JVM producer.
-type ProducerConfig struct {
-	Partitioner       PartitionerConstructor // Generates partitioners for choosing the partition to send messages to (defaults to hash). Similar to the `partitioner.class` setting for the JVM producer.
-	RequiredAcks      RequiredAcks           // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal). Equivalent to the `request.required.acks` setting of the JVM producer.
-	Timeout           time.Duration          // The maximum duration the broker will wait the receipt of the number of RequiredAcks (defaults to 10 seconds). This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated. Equivalent to the JVM producer's `request.timeout.ms` setting.
-	Compression       CompressionCodec       // The type of compression to use on messages (defaults to no compression). Similar to `compression.codec` setting of the JVM producer.
-	FlushMsgCount     int                    // The number of messages needed to trigger a flush. This is a best effort; the number of messages may be more or less. Use `MaxMessagesPerReq` to set a hard upper limit.
-	FlushFrequency    time.Duration          // If this amount of time elapses without a flush, one will be queued. The frequency is a best effort, and the actual frequency can be more or less. Equivalent to `queue.buffering.max.ms` setting of JVM producer.
-	FlushByteCount    int                    // If this many bytes of messages are accumulated, a flush will be triggered. This is a best effort; the number of bytes may be more or less. Use the gloabl `sarama.MaxRequestSize` to set a hard upper limit.
-	AckSuccesses      bool                   // If enabled, successfully delivered messages will be returned on the Successes channel.
-	MaxMessageBytes   int                    // The maximum permitted size of a message (defaults to 1000000). Equivalent to the broker's `message.max.bytes`.
-	MaxMessagesPerReq int                    // The maximum number of messages the producer will send in a single broker request. Defaults to 0 for unlimited. The global setting MaxRequestSize still applies. Similar to `queue.buffering.max.messages` in the JVM producer.
-	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines (defaults to 256).
-	RetryBackoff      time.Duration          // The amount of time to wait for the cluster to elect a new leader before processing retries (defaults to 100ms). Similar to the retry.backoff.ms setting of the JVM producer.
-	MaxRetries        int                    // The total number of times to retry sending a message (defaults to 3). Similar to the message.send.max.retries setting of the JVM producer.
-}
-
-// NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
-func NewProducerConfig() *ProducerConfig {
-	return &ProducerConfig{
-		Partitioner:       NewHashPartitioner,
-		RequiredAcks:      WaitForLocal,
-		MaxMessageBytes:   1000000,
-		ChannelBufferSize: 256,
-		RetryBackoff:      100 * time.Millisecond,
-		Timeout:           10 * time.Second,
-		MaxRetries:        3,
-	}
-}
-
-// Validate checks a ProducerConfig instance. It will return a
-// ConfigurationError if the specified value doesn't make sense.
-func (config *ProducerConfig) Validate() error {
-	if config.RequiredAcks < -1 {
-		return ConfigurationError("Invalid RequiredAcks")
-	} else if config.RequiredAcks > 1 {
-		Logger.Println("ProducerConfig.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
-	}
-
-	if config.Timeout < 0 {
-		return ConfigurationError("Invalid Timeout")
-	} else if config.Timeout%time.Millisecond != 0 {
-		Logger.Println("ProducerConfig.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
-	}
-
-	if config.RequiredAcks == WaitForAll && config.Timeout == 0 {
-		return ConfigurationError("If you WaitForAll you must specify a non-zero timeout to wait.")
-	}
-
-	if config.FlushMsgCount < 0 {
-		return ConfigurationError("Invalid FlushMsgCount")
-	}
-
-	if config.FlushByteCount < 0 {
-		return ConfigurationError("Invalid FlushByteCount")
-	} else if config.FlushByteCount >= forceFlushThreshold() {
-		Logger.Println("ProducerConfig.FlushByteCount too close to MaxRequestSize; it will be ignored.")
-	}
-
-	if config.FlushFrequency < 0 {
-		return ConfigurationError("Invalid FlushFrequency")
-	}
-
-	if config.Partitioner == nil {
-		return ConfigurationError("No partitioner set")
-	}
-
-	if config.MaxMessageBytes <= 0 {
-		return ConfigurationError("Invalid MaxMessageBytes")
-	} else if config.MaxMessageBytes >= forceFlushThreshold() {
-		Logger.Println("ProducerConfig.MaxMessageBytes too close to MaxRequestSize; it will be ignored.")
-	}
-
-	if config.MaxMessagesPerReq < 0 || (config.MaxMessagesPerReq > 0 && config.MaxMessagesPerReq < config.FlushMsgCount) {
-		return ConfigurationError("Invalid MaxMessagesPerReq, must be non-negative and >= FlushMsgCount if set")
-	}
-
-	if config.RetryBackoff < 0 {
-		return ConfigurationError("Invalid RetryBackoff")
-	}
-
-	if config.MaxRetries < 0 {
-		return ConfigurationError("Invalid MaxRetries")
-	}
-
-	return nil
-}
-
 // Producer publishes Kafka messages. It routes messages to the correct broker
 // Producer publishes Kafka messages. It routes messages to the correct broker
 // for the provided topic-partition, refreshing metadata as appropriate, and
 // for the provided topic-partition, refreshing metadata as appropriate, and
 // parses responses for errors. You must read from the Errors() channel or the
 // parses responses for errors. You must read from the Errors() channel or the
@@ -111,8 +20,9 @@ func (config *ProducerConfig) Validate() error {
 // scope (this is in addition to calling Close on the underlying client, which
 // scope (this is in addition to calling Close on the underlying client, which
 // is still necessary).
 // is still necessary).
 type Producer struct {
 type Producer struct {
-	client *Client
-	config ProducerConfig
+	client    *Client
+	conf      *Config
+	ownClient bool
 
 
 	errors                    chan *ProducerError
 	errors                    chan *ProducerError
 	input, successes, retries chan *ProducerMessage
 	input, successes, retries chan *ProducerMessage
@@ -121,25 +31,31 @@ type Producer struct {
 	brokerLock sync.Mutex
 	brokerLock sync.Mutex
 }
 }
 
 
-// NewProducer creates a new Producer using the given client.
-func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
-	// Check that we are not dealing with a closed Client before processing
-	// any other arguments
-	if client.Closed() {
-		return nil, ErrClosedClient
+// NewProducer creates a new Producer using the given broker addresses and configuration.
+func NewProducer(addrs []string, conf *Config) (*Producer, error) {
+	client, err := NewClient(addrs, conf)
+	if err != nil {
+		return nil, err
 	}
 	}
 
 
-	if config == nil {
-		config = NewProducerConfig()
+	p, err := NewProducerFromClient(client)
+	if err != nil {
+		return nil, err
 	}
 	}
+	p.ownClient = true
+	return p, nil
+}
 
 
-	if err := config.Validate(); err != nil {
-		return nil, err
+// NewProducerFromClient creates a new Producer using the given client.
+func NewProducerFromClient(client *Client) (*Producer, error) {
+	// Check that we are not dealing with a closed Client before processing any other arguments
+	if client.Closed() {
+		return nil, ErrClosedClient
 	}
 	}
 
 
 	p := &Producer{
 	p := &Producer{
 		client:    client,
 		client:    client,
-		config:    *config,
+		conf:      client.conf,
 		errors:    make(chan *ProducerError),
 		errors:    make(chan *ProducerError),
 		input:     make(chan *ProducerMessage),
 		input:     make(chan *ProducerMessage),
 		successes: make(chan *ProducerMessage),
 		successes: make(chan *ProducerMessage),
@@ -226,7 +142,7 @@ func (p *Producer) Errors() <-chan *ProducerError {
 	return p.errors
 	return p.errors
 }
 }
 
 
-// Successes is the success output channel back to the user when AckSuccesses is configured.
+// Successes is the success output channel back to the user when AckSuccesses is confured.
 // If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
 // If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
 // It is suggested that you send and read messages together in a single select statement.
 // It is suggested that you send and read messages together in a single select statement.
 func (p *Producer) Successes() <-chan *ProducerMessage {
 func (p *Producer) Successes() <-chan *ProducerMessage {
@@ -245,7 +161,7 @@ func (p *Producer) Input() chan<- *ProducerMessage {
 func (p *Producer) Close() error {
 func (p *Producer) Close() error {
 	p.AsyncClose()
 	p.AsyncClose()
 
 
-	if p.config.AckSuccesses {
+	if p.conf.Producer.AckSuccesses {
 		go withRecover(func() {
 		go withRecover(func() {
 			for _ = range p.successes {
 			for _ = range p.successes {
 			}
 			}
@@ -296,8 +212,8 @@ func (p *Producer) topicDispatcher() {
 			break
 			break
 		}
 		}
 
 
-		if (p.config.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.config.MaxMessageBytes) ||
-			(msg.byteSize() > p.config.MaxMessageBytes) {
+		if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
+			(msg.byteSize() > p.conf.Producer.MaxMessageBytes) {
 
 
 			p.returnError(msg, ErrMessageSizeTooLarge)
 			p.returnError(msg, ErrMessageSizeTooLarge)
 			continue
 			continue
@@ -306,7 +222,7 @@ func (p *Producer) topicDispatcher() {
 		handler := handlers[msg.Topic]
 		handler := handlers[msg.Topic]
 		if handler == nil {
 		if handler == nil {
 			p.retries <- &ProducerMessage{flags: ref}
 			p.retries <- &ProducerMessage{flags: ref}
-			newHandler := make(chan *ProducerMessage, p.config.ChannelBufferSize)
+			newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
 			topic := msg.Topic // block local because go's closure semantics suck
 			topic := msg.Topic // block local because go's closure semantics suck
 			go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
 			go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
 			handler = newHandler
 			handler = newHandler
@@ -326,6 +242,12 @@ func (p *Producer) topicDispatcher() {
 		p.returnError(msg, ErrShuttingDown)
 		p.returnError(msg, ErrShuttingDown)
 	}
 	}
 
 
+	if p.ownClient {
+		err := p.client.Close()
+		if err != nil {
+			p.errors <- &ProducerError{Err: err}
+		}
+	}
 	close(p.errors)
 	close(p.errors)
 	close(p.successes)
 	close(p.successes)
 }
 }
@@ -334,7 +256,7 @@ func (p *Producer) topicDispatcher() {
 // partitions messages, then dispatches them by partition
 // partitions messages, then dispatches them by partition
 func (p *Producer) partitionDispatcher(topic string, input chan *ProducerMessage) {
 func (p *Producer) partitionDispatcher(topic string, input chan *ProducerMessage) {
 	handlers := make(map[int32]chan *ProducerMessage)
 	handlers := make(map[int32]chan *ProducerMessage)
-	partitioner := p.config.Partitioner()
+	partitioner := p.conf.Producer.Partitioner()
 
 
 	for msg := range input {
 	for msg := range input {
 		if msg.retries == 0 {
 		if msg.retries == 0 {
@@ -348,7 +270,7 @@ func (p *Producer) partitionDispatcher(topic string, input chan *ProducerMessage
 		handler := handlers[msg.partition]
 		handler := handlers[msg.partition]
 		if handler == nil {
 		if handler == nil {
 			p.retries <- &ProducerMessage{flags: ref}
 			p.retries <- &ProducerMessage{flags: ref}
-			newHandler := make(chan *ProducerMessage, p.config.ChannelBufferSize)
+			newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
 			topic := msg.Topic         // block local because go's closure semantics suck
 			topic := msg.Topic         // block local because go's closure semantics suck
 			partition := msg.partition // block local because go's closure semantics suck
 			partition := msg.partition // block local because go's closure semantics suck
 			go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
 			go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
@@ -401,7 +323,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
 	retryState := make([]struct {
 	retryState := make([]struct {
 		buf          []*ProducerMessage
 		buf          []*ProducerMessage
 		expectChaser bool
 		expectChaser bool
-	}, p.config.MaxRetries+1)
+	}, p.conf.Producer.Retry.Max+1)
 
 
 	for msg := range input {
 	for msg := range input {
 		if msg.retries > highWatermark {
 		if msg.retries > highWatermark {
@@ -414,7 +336,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
 			Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
 			Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
 			p.unrefBrokerProducer(leader)
 			p.unrefBrokerProducer(leader)
 			output = nil
 			output = nil
-			time.Sleep(p.config.RetryBackoff)
+			time.Sleep(p.conf.Producer.Retry.Backoff)
 		} else if highWatermark > 0 {
 		} else if highWatermark > 0 {
 			// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
 			// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
 			if msg.retries < highWatermark {
 			if msg.retries < highWatermark {
@@ -469,7 +391,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
 		if output == nil {
 		if output == nil {
 			if err := breaker.Run(doUpdate); err != nil {
 			if err := breaker.Run(doUpdate); err != nil {
 				p.returnError(msg, err)
 				p.returnError(msg, err)
-				time.Sleep(p.config.RetryBackoff)
+				time.Sleep(p.conf.Producer.Retry.Backoff)
 				continue
 				continue
 			}
 			}
 			Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
 			Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
@@ -488,8 +410,8 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
 func (p *Producer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
 func (p *Producer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
 	var ticker *time.Ticker
 	var ticker *time.Ticker
 	var timer <-chan time.Time
 	var timer <-chan time.Time
-	if p.config.FlushFrequency > 0 {
-		ticker = time.NewTicker(p.config.FlushFrequency)
+	if p.conf.Producer.Flush.Frequency > 0 {
+		ticker = time.NewTicker(p.conf.Producer.Flush.Frequency)
 		timer = ticker.C
 		timer = ticker.C
 	}
 	}
 
 
@@ -508,8 +430,8 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *ProducerMessage
 			}
 			}
 
 
 			if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
 			if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
-				(p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) ||
-				(p.config.MaxMessagesPerReq > 0 && len(buffer) >= p.config.MaxMessagesPerReq) {
+				(p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
+				(p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
 				Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
 				Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
 				flusher <- buffer
 				flusher <- buffer
 				buffer = nil
 				buffer = nil
@@ -520,8 +442,8 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *ProducerMessage
 			buffer = append(buffer, msg)
 			buffer = append(buffer, msg)
 			bytesAccumulated += msg.byteSize()
 			bytesAccumulated += msg.byteSize()
 
 
-			if len(buffer) >= p.config.FlushMsgCount ||
-				(p.config.FlushByteCount > 0 && bytesAccumulated >= p.config.FlushByteCount) {
+			if len(buffer) >= p.conf.Producer.Flush.Messages ||
+				(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
 				doFlush = flusher
 				doFlush = flusher
 			}
 			}
 		case <-timer:
 		case <-timer:
@@ -585,7 +507,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 			continue
 			continue
 		}
 		}
 
 
-		response, err := broker.Produce(p.client.id, request)
+		response, err := broker.Produce(request)
 
 
 		switch err.(type) {
 		switch err.(type) {
 		case nil:
 		case nil:
@@ -603,7 +525,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 
 
 		if response == nil {
 		if response == nil {
 			// this only happens when RequiredAcks is NoResponse, so we have to assume success
 			// this only happens when RequiredAcks is NoResponse, so we have to assume success
-			if p.config.AckSuccesses {
+			if p.conf.Producer.AckSuccesses {
 				p.returnSuccesses(batch)
 				p.returnSuccesses(batch)
 			}
 			}
 			continue
 			continue
@@ -623,7 +545,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 				switch block.Err {
 				switch block.Err {
 				case ErrNoError:
 				case ErrNoError:
 					// All the messages for this topic-partition were delivered successfully!
 					// All the messages for this topic-partition were delivered successfully!
-					if p.config.AckSuccesses {
+					if p.conf.Producer.AckSuccesses {
 						for i := range msgs {
 						for i := range msgs {
 							msgs[i].offset = block.Offset + int64(i)
 							msgs[i].offset = block.Offset + int64(i)
 						}
 						}
@@ -733,7 +655,7 @@ func (p *Producer) assignPartition(partitioner Partitioner, msg *ProducerMessage
 
 
 func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
 func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
 
 
-	req := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: int32(p.config.Timeout / time.Millisecond)}
+	req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
 	empty := true
 	empty := true
 
 
 	for topic, partitionSet := range batch {
 	for topic, partitionSet := range batch {
@@ -756,7 +678,7 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 					}
 					}
 				}
 				}
 
 
-				if p.config.Compression != CompressionNone && setSize+msg.byteSize() > p.config.MaxMessageBytes {
+				if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
 					// compression causes message-sets to be wrapped as single messages, which have tighter
 					// compression causes message-sets to be wrapped as single messages, which have tighter
 					// size requirements, so we have to respect those limits
 					// size requirements, so we have to respect those limits
 					valBytes, err := encode(setToSend)
 					valBytes, err := encode(setToSend)
@@ -764,7 +686,7 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 						Logger.Println(err) // if this happens, it's basically our fault.
 						Logger.Println(err) // if this happens, it's basically our fault.
 						panic(err)
 						panic(err)
 					}
 					}
-					req.AddMessage(topic, partition, &Message{Codec: p.config.Compression, Key: nil, Value: valBytes})
+					req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
 					setToSend = new(MessageSet)
 					setToSend = new(MessageSet)
 					setSize = 0
 					setSize = 0
 				}
 				}
@@ -774,7 +696,7 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 				empty = false
 				empty = false
 			}
 			}
 
 
-			if p.config.Compression == CompressionNone {
+			if p.conf.Producer.Compression == CompressionNone {
 				req.AddSet(topic, partition, setToSend)
 				req.AddSet(topic, partition, setToSend)
 			} else {
 			} else {
 				valBytes, err := encode(setToSend)
 				valBytes, err := encode(setToSend)
@@ -782,7 +704,7 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
 					Logger.Println(err) // if this happens, it's basically our fault.
 					Logger.Println(err) // if this happens, it's basically our fault.
 					panic(err)
 					panic(err)
 				}
 				}
-				req.AddMessage(topic, partition, &Message{Codec: p.config.Compression, Key: nil, Value: valBytes})
+				req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
 			}
 			}
 		}
 		}
 	}
 	}
@@ -821,7 +743,7 @@ func (p *Producer) retryMessages(batch []*ProducerMessage, err error) {
 		if msg == nil {
 		if msg == nil {
 			continue
 			continue
 		}
 		}
-		if msg.retries >= p.config.MaxRetries {
+		if msg.retries >= p.conf.Producer.Retry.Max {
 			p.returnError(msg, err)
 			p.returnError(msg, err)
 		} else {
 		} else {
 			msg.retries++
 			msg.retries++

+ 41 - 126
producer_test.go

@@ -28,13 +28,6 @@ func closeProducer(t *testing.T, p *Producer) {
 	wg.Wait()
 	wg.Wait()
 }
 }
 
 
-func TestDefaultProducerConfigValidates(t *testing.T) {
-	config := NewProducerConfig()
-	if err := config.Validate(); err != nil {
-		t.Error(err)
-	}
-}
-
 func TestSyncProducer(t *testing.T) {
 func TestSyncProducer(t *testing.T) {
 	seedBroker := newMockBroker(t, 1)
 	seedBroker := newMockBroker(t, 1)
 	leader := newMockBroker(t, 2)
 	leader := newMockBroker(t, 2)
@@ -50,12 +43,7 @@ func TestSyncProducer(t *testing.T) {
 		leader.Returns(prodSuccess)
 		leader.Returns(prodSuccess)
 	}
 	}
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	producer, err := NewSyncProducer(client, nil)
+	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -74,7 +62,6 @@ func TestSyncProducer(t *testing.T) {
 	}
 	}
 
 
 	safeClose(t, producer)
 	safeClose(t, producer)
-	safeClose(t, client)
 	leader.Close()
 	leader.Close()
 	seedBroker.Close()
 	seedBroker.Close()
 }
 }
@@ -92,14 +79,9 @@ func TestConcurrentSyncProducer(t *testing.T) {
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	config := NewProducerConfig()
-	config.FlushMsgCount = 100
-	producer, err := NewSyncProducer(client, config)
+	config := NewConfig()
+	config.Producer.Flush.Messages = 100
+	producer, err := NewSyncProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -122,7 +104,6 @@ func TestConcurrentSyncProducer(t *testing.T) {
 	wg.Wait()
 	wg.Wait()
 
 
 	safeClose(t, producer)
 	safeClose(t, producer)
-	safeClose(t, client)
 	leader.Close()
 	leader.Close()
 	seedBroker.Close()
 	seedBroker.Close()
 }
 }
@@ -140,15 +121,10 @@ func TestProducer(t *testing.T) {
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	config := NewProducerConfig()
-	config.FlushMsgCount = 10
-	config.AckSuccesses = true
-	producer, err := NewProducer(client, config)
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.AckSuccesses = true
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -174,7 +150,6 @@ func TestProducer(t *testing.T) {
 	}
 	}
 
 
 	closeProducer(t, producer)
 	closeProducer(t, producer)
-	safeClose(t, client)
 	leader.Close()
 	leader.Close()
 	seedBroker.Close()
 	seedBroker.Close()
 }
 }
@@ -194,15 +169,10 @@ func TestProducerMultipleFlushes(t *testing.T) {
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	config := NewProducerConfig()
-	config.FlushMsgCount = 5
-	config.AckSuccesses = true
-	producer, err := NewProducer(client, config)
+	config := NewConfig()
+	config.Producer.Flush.Messages = 5
+	config.Producer.AckSuccesses = true
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -227,7 +197,6 @@ func TestProducerMultipleFlushes(t *testing.T) {
 	}
 	}
 
 
 	closeProducer(t, producer)
 	closeProducer(t, producer)
-	safeClose(t, client)
 	leader.Close()
 	leader.Close()
 	seedBroker.Close()
 	seedBroker.Close()
 }
 }
@@ -252,16 +221,11 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
 	prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
 	leader1.Returns(prodResponse1)
 	leader1.Returns(prodResponse1)
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	config := NewProducerConfig()
-	config.FlushMsgCount = 5
-	config.AckSuccesses = true
-	config.Partitioner = NewRoundRobinPartitioner
-	producer, err := NewProducer(client, config)
+	config := NewConfig()
+	config.Producer.Flush.Messages = 5
+	config.Producer.AckSuccesses = true
+	config.Producer.Partitioner = NewRoundRobinPartitioner
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -284,7 +248,6 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	}
 	}
 
 
 	closeProducer(t, producer)
 	closeProducer(t, producer)
-	safeClose(t, client)
 	leader1.Close()
 	leader1.Close()
 	leader0.Close()
 	leader0.Close()
 	seedBroker.Close()
 	seedBroker.Close()
@@ -300,16 +263,11 @@ func TestProducerFailureRetry(t *testing.T) {
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 	seedBroker.Returns(metadataLeader1)
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	config := NewProducerConfig()
-	config.FlushMsgCount = 10
-	config.AckSuccesses = true
-	config.RetryBackoff = 0
-	producer, err := NewProducer(client, config)
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.AckSuccesses = true
+	config.Producer.Retry.Backoff = 0
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -365,7 +323,6 @@ func TestProducerFailureRetry(t *testing.T) {
 
 
 	leader2.Close()
 	leader2.Close()
 	closeProducer(t, producer)
 	closeProducer(t, producer)
-	safeClose(t, client)
 }
 }
 
 
 func TestProducerBrokerBounce(t *testing.T) {
 func TestProducerBrokerBounce(t *testing.T) {
@@ -378,16 +335,11 @@ func TestProducerBrokerBounce(t *testing.T) {
 	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 	seedBroker.Returns(metadataResponse)
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	config := NewProducerConfig()
-	config.FlushMsgCount = 10
-	config.AckSuccesses = true
-	config.RetryBackoff = 0
-	producer, err := NewProducer(client, config)
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.AckSuccesses = true
+	config.Producer.Retry.Backoff = 0
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -419,7 +371,6 @@ func TestProducerBrokerBounce(t *testing.T) {
 	leader.Close()
 	leader.Close()
 
 
 	closeProducer(t, producer)
 	closeProducer(t, producer)
-	safeClose(t, client)
 }
 }
 
 
 func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
@@ -432,17 +383,12 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 	seedBroker.Returns(metadataLeader1)
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	config := NewProducerConfig()
-	config.FlushMsgCount = 10
-	config.AckSuccesses = true
-	config.MaxRetries = 3
-	config.RetryBackoff = 0
-	producer, err := NewProducer(client, config)
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.AckSuccesses = true
+	config.Producer.Retry.Max = 3
+	config.Producer.Retry.Backoff = 0
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -480,7 +426,6 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	leader2.Close()
 	leader2.Close()
 
 
 	closeProducer(t, producer)
 	closeProducer(t, producer)
-	safeClose(t, client)
 }
 }
 
 
 func TestProducerMultipleRetries(t *testing.T) {
 func TestProducerMultipleRetries(t *testing.T) {
@@ -493,17 +438,12 @@ func TestProducerMultipleRetries(t *testing.T) {
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 	seedBroker.Returns(metadataLeader1)
 
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	config := NewProducerConfig()
-	config.FlushMsgCount = 10
-	config.AckSuccesses = true
-	config.MaxRetries = 4
-	config.RetryBackoff = 0
-	producer, err := NewProducer(client, config)
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.AckSuccesses = true
+	config.Producer.Retry.Max = 4
+	config.Producer.Retry.Backoff = 0
+	producer, err := NewProducer([]string{seedBroker.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -565,23 +505,10 @@ func TestProducerMultipleRetries(t *testing.T) {
 	leader1.Close()
 	leader1.Close()
 	leader2.Close()
 	leader2.Close()
 	closeProducer(t, producer)
 	closeProducer(t, producer)
-	safeClose(t, client)
 }
 }
 
 
 func ExampleProducer() {
 func ExampleProducer() {
-	client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
-	if err != nil {
-		panic(err)
-	} else {
-		fmt.Println("> connected")
-	}
-	defer func() {
-		if err := client.Close(); err != nil {
-			panic(err)
-		}
-	}()
-
-	producer, err := NewProducer(client, nil)
+	producer, err := NewProducer([]string{"localhost:9092"}, nil)
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
 	}
 	}
@@ -602,19 +529,7 @@ func ExampleProducer() {
 }
 }
 
 
 func ExampleSyncProducer() {
 func ExampleSyncProducer() {
-	client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
-	if err != nil {
-		panic(err)
-	} else {
-		fmt.Println("> connected")
-	}
-	defer func() {
-		if err := client.Close(); err != nil {
-			panic(err)
-		}
-	}()
-
-	producer, err := NewSyncProducer(client, nil)
+	producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
 	if err != nil {
 	if err != nil {
 		panic(err)
 		panic(err)
 	}
 	}

+ 16 - 9
sync_producer.go

@@ -10,26 +10,33 @@ type SyncProducer struct {
 	wg       sync.WaitGroup
 	wg       sync.WaitGroup
 }
 }
 
 
-// NewSyncProducer creates a new SyncProducer using the given client  and configuration.
-func NewSyncProducer(client *Client, config *ProducerConfig) (*SyncProducer, error) {
-	if config == nil {
-		config = NewProducerConfig()
+// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
+func NewSyncProducer(addrs []string, config *Config) (*SyncProducer, error) {
+	p, err := NewProducer(addrs, config)
+	if err != nil {
+		return nil, err
 	}
 	}
-	config.AckSuccesses = true
-
-	prod, err := NewProducer(client, config)
+	return newSyncProducerFromProducer(p), nil
+}
 
 
+// NewSyncProducerFromClient creates a new SyncProducer using the given client.
+func NewSyncProducerFromClient(client *Client) (*SyncProducer, error) {
+	p, err := NewProducerFromClient(client)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+	return newSyncProducerFromProducer(p), nil
+}
 
 
-	sp := &SyncProducer{producer: prod}
+func newSyncProducerFromProducer(p *Producer) *SyncProducer {
+	p.conf.Producer.AckSuccesses = true
+	sp := &SyncProducer{producer: p}
 
 
 	sp.wg.Add(2)
 	sp.wg.Add(2)
 	go withRecover(sp.handleSuccesses)
 	go withRecover(sp.handleSuccesses)
 	go withRecover(sp.handleErrors)
 	go withRecover(sp.handleErrors)
 
 
-	return sp, nil
+	return sp
 }
 }
 
 
 // SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.
 // SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type.