Parcourir la source

Wrap all godoc at 80 characters

Evan Huus il y a 10 ans
Parent
commit
93c1f7437d
7 fichiers modifiés avec 254 ajouts et 167 suppressions
  1. 38 20
      async_producer.go
  2. 22 20
      client.go
  3. 119 69
      config.go
  4. 27 21
      consumer.go
  5. 33 26
      offset_manager.go
  6. 8 5
      partitioner.go
  7. 7 6
      sync_producer.go

+ 38 - 20
async_producer.go

@@ -17,10 +17,11 @@ import (
 // scope.
 type AsyncProducer interface {
 
-	// AsyncClose triggers a shutdown of the producer, flushing any messages it may have
-	// buffered. The shutdown has completed when both the Errors and Successes channels
-	// have been closed. When calling AsyncClose, you *must* continue to read from those
-	// channels in order to drain the results of any messages in flight.
+	// AsyncClose triggers a shutdown of the producer, flushing any messages it may
+	// have buffered. The shutdown has completed when both the Errors and Successes
+	// channels have been closed. When calling AsyncClose, you *must* continue to
+	// read from those channels in order to drain the results of any messages in
+	// flight.
 	AsyncClose()
 
 	// Close shuts down the producer and flushes any messages it may have buffered.
@@ -29,17 +30,20 @@ type AsyncProducer interface {
 	// underlying client.
 	Close() error
 
-	// Input is the input channel for the user to write messages to that they wish to send.
+	// Input is the input channel for the user to write messages to that they
+	// wish to send.
 	Input() chan<- *ProducerMessage
 
-	// Successes is the success output channel back to the user when AckSuccesses is enabled.
-	// If Return.Successes is true, you MUST read from this channel or the Producer will deadlock.
-	// It is suggested that you send and read messages together in a single select statement.
+	// Successes is the success output channel back to the user when AckSuccesses is
+	// enabled. If Return.Successes is true, you MUST read from this channel or the
+	// Producer will deadlock. It is suggested that you send and read messages together
+	// in a single select statement.
 	Successes() <-chan *ProducerMessage
 
-	// Errors is the error output channel back to the user. You MUST read from this channel
-	// or the Producer will deadlock when the channel is full. Alternatively, you can set
-	// Producer.Return.Errors in your config to false, which prevents errors to be returned.
+	// Errors is the error output channel back to the user. You MUST read from this
+	// channel or the Producer will deadlock when the channel is full. Alternatively,
+	// you can set Producer.Return.Errors in your config to false, which prevents
+	// errors to be returned.
 	Errors() <-chan *ProducerError
 }
 
@@ -107,15 +111,29 @@ const (
 
 // ProducerMessage is the collection of elements passed to the Producer in order to send a message.
 type ProducerMessage struct {
-	Topic string  // The Kafka topic for this message.
-	Key   Encoder // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
-	Value Encoder // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
-
-	// These are filled in by the producer as the message is processed
-	Offset    int64 // Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if the message was successfully delivered and RequiredAcks is not NoResponse.
-	Partition int32 // Partition is the partition that the message was sent to. This is only guaranteed to be defined if the message was successfully delivered.
-
-	Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels.  Sarama completely ignores this field and is only to be used for pass-through data.
+	Topic string // The Kafka topic for this message.
+	// The partitioning key for this message. Pre-existing Encoders include
+	// StringEncoder and ByteEncoder.
+	Key Encoder
+	// The actual message to store in Kafka. Pre-existing Encoders include
+	// StringEncoder and ByteEncoder.
+	Value Encoder
+
+	// This field is used to hold arbitrary data you wish to include so it
+	// will be available when receiving on the Successes and Errors channels.
+	// Sarama completely ignores this field and is only to be used for
+	// pass-through data.
+	Metadata interface{}
+
+	// Below this point are filled in by the producer as the message is processed
+
+	// Offset is the offset of the message stored on the broker. This is only
+	// guaranteed to be defined if the message was successfully delivered and
+	// RequiredAcks is not NoResponse.
+	Offset int64
+	// Partition is the partition that the message was sent to. This is only
+	// guaranteed to be defined if the message was successfully delivered.
+	Partition int32
 
 	retries int
 	flags   flagSet

+ 22 - 20
client.go

@@ -12,49 +12,51 @@ import (
 // automatically when it passes out of scope. A single client can be safely shared by
 // multiple concurrent Producers and Consumers.
 type Client interface {
-	// Config returns the Config struct of the client. This struct should not be altered after it
-	// has been created.
+	// Config returns the Config struct of the client. This struct should not be
+	// altered after it has been created.
 	Config() *Config
 
-	// Topics returns the set of available topics as retrieved from the cluster metadata.
+	// Topics returns the set of available topics as retrieved from cluster metadata.
 	Topics() ([]string, error)
 
 	// Partitions returns the sorted list of all partition IDs for the given topic.
 	Partitions(topic string) ([]int32, error)
 
-	// WritablePartitions returns the sorted list of all writable partition IDs for the given topic,
-	// where "writable" means "having a valid leader accepting writes".
+	// WritablePartitions returns the sorted list of all writable partition IDs for the
+	// given topic, where "writable" means "having a valid leader accepting writes".
 	WritablePartitions(topic string) ([]int32, error)
 
-	// Leader returns the broker object that is the leader of the current topic/partition, as
-	// determined by querying the cluster metadata.
+	// Leader returns the broker object that is the leader of the current
+	// topic/partition, as determined by querying the cluster metadata.
 	Leader(topic string, partitionID int32) (*Broker, error)
 
 	// Replicas returns the set of all replica IDs for the given partition.
 	Replicas(topic string, partitionID int32) ([]int32, error)
 
 	// RefreshMetadata takes a list of topics and queries the cluster to refresh the
-	// available metadata for those topics. If no topics are provided, it will refresh metadata
-	// for all topics.
+	// available metadata for those topics. If no topics are provided, it will refresh
+	// metadata for all topics.
 	RefreshMetadata(topics ...string) error
 
-	// GetOffset queries the cluster to get the most recent available offset at the given
-	// time on the topic/partition combination. Time should be OffsetOldest for the earliest available
-	// offset, OffsetNewest for the offset of the message that will be produced next, or a time.
+	// GetOffset queries the cluster to get the most recent available offset at the
+	// given time on the topic/partition combination. Time should be OffsetOldest for
+	// the earliest available offset, OffsetNewest for the offset of the message that
+	// will be produced next, or a time.
 	GetOffset(topic string, partitionID int32, time int64) (int64, error)
 
-	// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached
-	// value if it's available. You can call RefreshCoordinator to update the cached value.
-	// This function only works on Kafka 0.8.2 and higher.
+	// Coordinator returns the coordinating broker for a consumer group. It will return
+	// a locally cached value if it's available. You can call RefreshCoordinator to
+	// update the cached value. This function only works on Kafka 0.8.2 and higher.
 	Coordinator(consumerGroup string) (*Broker, error)
 
-	// RefreshCoordinator retrieves the coordinator for a consumer group and stores it in local cache.
-	// This function only works on Kafka 0.8.2 and higher.
+	// RefreshCoordinator retrieves the coordinator for a consumer group and stores it
+	// in local cache. This function only works on Kafka 0.8.2 and higher.
 	RefreshCoordinator(consumerGroup string) error
 
-	// Close shuts down all broker connections managed by this client. It is required to call this function before
-	// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
-	// using a client before you close the client.
+	// Close shuts down all broker connections managed by this client. It is required
+	// to call this function before a client object passes out of scope, as it will
+	// otherwise leak memory. You must close any Producers or Consumers using a client
+	// before you close the client.
 	Close() error
 
 	// Closed returns true if the client has already had Close called on it

+ 119 - 69
config.go

@@ -7,20 +7,28 @@ import (
 
 // Config is used to pass multiple configuration options to Sarama's constructors.
 type Config struct {
-	// Net is the namespace for network-level properties used by the Broker, and shared by the Client/Producer/Consumer.
+	// Net is the namespace for network-level properties used by the Broker, and shared
+	// by the Client/Producer/Consumer.
 	Net struct {
-		MaxOpenRequests int // How many outstanding requests a connection is allowed to have before sending on it blocks (default 5).
-
-		// All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka.
-		DialTimeout  time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
-		ReadTimeout  time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
-		WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
-
-		// NOTE: these config values have no compatibility guarantees; they may change when Kafka releases its
-		// official TLS support in version 0.9.
+		// How many outstanding requests a connection is allowed to have before
+		// sending on it blocks (default 5).
+		MaxOpenRequests int
+
+		// All three of the below configurations are similar to the `socket.timeout.ms`
+		// setting in JVM kafka.
+		DialTimeout  time.Duration // How long to wait for the initial connection (default 30s).
+		ReadTimeout  time.Duration // How long to wait for a response (default 30s).
+		WriteTimeout time.Duration // How long to wait for a transmit (default 30s).
+
+		// NOTE: these config values have no compatibility guarantees; they may change
+		// when Kafka releases its official TLS support in version 0.9.
 		TLS struct {
-			Enable bool        // Whether or not to use TLS when connecting to the broker (defaults to false).
-			Config *tls.Config // The TLS configuration to use for secure connections if enabled (defaults to nil).
+			// Whether or not to use TLS when connecting to the broker
+			// (defaults to false).
+			Enable bool
+			// The TLS configuration to use for secure connections if
+			// enabled (defaults to nil).
+			Config *tls.Config
 		}
 
 		// KeepAlive specifies the keep-alive period for an active network connection.
@@ -28,53 +36,76 @@ type Config struct {
 		KeepAlive time.Duration
 	}
 
-	// Metadata is the namespace for metadata management properties used by the Client, and shared by the Producer/Consumer.
+	// Metadata is the namespace for metadata management properties used by the Client,
+	// and shared by the Producer/Consumer.
 	Metadata struct {
 		Retry struct {
-			Max     int           // The total number of times to retry a metadata request when the cluster is in the middle of a leader election (default 3).
-			Backoff time.Duration // How long to wait for leader election to occur before retrying (default 250ms). Similar to the JVM's `retry.backoff.ms`.
+			// The total number of times to retry a metadata request when the
+			// cluster is in the middle of a leader election (default 3).
+			Max int
+			// How long to wait for leader election to occur before retrying
+			// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
+			Backoff time.Duration
 		}
-		// How frequently to refresh the cluster metadata in the background. Defaults to 10 minutes.
-		// Set to 0 to disable. Similar to `topic.metadata.refresh.interval.ms` in the JVM version.
+		// How frequently to refresh the cluster metadata in the background. Defaults
+		// to 10 minutes. Set to 0 to disable. Similar to
+		// `topic.metadata.refresh.interval.ms` in the JVM version.
 		RefreshFrequency time.Duration
 	}
 
-	// Producer is the namespace for configuration related to producing messages, used by the Producer.
+	// Producer is the namespace for configuration related to producing messages,
+	// used by the Producer.
 	Producer struct {
-		// The maximum permitted size of a message (defaults to 1000000). Should be set equal to or smaller than the broker's `message.max.bytes`.
+		// The maximum permitted size of a message (defaults to 1000000). Should be set
+		// equal to or smaller than the broker's `message.max.bytes`.
 		MaxMessageBytes int
-		// The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
-		// Equivalent to the `request.required.acks` setting of the JVM producer.
+		// The level of acknowledgement reliability needed from the broker (defaults
+		// to WaitForLocal). Equivalent to the `request.required.acks` setting of the
+		// JVM producer.
 		RequiredAcks RequiredAcks
-		// The maximum duration the broker will wait the receipt of the number of RequiredAcks (defaults to 10 seconds).
-		// This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution,
-		// nanoseconds will be truncated. Equivalent to the JVM producer's `request.timeout.ms` setting.
+		// The maximum duration the broker will wait the receipt of the number of
+		// RequiredAcks (defaults to 10 seconds). This is only relevant when
+		// RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond
+		// resolution, nanoseconds will be truncated. Equivalent to the JVM producer's
+		// `request.timeout.ms` setting.
 		Timeout time.Duration
-		// The type of compression to use on messages (defaults to no compression). Similar to `compression.codec` setting of the JVM producer.
+		// The type of compression to use on messages (defaults to no compression).
+		// Similar to `compression.codec` setting of the JVM producer.
 		Compression CompressionCodec
-		// Generates partitioners for choosing the partition to send messages to (defaults to hashing the message key).
-		// Similar to the `partitioner.class` setting for the JVM producer.
+		// Generates partitioners for choosing the partition to send messages to
+		// (defaults to hashing the message key). Similar to the `partitioner.class`
+		// setting for the JVM producer.
 		Partitioner PartitionerConstructor
 
-		// Return specifies what channels will be populated. If they are set to true, you must read from
-		// the respective channels to prevent deadlock.
+		// Return specifies what channels will be populated. If they are set to true,
+		// you must read from the respective channels to prevent deadlock.
 		Return struct {
-			// If enabled, successfully delivered messages will be returned on the Successes channel (default disabled).
+			// If enabled, successfully delivered messages will be returned on the
+			// Successes channel (default disabled).
 			Successes bool
 
-			// If enabled, messages that failed to deliver will be returned on the Errors channel, including error (default enabled).
+			// If enabled, messages that failed to deliver will be returned on the
+			// Errors channel, including error (default enabled).
 			Errors bool
 		}
 
-		// The following config options control how often messages are batched up and sent to the broker. By default,
-		// messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed
+		// The following config options control how often messages are batched up and
+		// sent to the broker. By default, messages are sent as fast as possible, and
+		// all messages received while the current batch is in-flight are placed
 		// into the subsequent batch.
 		Flush struct {
-			Bytes     int           // The best-effort number of bytes needed to trigger a flush. Use the global sarama.MaxRequestSize to set a hard upper limit.
-			Messages  int           // The best-effort number of messages needed to trigger a flush. Use `MaxMessages` to set a hard upper limit.
-			Frequency time.Duration // The best-effort frequency of flushes. Equivalent to `queue.buffering.max.ms` setting of JVM producer.
-			// The maximum number of messages the producer will send in a single broker request.
-			// Defaults to 0 for unlimited. Similar to `queue.buffering.max.messages` in the JVM producer.
+			// The best-effort number of bytes needed to trigger a flush. Use the
+			// global sarama.MaxRequestSize to set a hard upper limit.
+			Bytes int
+			// The best-effort number of messages needed to trigger a flush. Use
+			// `MaxMessages` to set a hard upper limit.
+			Messages int
+			// The best-effort frequency of flushes. Equivalent to
+			// `queue.buffering.max.ms` setting of JVM producer.
+			Frequency time.Duration
+			// The maximum number of messages the producer will send in a single broker
+			// request. Defaults to 0 for unlimited. Similar to
+			// `queue.buffering.max.messages` in the JVM producer.
 			MaxMessages int
 		}
 
@@ -82,68 +113,87 @@ type Config struct {
 			// The total number of times to retry sending a message (default 3).
 			// Similar to the `message.send.max.retries` setting of the JVM producer.
 			Max int
-			// How long to wait for the cluster to settle between retries (default 100ms).
-			// Similar to the `retry.backoff.ms` setting of the JVM producer.
+			// How long to wait for the cluster to settle between retries
+			// (default 100ms). Similar to the `retry.backoff.ms` setting of the
+			// JVM producer.
 			Backoff time.Duration
 		}
 	}
 
-	// Consumer is the namespace for configuration related to consuming messages, used by the Consumer.
+	// Consumer is the namespace for configuration related to consuming messages, used
+	// by the Consumer.
 	Consumer struct {
 		Retry struct {
-			// How long to wait after a failing to read from a partition before trying again (default 2s).
+			// How long to wait after a failing to read from a partition before trying
+			// again (default 2s).
 			Backoff time.Duration
 		}
 
-		// Fetch is the namespace for controlling how many bytes are retrieved by any given request.
+		// Fetch is the namespace for controlling how many bytes are retrieved by any
+		// given request.
 		Fetch struct {
-			// The minimum number of message bytes to fetch in a request - the broker will wait until at least this many are available.
-			// The default is 1, as 0 causes the consumer to spin when no messages are available. Equivalent to the JVM's `fetch.min.bytes`.
+			// The minimum number of message bytes to fetch in a request - the broker
+			// will wait until at least this many are available. The default is 1,
+			// as 0 causes the consumer to spin when no messages are available.
+			// Equivalent to the JVM's `fetch.min.bytes`.
 			Min int32
-			// The default number of message bytes to fetch from the broker in each request (default 32768). This should be larger than the
-			// majority of your messages, or else the consumer will spend a lot of time negotiating sizes and not actually consuming. Similar
-			// to the JVM's `fetch.message.max.bytes`.
+			// The default number of message bytes to fetch from the broker in each
+			// request (default 32768). This should be larger than the majority of
+			// your messages, or else the consumer will spend a lot of time
+			// negotiating sizes and not actually consuming. Similar to the JVM's
+			// `fetch.message.max.bytes`.
 			Default int32
-			// The maximum number of message bytes to fetch from the broker in a single request. Messages larger than this will return
-			// ErrMessageTooLarge and will not be consumable, so you must be sure this is at least as large as your largest message.
-			// Defaults to 0 (no limit). Similar to the JVM's `fetch.message.max.bytes`. The global `sarama.MaxResponseSize` still applies.
+			// The maximum number of message bytes to fetch from the broker in a single
+			// request. Messages larger than this will return ErrMessageTooLarge and
+			// will not be consumable, so you must be sure this is at least as large as
+			// your largest message. Defaults to 0 (no limit). Similar to the JVM's
+			// `fetch.message.max.bytes`. The global `sarama.MaxResponseSize` still
+			// applies.
 			Max int32
 		}
-		// The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it
-		// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
-		// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
-		// Equivalent to the JVM's `fetch.wait.max.ms`.
+		// The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes
+		// to become available before it returns fewer than that anyways. The default is
+		// 250ms, since 0 causes the consumer to spin when no events are available.
+		// 100-500ms is a reasonable range for most cases. Kafka only supports precision
+		// up to milliseconds; nanoseconds will be truncated. Equivalent to the JVM's
+		// `fetch.wait.max.ms`.
 		MaxWaitTime time.Duration
 
-		// The maximum amount of time the consumer expects a message takes to process for the user. If writing to the Messages channel
-		// takes longer than this, that partition will stop fetching more messages until it can proceed again. Note that, since the
-		// Messages channel is buffered, the actual grace time is (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
+		// The maximum amount of time the consumer expects a message takes to process for
+		// the user. If writing to the Messages channel takes longer than this, that
+		// partition will stop fetching more messages until it can proceed again. Note
+		// that, since the Messages channel is buffered, the actual grace time is
+		// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
 		MaxProcessingTime time.Duration
 
-		// Return specifies what channels will be populated. If they are set to true, you must read from
-		// them to prevent deadlock.
+		// Return specifies what channels will be populated. If they are set to true,
+		// you must read from them to prevent deadlock.
 		Return struct {
-			// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
+			// If enabled, any errors that occured while consuming are returned on
+			// the Errors channel (default disabled).
 			Errors bool
 		}
 
-		// Offsets specifies configuration for how and when to commit consumed offsets. This currently requires the
-		// manual use of an OffsetManager but will eventually be automated.
+		// Offsets specifies configuration for how and when to commit consumed offsets.
+		// This currently requires the manual use of an OffsetManager but will
+		// eventually be automated.
 		Offsets struct {
 			// How frequently to commit updated offsets. Defaults to 1s.
 			CommitInterval time.Duration
 
-			// The initial offset to use if no offset was previously committed. Should be OffsetNewest or OffsetOldest.
-			// Defaults to OffsetNewest.
+			// The initial offset to use if no offset was previously committed. Should
+			// be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
 			Initial int64
 		}
 	}
 
-	// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
-	// Defaults to "sarama", but you should probably set it to something specific to your application.
+	// A user-provided string sent with every request to the brokers for logging,
+	// debugging, and auditing purposes. Defaults to "sarama", but you should
+	// probably set it to something specific to your application.
 	ClientID string
-	// The number of events to buffer in internal and external channels. This permits the producer and consumer to
-	// continue processing some messages in the background while user code is working, greatly improving throughput.
+	// The number of events to buffer in internal and external channels. This
+	// permits the producer and consumer to continue processing some messages
+	// in the background while user code is working, greatly improving throughput.
 	// Defaults to 256.
 	ChannelBufferSize int
 }

+ 27 - 21
consumer.go

@@ -46,20 +46,23 @@ func (ce ConsumerErrors) Error() string {
 // to properly integrate this functionality at a later date.
 type Consumer interface {
 
-	// Topics returns the set of available topics as retrieved from the cluster metadata.
-	// This method is the same as Client.Topics(), and is provided for convenience.
+	// Topics returns the set of available topics as retrieved from the cluster
+	// metadata. This method is the same as Client.Topics(), and is provided for
+	// convenience.
 	Topics() ([]string, error)
 
 	// Partitions returns the sorted list of all partition IDs for the given topic.
 	// This method is the same as Client.Partitions(), and is provided for convenience.
 	Partitions(topic string) ([]int32, error)
 
-	// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will
-	// return an error if this Consumer is already consuming on the given topic/partition. Offset can be a
-	// literal offset, or OffsetNewest or OffsetOldest
+	// ConsumePartition creates a PartitionConsumer on the given topic/partition with
+	// the given offset. It will return an error if this Consumer is already consuming
+	// on the given topic/partition. Offset can be a literal offset, or OffsetNewest
+	// or OffsetOldest
 	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
 
-	// Close shuts down the consumer. It must be called after all child PartitionConsumers have already been closed.
+	// Close shuts down the consumer. It must be called after all child
+	// PartitionConsumers have already been closed.
 	Close() error
 }
 
@@ -234,29 +237,32 @@ func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
 // or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
 type PartitionConsumer interface {
 
-	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
-	// after which you should wait until the 'messages' and 'errors' channel are drained.
-	// It is required to call this function, or Close before a consumer object passes out of scope,
-	// as it will otherwise leak memory.  You must call this before calling Close on the underlying
-	// client.
+	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will
+	// return immediately, after which you should wait until the 'messages' and
+	// 'errors' channel are drained. It is required to call this function, or
+	// Close before a consumer object passes out of scope, as it will otherwise
+	// leak memory. You must call this before calling Close on the underlying client.
 	AsyncClose()
 
-	// Close stops the PartitionConsumer from fetching messages. It is required to call this function
-	// (or AsyncClose) before a consumer object passes out of scope, as it will otherwise leak memory. You must
-	// call this before calling Close on the underlying client.
+	// Close stops the PartitionConsumer from fetching messages. It is required to call
+	// this function (or AsyncClose) before a consumer object passes out of scope, as
+	// it will otherwise leak memory. You must call this before calling Close on the
+	// underlying client.
 	Close() error
 
-	// Messages returns the read channel for the messages that are returned by the broker.
+	// Messages returns the read channel for the messages that are returned by
+	// the broker.
 	Messages() <-chan *ConsumerMessage
 
-	// Errors returns a read channel of errors that occured during consuming, if enabled. By default,
-	// errors are logged and not returned over this channel. If you want to implement any custom error
-	// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
+	// Errors returns a read channel of errors that occured during consuming, if
+	// enabled. By default, errors are logged and not returned over this channel.
+	// If you want to implement any custom error handling, set your config's
+	// Consumer.Return.Errors setting to true, and read from this channel.
 	Errors() <-chan *ConsumerError
 
-	// HighWaterMarkOffset returns the high water mark offset of the partition, i.e. the offset that will
-	// be used for the next message that will be produced. You can use this to determine how far behind
-	// the processing is.
+	// HighWaterMarkOffset returns the high water mark offset of the partition,
+	// i.e. the offset that will be used for the next message that will be produced.
+	// You can use this to determine how far behind the processing is.
 	HighWaterMarkOffset() int64
 }
 

+ 33 - 26
offset_manager.go

@@ -9,13 +9,15 @@ import (
 
 // OffsetManager uses Kafka to store and fetch consumed partition offsets.
 type OffsetManager interface {
-	// ManagePartition creates a PartitionOffsetManager on the given topic/partition. It will
-	// return an error if this OffsetManager is already managing the given topic/partition.
+	// ManagePartition creates a PartitionOffsetManager on the given topic/partition.
+	// It will return an error if this OffsetManager is already managing the given
+	// topic/partition.
 	ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
 
-	// Close stops the OffsetManager from managing offsets. It is required to call this function
-	// before an OffsetManager object passes out of scope, as it will otherwise
-	// leak memory. You must call this after all the PartitionOffsetManagers are closed.
+	// Close stops the OffsetManager from managing offsets. It is required to call this
+	// function before an OffsetManager object passes out of scope, as it will
+	// otherwise leak memory. You must call this after all the PartitionOffsetManagers
+	// are closed.
 	Close() error
 }
 
@@ -127,36 +129,41 @@ func (om *offsetManager) abandonPartitionOffsetManager(pom *partitionOffsetManag
 // on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
 // out of scope.
 type PartitionOffsetManager interface {
-	// NextOffset returns the next offset that should be consumed for the managed partition, accompanied
-	// by metadata which can be used to reconstruct the state of the partition consumer when it resumes.
-	// NextOffset() will return `config.Consumer.Offsets.Initial` and an empty metadata string if no
-	// offset was committed for this partition yet.
+	// NextOffset returns the next offset that should be consumed for the managed
+	// partition, accompanied by metadata which can be used to reconstruct the state
+	// of the partition consumer when it resumes. NextOffset() will return
+	// `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
+	// was committed for this partition yet.
 	NextOffset() (int64, string)
 
-	// MarkOffset marks the provided offset as processed, alongside a metadata string that represents
-	// the state of the partition consumer at that point in time. The metadata string can be used by
-	// another consumer to restore that state, so it can resume consumption.
+	// MarkOffset marks the provided offset as processed, alongside a metadata string
+	// that represents the state of the partition consumer at that point in time. The
+	// metadata string can be used by another consumer to restore that state, so it
+	// can resume consumption.
 	//
-	// Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately
-	// for efficiency reasons, and it may never be committed if your application crashes. This means that
-	// you may end up processing the same message twice, and your processing should ideally be idempotent.
+	// Note: calling MarkOffset does not necessarily commit the offset to the backend
+	// store immediately for efficiency reasons, and it may never be committed if
+	// your application crashes. This means that you may end up processing the same
+	// message twice, and your processing should ideally be idempotent.
 	MarkOffset(offset int64, metadata string)
 
-	// Errors returns a read channel of errors that occur during offset management, if enabled. By default,
-	// errors are logged and not returned over this channel. If you want to implement any custom error
-	// handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
+	// Errors returns a read channel of errors that occur during offset management, if
+	// enabled. By default, errors are logged and not returned over this channel. If
+	// you want to implement any custom error handling, set your config's
+	// Consumer.Return.Errors setting to true, and read from this channel.
 	Errors() <-chan *ConsumerError
 
-	// AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will return immediately,
-	// after which you should wait until the 'errors' channel has been drained and closed.
-	// It is required to call this function, or Close before a consumer object passes out of scope,
-	// as it will otherwise leak memory.  You must call this before calling Close on the underlying
-	// client.
+	// AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
+	// return immediately, after which you should wait until the 'errors' channel has
+	// been drained and closed. It is required to call this function, or Close before
+	// a consumer object passes out of scope, as it will otherwise leak memory. You
+	// must call this before calling Close on the underlying client.
 	AsyncClose()
 
-	// Close stops the PartitionOffsetManager from managing offsets. It is required to call this function
-	// (or AsyncClose) before a PartitionOffsetManager object passes out of scope, as it will otherwise
-	// leak memory. You must call this before calling Close on the underlying client.
+	// Close stops the PartitionOffsetManager from managing offsets. It is required to
+	// call this function (or AsyncClose) before a PartitionOffsetManager object
+	// passes out of scope, as it will otherwise leak memory. You must call this
+	// before calling Close on the underlying client.
 	Close() error
 }
 

+ 8 - 5
partitioner.go

@@ -11,11 +11,14 @@ import (
 // decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
 // as simple default implementations.
 type Partitioner interface {
-	Partition(message *ProducerMessage, numPartitions int32) (int32, error) // Partition takes a message and partition count and chooses a partition
-
-	// RequiresConsistency indicates to the user of the partitioner whether the mapping of key->partition is consistent or not.
-	// Specifically, if a partitioner requires consistency then it must be allowed to choose from all partitions (even ones known to
-	// be unavailable), and its choice must be respected by the caller. The obvious example is the HashPartitioner.
+	// Partition takes a message and partition count and chooses a partition
+	Partition(message *ProducerMessage, numPartitions int32) (int32, error)
+
+	// RequiresConsistency indicates to the user of the partitioner whether the mapping
+	// of key->partition is consistent or not. Specifically, if a partitioner requires
+	// consistency then it must be allowed to choose from all partitions (even ones
+	// known to be unavailable), and its choice must be respected by the caller. The
+	// obvious example is the HashPartitioner.
 	RequiresConsistency() bool
 }
 

+ 7 - 6
sync_producer.go

@@ -7,14 +7,15 @@ import "sync"
 // it passes out of scope.
 type SyncProducer interface {
 
-	// SendMessage produces a given message, and returns only when it either has succeeded or failed to produce.
-	// It will return the partition and the offset of the produced message, or an error if the message
-	// failed to produce.
+	// SendMessage produces a given message, and returns only when it either has
+	// succeeded or failed to produce. It will return the partition and the offset
+	// of the produced message, or an error if the message failed to produce.
 	SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
 
-	// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
-	// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
-	// on the underlying client.
+	// Close shuts down the producer and flushes any messages it may have buffered.
+	// You must call this function before a producer object passes out of scope, as
+	// it may otherwise leak memory. You must call this before calling Close on the
+	// underlying client.
 	Close() error
 }