Browse Source

Checkpoint work-in-progress on a fat client

I expect to revert this almost immediately - a fat client looks like it's going
to be *VERY* fat, and so I'm going to finish the thin part first. However,
committing it so the idea is here when somebody wants to go back to it.
Evan Huus 12 years ago
parent
commit
1251b949b1
10 changed files with 112 additions and 23 deletions
  1. 5 5
      broker.go
  2. 6 3
      broker_manager.go
  3. 62 2
      client.go
  4. 6 0
      partition_request.go
  5. 7 0
      partition_response.go
  6. 3 0
      producer.go
  7. 8 7
      request.go
  8. 2 6
      response.go
  9. 6 0
      topic_request.go
  10. 7 0
      topic_response.go

+ 5 - 5
broker.go

@@ -164,14 +164,14 @@ func (b *broker) rcvResponseLoop() {
 	}
 }
 
-func (b *broker) sendRequest(clientID *string, body requestEncoder) (*responsePromise, error) {
-	req := request{b.correlation_id, clientID, body}
+func (b *broker) sendRequest(req request) (*responsePromise, error) {
+	req.correlation_id = b.correlation_id
 	packet, err := buildBytes(&req)
 	if err != nil {
 		return nil, err
 	}
 
-	sendRequest := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, body.expectResponse()}
+	sendRequest := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, req.expectResponse()}
 
 	b.requests <- sendRequest
 	sendRequest.response.packets <- *packet // we cheat to avoid poofing up more channels than necessary
