|
@@ -73,24 +73,8 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
|
|
|
config = new(ProducerConfig)
|
|
|
}
|
|
|
|
|
|
- if config.RequiredAcks < -1 {
|
|
|
- return nil, ConfigurationError("Invalid RequiredAcks")
|
|
|
- }
|
|
|
-
|
|
|
- if config.Timeout < 0 {
|
|
|
- return nil, ConfigurationError("Invalid Timeout")
|
|
|
- }
|
|
|
-
|
|
|
- if config.Partitioner == nil {
|
|
|
- config.Partitioner = NewRandomPartitioner()
|
|
|
- }
|
|
|
-
|
|
|
- if config.MaxBufferedBytes == 0 {
|
|
|
- return nil, ConfigurationError("Invalid MaxBufferedBytes")
|
|
|
- }
|
|
|
-
|
|
|
- if config.MaxBufferTime == 0 {
|
|
|
- return nil, ConfigurationError("Invalid MaxBufferTime")
|
|
|
+ if err := config.Validate(); err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
return &Producer{
|
|
@@ -463,3 +447,31 @@ func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
|
|
|
|
|
|
return partitions[choice], nil
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+func (config *ProducerConfig) Validate() error {
|
|
|
+ if config.RequiredAcks < -1 {
|
|
|
+ return ConfigurationError("Invalid RequiredAcks")
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.Timeout < 0 {
|
|
|
+ return ConfigurationError("Invalid Timeout")
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.Partitioner == nil {
|
|
|
+ config.Partitioner = NewRandomPartitioner()
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.MaxBufferedBytes == 0 {
|
|
|
+ return ConfigurationError("Invalid MaxBufferedBytes")
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.MaxBufferTime == 0 {
|
|
|
+ return ConfigurationError("Invalid MaxBufferTime")
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|