config_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441
  1. package sarama
  2. import (
  3. "os"
  4. "testing"
  5. "github.com/rcrowley/go-metrics"
  6. )
  7. func TestDefaultConfigValidates(t *testing.T) {
  8. config := NewConfig()
  9. if err := config.Validate(); err != nil {
  10. t.Error(err)
  11. }
  12. if config.MetricRegistry == nil {
  13. t.Error("Expected non nil metrics.MetricRegistry, got nil")
  14. }
  15. }
  16. func TestInvalidClientIDConfigValidates(t *testing.T) {
  17. config := NewConfig()
  18. config.ClientID = "foo:bar"
  19. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  20. t.Error("Expected invalid ClientID, got ", err)
  21. }
  22. }
  23. func TestEmptyClientIDConfigValidates(t *testing.T) {
  24. config := NewConfig()
  25. config.ClientID = ""
  26. if err := config.Validate(); string(err.(ConfigurationError)) != "ClientID is invalid" {
  27. t.Error("Expected invalid ClientID, got ", err)
  28. }
  29. }
  30. type DummyTokenProvider struct {
  31. }
  32. func (t *DummyTokenProvider) Token() (*AccessToken, error) {
  33. return &AccessToken{Token: "access-token-string"}, nil
  34. }
  35. func TestNetConfigValidates(t *testing.T) {
  36. tests := []struct {
  37. name string
  38. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  39. err string
  40. }{
  41. {
  42. "OpenRequests",
  43. func(cfg *Config) {
  44. cfg.Net.MaxOpenRequests = 0
  45. },
  46. "Net.MaxOpenRequests must be > 0"},
  47. {"DialTimeout",
  48. func(cfg *Config) {
  49. cfg.Net.DialTimeout = 0
  50. },
  51. "Net.DialTimeout must be > 0"},
  52. {"ReadTimeout",
  53. func(cfg *Config) {
  54. cfg.Net.ReadTimeout = 0
  55. },
  56. "Net.ReadTimeout must be > 0"},
  57. {"WriteTimeout",
  58. func(cfg *Config) {
  59. cfg.Net.WriteTimeout = 0
  60. },
  61. "Net.WriteTimeout must be > 0"},
  62. {"KeepAlive",
  63. func(cfg *Config) {
  64. cfg.Net.KeepAlive = -1
  65. },
  66. "Net.KeepAlive must be >= 0"},
  67. {"SASL.User",
  68. func(cfg *Config) {
  69. cfg.Net.SASL.Enable = true
  70. cfg.Net.SASL.User = ""
  71. },
  72. "Net.SASL.User must not be empty when SASL is enabled"},
  73. {"SASL.Password",
  74. func(cfg *Config) {
  75. cfg.Net.SASL.Enable = true
  76. cfg.Net.SASL.User = "user"
  77. cfg.Net.SASL.Password = ""
  78. },
  79. "Net.SASL.Password must not be empty when SASL is enabled"},
  80. {"SASL.Mechanism - Invalid mechanism type",
  81. func(cfg *Config) {
  82. cfg.Net.SASL.Enable = true
  83. cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism"
  84. cfg.Net.SASL.TokenProvider = &DummyTokenProvider{}
  85. },
  86. "The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`"},
  87. {"SASL.Mechanism.OAUTHBEARER - Missing token provider",
  88. func(cfg *Config) {
  89. cfg.Net.SASL.Enable = true
  90. cfg.Net.SASL.Mechanism = SASLTypeOAuth
  91. cfg.Net.SASL.TokenProvider = nil
  92. },
  93. "An AccessTokenProvider instance must be provided to Net.SASL.TokenProvider"},
  94. {"SASL.Mechanism SCRAM-SHA-256 - Missing SCRAM client",
  95. func(cfg *Config) {
  96. cfg.Net.SASL.Enable = true
  97. cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA256
  98. cfg.Net.SASL.SCRAMClientGeneratorFunc = nil
  99. cfg.Net.SASL.User = "user"
  100. cfg.Net.SASL.Password = "stong_password"
  101. },
  102. "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"},
  103. {"SASL.Mechanism SCRAM-SHA-512 - Missing SCRAM client",
  104. func(cfg *Config) {
  105. cfg.Net.SASL.Enable = true
  106. cfg.Net.SASL.Mechanism = SASLTypeSCRAMSHA512
  107. cfg.Net.SASL.SCRAMClientGeneratorFunc = nil
  108. cfg.Net.SASL.User = "user"
  109. cfg.Net.SASL.Password = "stong_password"
  110. },
  111. "A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"},
  112. {"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing password field",
  113. func(cfg *Config) {
  114. cfg.Net.SASL.Enable = true
  115. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  116. cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  117. cfg.Net.SASL.GSSAPI.Username = "sarama"
  118. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  119. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  120. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  121. },
  122. "Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
  123. "mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH"},
  124. {"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing KeyTabPath field",
  125. func(cfg *Config) {
  126. cfg.Net.SASL.Enable = true
  127. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  128. cfg.Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH
  129. cfg.Net.SASL.GSSAPI.Username = "sarama"
  130. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  131. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  132. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  133. },
  134. "Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
  135. " and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH"},
  136. {"SASL.Mechanism GSSAPI (Kerberos) - Missing username",
  137. func(cfg *Config) {
  138. cfg.Net.SASL.Enable = true
  139. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  140. cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  141. cfg.Net.SASL.GSSAPI.Password = "sarama"
  142. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  143. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  144. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  145. },
  146. "Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used"},
  147. {"SASL.Mechanism GSSAPI (Kerberos) - Missing ServiceName",
  148. func(cfg *Config) {
  149. cfg.Net.SASL.Enable = true
  150. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  151. cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  152. cfg.Net.SASL.GSSAPI.Username = "sarama"
  153. cfg.Net.SASL.GSSAPI.Password = "sarama"
  154. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  155. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  156. },
  157. "Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used"},
  158. {"SASL.Mechanism GSSAPI (Kerberos) - Missing AuthType",
  159. func(cfg *Config) {
  160. cfg.Net.SASL.Enable = true
  161. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  162. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  163. cfg.Net.SASL.GSSAPI.Username = "sarama"
  164. cfg.Net.SASL.GSSAPI.Password = "sarama"
  165. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  166. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  167. },
  168. "Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH"},
  169. {"SASL.Mechanism GSSAPI (Kerberos) - Missing KerberosConfigPath",
  170. func(cfg *Config) {
  171. cfg.Net.SASL.Enable = true
  172. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  173. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  174. cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  175. cfg.Net.SASL.GSSAPI.Username = "sarama"
  176. cfg.Net.SASL.GSSAPI.Password = "sarama"
  177. cfg.Net.SASL.GSSAPI.Realm = "kafka"
  178. },
  179. "Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used"},
  180. {"SASL.Mechanism GSSAPI (Kerberos) - Missing Realm",
  181. func(cfg *Config) {
  182. cfg.Net.SASL.Enable = true
  183. cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
  184. cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
  185. cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
  186. cfg.Net.SASL.GSSAPI.Username = "sarama"
  187. cfg.Net.SASL.GSSAPI.Password = "sarama"
  188. cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
  189. },
  190. "Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used"},
  191. }
  192. for i, test := range tests {
  193. c := NewConfig()
  194. test.cfg(c)
  195. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  196. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  197. }
  198. }
  199. }
  200. func TestMetadataConfigValidates(t *testing.T) {
  201. tests := []struct {
  202. name string
  203. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  204. err string
  205. }{
  206. {
  207. "Retry.Max",
  208. func(cfg *Config) {
  209. cfg.Metadata.Retry.Max = -1
  210. },
  211. "Metadata.Retry.Max must be >= 0"},
  212. {"Retry.Backoff",
  213. func(cfg *Config) {
  214. cfg.Metadata.Retry.Backoff = -1
  215. },
  216. "Metadata.Retry.Backoff must be >= 0"},
  217. {"RefreshFrequency",
  218. func(cfg *Config) {
  219. cfg.Metadata.RefreshFrequency = -1
  220. },
  221. "Metadata.RefreshFrequency must be >= 0"},
  222. }
  223. for i, test := range tests {
  224. c := NewConfig()
  225. test.cfg(c)
  226. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  227. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  228. }
  229. }
  230. }
  231. func TestAdminConfigValidates(t *testing.T) {
  232. tests := []struct {
  233. name string
  234. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  235. err string
  236. }{
  237. {"Timeout",
  238. func(cfg *Config) {
  239. cfg.Admin.Timeout = 0
  240. },
  241. "Admin.Timeout must be > 0"},
  242. }
  243. for i, test := range tests {
  244. c := NewConfig()
  245. test.cfg(c)
  246. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  247. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  248. }
  249. }
  250. }
  251. func TestProducerConfigValidates(t *testing.T) {
  252. tests := []struct {
  253. name string
  254. cfg func(*Config) // resorting to using a function as a param because of internal composite structs
  255. err string
  256. }{
  257. {
  258. "MaxMessageBytes",
  259. func(cfg *Config) {
  260. cfg.Producer.MaxMessageBytes = 0
  261. },
  262. "Producer.MaxMessageBytes must be > 0"},
  263. {"RequiredAcks",
  264. func(cfg *Config) {
  265. cfg.Producer.RequiredAcks = -2
  266. },
  267. "Producer.RequiredAcks must be >= -1"},
  268. {"Timeout",
  269. func(cfg *Config) {
  270. cfg.Producer.Timeout = 0
  271. },
  272. "Producer.Timeout must be > 0"},
  273. {"Partitioner",
  274. func(cfg *Config) {
  275. cfg.Producer.Partitioner = nil
  276. },
  277. "Producer.Partitioner must not be nil"},
  278. {"Flush.Bytes",
  279. func(cfg *Config) {
  280. cfg.Producer.Flush.Bytes = -1
  281. },
  282. "Producer.Flush.Bytes must be >= 0"},
  283. {"Flush.Messages",
  284. func(cfg *Config) {
  285. cfg.Producer.Flush.Messages = -1
  286. },
  287. "Producer.Flush.Messages must be >= 0"},
  288. {"Flush.Frequency",
  289. func(cfg *Config) {
  290. cfg.Producer.Flush.Frequency = -1
  291. },
  292. "Producer.Flush.Frequency must be >= 0"},
  293. {"Flush.MaxMessages",
  294. func(cfg *Config) {
  295. cfg.Producer.Flush.MaxMessages = -1
  296. },
  297. "Producer.Flush.MaxMessages must be >= 0"},
  298. {"Flush.MaxMessages with Producer.Flush.Messages",
  299. func(cfg *Config) {
  300. cfg.Producer.Flush.MaxMessages = 1
  301. cfg.Producer.Flush.Messages = 2
  302. },
  303. "Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set"},
  304. {"Flush.Retry.Max",
  305. func(cfg *Config) {
  306. cfg.Producer.Retry.Max = -1
  307. },
  308. "Producer.Retry.Max must be >= 0"},
  309. {"Flush.Retry.Backoff",
  310. func(cfg *Config) {
  311. cfg.Producer.Retry.Backoff = -1
  312. },
  313. "Producer.Retry.Backoff must be >= 0"},
  314. {"Idempotent Version",
  315. func(cfg *Config) {
  316. cfg.Producer.Idempotent = true
  317. cfg.Version = V0_10_0_0
  318. },
  319. "Idempotent producer requires Version >= V0_11_0_0"},
  320. {"Idempotent with Producer.Retry.Max",
  321. func(cfg *Config) {
  322. cfg.Version = V0_11_0_0
  323. cfg.Producer.Idempotent = true
  324. cfg.Producer.Retry.Max = 0
  325. },
  326. "Idempotent producer requires Producer.Retry.Max >= 1"},
  327. {"Idempotent with Producer.RequiredAcks",
  328. func(cfg *Config) {
  329. cfg.Version = V0_11_0_0
  330. cfg.Producer.Idempotent = true
  331. },
  332. "Idempotent producer requires Producer.RequiredAcks to be WaitForAll"},
  333. {"Idempotent with Net.MaxOpenRequests",
  334. func(cfg *Config) {
  335. cfg.Version = V0_11_0_0
  336. cfg.Producer.Idempotent = true
  337. cfg.Producer.RequiredAcks = WaitForAll
  338. },
  339. "Idempotent producer requires Net.MaxOpenRequests to be 1"},
  340. }
  341. for i, test := range tests {
  342. c := NewConfig()
  343. test.cfg(c)
  344. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  345. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  346. }
  347. }
  348. }
  349. func TestConsumerConfigValidates(t *testing.T) {
  350. tests := []struct {
  351. name string
  352. cfg func(*Config)
  353. err string
  354. }{
  355. {"ReadCommitted Version",
  356. func(cfg *Config) {
  357. cfg.Version = V0_10_0_0
  358. cfg.Consumer.IsolationLevel = ReadCommitted
  359. },
  360. "ReadCommitted requires Version >= V0_11_0_0",
  361. },
  362. {"Incorrect isolation level",
  363. func(cfg *Config) {
  364. cfg.Version = V0_11_0_0
  365. cfg.Consumer.IsolationLevel = IsolationLevel(42)
  366. },
  367. "Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted",
  368. },
  369. }
  370. for i, test := range tests {
  371. c := NewConfig()
  372. test.cfg(c)
  373. if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
  374. t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
  375. }
  376. }
  377. }
  378. func TestLZ4ConfigValidation(t *testing.T) {
  379. config := NewConfig()
  380. config.Producer.Compression = CompressionLZ4
  381. if err := config.Validate(); string(err.(ConfigurationError)) != "lz4 compression requires Version >= V0_10_0_0" {
  382. t.Error("Expected invalid lz4/kafka version error, got ", err)
  383. }
  384. config.Version = V0_10_0_0
  385. if err := config.Validate(); err != nil {
  386. t.Error("Expected lz4 to work, got ", err)
  387. }
  388. }
  389. func TestZstdConfigValidation(t *testing.T) {
  390. config := NewConfig()
  391. config.Producer.Compression = CompressionZSTD
  392. if err := config.Validate(); string(err.(ConfigurationError)) != "zstd compression requires Version >= V2_1_0_0" {
  393. t.Error("Expected invalid zstd/kafka version error, got ", err)
  394. }
  395. config.Version = V2_1_0_0
  396. if err := config.Validate(); err != nil {
  397. t.Error("Expected zstd to work, got ", err)
  398. }
  399. }
  400. // This example shows how to integrate with an existing registry as well as publishing metrics
  401. // on the standard output
  402. func ExampleConfig_metrics() {
  403. // Our application registry
  404. appMetricRegistry := metrics.NewRegistry()
  405. appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
  406. appGauge.Update(1)
  407. config := NewConfig()
  408. // Use a prefix registry instead of the default local one
  409. config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")
  410. // Simulate a metric created by sarama without starting a broker
  411. saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
  412. saramaGauge.Update(2)
  413. metrics.WriteOnce(appMetricRegistry, os.Stdout)
  414. // Output:
  415. // gauge m1
  416. // value: 1
  417. // gauge sarama.m2
  418. // value: 2
  419. }