|
|
@@ -7,21 +7,22 @@ import (
|
|
|
|
|
|
// Config is used to pass multiple configuration options to Sarama's constructors.
|
|
|
type Config struct {
|
|
|
- // Net is the namespace for network-level properties used by the Broker, and shared
|
|
|
- // by the Client/Producer/Consumer.
|
|
|
+ // Net is the namespace for network-level properties used by the Broker, and
|
|
|
+ // shared by the Client/Producer/Consumer.
|
|
|
Net struct {
|
|
|
// How many outstanding requests a connection is allowed to have before
|
|
|
// sending on it blocks (default 5).
|
|
|
MaxOpenRequests int
|
|
|
|
|
|
- // All three of the below configurations are similar to the `socket.timeout.ms`
|
|
|
- // setting in JVM kafka.
|
|
|
- DialTimeout time.Duration // How long to wait for the initial connection (default 30s).
|
|
|
- ReadTimeout time.Duration // How long to wait for a response (default 30s).
|
|
|
- WriteTimeout time.Duration // How long to wait for a transmit (default 30s).
|
|
|
+ // All three of the below configurations are similar to the
|
|
|
+ // `socket.timeout.ms` setting in JVM kafka. All of them default
|
|
|
+ // to 30 seconds.
|
|
|
+ DialTimeout time.Duration // How long to wait for the initial connection.
|
|
|
+ ReadTimeout time.Duration // How long to wait for a response.
|
|
|
+ WriteTimeout time.Duration // How long to wait for a transmit.
|
|
|
|
|
|
- // NOTE: these config values have no compatibility guarantees; they may change
|
|
|
- // when Kafka releases its official TLS support in version 0.9.
|
|
|
+ // NOTE: these config values have no compatibility guarantees; they may
|
|
|
+ // change when Kafka releases its official TLS support in version 0.9.
|
|
|
TLS struct {
|
|
|
// Whether or not to use TLS when connecting to the broker
|
|
|
// (defaults to false).
|
|
|
@@ -36,8 +37,8 @@ type Config struct {
|
|
|
KeepAlive time.Duration
|
|
|
}
|
|
|
|
|
|
- // Metadata is the namespace for metadata management properties used by the Client,
|
|
|
- // and shared by the Producer/Consumer.
|
|
|
+ // Metadata is the namespace for metadata management properties used by the
|
|
|
+ // Client, and shared by the Producer/Consumer.
|
|
|
Metadata struct {
|
|
|
Retry struct {
|
|
|
// The total number of times to retry a metadata request when the
|
|
|
@@ -47,8 +48,8 @@ type Config struct {
|
|
|
// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
|
|
|
Backoff time.Duration
|
|
|
}
|
|
|
- // How frequently to refresh the cluster metadata in the background. Defaults
|
|
|
- // to 10 minutes. Set to 0 to disable. Similar to
|
|
|
+ // How frequently to refresh the cluster metadata in the background.
|
|
|
+ // Defaults to 10 minutes. Set to 0 to disable. Similar to
|
|
|
// `topic.metadata.refresh.interval.ms` in the JVM version.
|
|
|
RefreshFrequency time.Duration
|
|
|
}
|
|
|
@@ -56,8 +57,8 @@ type Config struct {
|
|
|
// Producer is the namespace for configuration related to producing messages,
|
|
|
// used by the Producer.
|
|
|
Producer struct {
|
|
|
- // The maximum permitted size of a message (defaults to 1000000). Should be set
|
|
|
- // equal to or smaller than the broker's `message.max.bytes`.
|
|
|
+ // The maximum permitted size of a message (defaults to 1000000). Should be
|
|
|
+ // set equal to or smaller than the broker's `message.max.bytes`.
|
|
|
MaxMessageBytes int
|
|
|
// The level of acknowledgement reliability needed from the broker (defaults
|
|
|
// to WaitForLocal). Equivalent to the `request.required.acks` setting of the
|
|
|
@@ -65,9 +66,9 @@ type Config struct {
|
|
|
RequiredAcks RequiredAcks
|
|
|
// The maximum duration the broker will wait the receipt of the number of
|
|
|
// RequiredAcks (defaults to 10 seconds). This is only relevant when
|
|
|
- // RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond
|
|
|
- // resolution, nanoseconds will be truncated. Equivalent to the JVM producer's
|
|
|
- // `request.timeout.ms` setting.
|
|
|
+ // RequiredAcks is set to WaitForAll or a number > 1. Only supports
|
|
|
+ // millisecond resolution, nanoseconds will be truncated. Equivalent to
|
|
|
+ // the JVM producer's `request.timeout.ms` setting.
|
|
|
Timeout time.Duration
|
|
|
// The type of compression to use on messages (defaults to no compression).
|
|
|
// Similar to `compression.codec` setting of the JVM producer.
|
|
|
@@ -103,8 +104,8 @@ type Config struct {
|
|
|
// The best-effort frequency of flushes. Equivalent to
|
|
|
// `queue.buffering.max.ms` setting of JVM producer.
|
|
|
Frequency time.Duration
|
|
|
- // The maximum number of messages the producer will send in a single broker
|
|
|
- // request. Defaults to 0 for unlimited. Similar to
|
|
|
+ // The maximum number of messages the producer will send in a single
|
|
|
+ // broker request. Defaults to 0 for unlimited. Similar to
|
|
|
// `queue.buffering.max.messages` in the JVM producer.
|
|
|
MaxMessages int
|
|
|
}
|
|
|
@@ -120,12 +121,12 @@ type Config struct {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // Consumer is the namespace for configuration related to consuming messages, used
|
|
|
- // by the Consumer.
|
|
|
+ // Consumer is the namespace for configuration related to consuming messages,
|
|
|
+ // used by the Consumer.
|
|
|
Consumer struct {
|
|
|
Retry struct {
|
|
|
- // How long to wait after a failing to read from a partition before trying
|
|
|
- // again (default 2s).
|
|
|
+ // How long to wait after a failing to read from a partition before
|
|
|
+ // trying again (default 2s).
|
|
|
Backoff time.Duration
|
|
|
}
|
|
|
|
|
|
@@ -143,26 +144,26 @@ type Config struct {
|
|
|
// negotiating sizes and not actually consuming. Similar to the JVM's
|
|
|
// `fetch.message.max.bytes`.
|
|
|
Default int32
|
|
|
- // The maximum number of message bytes to fetch from the broker in a single
|
|
|
- // request. Messages larger than this will return ErrMessageTooLarge and
|
|
|
- // will not be consumable, so you must be sure this is at least as large as
|
|
|
- // your largest message. Defaults to 0 (no limit). Similar to the JVM's
|
|
|
- // `fetch.message.max.bytes`. The global `sarama.MaxResponseSize` still
|
|
|
- // applies.
|
|
|
+ // The maximum number of message bytes to fetch from the broker in a
|
|
|
+ // single request. Messages larger than this will return
|
|
|
+ // ErrMessageTooLarge and will not be consumable, so you must be sure
|
|
|
+ // this is at least as large as your largest message. Defaults to 0
|
|
|
+ // (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
|
|
|
+ // global `sarama.MaxResponseSize` still applies.
|
|
|
Max int32
|
|
|
}
|
|
|
- // The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes
|
|
|
- // to become available before it returns fewer than that anyways. The default is
|
|
|
- // 250ms, since 0 causes the consumer to spin when no events are available.
|
|
|
- // 100-500ms is a reasonable range for most cases. Kafka only supports precision
|
|
|
- // up to milliseconds; nanoseconds will be truncated. Equivalent to the JVM's
|
|
|
- // `fetch.wait.max.ms`.
|
|
|
+ // The maximum amount of time the broker will wait for Consumer.Fetch.Min
|
|
|
+ // bytes to become available before it returns fewer than that anyways. The
|
|
|
+ // default is 250ms, since 0 causes the consumer to spin when no events are
|
|
|
+ // available. 100-500ms is a reasonable range for most cases. Kafka only
|
|
|
+ // supports precision up to milliseconds; nanoseconds will be truncated.
|
|
|
+ // Equivalent to the JVM's `fetch.wait.max.ms`.
|
|
|
MaxWaitTime time.Duration
|
|
|
|
|
|
- // The maximum amount of time the consumer expects a message takes to process for
|
|
|
- // the user. If writing to the Messages channel takes longer than this, that
|
|
|
- // partition will stop fetching more messages until it can proceed again. Note
|
|
|
- // that, since the Messages channel is buffered, the actual grace time is
|
|
|
+ // The maximum amount of time the consumer expects a message takes to process
|
|
|
+ // for the user. If writing to the Messages channel takes longer than this,
|
|
|
+ // that partition will stop fetching more messages until it can proceed again.
|
|
|
+ // Note that, since the Messages channel is buffered, the actual grace time is
|
|
|
// (MaxProcessingTime * ChanneBufferSize). Defaults to 100ms.
|
|
|
MaxProcessingTime time.Duration
|
|
|
|
|
|
@@ -174,15 +175,15 @@ type Config struct {
|
|
|
Errors bool
|
|
|
}
|
|
|
|
|
|
- // Offsets specifies configuration for how and when to commit consumed offsets.
|
|
|
- // This currently requires the manual use of an OffsetManager but will
|
|
|
- // eventually be automated.
|
|
|
+ // Offsets specifies configuration for how and when to commit consumed
|
|
|
+ // offsets. This currently requires the manual use of an OffsetManager
|
|
|
+ // but will eventually be automated.
|
|
|
Offsets struct {
|
|
|
// How frequently to commit updated offsets. Defaults to 1s.
|
|
|
CommitInterval time.Duration
|
|
|
|
|
|
- // The initial offset to use if no offset was previously committed. Should
|
|
|
- // be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
|
|
|
+ // The initial offset to use if no offset was previously committed.
|
|
|
+ // Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
|
|
|
Initial int64
|
|
|
}
|
|
|
}
|