package sarama // OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset. type OffsetMethod int const ( // OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the // offset at which to start, allowing the user to manually specify their desired starting offset. OffsetMethodManual OffsetMethod = iota // OffsetMethodNewest causes the consumer to start at the most recent available offset, as // determined by querying the broker. OffsetMethodNewest // OffsetMethodOldest causes the consumer to start at the oldest available offset, as // determined by querying the broker. OffsetMethodOldest ) // ConsumerConfig is used to pass multiple configuration options to NewConsumer. type ConsumerConfig struct { // The default (maximum) amount of data to fetch from the broker in each request. The default of 0 is treated as 1024 bytes. DefaultFetchSize int32 // The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available. // The default of 0 is treated as 'at least one' to prevent the consumer from spinning when no messages are available. MinFetchSize int32 // The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is // treated as no limit. MaxMessageSize int32 // The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it // returns fewer than that anyways. The default of 0 causes Kafka to return immediately, which is rarely desirable // as it causes the Consumer to spin when no events are available. 100-500ms is a reasonable range for most cases. MaxWaitTime int32 // The method used to determine at which offset to begin consuming messages. OffsetMethod OffsetMethod // Interpreted differently according to the value of OffsetMethod. OffsetValue int64 // The number of events to buffer in the Events channel. Setting this can let the // consumer continue fetching messages in the background while local code consumes events, // greatly improving throughput. EventBufferSize int } // ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or // a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set. type ConsumerEvent struct { Key, Value []byte Topic string Partition int32 Offset int64 Err error } // Consumer processes Kafka messages from a given topic and partition. // You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when // it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary). type Consumer struct { client *Client topic string partition int32 group string config ConsumerConfig offset int64 broker *Broker stopper, done chan bool events chan *ConsumerEvent } // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as // part of the named consumer group. func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) { if config == nil { config = NewConsumerConfig() } if err := config.Validate(); err != nil { return nil, err } if topic == "" { return nil, ConfigurationError("Empty topic") } broker, err := client.Leader(topic, partition) if err != nil { return nil, err } c := &Consumer{ client: client, topic: topic, partition: partition, group: group, config: *config, broker: broker, stopper: make(chan bool), done: make(chan bool), events: make(chan *ConsumerEvent, config.EventBufferSize), } switch config.OffsetMethod { case OffsetMethodManual: if config.OffsetValue < 0 { return nil, ConfigurationError("OffsetValue cannot be < 0 when OffsetMethod is MANUAL") } c.offset = config.OffsetValue case OffsetMethodNewest: c.offset, err = c.getOffset(LatestOffsets, true) if err != nil { return nil, err } case OffsetMethodOldest: c.offset, err = c.getOffset(EarliestOffset, true) if err != nil { return nil, err } default: return nil, ConfigurationError("Invalid OffsetMethod") } go withRecover(c.fetchMessages) return c, nil } // Events returns the read channel for any events (messages or errors) that might be returned by the broker. func (c *Consumer) Events() <-chan *ConsumerEvent { return c.events } // Close stops the consumer from fetching messages. It is required to call this function before // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before // calling Close on the underlying client. func (c *Consumer) Close() error { close(c.stopper) <-c.done return nil } // helper function for safely sending an error on the errors channel // if it returns true, the error was sent (or was nil) // if it returns false, the stopper channel signaled that your goroutine should return! func (c *Consumer) sendError(err error) bool { if err == nil { return true } select { case <-c.stopper: close(c.events) close(c.done) return false case c.events <- &ConsumerEvent{Err: err, Topic: c.topic, Partition: c.partition}: return true } } func (c *Consumer) fetchMessages() { fetchSize := c.config.DefaultFetchSize for { request := new(FetchRequest) request.MinBytes = c.config.MinFetchSize request.MaxWaitTime = c.config.MaxWaitTime request.AddBlock(c.topic, c.partition, c.offset, fetchSize) response, err := c.broker.Fetch(c.client.id, request) switch { case err == nil: break case err == EncodingError: if c.sendError(err) { continue } else { return } default: Logger.Printf("Unexpected error processing FetchRequest; disconnecting broker %s: %s\n", c.broker.addr, err) c.client.disconnectBroker(c.broker) for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) { if !c.sendError(err) { return } } continue } block := response.GetBlock(c.topic, c.partition) if block == nil { if c.sendError(IncompleteResponse) { continue } else { return } } switch block.Err { case NoError: break case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable: err = c.client.RefreshTopicMetadata(c.topic) if c.sendError(err) { for c.broker, err = c.client.Leader(c.topic, c.partition); err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) { if !c.sendError(err) { return } } continue } else { return } default: if c.sendError(block.Err) { continue } else { return } } if len(block.MsgSet.Messages) == 0 { // We got no messages. If we got a trailing one then we need to ask for more data. // Otherwise we just poll again and wait for one to be produced... if block.MsgSet.PartialTrailingMessage { if c.config.MaxMessageSize == 0 { fetchSize *= 2 } else { if fetchSize == c.config.MaxMessageSize { if c.sendError(MessageTooLarge) { continue } else { return } } else { fetchSize *= 2 if fetchSize > c.config.MaxMessageSize { fetchSize = c.config.MaxMessageSize } } } } select { case <-c.stopper: close(c.events) close(c.done) return default: continue } } else { fetchSize = c.config.DefaultFetchSize } for _, msgBlock := range block.MsgSet.Messages { for _, msg := range msgBlock.Messages() { select { case <-c.stopper: close(c.events) close(c.done) return case c.events <- &ConsumerEvent{Key: msg.Msg.Key, Value: msg.Msg.Value, Offset: msg.Offset, Topic: c.topic, Partition: c.partition}: c.offset++ } } } } } func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) { request := &OffsetRequest{} request.AddBlock(c.topic, c.partition, where, 1) response, err := c.broker.GetAvailableOffsets(c.client.id, request) switch err { case nil: break case EncodingError: return -1, err default: if !retry { return -1, err } Logger.Printf("Unexpected error processing OffsetRequest; disconnecting broker %s: %s\n", c.broker.addr, err) c.client.disconnectBroker(c.broker) c.broker, err = c.client.Leader(c.topic, c.partition) if err != nil { return -1, err } return c.getOffset(where, false) } block := response.GetBlock(c.topic, c.partition) if block == nil { return -1, IncompleteResponse } switch block.Err { case NoError: if len(block.Offsets) < 1 { return -1, IncompleteResponse } return block.Offsets[0], nil case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable: if !retry { return -1, block.Err } err = c.client.RefreshTopicMetadata(c.topic) if err != nil { return -1, err } c.broker, err = c.client.Leader(c.topic, c.partition) if err != nil { return -1, err } return c.getOffset(where, false) } return -1, block.Err } // Creates a ConsumerConfig instance with sane defaults. func NewConsumerConfig() *ConsumerConfig { return &ConsumerConfig{ DefaultFetchSize: 1024, MinFetchSize: 1, MaxWaitTime: 250, } } // Validates a ConsumerConfig instance. It will return a // ConfigurationError if the specified value doesn't make sense. func (config *ConsumerConfig) Validate() error { if config.DefaultFetchSize <= 0 { return ConfigurationError("Invalid DefaultFetchSize") } if config.MinFetchSize <= 0 { return ConfigurationError("Invalid MinFetchSize") } if config.MaxMessageSize < 0 { return ConfigurationError("Invalid MaxMessageSize") } if config.MaxWaitTime <= 0 { return ConfigurationError("Invalid MaxWaitTime") } else if config.MaxWaitTime < 100 { Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.") } if config.EventBufferSize < 0 { return ConfigurationError("Invalid EventBufferSize") } return nil }