config_test.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. package sarama
  2. import (
  3. "os"
  4. "testing"
  5. "github.com/rcrowley/go-metrics"
  6. )
  7. func TestDefaultConfigValidates(t *testing.T) {
  8. config := NewConfig()
  9. if err := config.Validate(); err != nil {
  10. t.Error(err)
  11. }
  12. if config.MetricRegistry == nil {
  13. t.Error("Expected non nil metrics.MetricRegistry, got nil")
  14. }
  15. }
  16. func TestInvalidClientIDConfigValidates(t *testing.T) {
  17. config := NewConfig()
  18. config.ClientID = "foo:bar"
  19. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  20. t.Error("Expected invalid ClientID, got ", err)
  21. }
  22. }
  23. func TestEmptyClientIDConfigValidates(t *testing.T) {
  24. config := NewConfig()
  25. config.ClientID = ""
  26. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  27. t.Error("Expected invalid ClientID, got ", err)
  28. }
  29. }
  30. type DummyTokenProvider struct {
  31. }
  32. func (t *DummyTokenProvider) Token() (*AccessToken, error) {
  33. return &AccessToken{Token: "access-token-string"}, nil
  34. }
  35. func TestNetConfigValidates(t *testing.T) {
  36. tests := []struct {
  37. name string
  38. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  39. err string
  40. }{
  41. {
  42. "OpenRequests",
  43. func(cfg *Config) {
  44. cfg.Net.MaxOpenRequests = 0
  45. },
  46. "Net.MaxOpenRequests must be > 0"},
  47. {"DialTimeout",
  48. func(cfg *Config) {
  49. cfg.Net.DialTimeout = 0
  50. },
  51. "Net.DialTimeout must be > 0"},
  52. {"ReadTimeout",
  53. func(cfg *Config) {
  54. cfg.Net.ReadTimeout = 0
  55. },
  56. "Net.ReadTimeout must be > 0"},
  57. {"WriteTimeout",
  58. func(cfg *Config) {
  59. cfg.Net.WriteTimeout = 0
  60. },
  61. "Net.WriteTimeout must be > 0"},
  62. {"KeepAlive",
  63. func(cfg *Config) {
  64. cfg.Net.KeepAlive = -1
  65. },
  66. "Net.KeepAlive must be >= 0"},
  67. {"SASL.User",
  68. func(cfg *Config) {
  69. cfg.Net.SASL.Enable = true
  70. cfg.Net.SASL.User = ""
  71. },
  72. "Net.SASL.User must not be empty when SASL is enabled"},
  73. {"SASL.Password",
  74. func(cfg *Config) {
  75. cfg.Net.SASL.Enable = true
  76. cfg.Net.SASL.User = "user"
  77. cfg.Net.SASL.Password = ""
  78. },
  79. "Net.SASL.Password must not be empty when SASL is enabled"},
  80. {"SASL.Mechanism - Invalid mechanism type",
  81. func(cfg *Config) {
  82. cfg.Net.SASL.Enable = true
  83. cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism"
  84. cfg.Net.SASL.TokenProvider = &DummyTokenProvider{}
  85. },
  86. "The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256` and `SCRAM-SHA-512`"},
  87. {"SASL.Mechanism.OAUTHBEARER - Missing token provider",
  88. func(cfg *Config) {
  89. cfg.Net.SASL.Enable = true
  90. cfg.Net.SASL.Mechanism = SASLTypeOAuth
  91. cfg.Net.SASL.TokenProvider = nil
  92. },
  93. "An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider"},
  94. {"SASL.Mechanism SCRAM-SHA-256 - Missing SCRAM client",
  95. func(cfg *Config) {
  96. cfg.Net.SASL.Enable = true
  97. cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA256
  98. cfg.Net.SASL.SCRAMClient = nil
  99. cfg.Net.SASL.User = "user"
  100. cfg.Net.SASL.Password = "stong_password"
  101. },
  102. "A SCRAMClient instance must be provided to Net.SASL.SCRAMClient"},
  103. {"SASL.Mechanism SCRAM-SHA-512 - Missing SCRAM client",
  104. func(cfg *Config) {
  105. cfg.Net.SASL.Enable = true
  106. cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  107. cfg.Net.SASL.SCRAMClient = nil
  108. cfg.Net.SASL.User = "user"
  109. cfg.Net.SASL.Password = "stong_password"
  110. },
  111. "A SCRAMClient instance must be provided to Net.SASL.SCRAMClient"},
  112. }
  113. for i, test := range tests {
  114. c := NewConfig()
  115. test.cfg(c)
  116. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  117. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  118. }
  119. }
  120. }
  121. func TestMetadataConfigValidates(t *testing.T) {
  122. tests := []struct {
  123. name string
  124. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  125. err string
  126. }{
  127. {
  128. "Retry.Max",
  129. func(cfg *Config) {
  130. cfg.Metadata.Retry.Max = -1
  131. },
  132. "Metadata.Retry.Max must be >= 0"},
  133. {"Retry.Backoff",
  134. func(cfg *Config) {
  135. cfg.Metadata.Retry.Backoff = -1
  136. },
  137. "Metadata.Retry.Backoff must be >= 0"},
  138. {"RefreshFrequency",
  139. func(cfg *Config) {
  140. cfg.Metadata.RefreshFrequency = -1
  141. },
  142. "Metadata.RefreshFrequency must be >= 0"},
  143. }
  144. for i, test := range tests {
  145. c := NewConfig()
  146. test.cfg(c)
  147. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  148. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  149. }
  150. }
  151. }
  152. func TestAdminConfigValidates(t *testing.T) {
  153. tests := []struct {
  154. name string
  155. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  156. err string
  157. }{
  158. {"Timeout",
  159. func(cfg *Config) {
  160. cfg.Admin.Timeout = 0
  161. },
  162. "Admin.Timeout must be > 0"},
  163. }
  164. for i, test := range tests {
  165. c := NewConfig()
  166. test.cfg(c)
  167. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  168. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  169. }
  170. }
  171. }
  172. func TestProducerConfigValidates(t *testing.T) {
  173. tests := []struct {
  174. name string
  175. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  176. err string
  177. }{
  178. {
  179. "MaxMessageBytes",
  180. func(cfg *Config) {
  181. cfg.Producer.MaxMessageBytes = 0
  182. },
  183. "Producer.MaxMessageBytes must be > 0"},
  184. {"RequiredAcks",
  185. func(cfg *Config) {
  186. cfg.Producer.RequiredAcks = -2
  187. },
  188. "Producer.RequiredAcks must be >= -1"},
  189. {"Timeout",
  190. func(cfg *Config) {
  191. cfg.Producer.Timeout = 0
  192. },
  193. "Producer.Timeout must be > 0"},
  194. {"Partitioner",
  195. func(cfg *Config) {
  196. cfg.Producer.Partitioner = nil
  197. },
  198. "Producer.Partitioner must not be nil"},
  199. {"Flush.Bytes",
  200. func(cfg *Config) {
  201. cfg.Producer.Flush.Bytes = -1
  202. },
  203. "Producer.Flush.Bytes must be >= 0"},
  204. {"Flush.Messages",
  205. func(cfg *Config) {
  206. cfg.Producer.Flush.Messages = -1
  207. },
  208. "Producer.Flush.Messages must be >= 0"},
  209. {"Flush.Frequency",
  210. func(cfg *Config) {
  211. cfg.Producer.Flush.Frequency = -1
  212. },
  213. "Producer.Flush.Frequency must be >= 0"},
  214. {"Flush.MaxMessages",
  215. func(cfg *Config) {
  216. cfg.Producer.Flush.MaxMessages = -1
  217. },
  218. "Producer.Flush.MaxMessages must be >= 0"},
  219. {"Flush.MaxMessages with Producer.Flush.Messages",
  220. func(cfg *Config) {
  221. cfg.Producer.Flush.MaxMessages = 1
  222. cfg.Producer.Flush.Messages = 2
  223. },
  224. "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
  225. {"Flush.Retry.Max",
  226. func(cfg *Config) {
  227. cfg.Producer.Retry.Max = -1
  228. },
  229. "Producer.Retry.Max must be >= 0"},
  230. {"Flush.Retry.Backoff",
  231. func(cfg *Config) {
  232. cfg.Producer.Retry.Backoff = -1
  233. },
  234. "Producer.Retry.Backoff must be >= 0"},
  235. {"Idempotent Version",
  236. func(cfg *Config) {
  237. cfg.Producer.Idempotent = true
  238. cfg.Version = V0_10_0_0
  239. },
  240. "Idempotent producer requires Version >= V0_11_0_0"},
  241. {"Idempotent with Producer.Retry.Max",
  242. func(cfg *Config) {
  243. cfg.Version = V0_11_0_0
  244. cfg.Producer.Idempotent = true
  245. cfg.Producer.Retry.Max = 0
  246. },
  247. "Idempotent producer requires Producer.Retry.Max >= 1"},
  248. {"Idempotent with Producer.RequiredAcks",
  249. func(cfg *Config) {
  250. cfg.Version = V0_11_0_0
  251. cfg.Producer.Idempotent = true
  252. },
  253. "Idempotent producer requires Producer.RequiredAcks to be WaitForAll"},
  254. {"Idempotent with Net.MaxOpenRequests",
  255. func(cfg *Config) {
  256. cfg.Version = V0_11_0_0
  257. cfg.Producer.Idempotent = true
  258. cfg.Producer.RequiredAcks = WaitForAll
  259. },
  260. "Idempotent producer requires Net.MaxOpenRequests to be 1"},
  261. }
  262. for i, test := range tests {
  263. c := NewConfig()
  264. test.cfg(c)
  265. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  266. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  267. }
  268. }
  269. }
  270. func TestConsumerConfigValidates(t *testing.T) {
  271. tests := []struct {
  272. name string
  273. cfg func(*Config)
  274. err string
  275. }{
  276. {"ReadCommitted Version",
  277. func(cfg *Config) {
  278. cfg.Version = V0_10_0_0
  279. cfg.Consumer.IsolationLevel = ReadCommitted
  280. },
  281. "ReadCommitted requires Version >= V0_11_0_0",
  282. },
  283. {"Incorrect isolation level",
  284. func(cfg *Config) {
  285. cfg.Version = V0_11_0_0
  286. cfg.Consumer.IsolationLevel = IsolationLevel(42)
  287. },
  288. "Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted",
  289. },
  290. }
  291. for i, test := range tests {
  292. c := NewConfig()
  293. test.cfg(c)
  294. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  295. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  296. }
  297. }
  298. }
  299. func TestLZ4ConfigValidation(t *testing.T) {
  300. config := NewConfig()
  301. config.Producer.Compression = CompressionLZ4
  302. if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
  303. t.Error("Expected invalid lz4/kafka version error, got ", err)
  304. }
  305. config.Version = V0_10_0_0
  306. if err := config.Validate(); err != nil {
  307. t.Error("Expected lz4 to work, got ", err)
  308. }
  309. }
  310. // This example shows how to integrate with an existing registry as well as publishing metrics
  311. // on the standard output
  312. func ExampleConfig_metrics() {
  313. // Our application registry
  314. appMetricRegistry := metrics.NewRegistry()
  315. appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
  316. appGauge.Update(1)
  317. config := NewConfig()
  318. // Use a prefix registry instead of the default local one
  319. config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
  320. // Simulate a metric created by sarama without starting a broker
  321. saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
  322. saramaGauge.Update(2)
  323. metrics.WriteOnce(appMetricRegistry, os.Stdout)
  324. // Output:
  325. // gauge m1
  326. // value: 1
  327. // gauge sarama.m2
  328. // value: 2
  329. }