config_test.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package sarama
  2. import (
  3. "os"
  4. "testing"
  5. "github.com/rcrowley/go-metrics"
  6. )
  7. func TestDefaultConfigValidates(t *testing.T) {
  8. config := NewConfig()
  9. if err := config.Validate(); err != nil {
  10. t.Error(err)
  11. }
  12. if config.MetricRegistry == nil {
  13. t.Error("Expected non nil metrics.MetricRegistry, got nil")
  14. }
  15. }
  16. func TestInvalidClientIDConfigValidates(t *testing.T) {
  17. config := NewConfig()
  18. config.ClientID = "foo:bar"
  19. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  20. t.Error("Expected invalid ClientID, got ", err)
  21. }
  22. }
  23. func TestEmptyClientIDConfigValidates(t *testing.T) {
  24. config := NewConfig()
  25. config.ClientID = ""
  26. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  27. t.Error("Expected invalid ClientID, got ", err)
  28. }
  29. }
  30. func TestNetConfigValidates(t *testing.T) {
  31. tests := []struct {
  32. name string
  33. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  34. err string
  35. }{
  36. {
  37. "OpenRequests",
  38. func(cfg *Config) {
  39. cfg.Net.MaxOpenRequests = 0
  40. },
  41. "Net.MaxOpenRequests must be > 0"},
  42. {"DialTimeout",
  43. func(cfg *Config) {
  44. cfg.Net.DialTimeout = 0
  45. },
  46. "Net.DialTimeout must be > 0"},
  47. {"ReadTimeout",
  48. func(cfg *Config) {
  49. cfg.Net.ReadTimeout = 0
  50. },
  51. "Net.ReadTimeout must be > 0"},
  52. {"WriteTimeout",
  53. func(cfg *Config) {
  54. cfg.Net.WriteTimeout = 0
  55. },
  56. "Net.WriteTimeout must be > 0"},
  57. {"KeepAlive",
  58. func(cfg *Config) {
  59. cfg.Net.KeepAlive = -1
  60. },
  61. "Net.KeepAlive must be >= 0"},
  62. {"SASL.User",
  63. func(cfg *Config) {
  64. cfg.Net.SASL.Enable = true
  65. cfg.Net.SASL.User = ""
  66. },
  67. "Net.SASL.User must not be empty when SASL is enabled"},
  68. {"SASL.Password",
  69. func(cfg *Config) {
  70. cfg.Net.SASL.Enable = true
  71. cfg.Net.SASL.User = "user"
  72. cfg.Net.SASL.Password = ""
  73. },
  74. "Net.SASL.Password must not be empty when SASL is enabled"},
  75. }
  76. for i, test := range tests {
  77. c := NewConfig()
  78. test.cfg(c)
  79. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  80. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  81. }
  82. }
  83. }
  84. func TestMetadataConfigValidates(t *testing.T) {
  85. tests := []struct {
  86. name string
  87. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  88. err string
  89. }{
  90. {
  91. "Retry.Max",
  92. func(cfg *Config) {
  93. cfg.Metadata.Retry.Max = -1
  94. },
  95. "Metadata.Retry.Max must be >= 0"},
  96. {"Retry.Backoff",
  97. func(cfg *Config) {
  98. cfg.Metadata.Retry.Backoff = -1
  99. },
  100. "Metadata.Retry.Backoff must be >= 0"},
  101. {"RefreshFrequency",
  102. func(cfg *Config) {
  103. cfg.Metadata.RefreshFrequency = -1
  104. },
  105. "Metadata.RefreshFrequency must be >= 0"},
  106. }
  107. for i, test := range tests {
  108. c := NewConfig()
  109. test.cfg(c)
  110. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  111. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  112. }
  113. }
  114. }
  115. func TestProducerConfigValidates(t *testing.T) {
  116. tests := []struct {
  117. name string
  118. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  119. err string
  120. }{
  121. {
  122. "MaxMessageBytes",
  123. func(cfg *Config) {
  124. cfg.Producer.MaxMessageBytes = 0
  125. },
  126. "Producer.MaxMessageBytes must be > 0"},
  127. {"RequiredAcks",
  128. func(cfg *Config) {
  129. cfg.Producer.RequiredAcks = -2
  130. },
  131. "Producer.RequiredAcks must be >= -1"},
  132. {"Timeout",
  133. func(cfg *Config) {
  134. cfg.Producer.Timeout = 0
  135. },
  136. "Producer.Timeout must be > 0"},
  137. {"Partitioner",
  138. func(cfg *Config) {
  139. cfg.Producer.Partitioner = nil
  140. },
  141. "Producer.Partitioner must not be nil"},
  142. {"Flush.Bytes",
  143. func(cfg *Config) {
  144. cfg.Producer.Flush.Bytes = -1
  145. },
  146. "Producer.Flush.Bytes must be >= 0"},
  147. {"Flush.Messages",
  148. func(cfg *Config) {
  149. cfg.Producer.Flush.Messages = -1
  150. },
  151. "Producer.Flush.Messages must be >= 0"},
  152. {"Flush.Frequency",
  153. func(cfg *Config) {
  154. cfg.Producer.Flush.Frequency = -1
  155. },
  156. "Producer.Flush.Frequency must be >= 0"},
  157. {"Flush.MaxMessages",
  158. func(cfg *Config) {
  159. cfg.Producer.Flush.MaxMessages = -1
  160. },
  161. "Producer.Flush.MaxMessages must be >= 0"},
  162. {"Flush.MaxMessages with Producer.Flush.Messages",
  163. func(cfg *Config) {
  164. cfg.Producer.Flush.MaxMessages = 1
  165. cfg.Producer.Flush.Messages = 2
  166. },
  167. "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
  168. {"Flush.Retry.Max",
  169. func(cfg *Config) {
  170. cfg.Producer.Retry.Max = -1
  171. },
  172. "Producer.Retry.Max must be >= 0"},
  173. {"Flush.Retry.Backoff",
  174. func(cfg *Config) {
  175. cfg.Producer.Retry.Backoff = -1
  176. },
  177. "Producer.Retry.Backoff must be >= 0"},
  178. }
  179. for i, test := range tests {
  180. c := NewConfig()
  181. test.cfg(c)
  182. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  183. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  184. }
  185. }
  186. }
  187. func TestLZ4ConfigValidation(t *testing.T) {
  188. config := NewConfig()
  189. config.Producer.Compression = CompressionLZ4
  190. if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
  191. t.Error("Expected invalid lz4/kakfa version error, got ", err)
  192. }
  193. config.Version = V0_10_0_0
  194. if err := config.Validate(); err != nil {
  195. t.Error("Expected lz4 to work, got ", err)
  196. }
  197. }
  198. // This example shows how to integrate with an existing registry as well as publishing metrics
  199. // on the standard output
  200. func ExampleConfig_metrics() {
  201. // Our application registry
  202. appMetricRegistry := metrics.NewRegistry()
  203. appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
  204. appGauge.Update(1)
  205. config := NewConfig()
  206. // Use a prefix registry instead of the default local one
  207. config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
  208. // Simulate a metric created by sarama without starting a broker
  209. saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
  210. saramaGauge.Update(2)
  211. metrics.WriteOnce(appMetricRegistry, os.Stdout)
  212. // Output:
  213. // gauge m1
  214. // value: 1
  215. // gauge sarama.m2
  216. // value: 2
  217. }