@@ -181,8 +181,8 @@ func (b *broker) sendRequest(clientID *string, body requestEncoder) (*responsePr
 
 // returns true if there was a response, even if there was an error decoding it (in
 // which case it will also return an error of some sort)
-func (b *broker) sendAndReceive(clientID *string, req requestEncoder, res decoder) (bool, error) {
-	responseChan, err := b.sendRequest(clientID, req)
+func (b *broker) sendAndReceive(req request, res decoder) (bool, error) {
+	responseChan, err := b.sendRequest(req)
 	if err != nil {
 		return false, err
 	}

+ 6 - 3
broker_manager.go

@@ -27,7 +27,7 @@ func newBrokerManager(client *Client, host string, port int32) (bm *brokerManage
 	bm.partitions = make(map[string]map[int32]*partitionMetadata)
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
-	err = bm.refreshTopics(make([]*string, 0))
+	//err = bm.refreshTopics(make([]*string, 0))
 	if err != nil {
 		return nil, err
 	}
@@ -35,6 +35,8 @@ func newBrokerManager(client *Client, host string, port int32) (bm *brokerManage
 	return bm, nil
 }
 
+/*
+
 func (bm *brokerManager) terminateBroker(id int32) {
 	bm.lock.Lock()
 	delete(bm.brokers, id)
@@ -104,7 +106,7 @@ func (bm *brokerManager) partitionsForTopic(topic string) ([]int32, error) {
 	return partitions, nil
 }
 
-func (bm *brokerManager) sendToPartition(topic string, partition int32, req requestEncoder, res responseDecoder) (bool, error) {
+func (bm *brokerManager) sendToPartition(topic string, partition int32, req requestBody, res response) (bool, error) {
 	b, err := bm.getValidLeader(topic, partition)
 	if err != nil {
 		return false, err
@@ -163,7 +165,7 @@ func (bm *brokerManager) getDefault() *broker {
 	return bm.defaultBroker
 }
 
-func (bm *brokerManager) sendToAny(req requestEncoder, res decoder) (bool, error) {
+func (bm *brokerManager) sendToAny(req requestBody, res decoder) (bool, error) {
 	for b := bm.getDefault(); b != nil; b = bm.getDefault() {
 		gotResponse, err := b.sendAndReceive(bm.client.id, req, res)
 		switch err.(type) {
@@ -216,3 +218,4 @@ func (bm *brokerManager) refreshTopic(topic string) error {
 	tmp[0] = &topic
 	return bm.refreshTopics(tmp)
 }
+*/

+ 62 - 2
client.go

@@ -2,15 +2,75 @@ package kafka
 
 type Client struct {
 	id      *string
-	brokers *brokerManager
+	manager *brokerManager
 }
 
 func NewClient(id *string, host string, port int32) (client *Client, err error) {
 	client = new(Client)
 	client.id = id
-	client.brokers, err = newBrokerManager(client, host, port)
+	client.manager, err = newBrokerManager(client, host, port)
 	if err != nil {
 		return nil, err
 	}
 	return client, nil
 }
+
+func (c *Client) todo(broker *broker, body requestBody, res response) error {
+	// the correlation id gets filled in by the broker
+	req := request{0, c.id, body}
+	gotResponse, err := broker.sendAndReceive(req, res);
+	switch err.(type) {
+	case EncodingError:
+		return err
+	case nil:
+		// no error, did we get a response?
+		if !gotResponse {
+			return nil
+		}
+	default:
+		// broker error, so discard that broker
+		// TODO c.manager.terminateBroker(b.id)
+		return err
+	}
+
+        // we successfully got and parsed a response, so check for stale brokers and other errors
+	toRetry := make(map[*string]map[int32]bool)
+	for _, topic := range res.topics() {
+		for _, partition := range topic.partitions() {
+			switch partition.err() {
+			case NO_ERROR:
+				continue
+			case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION:
+				if toRetry[topic.name()] == nil {
+					toRetry[topic.name()] = make(map[int32]bool)
+				}
+				toRetry[topic.name()][partition.id()] = true
+			default:
+				return partition.err()
+			}
+		}
+	}
+
+	if len(toRetry) == 0 {
+		return nil
+	}
+
+	// refresh necessary metadata
+	toRefresh := make([]*string, len(toRetry))
+	for name, _ := range toRetry {
+		toRefresh = append(toRefresh, name)
+	}
+	// TODO c.manager.refreshTopics(toRefresh)
+
+	// now retry the request chunks that failed
+	for _, topic := range body.topics() {
+		if toRetry[topic.name()] != nil {
+			for _, partition := range topic.partitions() {
+				if toRetry[topic.name()][partition.id()] {
+				}
+			}
+		}
+	}
+
+	return nil
+}

+ 6 - 0
partition_request.go

@@ -0,0 +1,6 @@
+package kafka
+
+type partitionRequest interface {
+	encoder
+	id() int32
+}

+ 7 - 0
partition_response.go

@@ -0,0 +1,7 @@
+package kafka
+
+type partitionResponse interface {
+	decoder
+	id() int32
+	err() KError
+}

+ 3 - 0
producer.go

@@ -17,6 +17,7 @@ func NewSimpleProducer(client *Client, topic string) *Producer {
 }
 
 func (p *Producer) SendMessage(key, value encoder) error {
+	/*
 	partitions, err := p.client.brokers.partitionsForTopic(p.topic)
 	if err != nil {
 		return err
@@ -42,6 +43,8 @@ func (p *Producer) SendMessage(key, value encoder) error {
 	_, err = p.client.brokers.sendToPartition(p.topic, partition, request, &produceResponse{})
 
 	return err
+	*/
+	return nil
 }
 
 type encodableString string

+ 8 - 7
request.go

@@ -1,20 +1,17 @@
 package kafka
 
-type requestAPI interface {
+type requestBody interface {
+	encoder
 	key() int16
 	version() int16
 	expectResponse() bool
-}
-
-type requestEncoder interface {
-	encoder
-	requestAPI
+	topics() []topicRequest
 }
 
 type request struct {
 	correlation_id int32
 	id             *string
-	body           requestEncoder
+	body           requestBody
 }
 
 func (r *request) encode(pe packetEncoder) {
@@ -26,3 +23,7 @@ func (r *request) encode(pe packetEncoder) {
 	r.body.encode(pe)
 	pe.pop()
 }
+
+func (r *request) expectResponse() bool {
+	return r.body.expectResponse()
+}

+ 2 - 6
response.go

@@ -1,10 +1,6 @@
 package kafka
 
-type responseAPI interface {
-	staleTopics() []*string
-}
-
-type responseDecoder interface {
+type response interface {
 	decoder
-	responseAPI
+	topics() []topicResponse
 }

+ 6 - 0
topic_request.go

@@ -0,0 +1,6 @@
+package kafka
+
+type topicRequest interface {
+	name() *string
+	partitions() []partitionRequest
+}

+ 7 - 0
topic_response.go

@@ -0,0 +1,7 @@
+package kafka
+
+type topicResponse interface {
+	decoder
+	name() *string
+	partitions() []partitionResponse
+}