config_test.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package sarama
  2. import (
  3. "os"
  4. "testing"
  5. "github.com/rcrowley/go-metrics"
  6. )
  7. func TestDefaultConfigValidates(t *testing.T) {
  8. config := NewConfig()
  9. if err := config.Validate(); err != nil {
  10. t.Error(err)
  11. }
  12. if config.MetricRegistry == nil {
  13. t.Error("Expected non nil metrics.MetricRegistry, got nil")
  14. }
  15. }
  16. func TestInvalidClientIDConfigValidates(t *testing.T) {
  17. config := NewConfig()
  18. config.ClientID = "foo:bar"
  19. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  20. t.Error("Expected invalid ClientID, got ", err)
  21. }
  22. }
  23. func TestEmptyClientIDConfigValidates(t *testing.T) {
  24. config := NewConfig()
  25. config.ClientID = ""
  26. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  27. t.Error("Expected invalid ClientID, got ", err)
  28. }
  29. }
  30. func TestNetConfigValidates(t *testing.T) {
  31. tests := []struct {
  32. name string
  33. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  34. err string
  35. }{
  36. {
  37. "OpenRequests",
  38. func(cfg *Config) {
  39. cfg.Net.MaxOpenRequests = 0
  40. },
  41. "Net.MaxOpenRequests must be > 0"},
  42. {"DialTimeout",
  43. func(cfg *Config) {
  44. cfg.Net.DialTimeout = 0
  45. },
  46. "Net.DialTimeout must be > 0"},
  47. {"ReadTimeout",
  48. func(cfg *Config) {
  49. cfg.Net.ReadTimeout = 0
  50. },
  51. "Net.ReadTimeout must be > 0"},
  52. {"WriteTimeout",
  53. func(cfg *Config) {
  54. cfg.Net.WriteTimeout = 0
  55. },
  56. "Net.WriteTimeout must be > 0"},
  57. {"KeepAlive",
  58. func(cfg *Config) {
  59. cfg.Net.KeepAlive = -1
  60. },
  61. "Net.KeepAlive must be >= 0"},
  62. {"SASL.User",
  63. func(cfg *Config) {
  64. cfg.Net.SASL.Enable = true
  65. cfg.Net.SASL.User = ""
  66. },
  67. "Net.SASL.User must not be empty when SASL is enabled"},
  68. {"SASL.Password",
  69. func(cfg *Config) {
  70. cfg.Net.SASL.Enable = true
  71. cfg.Net.SASL.User = "user"
  72. cfg.Net.SASL.Password = ""
  73. },
  74. "Net.SASL.Password must not be empty when SASL is enabled"},
  75. }
  76. for i, test := range tests {
  77. c := NewConfig()
  78. test.cfg(c)
  79. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  80. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  81. }
  82. }
  83. }
  84. func TestMetadataConfigValidates(t *testing.T) {
  85. tests := []struct {
  86. name string
  87. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  88. err string
  89. }{
  90. {
  91. "Retry.Max",
  92. func(cfg *Config) {
  93. cfg.Metadata.Retry.Max = -1
  94. },
  95. "Metadata.Retry.Max must be >= 0"},
  96. {"Retry.Backoff",
  97. func(cfg *Config) {
  98. cfg.Metadata.Retry.Backoff = -1
  99. },
  100. "Metadata.Retry.Backoff must be >= 0"},
  101. {"RefreshFrequency",
  102. func(cfg *Config) {
  103. cfg.Metadata.RefreshFrequency = -1
  104. },
  105. "Metadata.RefreshFrequency must be >= 0"},
  106. }
  107. for i, test := range tests {
  108. c := NewConfig()
  109. test.cfg(c)
  110. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  111. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  112. }
  113. }
  114. }
  115. func TestAdminConfigValidates(t *testing.T) {
  116. tests := []struct {
  117. name string
  118. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  119. err string
  120. }{
  121. {"Timeout",
  122. func(cfg *Config) {
  123. cfg.Admin.Timeout = 0
  124. },
  125. "Admin.Timeout must be > 0"},
  126. }
  127. for i, test := range tests {
  128. c := NewConfig()
  129. test.cfg(c)
  130. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  131. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  132. }
  133. }
  134. }
  135. func TestProducerConfigValidates(t *testing.T) {
  136. tests := []struct {
  137. name string
  138. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  139. err string
  140. }{
  141. {
  142. "MaxMessageBytes",
  143. func(cfg *Config) {
  144. cfg.Producer.MaxMessageBytes = 0
  145. },
  146. "Producer.MaxMessageBytes must be > 0"},
  147. {"RequiredAcks",
  148. func(cfg *Config) {
  149. cfg.Producer.RequiredAcks = -2
  150. },
  151. "Producer.RequiredAcks must be >= -1"},
  152. {"Timeout",
  153. func(cfg *Config) {
  154. cfg.Producer.Timeout = 0
  155. },
  156. "Producer.Timeout must be > 0"},
  157. {"Partitioner",
  158. func(cfg *Config) {
  159. cfg.Producer.Partitioner = nil
  160. },
  161. "Producer.Partitioner must not be nil"},
  162. {"Flush.Bytes",
  163. func(cfg *Config) {
  164. cfg.Producer.Flush.Bytes = -1
  165. },
  166. "Producer.Flush.Bytes must be >= 0"},
  167. {"Flush.Messages",
  168. func(cfg *Config) {
  169. cfg.Producer.Flush.Messages = -1
  170. },
  171. "Producer.Flush.Messages must be >= 0"},
  172. {"Flush.Frequency",
  173. func(cfg *Config) {
  174. cfg.Producer.Flush.Frequency = -1
  175. },
  176. "Producer.Flush.Frequency must be >= 0"},
  177. {"Flush.MaxMessages",
  178. func(cfg *Config) {
  179. cfg.Producer.Flush.MaxMessages = -1
  180. },
  181. "Producer.Flush.MaxMessages must be >= 0"},
  182. {"Flush.MaxMessages with Producer.Flush.Messages",
  183. func(cfg *Config) {
  184. cfg.Producer.Flush.MaxMessages = 1
  185. cfg.Producer.Flush.Messages = 2
  186. },
  187. "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
  188. {"Flush.Retry.Max",
  189. func(cfg *Config) {
  190. cfg.Producer.Retry.Max = -1
  191. },
  192. "Producer.Retry.Max must be >= 0"},
  193. {"Flush.Retry.Backoff",
  194. func(cfg *Config) {
  195. cfg.Producer.Retry.Backoff = -1
  196. },
  197. "Producer.Retry.Backoff must be >= 0"},
  198. {"Idempotent Version",
  199. func(cfg *Config) {
  200. cfg.Producer.Idempotent = true
  201. cfg.Version = V0_10_0_0
  202. },
  203. "Idempotent producer requires Version >= V0_11_0_0"},
  204. {"Idempotent with Producer.Retry.Max",
  205. func(cfg *Config) {
  206. cfg.Version = V0_11_0_0
  207. cfg.Producer.Idempotent = true
  208. cfg.Producer.Retry.Max = 0
  209. },
  210. "Idempotent producer requires Producer.Retry.Max >= 1"},
  211. {"Idempotent with Producer.RequiredAcks",
  212. func(cfg *Config) {
  213. cfg.Version = V0_11_0_0
  214. cfg.Producer.Idempotent = true
  215. },
  216. "Idempotent producer requires Producer.RequiredAcks to be WaitForAll"},
  217. {"Idempotent with Net.MaxOpenRequests",
  218. func(cfg *Config) {
  219. cfg.Version = V0_11_0_0
  220. cfg.Producer.Idempotent = true
  221. cfg.Producer.RequiredAcks = WaitForAll
  222. },
  223. "Idempotent producer requires Net.MaxOpenRequests to be 1"},
  224. }
  225. for i, test := range tests {
  226. c := NewConfig()
  227. test.cfg(c)
  228. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  229. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  230. }
  231. }
  232. }
  233. func TestLZ4ConfigValidation(t *testing.T) {
  234. config := NewConfig()
  235. config.Producer.Compression = CompressionLZ4
  236. if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
  237. t.Error("Expected invalid lz4/kafka version error, got ", err)
  238. }
  239. config.Version = V0_10_0_0
  240. if err := config.Validate(); err != nil {
  241. t.Error("Expected lz4 to work, got ", err)
  242. }
  243. }
  244. // This example shows how to integrate with an existing registry as well as publishing metrics
  245. // on the standard output
  246. func ExampleConfig_metrics() {
  247. // Our application registry
  248. appMetricRegistry := metrics.NewRegistry()
  249. appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
  250. appGauge.Update(1)
  251. config := NewConfig()
  252. // Use a prefix registry instead of the default local one
  253. config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
  254. // Simulate a metric created by sarama without starting a broker
  255. saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
  256. saramaGauge.Update(2)
  257. metrics.WriteOnce(appMetricRegistry, os.Stdout)
  258. // Output:
  259. // gauge m1
  260. // value: 1
  261. // gauge sarama.m2
  262. // value: 2
  263. }