server.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package sarama
  2. import "net"
  3. type Server struct {
  4. ln net.Listener
  5. handler RequestHandler
  6. }
  7. type RequestHandler interface {
  8. Produce(*ProduceRequest) *ProduceResponse
  9. Fetch(*FetchRequest) *FetchResponse
  10. Metadata(*MetadataRequest) *MetadataResponse
  11. Offset(*OffsetRequest) *OffsetResponse
  12. }
  13. func NewServer(addr string, handler RequestHandler) (*Server, error) {
  14. ln, err := net.Listen("tcp", addr)
  15. if err != nil {
  16. return nil, err
  17. }
  18. s := &Server{ln: ln, handler: handler}
  19. go s.acceptConns()
  20. return s, nil
  21. }
  22. func (s *Server) Close() error {
  23. return s.ln.Close()
  24. }
  25. func (s *Server) acceptConns() {
  26. for {
  27. conn, err := s.ln.Accept()
  28. switch err {
  29. case nil:
  30. go s.handleConn(conn)
  31. default:
  32. Logger.Println(err)
  33. return
  34. }
  35. }
  36. }
  37. func (s *Server) handleConn(conn net.Conn) {
  38. for {
  39. req, err := decodeRequest(conn)
  40. if err != nil {
  41. Logger.Println(err)
  42. Logger.Println(conn.Close())
  43. return
  44. }
  45. var responseBody encoder
  46. switch body := req.body.(type) {
  47. case *ProduceRequest:
  48. if r := s.handler.Produce(body); r != nil {
  49. responseBody = r
  50. }
  51. case *FetchRequest:
  52. if r := s.handler.Fetch(body); r != nil {
  53. responseBody = r
  54. }
  55. case *MetadataRequest:
  56. if r := s.handler.Metadata(body); r != nil {
  57. responseBody = r
  58. }
  59. case *OffsetRequest:
  60. if r := s.handler.Offset(body); r != nil {
  61. responseBody = r
  62. }
  63. default:
  64. Logger.Println("Unhandled request type")
  65. Logger.Println(conn.Close())
  66. return
  67. }
  68. if responseBody == nil {
  69. Logger.Println("nil response, aborting connection")
  70. Logger.Println(conn.Close())
  71. return
  72. }
  73. responseBuf, err := encode(responseBody)
  74. if err != nil {
  75. Logger.Println(err)
  76. Logger.Println(conn.Close())
  77. return
  78. }
  79. responseHeader, err := encode(&responseHeader{
  80. length: int32(len(responseBuf) + 4),
  81. correlationID: req.correlationID,
  82. })
  83. if err != nil {
  84. Logger.Println(err)
  85. Logger.Println(conn.Close())
  86. return
  87. }
  88. if _, err := conn.Write(responseHeader); err != nil {
  89. Logger.Println(err)
  90. Logger.Println(conn.Close())
  91. return
  92. }
  93. if _, err := conn.Write(responseBuf); err != nil {
  94. Logger.Println(err)
  95. Logger.Println(conn.Close())
  96. return
  97. }
  98. }
  99. }