multiproducer_test.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package sarama
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "testing"
  6. "time"
  7. )
  8. func TestSimpleMultiProducer(t *testing.T) {
  9. responses := make(chan []byte, 1)
  10. extraResponses := make(chan []byte)
  11. mockBroker := NewMockBroker(t, responses)
  12. mockExtra := NewMockBroker(t, extraResponses)
  13. defer mockBroker.Close()
  14. defer mockExtra.Close()
  15. // return the extra mock as another available broker
  16. response := []byte{
  17. 0x00, 0x00, 0x00, 0x01,
  18. 0x00, 0x00, 0x00, 0x01,
  19. 0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
  20. 0x00, 0x00, 0x00, 0x00,
  21. 0x00, 0x00, 0x00, 0x01,
  22. 0x00, 0x00,
  23. 0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
  24. 0x00, 0x00, 0x00, 0x01,
  25. 0x00, 0x00,
  26. 0x00, 0x00, 0x00, 0x00,
  27. 0x00, 0x00, 0x00, 0x01,
  28. 0x00, 0x00, 0x00, 0x00,
  29. 0x00, 0x00, 0x00, 0x00}
  30. binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
  31. responses <- response
  32. go func() {
  33. msg := []byte{
  34. 0x00, 0x00, 0x00, 0x01,
  35. 0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
  36. 0x00, 0x00, 0x00, 0x01,
  37. 0x00, 0x00, 0x00, 0x00,
  38. 0x00, 0x00,
  39. 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
  40. binary.BigEndian.PutUint64(msg[23:], 0)
  41. extraResponses <- msg
  42. }()
  43. client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
  44. if err != nil {
  45. t.Fatal(err)
  46. }
  47. producer, err := NewMultiProducer(client, &MultiProducerConfig{
  48. RequiredAcks: WaitForLocal,
  49. MaxBufferTime: 1000000, // "never"
  50. // So that we flush once, after the 10th message.
  51. MaxBufferBytes: uint32((len("ABC THE MESSAGE") * 10) - 1),
  52. })
  53. defer producer.Close()
  54. for i := 0; i < 10; i++ {
  55. err = producer.SendMessage("my_topic", nil, StringEncoder("ABC THE MESSAGE"))
  56. if err != nil {
  57. t.Error(err)
  58. }
  59. }
  60. select {
  61. case err = <-producer.Errors():
  62. if err != nil {
  63. t.Error(err)
  64. }
  65. case <-time.After(1 * time.Second):
  66. t.Error(fmt.Errorf("Message was never received"))
  67. }
  68. select {
  69. case <-producer.Errors():
  70. t.Error(fmt.Errorf("too many values returned"))
  71. default:
  72. }
  73. // TODO: This doesn't really test that we ONLY flush once.
  74. // For example, change the MaxBufferBytes to be much lower.
  75. }
  76. func TestMultipleMultiProducer(t *testing.T) {
  77. // TODO: Submit events to 3 different topics on 2 different brokers.
  78. // Need to figure out how the request format works to return the broker
  79. // info for those two new brokers, and how to return multiple blocks in
  80. // a ProduceRespose
  81. }