|
|
@@ -11,29 +11,35 @@ import (
|
|
|
|
|
|
// BrokerConfig is used to pass multiple configuration options to Broker.Open.
|
|
|
type BrokerConfig struct {
|
|
|
- MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send.
|
|
|
- DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error.
|
|
|
- ReadTimeout time.Duration // How long to wait for a response before timing out and returning an error.
|
|
|
- WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error.
|
|
|
+ MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5).
|
|
|
+
|
|
|
+ // All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka.
|
|
|
+ DialTimeout time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
|
|
|
+ ReadTimeout time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
|
|
|
+ WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
|
|
|
}
|
|
|
|
|
|
// NewBrokerConfig returns a new broker configuration with sane defaults.
|
|
|
func NewBrokerConfig() *BrokerConfig {
|
|
|
return &BrokerConfig{
|
|
|
- MaxOpenRequests: 4,
|
|
|
- DialTimeout: 1 * time.Minute,
|
|
|
- ReadTimeout: 1 * time.Minute,
|
|
|
- WriteTimeout: 1 * time.Minute,
|
|
|
+ MaxOpenRequests: 5,
|
|
|
+ DialTimeout: 30 * time.Second,
|
|
|
+ ReadTimeout: 30 * time.Second,
|
|
|
+ WriteTimeout: 30 * time.Second,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Validate checks a BrokerConfig instance. This will return a
|
|
|
// ConfigurationError if the specified values don't make sense.
|
|
|
func (config *BrokerConfig) Validate() error {
|
|
|
- if config.MaxOpenRequests < 0 {
|
|
|
+ if config.MaxOpenRequests <= 0 {
|
|
|
return ConfigurationError("Invalid MaxOpenRequests")
|
|
|
}
|
|
|
|
|
|
+ if config.DialTimeout <= 0 {
|
|
|
+ return ConfigurationError("Invalid DialTimeout")
|
|
|
+ }
|
|
|
+
|
|
|
if config.ReadTimeout <= 0 {
|
|
|
return ConfigurationError("Invalid ReadTimeout")
|
|
|
}
|
|
|
@@ -107,7 +113,7 @@ func (b *Broker) Open(conf *BrokerConfig) error {
|
|
|
|
|
|
b.conf = conf
|
|
|
b.done = make(chan bool)
|
|
|
- b.responses = make(chan responsePromise, b.conf.MaxOpenRequests)
|
|
|
+ b.responses = make(chan responsePromise, b.conf.MaxOpenRequests-1)
|
|
|
|
|
|
Logger.Printf("Connected to broker %s\n", b.addr)
|
|
|
go withRecover(b.responseReceiver)
|