config_test.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package sarama
  2. import (
  3. "os"
  4. "testing"
  5. "github.com/rcrowley/go-metrics"
  6. )
  7. func TestDefaultConfigValidates(t *testing.T) {
  8. config := NewConfig()
  9. if err := config.Validate(); err != nil {
  10. t.Error(err)
  11. }
  12. if config.MetricRegistry == nil {
  13. t.Error("Expected non nil metrics.MetricRegistry, got nil")
  14. }
  15. }
  16. func TestInvalidClientIDConfigValidates(t *testing.T) {
  17. config := NewConfig()
  18. config.ClientID = "foo:bar"
  19. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  20. t.Error("Expected invalid ClientID, got ", err)
  21. }
  22. }
  23. func TestEmptyClientIDConfigValidates(t *testing.T) {
  24. config := NewConfig()
  25. config.ClientID = ""
  26. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  27. t.Error("Expected invalid ClientID, got ", err)
  28. }
  29. }
  30. func TestNetConfigValidates(t *testing.T) {
  31. tests := []struct {
  32. name string
  33. cfg func() *Config // resorting to using a function as a param because of internal composite structs
  34. err string
  35. }{
  36. {
  37. "OpenRequests",
  38. func() *Config {
  39. cfg := NewConfig()
  40. cfg.Net.MaxOpenRequests = 0
  41. return cfg
  42. },
  43. "Net.MaxOpenRequests must be > 0"},
  44. {"DialTimeout",
  45. func() *Config {
  46. cfg := NewConfig()
  47. cfg.Net.DialTimeout = 0
  48. return cfg
  49. },
  50. "Net.DialTimeout must be > 0"},
  51. {"ReadTimeout",
  52. func() *Config {
  53. cfg := NewConfig()
  54. cfg.Net.ReadTimeout = 0
  55. return cfg
  56. },
  57. "Net.ReadTimeout must be > 0"},
  58. {"WriteTimeout",
  59. func() *Config {
  60. cfg := NewConfig()
  61. cfg.Net.WriteTimeout = 0
  62. return cfg
  63. },
  64. "Net.WriteTimeout must be > 0"},
  65. {"KeepAlive",
  66. func() *Config {
  67. cfg := NewConfig()
  68. cfg.Net.KeepAlive = -1
  69. return cfg
  70. },
  71. "Net.KeepAlive must be >= 0"},
  72. {"SASL.User",
  73. func() *Config {
  74. cfg := NewConfig()
  75. cfg.Net.SASL.Enable = true
  76. cfg.Net.SASL.User = ""
  77. return cfg
  78. },
  79. "Net.SASL.User must not be empty when SASL is enabled"},
  80. {"SASL.Password",
  81. func() *Config {
  82. cfg := NewConfig()
  83. cfg.Net.SASL.Enable = true
  84. cfg.Net.SASL.User = "user"
  85. cfg.Net.SASL.Password = ""
  86. return cfg
  87. },
  88. "Net.SASL.Password must not be empty when SASL is enabled"},
  89. }
  90. for i, test := range tests {
  91. if err := test.cfg().Validate(); string(err.(ConfigurationError)) != test.err {
  92. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  93. }
  94. }
  95. }
  96. func TestMetadataConfigValidates(t *testing.T) {
  97. tests := []struct {
  98. name string
  99. cfg func() *Config // resorting to using a function as a param because of internal composite structs
  100. err string
  101. }{
  102. {
  103. "Retry.Max",
  104. func() *Config {
  105. cfg := NewConfig()
  106. cfg.Metadata.Retry.Max = -1
  107. return cfg
  108. },
  109. "Metadata.Retry.Max must be >= 0"},
  110. {"Retry.Backoff",
  111. func() *Config {
  112. cfg := NewConfig()
  113. cfg.Metadata.Retry.Backoff = -1
  114. return cfg
  115. },
  116. "Metadata.Retry.Backoff must be >= 0"},
  117. {"RefreshFrequency",
  118. func() *Config {
  119. cfg := NewConfig()
  120. cfg.Metadata.RefreshFrequency = -1
  121. return cfg
  122. },
  123. "Metadata.RefreshFrequency must be >= 0"},
  124. }
  125. for i, test := range tests {
  126. if err := test.cfg().Validate(); string(err.(ConfigurationError)) != test.err {
  127. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  128. }
  129. }
  130. }
  131. func TestProducerConfigValidates(t *testing.T) {
  132. tests := []struct {
  133. name string
  134. cfg func() *Config // resorting to using a function as a param because of internal composite structs
  135. err string
  136. }{
  137. {
  138. "MaxMessageBytes",
  139. func() *Config {
  140. cfg := NewConfig()
  141. cfg.Producer.MaxMessageBytes = 0
  142. return cfg
  143. },
  144. "Producer.MaxMessageBytes must be > 0"},
  145. {"RequiredAcks",
  146. func() *Config {
  147. cfg := NewConfig()
  148. cfg.Producer.RequiredAcks = -2
  149. return cfg
  150. },
  151. "Producer.RequiredAcks must be >= -1"},
  152. {"Timeout",
  153. func() *Config {
  154. cfg := NewConfig()
  155. cfg.Producer.Timeout = 0
  156. return cfg
  157. },
  158. "Producer.Timeout must be > 0"},
  159. {"Partitioner",
  160. func() *Config {
  161. cfg := NewConfig()
  162. cfg.Producer.Partitioner = nil
  163. return cfg
  164. },
  165. "Producer.Partitioner must not be nil"},
  166. {"Flush.Bytes",
  167. func() *Config {
  168. cfg := NewConfig()
  169. cfg.Producer.Flush.Bytes = -1
  170. return cfg
  171. },
  172. "Producer.Flush.Bytes must be >= 0"},
  173. {"Flush.Messages",
  174. func() *Config {
  175. cfg := NewConfig()
  176. cfg.Producer.Flush.Messages = -1
  177. return cfg
  178. },
  179. "Producer.Flush.Messages must be >= 0"},
  180. {"Flush.Frequency",
  181. func() *Config {
  182. cfg := NewConfig()
  183. cfg.Producer.Flush.Frequency = -1
  184. return cfg
  185. },
  186. "Producer.Flush.Frequency must be >= 0"},
  187. {"Flush.MaxMessages",
  188. func() *Config {
  189. cfg := NewConfig()
  190. cfg.Producer.Flush.MaxMessages = -1
  191. return cfg
  192. },
  193. "Producer.Flush.MaxMessages must be >= 0"},
  194. {"Flush.MaxMessages with Producer.Flush.Messages",
  195. func() *Config {
  196. cfg := NewConfig()
  197. cfg.Producer.Flush.MaxMessages = 1
  198. cfg.Producer.Flush.Messages = 2
  199. return cfg
  200. },
  201. "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
  202. {"Flush.Retry.Max",
  203. func() *Config {
  204. cfg := NewConfig()
  205. cfg.Producer.Retry.Max = -1
  206. return cfg
  207. },
  208. "Producer.Retry.Max must be >= 0"},
  209. {"Flush.Retry.Backoff",
  210. func() *Config {
  211. cfg := NewConfig()
  212. cfg.Producer.Retry.Backoff = -1
  213. return cfg
  214. },
  215. "Producer.Retry.Backoff must be >= 0"},
  216. }
  217. for i, test := range tests {
  218. if err := test.cfg().Validate(); string(err.(ConfigurationError)) != test.err {
  219. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  220. }
  221. }
  222. }
  223. func TestLZ4ConfigValidation(t *testing.T) {
  224. config := NewConfig()
  225. config.Producer.Compression = CompressionLZ4
  226. if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
  227. t.Error("Expected invalid lz4/kakfa version error, got ", err)
  228. }
  229. config.Version = V0_10_0_0
  230. if err := config.Validate(); err != nil {
  231. t.Error("Expected lz4 to work, got ", err)
  232. }
  233. }
  234. // This example shows how to integrate with an existing registry as well as publishing metrics
  235. // on the standard output
  236. func ExampleConfig_metrics() {
  237. // Our application registry
  238. appMetricRegistry := metrics.NewRegistry()
  239. appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
  240. appGauge.Update(1)
  241. config := NewConfig()
  242. // Use a prefix registry instead of the default local one
  243. config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
  244. // Simulate a metric created by sarama without starting a broker
  245. saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
  246. saramaGauge.Update(2)
  247. metrics.WriteOnce(appMetricRegistry, os.Stdout)
  248. // Output:
  249. // gauge m1
  250. // value: 1
  251. // gauge sarama.m2
  252. // value: 2
  253. }