|
@@ -13,6 +13,7 @@ import (
|
|
|
type BrokerConfig struct {
|
|
|
MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send.
|
|
|
ReadTimeout time.Duration // How long to wait for a response before timing out and returning an error.
|
|
|
+ WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error.
|
|
|
}
|
|
|
|
|
|
// NewBrokerConfig returns a new broker configuration with sane defaults.
|
|
@@ -20,6 +21,7 @@ func NewBrokerConfig() *BrokerConfig {
|
|
|
return &BrokerConfig{
|
|
|
MaxOpenRequests: 4,
|
|
|
ReadTimeout: 1 * time.Minute,
|
|
|
+ WriteTimeout: 1 * time.Minute,
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -34,6 +36,10 @@ func (config *BrokerConfig) Validate() error {
|
|
|
return ConfigurationError("Invalid ReadTimeout")
|
|
|
}
|
|
|
|
|
|
+ if config.WriteTimeout <= 0 {
|
|
|
+ return ConfigurationError("Invalid WriteTimeout")
|
|
|
+ }
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
@@ -264,6 +270,11 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
+ err = b.conn.SetWriteDeadline(time.Now().Add(b.conf.WriteTimeout))
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
_, err = b.conn.Write(buf)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
@@ -346,9 +357,13 @@ func (b *Broker) encode(pe packetEncoder) (err error) {
|
|
|
func (b *Broker) responseReceiver() {
|
|
|
header := make([]byte, 8)
|
|
|
for response := range b.responses {
|
|
|
- b.conn.SetReadDeadline(time.Now().Add(b.conf.ReadTimeout))
|
|
|
+ err := b.conn.SetReadDeadline(time.Now().Add(b.conf.ReadTimeout))
|
|
|
+ if err != nil {
|
|
|
+ response.errors <- err
|
|
|
+ continue
|
|
|
+ }
|
|
|
|
|
|
- _, err := io.ReadFull(b.conn, header)
|
|
|
+ _, err = io.ReadFull(b.conn, header)
|
|
|
if err != nil {
|
|
|
response.errors <- err
|
|
|
continue
|