config_test.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. package sarama
  2. import (
  3. "os"
  4. "testing"
  5. "github.com/rcrowley/go-metrics"
  6. )
  7. func TestDefaultConfigValidates(t *testing.T) {
  8. config := NewConfig()
  9. if err := config.Validate(); err != nil {
  10. t.Error(err)
  11. }
  12. if config.MetricRegistry == nil {
  13. t.Error("Expected non nil metrics.MetricRegistry, got nil")
  14. }
  15. }
  16. func TestInvalidClientIDConfigValidates(t *testing.T) {
  17. config := NewConfig()
  18. config.ClientID = "foo:bar"
  19. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  20. t.Error("Expected invalid ClientID, got ", err)
  21. }
  22. }
  23. func TestEmptyClientIDConfigValidates(t *testing.T) {
  24. config := NewConfig()
  25. config.ClientID = ""
  26. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  27. t.Error("Expected invalid ClientID, got ", err)
  28. }
  29. }
  30. func TestNetConfigValidates(t *testing.T) {
  31. tests := []struct {
  32. name string
  33. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  34. err string
  35. }{
  36. {
  37. "OpenRequests",
  38. func(cfg *Config) {
  39. cfg.Net.MaxOpenRequests = 0
  40. },
  41. "Net.MaxOpenRequests must be > 0"},
  42. {"DialTimeout",
  43. func(cfg *Config) {
  44. cfg.Net.DialTimeout = 0
  45. },
  46. "Net.DialTimeout must be > 0"},
  47. {"ReadTimeout",
  48. func(cfg *Config) {
  49. cfg.Net.ReadTimeout = 0
  50. },
  51. "Net.ReadTimeout must be > 0"},
  52. {"WriteTimeout",
  53. func(cfg *Config) {
  54. cfg.Net.WriteTimeout = 0
  55. },
  56. "Net.WriteTimeout must be > 0"},
  57. {"KeepAlive",
  58. func(cfg *Config) {
  59. cfg.Net.KeepAlive = -1
  60. },
  61. "Net.KeepAlive must be >= 0"},
  62. {"SASL.User",
  63. func(cfg *Config) {
  64. cfg.Net.SASL.Enable = true
  65. cfg.Net.SASL.User = ""
  66. },
  67. "Net.SASL.User must not be empty when SASL is enabled"},
  68. {"SASL.Password",
  69. func(cfg *Config) {
  70. cfg.Net.SASL.Enable = true
  71. cfg.Net.SASL.User = "user"
  72. cfg.Net.SASL.Password = ""
  73. },
  74. "Net.SASL.Password must not be empty when SASL is enabled"},
  75. }
  76. for i, test := range tests {
  77. c := NewConfig()
  78. test.cfg(c)
  79. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  80. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  81. }
  82. }
  83. }
  84. func TestMetadataConfigValidates(t *testing.T) {
  85. tests := []struct {
  86. name string
  87. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  88. err string
  89. }{
  90. {
  91. "Retry.Max",
  92. func(cfg *Config) {
  93. cfg.Metadata.Retry.Max = -1
  94. },
  95. "Metadata.Retry.Max must be >= 0"},
  96. {"Retry.Backoff",
  97. func(cfg *Config) {
  98. cfg.Metadata.Retry.Backoff = -1
  99. },
  100. "Metadata.Retry.Backoff must be >= 0"},
  101. {"RefreshFrequency",
  102. func(cfg *Config) {
  103. cfg.Metadata.RefreshFrequency = -1
  104. },
  105. "Metadata.RefreshFrequency must be >= 0"},
  106. }
  107. for i, test := range tests {
  108. c := NewConfig()
  109. test.cfg(c)
  110. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  111. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  112. }
  113. }
  114. }
  115. func TestAdminConfigValidates(t *testing.T) {
  116. tests := []struct {
  117. name string
  118. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  119. err string
  120. }{
  121. {"Timeout",
  122. func(cfg *Config) {
  123. cfg.Admin.Timeout = 0
  124. },
  125. "Admin.Timeout must be > 0"},
  126. }
  127. for i, test := range tests {
  128. c := NewConfig()
  129. test.cfg(c)
  130. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  131. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  132. }
  133. }
  134. }
  135. func TestProducerConfigValidates(t *testing.T) {
  136. tests := []struct {
  137. name string
  138. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  139. err string
  140. }{
  141. {
  142. "MaxMessageBytes",
  143. func(cfg *Config) {
  144. cfg.Producer.MaxMessageBytes = 0
  145. },
  146. "Producer.MaxMessageBytes must be > 0"},
  147. {"RequiredAcks",
  148. func(cfg *Config) {
  149. cfg.Producer.RequiredAcks = -2
  150. },
  151. "Producer.RequiredAcks must be >= -1"},
  152. {"Timeout",
  153. func(cfg *Config) {
  154. cfg.Producer.Timeout = 0
  155. },
  156. "Producer.Timeout must be > 0"},
  157. {"Partitioner",
  158. func(cfg *Config) {
  159. cfg.Producer.Partitioner = nil
  160. },
  161. "Producer.Partitioner must not be nil"},
  162. {"Flush.Bytes",
  163. func(cfg *Config) {
  164. cfg.Producer.Flush.Bytes = -1
  165. },
  166. "Producer.Flush.Bytes must be >= 0"},
  167. {"Flush.Messages",
  168. func(cfg *Config) {
  169. cfg.Producer.Flush.Messages = -1
  170. },
  171. "Producer.Flush.Messages must be >= 0"},
  172. {"Flush.Frequency",
  173. func(cfg *Config) {
  174. cfg.Producer.Flush.Frequency = -1
  175. },
  176. "Producer.Flush.Frequency must be >= 0"},
  177. {"Flush.MaxMessages",
  178. func(cfg *Config) {
  179. cfg.Producer.Flush.MaxMessages = -1
  180. },
  181. "Producer.Flush.MaxMessages must be >= 0"},
  182. {"Flush.MaxMessages with Producer.Flush.Messages",
  183. func(cfg *Config) {
  184. cfg.Producer.Flush.MaxMessages = 1
  185. cfg.Producer.Flush.Messages = 2
  186. },
  187. "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
  188. {"Flush.Retry.Max",
  189. func(cfg *Config) {
  190. cfg.Producer.Retry.Max = -1
  191. },
  192. "Producer.Retry.Max must be >= 0"},
  193. {"Flush.Retry.Backoff",
  194. func(cfg *Config) {
  195. cfg.Producer.Retry.Backoff = -1
  196. },
  197. "Producer.Retry.Backoff must be >= 0"},
  198. }
  199. for i, test := range tests {
  200. c := NewConfig()
  201. test.cfg(c)
  202. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  203. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  204. }
  205. }
  206. }
  207. func TestLZ4ConfigValidation(t *testing.T) {
  208. config := NewConfig()
  209. config.Producer.Compression = CompressionLZ4
  210. if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
  211. t.Error("Expected invalid lz4/kakfa version error, got ", err)
  212. }
  213. config.Version = V0_10_0_0
  214. if err := config.Validate(); err != nil {
  215. t.Error("Expected lz4 to work, got ", err)
  216. }
  217. }
  218. // This example shows how to integrate with an existing registry as well as publishing metrics
  219. // on the standard output
  220. func ExampleConfig_metrics() {
  221. // Our application registry
  222. appMetricRegistry := metrics.NewRegistry()
  223. appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
  224. appGauge.Update(1)
  225. config := NewConfig()
  226. // Use a prefix registry instead of the default local one
  227. config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
  228. // Simulate a metric created by sarama without starting a broker
  229. saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
  230. saramaGauge.Update(2)
  231. metrics.WriteOnce(appMetricRegistry, os.Stdout)
  232. // Output:
  233. // gauge m1
  234. // value: 1
  235. // gauge sarama.m2
  236. // value: 2
  237. }