|
|
@@ -8,10 +8,15 @@ type Server struct {
|
|
|
}
|
|
|
|
|
|
type RequestHandler interface {
|
|
|
- Produce(*ProduceRequest) *ProduceResponse
|
|
|
- Fetch(*FetchRequest) *FetchResponse
|
|
|
Metadata(*MetadataRequest) *MetadataResponse
|
|
|
Offset(*OffsetRequest) *OffsetResponse
|
|
|
+
|
|
|
+ Produce(*ProduceRequest) *ProduceResponse
|
|
|
+ Fetch(*FetchRequest) *FetchResponse
|
|
|
+
|
|
|
+ ConsumerMetadata(*ConsumerMetadataRequest) *ConsumerMetadataResponse
|
|
|
+ CommitOffset(*OffsetCommitRequest) *OffsetCommitResponse
|
|
|
+ FetchOffset(*OffsetFetchRequest) *OffsetFetchResponse
|
|
|
}
|
|
|
|
|
|
func NewServer(addr string, handler RequestHandler) (*Server, error) {
|
|
|
@@ -52,6 +57,14 @@ func (s *Server) handleConn(conn net.Conn) {
|
|
|
|
|
|
var responseBody encoder
|
|
|
switch body := req.body.(type) {
|
|
|
+ case *MetadataRequest:
|
|
|
+ if r := s.handler.Metadata(body); r != nil {
|
|
|
+ responseBody = r
|
|
|
+ }
|
|
|
+ case *OffsetRequest:
|
|
|
+ if r := s.handler.Offset(body); r != nil {
|
|
|
+ responseBody = r
|
|
|
+ }
|
|
|
case *ProduceRequest:
|
|
|
if r := s.handler.Produce(body); r != nil {
|
|
|
responseBody = r
|
|
|
@@ -60,22 +73,23 @@ func (s *Server) handleConn(conn net.Conn) {
|
|
|
if r := s.handler.Fetch(body); r != nil {
|
|
|
responseBody = r
|
|
|
}
|
|
|
- case *MetadataRequest:
|
|
|
- if r := s.handler.Metadata(body); r != nil {
|
|
|
+ case *ConsumerMetadataRequest:
|
|
|
+ if r := s.handler.ConsumerMetadata(body); r != nil {
|
|
|
responseBody = r
|
|
|
}
|
|
|
- case *OffsetRequest:
|
|
|
- if r := s.handler.Offset(body); r != nil {
|
|
|
+ case *OffsetCommitRequest:
|
|
|
+ if r := s.handler.CommitOffset(body); r != nil {
|
|
|
+ responseBody = r
|
|
|
+ }
|
|
|
+ case *OffsetFetchRequest:
|
|
|
+ if r := s.handler.FetchOffset(body); r != nil {
|
|
|
responseBody = r
|
|
|
}
|
|
|
default:
|
|
|
- Logger.Println("Unhandled request type")
|
|
|
- Logger.Println(conn.Close())
|
|
|
- return
|
|
|
}
|
|
|
|
|
|
if responseBody == nil {
|
|
|
- Logger.Println("nil response, aborting connection")
|
|
|
+ Logger.Println("nil response or unhandled request type, aborting connection")
|
|
|
Logger.Println(conn.Close())
|
|
|
return
|
|
|
}
|