config_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. package sarama
  2. import (
  3. "os"
  4. "testing"
  5. "github.com/rcrowley/go-metrics"
  6. )
  7. func TestDefaultConfigValidates(t *testing.T) {
  8. config := NewConfig()
  9. if err := config.Validate(); err != nil {
  10. t.Error(err)
  11. }
  12. if config.MetricRegistry == nil {
  13. t.Error("Expected non nil metrics.MetricRegistry, got nil")
  14. }
  15. }
  16. func TestInvalidClientIDConfigValidates(t *testing.T) {
  17. config := NewConfig()
  18. config.ClientID = "foo:bar"
  19. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  20. t.Error("Expected invalid ClientID, got ", err)
  21. }
  22. }
  23. func TestEmptyClientIDConfigValidates(t *testing.T) {
  24. config := NewConfig()
  25. config.ClientID = ""
  26. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  27. t.Error("Expected invalid ClientID, got ", err)
  28. }
  29. }
  30. type DummyTokenProvider struct {
  31. }
  32. func (t *DummyTokenProvider) Token() (*AccessToken, error) {
  33. return &AccessToken{Token: "access-token-string"}, nil
  34. }
  35. func TestNetConfigValidates(t *testing.T) {
  36. tests := []struct {
  37. name string
  38. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  39. err string
  40. }{
  41. {
  42. "OpenRequests",
  43. func(cfg *Config) {
  44. cfg.Net.MaxOpenRequests = 0
  45. },
  46. "Net.MaxOpenRequests must be > 0"},
  47. {"DialTimeout",
  48. func(cfg *Config) {
  49. cfg.Net.DialTimeout = 0
  50. },
  51. "Net.DialTimeout must be > 0"},
  52. {"ReadTimeout",
  53. func(cfg *Config) {
  54. cfg.Net.ReadTimeout = 0
  55. },
  56. "Net.ReadTimeout must be > 0"},
  57. {"WriteTimeout",
  58. func(cfg *Config) {
  59. cfg.Net.WriteTimeout = 0
  60. },
  61. "Net.WriteTimeout must be > 0"},
  62. {"SASL.User",
  63. func(cfg *Config) {
  64. cfg.Net.SASL.Enable = true
  65. cfg.Net.SASL.User = ""
  66. },
  67. "Net.SASL.User must not be empty when SASL is enabled"},
  68. {"SASL.Password",
  69. func(cfg *Config) {
  70. cfg.Net.SASL.Enable = true
  71. cfg.Net.SASL.User = "user"
  72. cfg.Net.SASL.Password = ""
  73. },
  74. "Net.SASL.Password must not be empty when SASL is enabled"},
  75. {"SASL.Mechanism - Invalid mechanism type",
  76. func(cfg *Config) {
  77. cfg.Net.SASL.Enable = true
  78. cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism"
  79. cfg.Net.SASL.TokenProvider = &DummyTokenProvider{}
  80. },
  81. "The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`"},
  82. {"SASL.Mechanism.OAUTHBEARER - Missing token provider",
  83. func(cfg *Config) {
  84. cfg.Net.SASL.Enable = true
  85. cfg.Net.SASL.Mechanism = SASLTypeOAuth
  86. cfg.Net.SASL.TokenProvider = nil
  87. },
  88. "An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider"},
  89. {"SASL.Mechanism SCRAM-SHA-256 - Missing SCRAM client",
  90. func(cfg *Config) {
  91. cfg.Net.SASL.Enable = true
  92. cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA256
  93. cfg.Net.SASL.SCRAMClientGeneratorFunc = nil
  94. cfg.Net.SASL.User = "user"
  95. cfg.Net.SASL.Password = "stong_password"
  96. },
  97. "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"},
  98. {"SASL.Mechanism SCRAM-SHA-512 - Missing SCRAM client",
  99. func(cfg *Config) {
  100. cfg.Net.SASL.Enable = true
  101. cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  102. cfg.Net.SASL.SCRAMClientGeneratorFunc = nil
  103. cfg.Net.SASL.User = "user"
  104. cfg.Net.SASL.Password = "stong_password"
  105. },
  106. "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"},
  107. {"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing password field",
  108. func(cfg *Config) {
  109. cfg.Net.SASL.Enable = true
  110. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  111. cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  112. cfg.Net.SASL.GSSAPI.Username = "sarama"
  113. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  114. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  115. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  116. },
  117. "Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
  118. "mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH"},
  119. {"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing KeyTabPath field",
  120. func(cfg *Config) {
  121. cfg.Net.SASL.Enable = true
  122. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  123. cfg.Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH
  124. cfg.Net.SASL.GSSAPI.Username = "sarama"
  125. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  126. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  127. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  128. },
  129. "Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
  130. " and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH"},
  131. {"SASL.Mechanism GSSAPI (Kerberos) - Missing username",
  132. func(cfg *Config) {
  133. cfg.Net.SASL.Enable = true
  134. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  135. cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  136. cfg.Net.SASL.GSSAPI.Password = "sarama"
  137. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  138. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  139. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  140. },
  141. "Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used"},
  142. {"SASL.Mechanism GSSAPI (Kerberos) - Missing ServiceName",
  143. func(cfg *Config) {
  144. cfg.Net.SASL.Enable = true
  145. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  146. cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  147. cfg.Net.SASL.GSSAPI.Username = "sarama"
  148. cfg.Net.SASL.GSSAPI.Password = "sarama"
  149. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  150. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  151. },
  152. "Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used"},
  153. {"SASL.Mechanism GSSAPI (Kerberos) - Missing AuthType",
  154. func(cfg *Config) {
  155. cfg.Net.SASL.Enable = true
  156. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  157. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  158. cfg.Net.SASL.GSSAPI.Username = "sarama"
  159. cfg.Net.SASL.GSSAPI.Password = "sarama"
  160. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  161. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  162. },
  163. "Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH"},
  164. {"SASL.Mechanism GSSAPI (Kerberos) - Missing KerberosConfigPath",
  165. func(cfg *Config) {
  166. cfg.Net.SASL.Enable = true
  167. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  168. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  169. cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  170. cfg.Net.SASL.GSSAPI.Username = "sarama"
  171. cfg.Net.SASL.GSSAPI.Password = "sarama"
  172. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  173. },
  174. "Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used"},
  175. {"SASL.Mechanism GSSAPI (Kerberos) - Missing Realm",
  176. func(cfg *Config) {
  177. cfg.Net.SASL.Enable = true
  178. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  179. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  180. cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  181. cfg.Net.SASL.GSSAPI.Username = "sarama"
  182. cfg.Net.SASL.GSSAPI.Password = "sarama"
  183. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  184. },
  185. "Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used"},
  186. }
  187. for i, test := range tests {
  188. c := NewConfig()
  189. test.cfg(c)
  190. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  191. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  192. }
  193. }
  194. }
  195. func TestMetadataConfigValidates(t *testing.T) {
  196. tests := []struct {
  197. name string
  198. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  199. err string
  200. }{
  201. {
  202. "Retry.Max",
  203. func(cfg *Config) {
  204. cfg.Metadata.Retry.Max = -1
  205. },
  206. "Metadata.Retry.Max must be >= 0"},
  207. {"Retry.Backoff",
  208. func(cfg *Config) {
  209. cfg.Metadata.Retry.Backoff = -1
  210. },
  211. "Metadata.Retry.Backoff must be >= 0"},
  212. {"RefreshFrequency",
  213. func(cfg *Config) {
  214. cfg.Metadata.RefreshFrequency = -1
  215. },
  216. "Metadata.RefreshFrequency must be >= 0"},
  217. }
  218. for i, test := range tests {
  219. c := NewConfig()
  220. test.cfg(c)
  221. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  222. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  223. }
  224. }
  225. }
  226. func TestAdminConfigValidates(t *testing.T) {
  227. tests := []struct {
  228. name string
  229. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  230. err string
  231. }{
  232. {"Timeout",
  233. func(cfg *Config) {
  234. cfg.Admin.Timeout = 0
  235. },
  236. "Admin.Timeout must be > 0"},
  237. }
  238. for i, test := range tests {
  239. c := NewConfig()
  240. test.cfg(c)
  241. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  242. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  243. }
  244. }
  245. }
  246. func TestProducerConfigValidates(t *testing.T) {
  247. tests := []struct {
  248. name string
  249. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  250. err string
  251. }{
  252. {
  253. "MaxMessageBytes",
  254. func(cfg *Config) {
  255. cfg.Producer.MaxMessageBytes = 0
  256. },
  257. "Producer.MaxMessageBytes must be > 0"},
  258. {"RequiredAcks",
  259. func(cfg *Config) {
  260. cfg.Producer.RequiredAcks = -2
  261. },
  262. "Producer.RequiredAcks must be >= -1"},
  263. {"Timeout",
  264. func(cfg *Config) {
  265. cfg.Producer.Timeout = 0
  266. },
  267. "Producer.Timeout must be > 0"},
  268. {"Partitioner",
  269. func(cfg *Config) {
  270. cfg.Producer.Partitioner = nil
  271. },
  272. "Producer.Partitioner must not be nil"},
  273. {"Flush.Bytes",
  274. func(cfg *Config) {
  275. cfg.Producer.Flush.Bytes = -1
  276. },
  277. "Producer.Flush.Bytes must be >= 0"},
  278. {"Flush.Messages",
  279. func(cfg *Config) {
  280. cfg.Producer.Flush.Messages = -1
  281. },
  282. "Producer.Flush.Messages must be >= 0"},
  283. {"Flush.Frequency",
  284. func(cfg *Config) {
  285. cfg.Producer.Flush.Frequency = -1
  286. },
  287. "Producer.Flush.Frequency must be >= 0"},
  288. {"Flush.MaxMessages",
  289. func(cfg *Config) {
  290. cfg.Producer.Flush.MaxMessages = -1
  291. },
  292. "Producer.Flush.MaxMessages must be >= 0"},
  293. {"Flush.MaxMessages with Producer.Flush.Messages",
  294. func(cfg *Config) {
  295. cfg.Producer.Flush.MaxMessages = 1
  296. cfg.Producer.Flush.Messages = 2
  297. },
  298. "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
  299. {"Flush.Retry.Max",
  300. func(cfg *Config) {
  301. cfg.Producer.Retry.Max = -1
  302. },
  303. "Producer.Retry.Max must be >= 0"},
  304. {"Flush.Retry.Backoff",
  305. func(cfg *Config) {
  306. cfg.Producer.Retry.Backoff = -1
  307. },
  308. "Producer.Retry.Backoff must be >= 0"},
  309. {"Idempotent Version",
  310. func(cfg *Config) {
  311. cfg.Producer.Idempotent = true
  312. cfg.Version = V0_10_0_0
  313. },
  314. "Idempotent producer requires Version >= V0_11_0_0"},
  315. {"Idempotent with Producer.Retry.Max",
  316. func(cfg *Config) {
  317. cfg.Version = V0_11_0_0
  318. cfg.Producer.Idempotent = true
  319. cfg.Producer.Retry.Max = 0
  320. },
  321. "Idempotent producer requires Producer.Retry.Max >= 1"},
  322. {"Idempotent with Producer.RequiredAcks",
  323. func(cfg *Config) {
  324. cfg.Version = V0_11_0_0
  325. cfg.Producer.Idempotent = true
  326. },
  327. "Idempotent producer requires Producer.RequiredAcks to be WaitForAll"},
  328. {"Idempotent with Net.MaxOpenRequests",
  329. func(cfg *Config) {
  330. cfg.Version = V0_11_0_0
  331. cfg.Producer.Idempotent = true
  332. cfg.Producer.RequiredAcks = WaitForAll
  333. },
  334. "Idempotent producer requires Net.MaxOpenRequests to be 1"},
  335. }
  336. for i, test := range tests {
  337. c := NewConfig()
  338. test.cfg(c)
  339. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  340. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  341. }
  342. }
  343. }
  344. func TestConsumerConfigValidates(t *testing.T) {
  345. tests := []struct {
  346. name string
  347. cfg func(*Config)
  348. err string
  349. }{
  350. {"ReadCommitted Version",
  351. func(cfg *Config) {
  352. cfg.Version = V0_10_0_0
  353. cfg.Consumer.IsolationLevel = ReadCommitted
  354. },
  355. "ReadCommitted requires Version >= V0_11_0_0",
  356. },
  357. {"Incorrect isolation level",
  358. func(cfg *Config) {
  359. cfg.Version = V0_11_0_0
  360. cfg.Consumer.IsolationLevel = int8(42)
  361. },
  362. "Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted",
  363. },
  364. }
  365. for i, test := range tests {
  366. c := NewConfig()
  367. test.cfg(c)
  368. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  369. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  370. }
  371. }
  372. }
  373. func TestLZ4ConfigValidation(t *testing.T) {
  374. config := NewConfig()
  375. config.Producer.Compression = CompressionLZ4
  376. if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
  377. t.Error("Expected invalid lz4/kafka version error, got ", err)
  378. }
  379. config.Version = V0_10_0_0
  380. if err := config.Validate(); err != nil {
  381. t.Error("Expected lz4 to work, got ", err)
  382. }
  383. }
  384. func TestZstdConfigValidation(t *testing.T) {
  385. config := NewConfig()
  386. config.Producer.Compression = CompressionZSTD
  387. if err := config.Validate(); string(err.(ConfigurationError)) != "zstd compression requires Version >= V2_1_0_0" {
  388. t.Error("Expected invalid zstd/kafka version error, got ", err)
  389. }
  390. config.Version = V2_1_0_0
  391. if err := config.Validate(); err != nil {
  392. t.Error("Expected zstd to work, got ", err)
  393. }
  394. }
  395. // This example shows how to integrate with an existing registry as well as publishing metrics
  396. // on the standard output
  397. func ExampleConfig_metrics() {
  398. // Our application registry
  399. appMetricRegistry := metrics.NewRegistry()
  400. appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
  401. appGauge.Update(1)
  402. config := NewConfig()
  403. // Use a prefix registry instead of the default local one
  404. config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
  405. // Simulate a metric created by sarama without starting a broker
  406. saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
  407. saramaGauge.Update(2)
  408. metrics.WriteOnce(appMetricRegistry, os.Stdout)
  409. // Output:
  410. // gauge m1
  411. // value: 1
  412. // gauge sarama.m2
  413. // value: 2
  414. }