Просмотр исходного кода

Allocation-Free Round Robin Policy (fixes #1401) (#1402)

* reduced number of allocations in host picking logic

* a bit of cleanup

* fixes for broken tests

* dropped go vet-suggested code

* fix after merge
AlexeyUzhva 5 лет назад
Родитель
Сommit
aca01c205c
1 измененных файлов с 43 добавлено и 64 удалено
  1. 43 64
      policies.go

+ 43 - 64
policies.go

@@ -6,8 +6,6 @@ package gocql
 
 import (
 	"context"
-	crand "crypto/rand"
-	"encoding/binary"
 	"errors"
 	"fmt"
 	"math"
@@ -335,7 +333,8 @@ func RoundRobinHostPolicy() HostSelectionPolicy {
 }
 
 type roundRobinHostPolicy struct {
-	hosts cowHostList
+	hosts           cowHostList
+	lastUsedHostIdx uint64
 }
 
 func (r *roundRobinHostPolicy) IsLocal(*HostInfo) bool              { return true }
@@ -343,26 +342,9 @@ func (r *roundRobinHostPolicy) KeyspaceChanged(KeyspaceUpdateEvent) {}
 func (r *roundRobinHostPolicy) SetPartitioner(partitioner string)   {}
 func (r *roundRobinHostPolicy) Init(*Session)                       {}
 
-var (
-	randPool = sync.Pool{
-		New: func() interface{} {
-			return rand.New(randSource())
-		},
-	}
-)
-
 func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost {
-	src := r.hosts.get()
-	hosts := make([]*HostInfo, len(src))
-	copy(hosts, src)
-
-	rand := randPool.Get().(*rand.Rand)
-	defer randPool.Put(rand)
-	rand.Shuffle(len(hosts), func(i, j int) {
-		hosts[i], hosts[j] = hosts[j], hosts[i]
-	})
-
-	return roundRobbin(hosts)
+	nextStartOffset := atomic.AddUint64(&r.lastUsedHostIdx, 1)
+	return roundRobbin(int(nextStartOffset), r.hosts.get())
 }
 
 func (r *roundRobinHostPolicy) AddHost(host *HostInfo) {
@@ -811,9 +793,10 @@ func (host selectedHostPoolHost) Mark(err error) {
 }
 
 type dcAwareRR struct {
-	local       string
-	localHosts  cowHostList
-	remoteHosts cowHostList
+	local           string
+	localHosts      cowHostList
+	remoteHosts     cowHostList
+	lastUsedHostIdx uint64
 }
 
 // DCAwareRoundRobinPolicy is a host selection policies which will prioritize and
@@ -850,55 +833,51 @@ func (d *dcAwareRR) RemoveHost(host *HostInfo) {
 func (d *dcAwareRR) HostUp(host *HostInfo)   { d.AddHost(host) }
 func (d *dcAwareRR) HostDown(host *HostInfo) { d.RemoveHost(host) }
 
-var randSeed int64
-
-func init() {
-	p := make([]byte, 8)
-	if _, err := crand.Read(p); err != nil {
-		panic(err)
-	}
-	randSeed = int64(binary.BigEndian.Uint64(p))
-}
-
-func randSource() rand.Source {
-	return rand.NewSource(atomic.AddInt64(&randSeed, 1))
-}
+// This function is supposed to be called in a fashion
+// roundRobbin(offset, hostsPriority1, hostsPriority2, hostsPriority3 ... )
+//
+// E.g. for DC-naive strategy:
+// roundRobbin(offset, allHosts)
+//
+// For tiered and DC-aware strategy:
+// roundRobbin(offset, localHosts, remoteHosts)
+func roundRobbin(shift int, hosts ...[]*HostInfo) NextHost {
+	currentLayer := 0
+	currentlyObserved := 0
 
-func roundRobbin(hosts []*HostInfo) NextHost {
-	var i int
 	return func() SelectedHost {
-		for i < len(hosts) {
-			h := hosts[i]
-			i++
 
-			if h.IsUp() {
-				return (*selectedHost)(h)
+		// iterate over layers
+		for {
+			if currentLayer == len(hosts) {
+				return nil
 			}
-		}
 
-		return nil
-	}
-}
+			currentLayerSize := len(hosts[currentLayer])
 
-func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost {
-	local := d.localHosts.get()
-	remote := d.remoteHosts.get()
+			// iterate over hosts within a layer
+			for {
+				currentlyObserved++
+				if currentlyObserved > currentLayerSize {
+					currentLayer++
+					currentlyObserved = 0
+					break
+				}
+
+				h := hosts[currentLayer][(shift+currentlyObserved)%currentLayerSize]
 
-	hosts := make([]*HostInfo, len(local)+len(remote))
-	n := copy(hosts, local)
-	copy(hosts[n:], remote)
+				if h.IsUp() {
+					return (*selectedHost)(h)
+				}
 
-	// TODO: use random chose-2 but that will require plumbing information
-	// about connection/host load to here
-	r := randPool.Get().(*rand.Rand)
-	defer randPool.Put(r)
-	for _, l := range [][]*HostInfo{hosts[:len(local)], hosts[len(local):]} {
-		r.Shuffle(len(l), func(i, j int) {
-			l[i], l[j] = l[j], l[i]
-		})
+			}
+		}
 	}
+}
 
-	return roundRobbin(hosts)
+func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost {
+	nextStartOffset := atomic.AddUint64(&d.lastUsedHostIdx, 1)
+	return roundRobbin(int(nextStartOffset), d.localHosts.get(), d.remoteHosts.get())
 }
 
 // ConvictionPolicy interface is used by gocql to determine if a host should be