Преглед изворни кода

Move the clientID into the config object

Evan Huus пре 11 година
родитељ
комит
0abdf8c4f0
10 измењених фајлова са 65 додато и 61 уклоњено
  1. 19 19
      broker.go
  2. 9 9
      broker_test.go
  3. 4 6
      client.go
  4. 5 5
      client_test.go
  5. 6 0
      config.go
  6. 1 1
      consumer.go
  7. 6 6
      consumer_test.go
  8. 3 3
      functional_test.go
  9. 1 1
      producer.go
  10. 11 11
      producer_test.go

+ 19 - 19
broker.go

@@ -136,10 +136,10 @@ func (b *Broker) Addr() string {
 	return b.addr
 }
 
-func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error) {
+func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) {
 	response := new(MetadataResponse)
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 	if err != nil {
 		return nil, err
@@ -148,10 +148,10 @@ func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*Metada
 	return response, nil
 }
 
-func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
+func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) {
 	response := new(ConsumerMetadataResponse)
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 	if err != nil {
 		return nil, err
@@ -160,10 +160,10 @@ func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataR
 	return response, nil
 }
 
-func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error) {
+func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) {
 	response := new(OffsetResponse)
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 	if err != nil {
 		return nil, err
@@ -172,15 +172,15 @@ func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*
 	return response, nil
 }
 
-func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error) {
+func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
 	var response *ProduceResponse
 	var err error
 
 	if request.RequiredAcks == NoResponse {
-		err = b.sendAndReceive(clientID, request, nil)
+		err = b.sendAndReceive(request, nil)
 	} else {
 		response = new(ProduceResponse)
-		err = b.sendAndReceive(clientID, request, response)
+		err = b.sendAndReceive(request, response)
 	}
 
 	if err != nil {
@@ -190,10 +190,10 @@ func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResp
 	return response, nil
 }
 
-func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error) {
+func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) {
 	response := new(FetchResponse)
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 	if err != nil {
 		return nil, err
@@ -202,10 +202,10 @@ func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse,
 	return response, nil
 }
 
-func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
+func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) {
 	response := new(OffsetCommitResponse)
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 	if err != nil {
 		return nil, err
@@ -214,10 +214,10 @@ func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*O
 	return response, nil
 }
 
-func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
+func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
 	response := new(OffsetFetchResponse)
 
-	err := b.sendAndReceive(clientID, request, response)
+	err := b.sendAndReceive(request, response)
 
 	if err != nil {
 		return nil, err
@@ -226,7 +226,7 @@ func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*Off
 	return response, nil
 }
 
-func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool) (*responsePromise, error) {
+func (b *Broker) send(req requestEncoder, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()
 
@@ -237,7 +237,7 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 		return nil, ErrNotConnected
 	}
 
-	fullRequest := request{b.correlationID, clientID, req}
+	fullRequest := request{b.correlationID, b.conf.ClientID, req}
 	buf, err := encode(&fullRequest)
 	if err != nil {
 		return nil, err
@@ -264,8 +264,8 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 	return &promise, nil
 }
 
-func (b *Broker) sendAndReceive(clientID string, req requestEncoder, res decoder) error {
-	promise, err := b.send(clientID, req, res != nil)
+func (b *Broker) sendAndReceive(req requestEncoder, res decoder) error {
+	promise, err := b.send(req, res != nil)
 
 	if err != nil {
 		return err

+ 9 - 9
broker_test.go

@@ -13,7 +13,7 @@ func ExampleBroker() error {
 	}
 
 	request := MetadataRequest{Topics: []string{"myTopic"}}
-	response, err := broker.GetMetadata("myClient", &request)
+	response, err := broker.GetMetadata(&request)
 	if err != nil {
 		_ = broker.Close()
 		return err
@@ -80,7 +80,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := MetadataRequest{}
-			response, err := broker.GetMetadata("clientID", &request)
+			response, err := broker.GetMetadata(&request)
 			if err != nil {
 				t.Error(err)
 			}
@@ -92,7 +92,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 't', 0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ConsumerMetadataRequest{}
-			response, err := broker.GetConsumerMetadata("clientID", &request)
+			response, err := broker.GetConsumerMetadata(&request)
 			if err != nil {
 				t.Error(err)
 			}
@@ -105,7 +105,7 @@ var brokerTestTable = []struct {
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
 			request.RequiredAcks = NoResponse
-			response, err := broker.Produce("clientID", &request)
+			response, err := broker.Produce(&request)
 			if err != nil {
 				t.Error(err)
 			}
@@ -118,7 +118,7 @@ var brokerTestTable = []struct {
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
 			request.RequiredAcks = WaitForLocal
-			response, err := broker.Produce("clientID", &request)
+			response, err := broker.Produce(&request)
 			if err != nil {
 				t.Error(err)
 			}
@@ -130,7 +130,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := FetchRequest{}
-			response, err := broker.Fetch("clientID", &request)
+			response, err := broker.Fetch(&request)
 			if err != nil {
 				t.Error(err)
 			}
@@ -142,7 +142,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetFetchRequest{}
-			response, err := broker.FetchOffset("clientID", &request)
+			response, err := broker.FetchOffset(&request)
 			if err != nil {
 				t.Error(err)
 			}
@@ -154,7 +154,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetCommitRequest{}
-			response, err := broker.CommitOffset("clientID", &request)
+			response, err := broker.CommitOffset(&request)
 			if err != nil {
 				t.Error(err)
 			}
@@ -166,7 +166,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := OffsetRequest{}
-			response, err := broker.GetAvailableOffsets("clientID", &request)
+			response, err := broker.GetAvailableOffsets(&request)
 			if err != nil {
 				t.Error(err)
 			}

+ 4 - 6
client.go

@@ -11,7 +11,6 @@ import (
 // automatically when it passes out of scope. A single client can be safely shared by
 // multiple concurrent Producers and Consumers.
 type Client struct {
-	id     string
 	conf   *Config
 	closer chan none
 
@@ -31,10 +30,10 @@ type Client struct {
 	lock                    sync.RWMutex // protects access to the maps, only one since they're always written together
 }
 
-// NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
+// NewClient creates a new Client. It connects to one of the given broker addresses
 // and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
 // be retrieved from any of the given broker addresses, the client is not created.
-func NewClient(id string, addrs []string, conf *Config) (*Client, error) {
+func NewClient(addrs []string, conf *Config) (*Client, error) {
 	Logger.Println("Initializing new client")
 
 	if conf == nil {
@@ -50,7 +49,6 @@ func NewClient(id string, addrs []string, conf *Config) (*Client, error) {
 	}
 
 	client := &Client{
-		id:                      id,
 		conf:                    conf,
 		closer:                  make(chan none),
 		seedBrokerAddrs:         addrs,
@@ -267,7 +265,7 @@ func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTim
 	request := &OffsetRequest{}
 	request.AddBlock(topic, partitionID, where, 1)
 
-	response, err := broker.GetAvailableOffsets(client.id, request)
+	response, err := broker.GetAvailableOffsets(request)
 	if err != nil {
 		return -1, err
 	}
@@ -489,7 +487,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
 		} else {
 			Logger.Printf("Fetching metadata for all topics from broker %s\n", broker.addr)
 		}
-		response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
+		response, err := broker.GetMetadata(&MetadataRequest{Topics: topics})
 
 		switch err.(type) {
 		case nil:

+ 5 - 5
client_test.go

@@ -17,7 +17,7 @@ func TestSimpleClient(t *testing.T) {
 
 	seedBroker.Returns(new(MetadataResponse))
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -41,7 +41,7 @@ func TestCachedPartitions(t *testing.T) {
 
 	config := NewConfig()
 	config.Metadata.Retries = 0
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, config)
+	client, err := NewClient([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -74,7 +74,7 @@ func TestClientSeedBrokers(t *testing.T) {
 	metadataResponse.AddBroker(discoveredBroker.Addr(), discoveredBroker.BrokerID())
 	seedBroker.Returns(metadataResponse)
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -99,7 +99,7 @@ func TestClientMetadata(t *testing.T) {
 
 	config := NewConfig()
 	config.Metadata.Retries = 0
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, config)
+	client, err := NewClient([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -169,7 +169,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse2)
 
-	client, err := NewClient("clientID", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 6 - 0
config.go

@@ -85,6 +85,9 @@ type Config struct {
 		MaxWaitTime time.Duration
 	}
 
+	// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
+	// Defaults to "sarama", but you should probably set it to something specific to your application.
+	ClientID string
 	// The number of events to buffer in internal and external channels. This permits the producer and consumer to
 	// continue processing some messages in the background while user code is working, greatly improving throughput.
 	// Defaults to 256.
@@ -142,6 +145,9 @@ func (c *Config) Validate() error {
 	if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
 		Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
 	}
+	if c.ClientID == "sarama" {
+		Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
+	}
 
 	// validate Net values
 	switch {

+ 1 - 1
consumer.go

@@ -468,7 +468,7 @@ func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
 	}
 
-	return w.broker.Fetch(w.consumer.client.id, request)
+	return w.broker.Fetch(request)
 }
 
 func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchResponseBlock) {

+ 6 - 6
consumer_test.go

@@ -22,7 +22,7 @@ func TestConsumerOffsetManual(t *testing.T) {
 		leader.Returns(fetchResponse)
 	}
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 
 	if err != nil {
 		t.Fatal(err)
@@ -72,7 +72,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
 	leader.Returns(fetchResponse)
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -119,7 +119,7 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
 	leader.Returns(fetchResponse)
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -156,7 +156,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	seedBroker.Returns(metadataResponse)
 
 	// launch test goroutines
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -269,7 +269,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 }
 
 func ExampleConsumerWithSelect() {
-	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
+	client, err := NewClient([]string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {
@@ -318,7 +318,7 @@ consumerLoop:
 }
 
 func ExampleConsumerWithGoroutines() {
-	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
+	client, err := NewClient([]string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {

+ 3 - 3
functional_test.go

@@ -47,7 +47,7 @@ func TestFuncConnectionFailure(t *testing.T) {
 	config := NewConfig()
 	config.Metadata.Retries = 1
 
-	_, err := NewClient("test", []string{"localhost:9000"}, config)
+	_, err := NewClient([]string{"localhost:9000"}, config)
 	if err != ErrOutOfBrokers {
 		t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
 	}
@@ -85,7 +85,7 @@ func TestFuncProducingFlushing(t *testing.T) {
 
 func TestFuncMultiPartitionProduce(t *testing.T) {
 	checkKafkaAvailability(t)
-	client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
+	client, err := NewClient([]string{kafkaAddr}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -127,7 +127,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 func testProducingMessages(t *testing.T, config *Config) {
 	checkKafkaAvailability(t)
 
-	client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
+	client, err := NewClient([]string{kafkaAddr}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 1 - 1
producer.go

@@ -494,7 +494,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
 			continue
 		}
 
-		response, err := broker.Produce(p.client.id, request)
+		response, err := broker.Produce(request)
 
 		switch err.(type) {
 		case nil:

+ 11 - 11
producer_test.go

@@ -43,7 +43,7 @@ func TestSyncProducer(t *testing.T) {
 		leader.Returns(prodSuccess)
 	}
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -85,7 +85,7 @@ func TestConcurrentSyncProducer(t *testing.T) {
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -133,7 +133,7 @@ func TestProducer(t *testing.T) {
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
 	leader.Returns(prodSuccess)
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -187,7 +187,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
 	leader.Returns(prodSuccess)
 	leader.Returns(prodSuccess)
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -245,7 +245,7 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
 	leader1.Returns(prodResponse1)
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -293,7 +293,7 @@ func TestProducerFailureRetry(t *testing.T) {
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -371,7 +371,7 @@ func TestProducerBrokerBounce(t *testing.T) {
 	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -425,7 +425,7 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -486,7 +486,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
-	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -562,7 +562,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 }
 
 func ExampleProducer() {
-	client, err := NewClient("client_id", []string{"localhost:9092"}, nil)
+	client, err := NewClient([]string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {
@@ -595,7 +595,7 @@ func ExampleProducer() {
 }
 
 func ExampleSyncProducer() {
-	client, err := NewClient("client_id", []string{"localhost:9092"}, nil)
+	client, err := NewClient([]string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {