Browse Source

Proof-of-concept exposing a kafka server API

Used for kafka-cache: https://github.com/eapache/kafka-cache
Evan Huus 10 years ago
parent
commit
4636efcdb8
5 changed files with 185 additions and 67 deletions
  1. 26 26
      fetch_request.go
  2. 24 24
      offset_request.go
  3. 17 17
      produce_request.go
  4. 6 0
      response_header.go
  5. 112 0
      server.go

+ 26 - 26
fetch_request.go

@@ -1,21 +1,21 @@
 package sarama
 
-type fetchRequestBlock struct {
-	fetchOffset int64
-	maxBytes    int32
+type FetchRequestBlock struct {
+	FetchOffset int64
+	MaxBytes    int32
 }
 
-func (f *fetchRequestBlock) encode(pe packetEncoder) error {
-	pe.putInt64(f.fetchOffset)
-	pe.putInt32(f.maxBytes)
+func (f *FetchRequestBlock) encode(pe packetEncoder) error {
+	pe.putInt64(f.FetchOffset)
+	pe.putInt32(f.MaxBytes)
 	return nil
 }
 
-func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) {
-	if f.fetchOffset, err = pd.getInt64(); err != nil {
+func (f *FetchRequestBlock) decode(pd packetDecoder) (err error) {
+	if f.FetchOffset, err = pd.getInt64(); err != nil {
 		return err
 	}
-	if f.maxBytes, err = pd.getInt32(); err != nil {
+	if f.MaxBytes, err = pd.getInt32(); err != nil {
 		return err
 	}
 	return nil
@@ -24,27 +24,27 @@ func (f *fetchRequestBlock) decode(pd packetDecoder) (err error) {
 type FetchRequest struct {
 	MaxWaitTime int32
 	MinBytes    int32
-	blocks      map[string]map[int32]*fetchRequestBlock
+	Blocks      map[string]map[int32]*FetchRequestBlock
 }
 
 func (f *FetchRequest) encode(pe packetEncoder) (err error) {
 	pe.putInt32(-1) // replica ID is always -1 for clients
 	pe.putInt32(f.MaxWaitTime)
 	pe.putInt32(f.MinBytes)
-	err = pe.putArrayLength(len(f.blocks))
+	err = pe.putArrayLength(len(f.Blocks))
 	if err != nil {
 		return err
 	}
-	for topic, blocks := range f.blocks {
+	for topic, Blocks := range f.Blocks {
 		err = pe.putString(topic)
 		if err != nil {
 			return err
 		}
-		err = pe.putArrayLength(len(blocks))
+		err = pe.putArrayLength(len(Blocks))
 		if err != nil {
 			return err
 		}
-		for partition, block := range blocks {
+		for partition, block := range Blocks {
 			pe.putInt32(partition)
 			err = block.encode(pe)
 			if err != nil {
@@ -72,7 +72,7 @@ func (f *FetchRequest) decode(pd packetDecoder) (err error) {
 	if topicCount == 0 {
 		return nil
 	}
-	f.blocks = make(map[string]map[int32]*fetchRequestBlock)
+	f.Blocks = make(map[string]map[int32]*FetchRequestBlock)
 	for i := 0; i < topicCount; i++ {
 		topic, err := pd.getString()
 		if err != nil {
@@ -82,17 +82,17 @@ func (f *FetchRequest) decode(pd packetDecoder) (err error) {
 		if err != nil {
 			return err
 		}
-		f.blocks[topic] = make(map[int32]*fetchRequestBlock)
+		f.Blocks[topic] = make(map[int32]*FetchRequestBlock)
 		for j := 0; j < partitionCount; j++ {
 			partition, err := pd.getInt32()
 			if err != nil {
 				return err
 			}
-			fetchBlock := &fetchRequestBlock{}
+			fetchBlock := &FetchRequestBlock{}
 			if err = fetchBlock.decode(pd); err != nil {
 				return nil
 			}
-			f.blocks[topic][partition] = fetchBlock
+			f.Blocks[topic][partition] = fetchBlock
 		}
 	}
 	return nil
@@ -107,17 +107,17 @@ func (f *FetchRequest) version() int16 {
 }
 
 func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
-	if f.blocks == nil {
-		f.blocks = make(map[string]map[int32]*fetchRequestBlock)
+	if f.Blocks == nil {
+		f.Blocks = make(map[string]map[int32]*FetchRequestBlock)
 	}
 
-	if f.blocks[topic] == nil {
-		f.blocks[topic] = make(map[int32]*fetchRequestBlock)
+	if f.Blocks[topic] == nil {
+		f.Blocks[topic] = make(map[int32]*FetchRequestBlock)
 	}
 
-	tmp := new(fetchRequestBlock)
-	tmp.maxBytes = maxBytes
-	tmp.fetchOffset = fetchOffset
+	tmp := new(FetchRequestBlock)
+	tmp.MaxBytes = maxBytes
+	tmp.FetchOffset = fetchOffset
 
-	f.blocks[topic][partitionID] = tmp
+	f.Blocks[topic][partitionID] = tmp
 }

+ 24 - 24
offset_request.go

@@ -1,37 +1,37 @@
 package sarama
 
-type offsetRequestBlock struct {
-	time       int64
-	maxOffsets int32
+type OffsetRequestBlock struct {
+	Time       int64
+	MaxOffsets int32
 }
 
-func (r *offsetRequestBlock) encode(pe packetEncoder) error {
-	pe.putInt64(int64(r.time))
-	pe.putInt32(r.maxOffsets)
+func (r *OffsetRequestBlock) encode(pe packetEncoder) error {
+	pe.putInt64(int64(r.Time))
+	pe.putInt32(r.MaxOffsets)
 	return nil
 }
 
-func (r *offsetRequestBlock) decode(pd packetDecoder) (err error) {
-	if r.time, err = pd.getInt64(); err != nil {
+func (r *OffsetRequestBlock) decode(pd packetDecoder) (err error) {
+	if r.Time, err = pd.getInt64(); err != nil {
 		return err
 	}
-	if r.maxOffsets, err = pd.getInt32(); err != nil {
+	if r.MaxOffsets, err = pd.getInt32(); err != nil {
 		return err
 	}
 	return nil
 }
 
 type OffsetRequest struct {
-	blocks map[string]map[int32]*offsetRequestBlock
+	Blocks map[string]map[int32]*OffsetRequestBlock
 }
 
 func (r *OffsetRequest) encode(pe packetEncoder) error {
 	pe.putInt32(-1) // replica ID is always -1 for clients
-	err := pe.putArrayLength(len(r.blocks))
+	err := pe.putArrayLength(len(r.Blocks))
 	if err != nil {
 		return err
 	}
-	for topic, partitions := range r.blocks {
+	for topic, partitions := range r.Blocks {
 		err = pe.putString(topic)
 		if err != nil {
 			return err
@@ -62,7 +62,7 @@ func (r *OffsetRequest) decode(pd packetDecoder) error {
 	if blockCount == 0 {
 		return nil
 	}
-	r.blocks = make(map[string]map[int32]*offsetRequestBlock)
+	r.Blocks = make(map[string]map[int32]*OffsetRequestBlock)
 	for i := 0; i < blockCount; i++ {
 		topic, err := pd.getString()
 		if err != nil {
@@ -72,17 +72,17 @@ func (r *OffsetRequest) decode(pd packetDecoder) error {
 		if err != nil {
 			return err
 		}
-		r.blocks[topic] = make(map[int32]*offsetRequestBlock)
+		r.Blocks[topic] = make(map[int32]*OffsetRequestBlock)
 		for j := 0; j < partitionCount; j++ {
 			partition, err := pd.getInt32()
 			if err != nil {
 				return err
 			}
-			block := &offsetRequestBlock{}
+			block := &OffsetRequestBlock{}
 			if err := block.decode(pd); err != nil {
 				return err
 			}
-			r.blocks[topic][partition] = block
+			r.Blocks[topic][partition] = block
 		}
 	}
 	return nil
@@ -97,17 +97,17 @@ func (r *OffsetRequest) version() int16 {
 }
 
 func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
-	if r.blocks == nil {
-		r.blocks = make(map[string]map[int32]*offsetRequestBlock)
+	if r.Blocks == nil {
+		r.Blocks = make(map[string]map[int32]*OffsetRequestBlock)
 	}
 
-	if r.blocks[topic] == nil {
-		r.blocks[topic] = make(map[int32]*offsetRequestBlock)
+	if r.Blocks[topic] == nil {
+		r.Blocks[topic] = make(map[int32]*OffsetRequestBlock)
 	}
 
-	tmp := new(offsetRequestBlock)
-	tmp.time = time
-	tmp.maxOffsets = maxOffsets
+	tmp := new(OffsetRequestBlock)
+	tmp.Time = time
+	tmp.MaxOffsets = maxOffsets
 
-	r.blocks[topic][partitionID] = tmp
+	r.Blocks[topic][partitionID] = tmp
 }

+ 17 - 17
produce_request.go

@@ -19,17 +19,17 @@ const (
 type ProduceRequest struct {
 	RequiredAcks RequiredAcks
 	Timeout      int32
-	msgSets      map[string]map[int32]*MessageSet
+	MsgSets      map[string]map[int32]*MessageSet
 }
 
 func (p *ProduceRequest) encode(pe packetEncoder) error {
 	pe.putInt16(int16(p.RequiredAcks))
 	pe.putInt32(p.Timeout)
-	err := pe.putArrayLength(len(p.msgSets))
+	err := pe.putArrayLength(len(p.MsgSets))
 	if err != nil {
 		return err
 	}
-	for topic, partitions := range p.msgSets {
+	for topic, partitions := range p.MsgSets {
 		err = pe.putString(topic)
 		if err != nil {
 			return err
@@ -70,7 +70,7 @@ func (p *ProduceRequest) decode(pd packetDecoder) error {
 	if topicCount == 0 {
 		return nil
 	}
-	p.msgSets = make(map[string]map[int32]*MessageSet)
+	p.MsgSets = make(map[string]map[int32]*MessageSet)
 	for i := 0; i < topicCount; i++ {
 		topic, err := pd.getString()
 		if err != nil {
@@ -80,7 +80,7 @@ func (p *ProduceRequest) decode(pd packetDecoder) error {
 		if err != nil {
 			return err
 		}
-		p.msgSets[topic] = make(map[int32]*MessageSet)
+		p.MsgSets[topic] = make(map[int32]*MessageSet)
 		for j := 0; j < partitionCount; j++ {
 			partition, err := pd.getInt32()
 			if err != nil {
@@ -99,7 +99,7 @@ func (p *ProduceRequest) decode(pd packetDecoder) error {
 			if err != nil {
 				return err
 			}
-			p.msgSets[topic][partition] = msgSet
+			p.MsgSets[topic][partition] = msgSet
 		}
 	}
 	return nil
@@ -114,32 +114,32 @@ func (p *ProduceRequest) version() int16 {
 }
 
 func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
-	if p.msgSets == nil {
-		p.msgSets = make(map[string]map[int32]*MessageSet)
+	if p.MsgSets == nil {
+		p.MsgSets = make(map[string]map[int32]*MessageSet)
 	}
 
-	if p.msgSets[topic] == nil {
-		p.msgSets[topic] = make(map[int32]*MessageSet)
+	if p.MsgSets[topic] == nil {
+		p.MsgSets[topic] = make(map[int32]*MessageSet)
 	}
 
-	set := p.msgSets[topic][partition]
+	set := p.MsgSets[topic][partition]
 
 	if set == nil {
 		set = new(MessageSet)
-		p.msgSets[topic][partition] = set
+		p.MsgSets[topic][partition] = set
 	}
 
 	set.addMessage(msg)
 }
 
 func (p *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
-	if p.msgSets == nil {
-		p.msgSets = make(map[string]map[int32]*MessageSet)
+	if p.MsgSets == nil {
+		p.MsgSets = make(map[string]map[int32]*MessageSet)
 	}
 
-	if p.msgSets[topic] == nil {
-		p.msgSets[topic] = make(map[int32]*MessageSet)
+	if p.MsgSets[topic] == nil {
+		p.MsgSets[topic] = make(map[int32]*MessageSet)
 	}
 
-	p.msgSets[topic][partition] = set
+	p.MsgSets[topic][partition] = set
 }

+ 6 - 0
response_header.go

@@ -19,3 +19,9 @@ func (r *responseHeader) decode(pd packetDecoder) (err error) {
 	r.correlationID, err = pd.getInt32()
 	return err
 }
+
+func (r *responseHeader) encode(pe packetEncoder) (err error) {
+	pe.putInt32(r.length)
+	pe.putInt32(r.correlationID)
+	return nil
+}

+ 112 - 0
server.go

@@ -0,0 +1,112 @@
+package sarama
+
+import "net"
+
+type Server struct {
+	ln      net.Listener
+	handler RequestHandler
+}
+
+type RequestHandler interface {
+	Produce(*ProduceRequest) *ProduceResponse
+	Fetch(*FetchRequest) *FetchResponse
+	Metadata(*MetadataRequest) *MetadataResponse
+	Offset(*OffsetRequest) *OffsetResponse
+}
+
+func NewServer(addr string, handler RequestHandler) (*Server, error) {
+	ln, err := net.Listen("tcp", addr)
+	if err != nil {
+		return nil, err
+	}
+	s := &Server{ln: ln, handler: handler}
+	go s.acceptConns()
+	return s, nil
+}
+
+func (s *Server) Close() error {
+	return s.ln.Close()
+}
+
+func (s *Server) acceptConns() {
+	for {
+		conn, err := s.ln.Accept()
+		switch err {
+		case nil:
+			go s.handleConn(conn)
+		default:
+			Logger.Println(err)
+			return
+		}
+	}
+}
+
+func (s *Server) handleConn(conn net.Conn) {
+	for {
+		req, err := decodeRequest(conn)
+		if err != nil {
+			Logger.Println(err)
+			conn.Close()
+			return
+		}
+
+		var responseBody encoder
+		switch body := req.body.(type) {
+		case *ProduceRequest:
+			if r := s.handler.Produce(body); r != nil {
+				responseBody = r
+			}
+		case *FetchRequest:
+			if r := s.handler.Fetch(body); r != nil {
+				responseBody = r
+			}
+		case *MetadataRequest:
+			if r := s.handler.Metadata(body); r != nil {
+				responseBody = r
+			}
+		case *OffsetRequest:
+			if r := s.handler.Offset(body); r != nil {
+				responseBody = r
+			}
+		default:
+			Logger.Println("Unhandled request type")
+			conn.Close()
+			return
+		}
+
+		if responseBody == nil {
+			Logger.Println("nil response, aborting connection")
+			conn.Close()
+			return
+		}
+
+		responseBuf, err := encode(responseBody)
+		if err != nil {
+			Logger.Println(err)
+			conn.Close()
+			return
+		}
+
+		responseHeader, err := encode(&responseHeader{
+			length:        int32(len(responseBuf) + 4),
+			correlationID: req.correlationID,
+		})
+		if err != nil {
+			Logger.Println(err)
+			conn.Close()
+			return
+		}
+
+		if _, err := conn.Write(responseHeader); err != nil {
+			Logger.Println(err)
+			conn.Close()
+			return
+		}
+
+		if _, err := conn.Write(responseBuf); err != nil {
+			Logger.Println(err)
+			conn.Close()
+			return
+		}
+	}
+}