config_test.go 8.1 KB


  1. package sarama
  2. import (
  3. "os"
  4. "testing"
  5. "github.com/rcrowley/go-metrics"
  6. )
  7. func TestDefaultConfigValidates(t *testing.T) {
  8. config := NewConfig()
  9. if err := config.Validate(); err != nil {
  10. t.Error(err)
  11. }
  12. if config.MetricRegistry == nil {
  13. t.Error("Expected non nil metrics.MetricRegistry, got nil")
  14. }
  15. }
  16. func TestInvalidClientIDConfigValidates(t *testing.T) {
  17. config := NewConfig()
  18. config.ClientID = "foo:bar"
  19. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  20. t.Error("Expected invalid ClientID, got ", err)
  21. }
  22. }
  23. func TestEmptyClientIDConfigValidates(t *testing.T) {
  24. config := NewConfig()
  25. config.ClientID = ""
  26. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  27. t.Error("Expected invalid ClientID, got ", err)
  28. }
  29. }
  30. type DummyTokenProvider struct {
  31. }
  32. func (t *DummyTokenProvider) Token() (string, error) {
  33. return "access-token-string", nil
  34. }
  35. func TestNetConfigValidates(t *testing.T) {
  36. tests := []struct {
  37. name string
  38. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  39. err string
  40. }{
  41. {
  42. "OpenRequests",
  43. func(cfg *Config) {
  44. cfg.Net.MaxOpenRequests = 0
  45. },
  46. "Net.MaxOpenRequests must be > 0"},
  47. {"DialTimeout",
  48. func(cfg *Config) {
  49. cfg.Net.DialTimeout = 0
  50. },
  51. "Net.DialTimeout must be > 0"},
  52. {"ReadTimeout",
  53. func(cfg *Config) {
  54. cfg.Net.ReadTimeout = 0
  55. },
  56. "Net.ReadTimeout must be > 0"},
  57. {"WriteTimeout",
  58. func(cfg *Config) {
  59. cfg.Net.WriteTimeout = 0
  60. },
  61. "Net.WriteTimeout must be > 0"},
  62. {"KeepAlive",
  63. func(cfg *Config) {
  64. cfg.Net.KeepAlive = -1
  65. },
  66. "Net.KeepAlive must be >= 0"},
  67. {"SASL.User",
  68. func(cfg *Config) {
  69. cfg.Net.SASL.Enable = true
  70. cfg.Net.SASL.User = ""
  71. },
  72. "Net.SASL.User must not be empty when SASL is enabled"},
  73. {"SASL.Password",
  74. func(cfg *Config) {
  75. cfg.Net.SASL.Enable = true
  76. cfg.Net.SASL.User = "user"
  77. cfg.Net.SASL.Password = ""
  78. },
  79. "Net.SASL.Password must not be empty when SASL is enabled"},
  80. {"SASL.Mechanism - Invalid mechanism type",
  81. func(cfg *Config) {
  82. cfg.Net.SASL.Enable = true
  83. cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism"
  84. cfg.Net.SASL.TokenProvider = &DummyTokenProvider{}
  85. },
  86. "The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER` and `PLAIN`"},
  87. {"SASL.Mechanism.OAUTHBEARER - Missing token provider",
  88. func(cfg *Config) {
  89. cfg.Net.SASL.Enable = true
  90. cfg.Net.SASL.Mechanism = SASLTypeOAuth
  91. cfg.Net.SASL.TokenProvider = nil
  92. },
  93. "An AccessTokenProvider instance must be provided to Net.SASL.User.TokenProvider"},
  94. }
  95. for i, test := range tests {
  96. c := NewConfig()
  97. test.cfg(c)
  98. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  99. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  100. }
  101. }
  102. }
  103. func TestMetadataConfigValidates(t *testing.T) {
  104. tests := []struct {
  105. name string
  106. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  107. err string
  108. }{
  109. {
  110. "Retry.Max",
  111. func(cfg *Config) {
  112. cfg.Metadata.Retry.Max = -1
  113. },
  114. "Metadata.Retry.Max must be >= 0"},
  115. {"Retry.Backoff",
  116. func(cfg *Config) {
  117. cfg.Metadata.Retry.Backoff = -1
  118. },
  119. "Metadata.Retry.Backoff must be >= 0"},
  120. {"RefreshFrequency",
  121. func(cfg *Config) {
  122. cfg.Metadata.RefreshFrequency = -1
  123. },
  124. "Metadata.RefreshFrequency must be >= 0"},
  125. }
  126. for i, test := range tests {
  127. c := NewConfig()
  128. test.cfg(c)
  129. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  130. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  131. }
  132. }
  133. }
  134. func TestAdminConfigValidates(t *testing.T) {
  135. tests := []struct {
  136. name string
  137. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  138. err string
  139. }{
  140. {"Timeout",
  141. func(cfg *Config) {
  142. cfg.Admin.Timeout = 0
  143. },
  144. "Admin.Timeout must be > 0"},
  145. }
  146. for i, test := range tests {
  147. c := NewConfig()
  148. test.cfg(c)
  149. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  150. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  151. }
  152. }
  153. }
  154. func TestProducerConfigValidates(t *testing.T) {
  155. tests := []struct {
  156. name string
  157. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  158. err string
  159. }{
  160. {
  161. "MaxMessageBytes",
  162. func(cfg *Config) {
  163. cfg.Producer.MaxMessageBytes = 0
  164. },
  165. "Producer.MaxMessageBytes must be > 0"},
  166. {"RequiredAcks",
  167. func(cfg *Config) {
  168. cfg.Producer.RequiredAcks = -2
  169. },
  170. "Producer.RequiredAcks must be >= -1"},
  171. {"Timeout",
  172. func(cfg *Config) {
  173. cfg.Producer.Timeout = 0
  174. },
  175. "Producer.Timeout must be > 0"},
  176. {"Partitioner",
  177. func(cfg *Config) {
  178. cfg.Producer.Partitioner = nil
  179. },
  180. "Producer.Partitioner must not be nil"},
  181. {"Flush.Bytes",
  182. func(cfg *Config) {
  183. cfg.Producer.Flush.Bytes = -1
  184. },
  185. "Producer.Flush.Bytes must be >= 0"},
  186. {"Flush.Messages",
  187. func(cfg *Config) {
  188. cfg.Producer.Flush.Messages = -1
  189. },
  190. "Producer.Flush.Messages must be >= 0"},
  191. {"Flush.Frequency",
  192. func(cfg *Config) {
  193. cfg.Producer.Flush.Frequency = -1
  194. },
  195. "Producer.Flush.Frequency must be >= 0"},
  196. {"Flush.MaxMessages",
  197. func(cfg *Config) {
  198. cfg.Producer.Flush.MaxMessages = -1
  199. },
  200. "Producer.Flush.MaxMessages must be >= 0"},
  201. {"Flush.MaxMessages with Producer.Flush.Messages",
  202. func(cfg *Config) {
  203. cfg.Producer.Flush.MaxMessages = 1
  204. cfg.Producer.Flush.Messages = 2
  205. },
  206. "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
  207. {"Flush.Retry.Max",
  208. func(cfg *Config) {
  209. cfg.Producer.Retry.Max = -1
  210. },
  211. "Producer.Retry.Max must be >= 0"},
  212. {"Flush.Retry.Backoff",
  213. func(cfg *Config) {
  214. cfg.Producer.Retry.Backoff = -1
  215. },
  216. "Producer.Retry.Backoff must be >= 0"},
  217. {"Idempotent Version",
  218. func(cfg *Config) {
  219. cfg.Producer.Idempotent = true
  220. cfg.Version = V0_10_0_0
  221. },
  222. "Idempotent producer requires Version >= V0_11_0_0"},
  223. {"Idempotent with Producer.Retry.Max",
  224. func(cfg *Config) {
  225. cfg.Version = V0_11_0_0
  226. cfg.Producer.Idempotent = true
  227. cfg.Producer.Retry.Max = 0
  228. },
  229. "Idempotent producer requires Producer.Retry.Max >= 1"},
  230. {"Idempotent with Producer.RequiredAcks",
  231. func(cfg *Config) {
  232. cfg.Version = V0_11_0_0
  233. cfg.Producer.Idempotent = true
  234. },
  235. "Idempotent producer requires Producer.RequiredAcks to be WaitForAll"},
  236. {"Idempotent with Net.MaxOpenRequests",
  237. func(cfg *Config) {
  238. cfg.Version = V0_11_0_0
  239. cfg.Producer.Idempotent = true
  240. cfg.Producer.RequiredAcks = WaitForAll
  241. },
  242. "Idempotent producer requires Net.MaxOpenRequests to be 1"},
  243. }
  244. for i, test := range tests {
  245. c := NewConfig()
  246. test.cfg(c)
  247. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  248. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  249. }
  250. }
  251. }
  252. func TestLZ4ConfigValidation(t *testing.T) {
  253. config := NewConfig()
  254. config.Producer.Compression = CompressionLZ4
  255. if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
  256. t.Error("Expected invalid lz4/kafka version error, got ", err)
  257. }
  258. config.Version = V0_10_0_0
  259. if err := config.Validate(); err != nil {
  260. t.Error("Expected lz4 to work, got ", err)
  261. }
  262. }
  263. // This example shows how to integrate with an existing registry as well as publishing metrics
  264. // on the standard output
  265. func ExampleConfig_metrics() {
  266. // Our application registry
  267. appMetricRegistry := metrics.NewRegistry()
  268. appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
  269. appGauge.Update(1)
  270. config := NewConfig()
  271. // Use a prefix registry instead of the default local one
  272. config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
  273. // Simulate a metric created by sarama without starting a broker
  274. saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
  275. saramaGauge.Update(2)
  276. metrics.WriteOnce(appMetricRegistry, os.Stdout)
  277. // Output:
  278. // gauge m1
  279. // value: 1
  280. // gauge sarama.m2
  281. // value: 2
  282. }