|
@@ -512,41 +512,42 @@ type ConnSelectionPolicy interface {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
type roundRobinConnPolicy struct {
|
|
type roundRobinConnPolicy struct {
|
|
|
|
|
+ // pos is still used to evenly distribute queries amongst connections.
|
|
|
pos uint32
|
|
pos uint32
|
|
|
- mu sync.RWMutex
|
|
|
|
|
- conns []*Conn
|
|
|
|
|
|
|
+ conns atomic.Value // *[]*Conn
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func RoundRobinConnPolicy() func() ConnSelectionPolicy {
|
|
func RoundRobinConnPolicy() func() ConnSelectionPolicy {
|
|
|
return func() ConnSelectionPolicy {
|
|
return func() ConnSelectionPolicy {
|
|
|
- return &roundRobinConnPolicy{}
|
|
|
|
|
|
|
+ p := &roundRobinConnPolicy{}
|
|
|
|
|
+ var conns []*Conn
|
|
|
|
|
+ p.conns.Store(&conns)
|
|
|
|
|
+ return p
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (r *roundRobinConnPolicy) SetConns(conns []*Conn) {
|
|
func (r *roundRobinConnPolicy) SetConns(conns []*Conn) {
|
|
|
- r.mu.Lock()
|
|
|
|
|
- r.conns = conns
|
|
|
|
|
- r.mu.Unlock()
|
|
|
|
|
|
|
+ // NOTE: we do not need to lock here due to the conneciton pool is already
|
|
|
|
|
+ // holding its own mutex over the conn seleciton policy
|
|
|
|
|
+ r.conns.Store(&conns)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (r *roundRobinConnPolicy) Pick(qry *Query) *Conn {
|
|
func (r *roundRobinConnPolicy) Pick(qry *Query) *Conn {
|
|
|
- pos := int(atomic.AddUint32(&r.pos, 1) - 1)
|
|
|
|
|
-
|
|
|
|
|
- r.mu.RLock()
|
|
|
|
|
- defer r.mu.RUnlock()
|
|
|
|
|
-
|
|
|
|
|
- if len(r.conns) == 0 {
|
|
|
|
|
|
|
+ conns := *(r.conns.Load().(*[]*Conn))
|
|
|
|
|
+ if len(conns) == 0 {
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ pos := int(atomic.AddUint32(&r.pos, 1) - 1)
|
|
|
|
|
+
|
|
|
var (
|
|
var (
|
|
|
leastBusyConn *Conn
|
|
leastBusyConn *Conn
|
|
|
streamsAvailable int
|
|
streamsAvailable int
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
// find the conn which has the most available streams, this is racy
|
|
// find the conn which has the most available streams, this is racy
|
|
|
- for i := 0; i < len(r.conns); i++ {
|
|
|
|
|
- conn := r.conns[(pos+i)%len(r.conns)]
|
|
|
|
|
|
|
+ for i := 0; i < len(conns); i++ {
|
|
|
|
|
+ conn := conns[(pos+i)%len(conns)]
|
|
|
if streams := conn.AvailableStreams(); streams > streamsAvailable {
|
|
if streams := conn.AvailableStreams(); streams > streamsAvailable {
|
|
|
leastBusyConn = conn
|
|
leastBusyConn = conn
|
|
|
streamsAvailable = streams
|
|
streamsAvailable = streams
|