|
@@ -75,30 +75,8 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
|
|
|
config = new(ConsumerConfig)
|
|
|
}
|
|
|
|
|
|
- if config.DefaultFetchSize < 0 {
|
|
|
- return nil, ConfigurationError("Invalid DefaultFetchSize")
|
|
|
- } else if config.DefaultFetchSize == 0 {
|
|
|
- config.DefaultFetchSize = 1024
|
|
|
- }
|
|
|
-
|
|
|
- if config.MinFetchSize < 0 {
|
|
|
- return nil, ConfigurationError("Invalid MinFetchSize")
|
|
|
- } else if config.MinFetchSize == 0 {
|
|
|
- config.MinFetchSize = 1
|
|
|
- }
|
|
|
-
|
|
|
- if config.MaxMessageSize < 0 {
|
|
|
- return nil, ConfigurationError("Invalid MaxMessageSize")
|
|
|
- }
|
|
|
-
|
|
|
- if config.MaxWaitTime <= 0 {
|
|
|
- return nil, ConfigurationError("Invalid MaxWaitTime")
|
|
|
- } else if config.MaxWaitTime < 100 {
|
|
|
- Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
|
|
|
- }
|
|
|
-
|
|
|
- if config.EventBufferSize < 0 {
|
|
|
- return nil, ConfigurationError("Invalid EventBufferSize")
|
|
|
+ if err := config.Validate(); err != nil {
|
|
|
+ return nil, err
|
|
|
}
|
|
|
|
|
|
if topic == "" {
|
|
@@ -341,3 +319,33 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
|
|
|
|
|
|
return -1, block.Err
|
|
|
}
|
|
|
+
|
|
|
+func (config *ConsumerConfig) Validate() error {
|
|
|
+ if config.DefaultFetchSize < 0 {
|
|
|
+ return ConfigurationError("Invalid DefaultFetchSize")
|
|
|
+ } else if config.DefaultFetchSize == 0 {
|
|
|
+ config.DefaultFetchSize = 1024
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.MinFetchSize < 0 {
|
|
|
+ return ConfigurationError("Invalid MinFetchSize")
|
|
|
+ } else if config.MinFetchSize == 0 {
|
|
|
+ config.MinFetchSize = 1
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.MaxMessageSize < 0 {
|
|
|
+ return ConfigurationError("Invalid MaxMessageSize")
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.MaxWaitTime <= 0 {
|
|
|
+ return ConfigurationError("Invalid MaxWaitTime")
|
|
|
+ } else if config.MaxWaitTime < 100 {
|
|
|
+ Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
|
|
|
+ }
|
|
|
+
|
|
|
+ if config.EventBufferSize < 0 {
|
|
|
+ return ConfigurationError("Invalid EventBufferSize")
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|