|
|
@@ -150,6 +150,12 @@ func (n *networkTopology) haveRF(replicaCounts map[string]int) bool {
|
|
|
|
|
|
func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
|
|
|
dcRacks := make(map[string]map[string]struct{}, len(n.dcs))
|
|
|
+ // skipped hosts in a dc
|
|
|
+ skipped := make(map[string][]*HostInfo, len(n.dcs))
|
|
|
+ // number of replicas per dc
|
|
|
+ replicasInDC := make(map[string]int, len(n.dcs))
|
|
|
+ // dc -> racks
|
|
|
+ seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs))
|
|
|
|
|
|
for _, h := range tokenRing.hosts {
|
|
|
dc := h.DataCenter()
|
|
|
@@ -163,6 +169,11 @@ func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
|
|
|
racks[rack] = struct{}{}
|
|
|
}
|
|
|
|
|
|
+ for dc, racks := range dcRacks {
|
|
|
+ replicasInDC[dc] = 0
|
|
|
+ seenDCRacks[dc] = make(map[string]struct{}, len(racks))
|
|
|
+ }
|
|
|
+
|
|
|
tokens := tokenRing.tokens
|
|
|
replicaRing := make(tokenRingReplicas, len(tokens))
|
|
|
|
|
|
@@ -172,18 +183,25 @@ func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
|
|
|
}
|
|
|
|
|
|
for i, th := range tokenRing.tokens {
|
|
|
- // number of replicas per dc
|
|
|
- // TODO: recycle these
|
|
|
- replicasInDC := make(map[string]int, len(n.dcs))
|
|
|
- // dc -> racks
|
|
|
- seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs))
|
|
|
- // skipped hosts in a dc
|
|
|
- skipped := make(map[string][]*HostInfo, len(n.dcs))
|
|
|
+ for k, v := range skipped {
|
|
|
+ skipped[k] = v[:0]
|
|
|
+ }
|
|
|
+
|
|
|
+ for dc := range n.dcs {
|
|
|
+ replicasInDC[dc] = 0
|
|
|
+ for rack := range seenDCRacks[dc] {
|
|
|
+ delete(seenDCRacks[dc], rack)
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
replicas := make([]*HostInfo, 0, totalRF)
|
|
|
- for j := 0; j < len(tokens) && !n.haveRF(replicasInDC); j++ {
|
|
|
+ for j := 0; j < len(tokens) && (len(replicas) < totalRF && !n.haveRF(replicasInDC)); j++ {
|
|
|
// TODO: ensure we dont add the same host twice
|
|
|
- h := tokens[(i+j)%len(tokens)].host
|
|
|
+ p := i + j
|
|
|
+ if p >= len(tokens) {
|
|
|
+ p -= len(tokens)
|
|
|
+ }
|
|
|
+ h := tokens[p].host
|
|
|
|
|
|
dc := h.DataCenter()
|
|
|
rack := h.Rack()
|
|
|
@@ -202,13 +220,6 @@ func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
|
|
|
} else if _, ok := dcRacks[dc][rack]; !ok {
|
|
|
// dont know about this rack
|
|
|
continue
|
|
|
- } else if len(replicas) >= totalRF {
|
|
|
- if replicasInDC[dc] > rf {
|
|
|
- panic(fmt.Sprintf("replica overflow. total rf=%d have=%d", totalRF, len(replicas)))
|
|
|
- }
|
|
|
-
|
|
|
- // we now have enough replicas
|
|
|
- break
|
|
|
}
|
|
|
|
|
|
racks := seenDCRacks[dc]
|
|
|
@@ -225,7 +236,7 @@ func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
|
|
|
// new rack
|
|
|
racks[rack] = struct{}{}
|
|
|
replicas = append(replicas, h)
|
|
|
- replicasInDC[dc]++
|
|
|
+ r := replicasInDC[dc] + 1
|
|
|
|
|
|
if len(racks) == len(dcRacks[dc]) {
|
|
|
// if we have been through all the racks, drain the rest of the skipped
|
|
|
@@ -233,13 +244,14 @@ func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
|
|
|
// above
|
|
|
skippedHosts := skipped[dc]
|
|
|
var k int
|
|
|
- for ; k < len(skippedHosts) && replicasInDC[dc] < rf; k++ {
|
|
|
+ for ; k < len(skippedHosts) && r+k < rf; k++ {
|
|
|
sh := skippedHosts[k]
|
|
|
replicas = append(replicas, sh)
|
|
|
- replicasInDC[dc]++
|
|
|
}
|
|
|
+ r += k
|
|
|
skipped[dc] = skippedHosts[k:]
|
|
|
}
|
|
|
+ replicasInDC[dc] = r
|
|
|
} else {
|
|
|
// already seen this rack, keep hold of this host incase
|
|
|
// we dont get enough for rf
|
|
|
@@ -260,7 +272,5 @@ func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
|
|
|
panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(replicaRing), len(tokens)))
|
|
|
}
|
|
|
|
|
|
- sort.Sort(replicaRing)
|
|
|
-
|
|
|
return replicaRing
|
|
|
}
|