Browse Source

policies: check available streams on conn

RoundRobbin conn pool should check the available streams on a
connection before returning it.
Chris Bannister 10 năm trước cách đây
mục cha
commit
f6027742da
2 tập tin đã thay đổi với 20 bổ sung12 xóa
  1. 14 6
      policies.go
  2. 6 6
      policies_test.go

+ 14 - 6
policies.go

@@ -354,12 +354,20 @@ func (r *roundRobinConnPolicy) SetConns(conns []*Conn) {
 }
 
 func (r *roundRobinConnPolicy) Pick(qry *Query) *Conn {
-	pos := atomic.AddUint32(&r.pos, 1)
-	var conn *Conn
+	pos := int(atomic.AddUint32(&r.pos, 1) - 1)
 	r.mu.RLock()
-	if len(r.conns) > 0 {
-		conn = r.conns[pos%uint32(len(r.conns))]
+	defer r.mu.RUnlock()
+
+	if len(r.conns) == 0 {
+		return nil
 	}
-	r.mu.RUnlock()
-	return conn
+
+	for i := 0; i < len(r.conns); i++ {
+		conn := r.conns[(pos+i)%len(r.conns)]
+		if conn.AvailableStreams() > 0 {
+			return conn
+		}
+	}
+
+	return nil
 }

+ 6 - 6
policies_test.go

@@ -6,6 +6,7 @@ package gocql
 
 import (
 	"fmt"
+	"github.com/gocql/gocql/internal/streams"
 	"testing"
 
 	"github.com/hailocab/go-hostpool"
@@ -147,8 +148,8 @@ func TestHostPoolHostPolicy(t *testing.T) {
 func TestRoundRobinConnPolicy(t *testing.T) {
 	policy := RoundRobinConnPolicy()()
 
-	conn0 := &Conn{}
-	conn1 := &Conn{}
+	conn0 := &Conn{streams: streams.New(1)}
+	conn1 := &Conn{streams: streams.New(1)}
 	conn := []*Conn{
 		conn0,
 		conn1,
@@ -156,14 +157,13 @@ func TestRoundRobinConnPolicy(t *testing.T) {
 
 	policy.SetConns(conn)
 
-	// the first conn selected is actually at [1], but this is ok for RR
-	if actual := policy.Pick(nil); actual != conn1 {
+	if actual := policy.Pick(nil); actual != conn0 {
 		t.Error("Expected conn1")
 	}
-	if actual := policy.Pick(nil); actual != conn0 {
+	if actual := policy.Pick(nil); actual != conn1 {
 		t.Error("Expected conn0")
 	}
-	if actual := policy.Pick(nil); actual != conn1 {
+	if actual := policy.Pick(nil); actual != conn0 {
 		t.Error("Expected conn1")
 	}
 }