|
@@ -10,13 +10,14 @@ import (
|
|
|
|
|
|
|
|
// ConsumerMessage encapsulates a Kafka message returned by the consumer.
|
|
// ConsumerMessage encapsulates a Kafka message returned by the consumer.
|
|
|
type ConsumerMessage struct {
|
|
type ConsumerMessage struct {
|
|
|
- Key, Value []byte
|
|
|
|
|
- Topic string
|
|
|
|
|
- Partition int32
|
|
|
|
|
- Offset int64
|
|
|
|
|
|
|
+ Headers []*RecordHeader // only set if kafka is version 0.11+
|
|
|
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
|
|
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
|
|
|
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
|
|
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
|
|
|
- Headers []*RecordHeader // only set if kafka is version 0.11+
|
|
|
|
|
|
|
+
|
|
|
|
|
+ Key, Value []byte
|
|
|
|
|
+ Topic string
|
|
|
|
|
+ Partition int32
|
|
|
|
|
+ Offset int64
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// ConsumerError is what is provided to the user when an error occurs.
|
|
// ConsumerError is what is provided to the user when an error occurs.
|
|
@@ -75,12 +76,11 @@ type Consumer interface {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type consumer struct {
|
|
type consumer struct {
|
|
|
- client Client
|
|
|
|
|
- conf *Config
|
|
|
|
|
-
|
|
|
|
|
- lock sync.Mutex
|
|
|
|
|
|
|
+ conf *Config
|
|
|
children map[string]map[int32]*partitionConsumer
|
|
children map[string]map[int32]*partitionConsumer
|
|
|
brokerConsumers map[*Broker]*brokerConsumer
|
|
brokerConsumers map[*Broker]*brokerConsumer
|
|
|
|
|
+ client Client
|
|
|
|
|
+ lock sync.Mutex
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// NewConsumer creates a new consumer using the given broker addresses and configuration.
|
|
// NewConsumer creates a new consumer using the given broker addresses and configuration.
|
|
@@ -258,7 +258,7 @@ func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
|
|
|
// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
|
|
// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
|
|
|
//
|
|
//
|
|
|
// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
|
|
// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
|
|
|
-// consumer tear-down & return imediately. Continue to loop, servicing the Messages channel until the teardown process
|
|
|
|
|
|
|
+// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
|
|
|
// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
|
|
// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
|
|
|
// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
|
|
// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
|
|
|
// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
|
|
// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
|
|
@@ -295,24 +295,22 @@ type PartitionConsumer interface {
|
|
|
|
|
|
|
|
type partitionConsumer struct {
|
|
type partitionConsumer struct {
|
|
|
highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
|
highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
|
|
- consumer *consumer
|
|
|
|
|
- conf *Config
|
|
|
|
|
- topic string
|
|
|
|
|
- partition int32
|
|
|
|
|
|
|
|
|
|
|
|
+ consumer *consumer
|
|
|
|
|
+ conf *Config
|
|
|
broker *brokerConsumer
|
|
broker *brokerConsumer
|
|
|
messages chan *ConsumerMessage
|
|
messages chan *ConsumerMessage
|
|
|
errors chan *ConsumerError
|
|
errors chan *ConsumerError
|
|
|
feeder chan *FetchResponse
|
|
feeder chan *FetchResponse
|
|
|
|
|
|
|
|
trigger, dying chan none
|
|
trigger, dying chan none
|
|
|
- responseResult error
|
|
|
|
|
closeOnce sync.Once
|
|
closeOnce sync.Once
|
|
|
-
|
|
|
|
|
- fetchSize int32
|
|
|
|
|
- offset int64
|
|
|
|
|
-
|
|
|
|
|
- retries int32
|
|
|
|
|
|
|
+ topic string
|
|
|
|
|
+ partition int32
|
|
|
|
|
+ responseResult error
|
|
|
|
|
+ fetchSize int32
|
|
|
|
|
+ offset int64
|
|
|
|
|
+ retries int32
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
|
|
var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
|
|
@@ -335,9 +333,8 @@ func (child *partitionConsumer) computeBackoff() time.Duration {
|
|
|
if child.conf.Consumer.Retry.BackoffFunc != nil {
|
|
if child.conf.Consumer.Retry.BackoffFunc != nil {
|
|
|
retries := atomic.AddInt32(&child.retries, 1)
|
|
retries := atomic.AddInt32(&child.retries, 1)
|
|
|
return child.conf.Consumer.Retry.BackoffFunc(int(retries))
|
|
return child.conf.Consumer.Retry.BackoffFunc(int(retries))
|
|
|
- } else {
|
|
|
|
|
- return child.conf.Consumer.Retry.Backoff
|
|
|
|
|
}
|
|
}
|
|
|
|
|
+ return child.conf.Consumer.Retry.Backoff
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (child *partitionConsumer) dispatcher() {
|
|
func (child *partitionConsumer) dispatcher() {
|
|
@@ -529,7 +526,8 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
|
|
func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
|
|
|
- var messages []*ConsumerMessage
|
|
|
|
|
|
|
+ messages := make([]*ConsumerMessage, 0, len(batch.Records))
|
|
|
|
|
+
|
|
|
for _, rec := range batch.Records {
|
|
for _, rec := range batch.Records {
|
|
|
offset := batch.FirstOffset + rec.OffsetDelta
|
|
offset := batch.FirstOffset + rec.OffsetDelta
|
|
|
if offset < child.offset {
|
|
if offset < child.offset {
|
|
@@ -625,15 +623,13 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
|
|
|
return messages, nil
|
|
return messages, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// brokerConsumer
|
|
|
|
|
-
|
|
|
|
|
type brokerConsumer struct {
|
|
type brokerConsumer struct {
|
|
|
consumer *consumer
|
|
consumer *consumer
|
|
|
broker *Broker
|
|
broker *Broker
|
|
|
input chan *partitionConsumer
|
|
input chan *partitionConsumer
|
|
|
newSubscriptions chan []*partitionConsumer
|
|
newSubscriptions chan []*partitionConsumer
|
|
|
- wait chan none
|
|
|
|
|
subscriptions map[*partitionConsumer]none
|
|
subscriptions map[*partitionConsumer]none
|
|
|
|
|
+ wait chan none
|
|
|
acks sync.WaitGroup
|
|
acks sync.WaitGroup
|
|
|
refs int
|
|
refs int
|
|
|
}
|
|
}
|
|
@@ -655,14 +651,14 @@ func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
|
|
|
return bc
|
|
return bc
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
|
|
|
|
|
+// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
|
|
|
|
|
+// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
|
|
|
|
|
+// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
|
|
|
|
|
+// so the main goroutine can block waiting for work if it has none.
|
|
|
func (bc *brokerConsumer) subscriptionManager() {
|
|
func (bc *brokerConsumer) subscriptionManager() {
|
|
|
var buffer []*partitionConsumer
|
|
var buffer []*partitionConsumer
|
|
|
|
|
|
|
|
- // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
|
|
|
|
|
- // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
|
|
|
|
|
- // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
|
|
|
|
|
- // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
|
|
|
|
|
- // so the main goroutine can block waiting for work if it has none.
|
|
|
|
|
for {
|
|
for {
|
|
|
if len(buffer) > 0 {
|
|
if len(buffer) > 0 {
|
|
|
select {
|
|
select {
|
|
@@ -695,10 +691,10 @@ done:
|
|
|
close(bc.newSubscriptions)
|
|
close(bc.newSubscriptions)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
|
|
|
func (bc *brokerConsumer) subscriptionConsumer() {
|
|
func (bc *brokerConsumer) subscriptionConsumer() {
|
|
|
<-bc.wait // wait for our first piece of work
|
|
<-bc.wait // wait for our first piece of work
|
|
|
|
|
|
|
|
- // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
|
|
|
|
|
for newSubscriptions := range bc.newSubscriptions {
|
|
for newSubscriptions := range bc.newSubscriptions {
|
|
|
bc.updateSubscriptions(newSubscriptions)
|
|
bc.updateSubscriptions(newSubscriptions)
|
|
|
|
|
|
|
@@ -744,8 +740,8 @@ func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsu
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+//handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
|
|
|
func (bc *brokerConsumer) handleResponses() {
|
|
func (bc *brokerConsumer) handleResponses() {
|
|
|
- // handles the response codes left for us by our subscriptions, and abandons ones that have been closed
|
|
|
|
|
for child := range bc.subscriptions {
|
|
for child := range bc.subscriptions {
|
|
|
result := child.responseResult
|
|
result := child.responseResult
|
|
|
child.responseResult = nil
|
|
child.responseResult = nil
|