server.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package sarama
  2. import "net"
  3. type Server struct {
  4. ln net.Listener
  5. handler RequestHandler
  6. }
  7. type RequestHandler interface {
  8. Metadata(*MetadataRequest) *MetadataResponse
  9. Offset(*OffsetRequest) *OffsetResponse
  10. Produce(*ProduceRequest) *ProduceResponse
  11. Fetch(*FetchRequest) *FetchResponse
  12. ConsumerMetadata(*ConsumerMetadataRequest) *ConsumerMetadataResponse
  13. CommitOffset(*OffsetCommitRequest) *OffsetCommitResponse
  14. FetchOffset(*OffsetFetchRequest) *OffsetFetchResponse
  15. }
  16. func NewServer(addr string, handler RequestHandler) (*Server, error) {
  17. ln, err := net.Listen("tcp", addr)
  18. if err != nil {
  19. return nil, err
  20. }
  21. s := &Server{ln: ln, handler: handler}
  22. go s.acceptConns()
  23. return s, nil
  24. }
  25. func (s *Server) Close() error {
  26. return s.ln.Close()
  27. }
  28. func (s *Server) acceptConns() {
  29. for {
  30. conn, err := s.ln.Accept()
  31. switch err {
  32. case nil:
  33. go s.handleConn(conn)
  34. default:
  35. Logger.Println(err)
  36. return
  37. }
  38. }
  39. }
  40. func (s *Server) handleConn(conn net.Conn) {
  41. for {
  42. req, err := decodeRequest(conn)
  43. if err != nil {
  44. Logger.Println(err)
  45. Logger.Println(conn.Close())
  46. return
  47. }
  48. var responseBody encoder
  49. switch body := req.body.(type) {
  50. case *MetadataRequest:
  51. if r := s.handler.Metadata(body); r != nil {
  52. responseBody = r
  53. }
  54. case *OffsetRequest:
  55. if r := s.handler.Offset(body); r != nil {
  56. responseBody = r
  57. }
  58. case *ProduceRequest:
  59. if r := s.handler.Produce(body); r != nil {
  60. responseBody = r
  61. }
  62. case *FetchRequest:
  63. if r := s.handler.Fetch(body); r != nil {
  64. responseBody = r
  65. }
  66. case *ConsumerMetadataRequest:
  67. if r := s.handler.ConsumerMetadata(body); r != nil {
  68. responseBody = r
  69. }
  70. case *OffsetCommitRequest:
  71. if r := s.handler.CommitOffset(body); r != nil {
  72. responseBody = r
  73. }
  74. case *OffsetFetchRequest:
  75. if r := s.handler.FetchOffset(body); r != nil {
  76. responseBody = r
  77. }
  78. default:
  79. }
  80. if responseBody == nil {
  81. Logger.Println("nil response or unhandled request type, aborting connection")
  82. Logger.Println(conn.Close())
  83. return
  84. }
  85. responseBuf, err := encode(responseBody)
  86. if err != nil {
  87. Logger.Println(err)
  88. Logger.Println(conn.Close())
  89. return
  90. }
  91. responseHeader, err := encode(&responseHeader{
  92. length: int32(len(responseBuf) + 4),
  93. correlationID: req.correlationID,
  94. })
  95. if err != nil {
  96. Logger.Println(err)
  97. Logger.Println(conn.Close())
  98. return
  99. }
  100. if _, err := conn.Write(responseHeader); err != nil {
  101. Logger.Println(err)
  102. Logger.Println(conn.Close())
  103. return
  104. }
  105. if _, err := conn.Write(responseBuf); err != nil {
  106. Logger.Println(err)
  107. Logger.Println(conn.Close())
  108. return
  109. }
  110. }
  111. }