Преглед на файлове

checkpoint, moving connection logic to broker

Evan Huus преди 12 години
родител
ревизия
8fb9932d87
променени са 3 файла, в които са добавени 159 реда и са изтрити 110 реда
  1. 119 0
      broker.go
  2. 34 0
      brokerManager.go
  3. 6 110
      client.go

+ 119 - 0
broker.go

@@ -1,9 +1,44 @@
 package kafka
 
+import (
+	"encoding/binary"
+	"errors"
+	"math"
+	"net"
+)
+
 type broker struct {
 	nodeId int32
 	host   *string
 	port   int32
+
+	client         *Client
+	correlation_id int32
+	conn           net.Conn
+	requests       chan reqResPair
+	responses      chan reqResPair
+}
+
+type reqResPair struct {
+	correlation_id int32
+	packets        chan []byte
+}
+
+func newBroker(host string, port int32) (b *broker, err error) {
+	b = new(broker)
+	b.nodeId = -1 // don't know it yet
+	b.host = &host
+	b.port = port
+	err = b.connect()
+	if err != nil {
+		return nil, err
+	}
+	return b, nil
+}
+
+func (b *broker) connect() (err error) {
+	// TODO
+	return nil
 }
 
 func (b *broker) encode(pe packetEncoder) {
@@ -30,3 +65,87 @@ func (b *broker) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (b *broker) sendRequestLoop() {
+	var request reqResPair
+	var n int
+	var err error
+	var buf []byte
+	for {
+		request = <-b.requests
+		buf = <-request.packets
+		n, err = b.conn.Write(buf)
+		if err != nil || n != len(buf) {
+			close(b.requests)
+			return
+		}
+		b.responses <- request
+	}
+}
+
+func (b *broker) rcvResponseLoop() {
+	var response reqResPair
+	var n int
+	var length int32
+	var err error
+	var buf []byte
+	header := make([]byte, 4)
+	for {
+		response = <-b.responses
+		n, err = b.conn.Read(header)
+		if err != nil || n != 4 {
+			close(b.responses)
+			return
+		}
+		length = int32(binary.BigEndian.Uint32(header))
+		if length <= 4 || length > 2*math.MaxUint16 {
+			close(b.responses)
+			return
+		}
+
+		n, err = b.conn.Read(header)
+		if err != nil || n != 4 {
+			close(b.responses)
+			return
+		}
+		if response.correlation_id != int32(binary.BigEndian.Uint32(header)) {
+			close(b.responses)
+			return
+		}
+
+		buf = make([]byte, length-4)
+		n, err = b.conn.Read(buf)
+		if err != nil || n != int(length-4) {
+			close(b.responses)
+			return
+		}
+
+		response.packets <- buf
+		close(response.packets)
+	}
+}
+
+func (b *broker) sendRequest(api API, body encoder) (chan []byte, error) {
+	var prepEnc prepEncoder
+	var realEnc realEncoder
+
+	req := request{api, b.correlation_id, b.client.id, body}
+
+	req.encode(&prepEnc)
+	if prepEnc.err {
+		return nil, errors.New("kafka encoding error")
+	}
+
+	realEnc.raw = make([]byte, prepEnc.length+4)
+	realEnc.putInt32(int32(prepEnc.length))
+	req.encode(&realEnc)
+
+	// we buffer one packet so that we can send our packet to the request queue without
+	// blocking, and so that the responses can be sent to us async if we want them
+	request := reqResPair{b.correlation_id, make(chan []byte, 1)}
+
+	request.packets <- realEnc.raw
+	b.requests <- request
+	b.correlation_id++
+	return request.packets, nil
+}

+ 34 - 0
brokerManager.go

@@ -0,0 +1,34 @@
+package kafka
+
+import "sync"
+
+type brokerKey struct {
+	topic     string
+	partition int32
+}
+
+type brokerManager struct {
+	defaultBroker *broker
+	leaders       map[brokerKey]*broker
+	leadersLock   sync.RWMutex
+}
+
+func newBrokerManager(host string, port int32) (bm *brokerManager, err error) {
+	bm = new(brokerManager)
+	bm.defaultBroker, err = newBroker(host, port)
+	if err != nil {
+		return nil, err
+	}
+	bm.leaders = make(map[brokerKey]*broker)
+	return bm, nil
+}
+
+func (bm *brokerManager) lookupLeader(topic string, partition int32) *broker {
+	bm.leadersLock.RLock()
+	tmp := bm.leaders[brokerKey{topic, partition}]
+	bm.leadersLock.RUnlock()
+	return tmp
+}
+
+func (bm *brokerManager) refreshTopic(topic string) {
+}

+ 6 - 110
client.go

@@ -1,12 +1,5 @@
 package kafka
 
-import (
-	"encoding/binary"
-	"errors"
-	"math"
-	"net"
-)
-
 type API struct {
 	key     int16
 	version int16
@@ -24,110 +17,13 @@ var (
 )
 
 type Client struct {
-	addr           string
-	id             *string
-	correlation_id int32
-	conn           net.Conn
-	requests       chan reqResPair
-	responses      chan reqResPair
+	id      *string
+	brokers *brokerManager
 }
 
-type reqResPair struct {
-	correlation_id int32
-	packets        chan []byte
-}
-
-func NewClient(addr string) (client *Client, err error) {
-	conn, err := net.Dial("tcp", addr)
-	if err != nil {
-		return nil, err
-	}
-	client = &Client{addr: addr, conn: conn, requests: make(chan reqResPair), responses: make(chan reqResPair)}
-	go client.sendRequestLoop()
-	go client.rcvResponseLoop()
+func NewClient(id *string, host string, port int32) (client *Client, err error) {
+	client = new(Client)
+	client.id = id
+	client.brokers, err = newBrokerManager(host, port)
 	return client, err
 }
-
-func (client *Client) sendRequestLoop() {
-	var request reqResPair
-	var n int
-	var err error
-	var buf []byte
-	for {
-		request = <-client.requests
-		buf = <-request.packets
-		n, err = client.conn.Write(buf)
-		if err != nil || n != len(buf) {
-			close(client.requests)
-			return
-		}
-		client.responses <- request
-	}
-}
-
-func (client *Client) rcvResponseLoop() {
-	var response reqResPair
-	var n int
-	var length int32
-	var err error
-	var buf []byte
-	header := make([]byte, 4)
-	for {
-		response = <-client.responses
-		n, err = client.conn.Read(header)
-		if err != nil || n != 4 {
-			close(client.responses)
-			return
-		}
-		length = int32(binary.BigEndian.Uint32(header))
-		if length <= 4 || length > 2*math.MaxUint16 {
-			close(client.responses)
-			return
-		}
-
-		n, err = client.conn.Read(header)
-		if err != nil || n != 4 {
-			close(client.responses)
-			return
-		}
-		if response.correlation_id != int32(binary.BigEndian.Uint32(header)) {
-			close(client.responses)
-			return
-		}
-
-		buf = make([]byte, length-4)
-		n, err = client.conn.Read(buf)
-		if err != nil || n != int(length-4) {
-			close(client.responses)
-			return
-		}
-
-		response.packets <- buf
-		close(response.packets)
-	}
-}
-
-func (client *Client) sendRequest(api API, body encoder) (chan []byte, error) {
-	var prepEnc prepEncoder
-	var realEnc realEncoder
-
-	req := request{api, client.correlation_id, client.id, body}
-
-	req.encode(&prepEnc)
-	if prepEnc.err {
-		return nil, errors.New("kafka encoding error")
-	}
-
-	realEnc.raw = make([]byte, prepEnc.length+4)
-	realEnc.putInt32(int32(prepEnc.length))
-	req.encode(&realEnc)
-
-	// we buffer one packet so that we can send our packet to the request queue without
-	// blocking, and so that the responses can be sent to us async if we want them
-	request := reqResPair{client.correlation_id, make(chan []byte, 1)}
-
-	request.packets <- realEnc.raw
-	client.requests <- request
-	client.correlation_id++
-	return request.packets, nil
-}