Jelajahi Sumber

allow tokenAware policy to handle keyspace with skipped dcs (#1417)

Keyspace could define replication that doesn't cover all datacenters.
More over: such case certainly happens when new data is added to cluster.

This commit fixes handling of this situation in networkTopology replicaMap
by skipping tokens from excess datacenters.

fixes #1349
Yura Sokolov 5 tahun lalu
induk
melakukan
964d7011f6
3 mengubah file dengan 78 tambahan dan 62 penghapusan
  1. 1 0
      AUTHORS
  2. 11 6
      topology.go
  3. 66 56
      topology_test.go

+ 1 - 0
AUTHORS

@@ -113,3 +113,4 @@ Thomas Meson <zllak@hycik.org>
 Martin Sucha <martin.sucha@kiwi.com>; <git@mm.ms47.eu>
 Pavel Buchinchik <p.buchinchik@gmail.com>
 Rintaro Okamura <rintaro.okamura@gmail.com>
+Yura Sokolov <y.sokolov@joom.com>; <funny.falcon@gmail.com>

+ 11 - 6
topology.go

@@ -171,7 +171,7 @@ func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
 	}
 
 	tokens := tokenRing.tokens
-	replicaRing := make(tokenRingReplicas, len(tokens))
+	replicaRing := make(tokenRingReplicas, 0, len(tokens))
 
 	var totalRF int
 	for _, rf := range n.dcs {
@@ -179,6 +179,11 @@ func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
 	}
 
 	for i, th := range tokenRing.tokens {
+		if rf := n.dcs[th.host.DataCenter()]; rf == 0 {
+			// skip this token since no replica in this datacenter.
+			continue
+		}
+
 		for k, v := range skipped {
 			skipped[k] = v[:0]
 		}
@@ -202,9 +207,9 @@ func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
 			dc := h.DataCenter()
 			rack := h.Rack()
 
-			rf, ok := n.dcs[dc]
-			if !ok {
-				// skip this DC, dont know about it
+			rf := n.dcs[dc]
+			if rf == 0 {
+				// skip this DC, dont know about it or replication factor is zero
 				continue
 			} else if replicasInDC[dc] >= rf {
 				if replicasInDC[dc] > rf {
@@ -261,10 +266,10 @@ func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
 			panic(fmt.Sprintf("first replica is not the primary replica for the token: expected %v got %v", replicas[0].ConnectAddress(), th.host.ConnectAddress()))
 		}
 
-		replicaRing[i] = hostTokens{th.token, replicas}
+		replicaRing = append(replicaRing, hostTokens{th.token, replicas})
 	}
 
-	if len(replicaRing) != len(tokens) {
+	if len(n.dcs) == len(dcRacks) && len(replicaRing) != len(tokens) {
 		panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(replicaRing), len(tokens)))
 	}
 

+ 66 - 56
topology_test.go

@@ -90,76 +90,86 @@ func TestPlacementStrategy_NetworkStrategy(t *testing.T) {
 	}
 	sort.Sort(&tokenRing{tokens: tokens})
 
-	strat := &networkTopology{
-		dcs: map[string]int{
-			"dc1": 1,
-			"dc2": 2,
-			"dc3": 3,
+	strats := []*networkTopology{
+		&networkTopology{
+			dcs: map[string]int{
+				"dc1": 1,
+				"dc2": 2,
+				"dc3": 3,
+			},
+		},
+		&networkTopology{
+			dcs: map[string]int{
+				"dc2": 2,
+				"dc3": 3,
+			},
 		},
 	}
 
-	var expReplicas int
-	for _, rf := range strat.dcs {
-		expReplicas += rf
-	}
-
-	tokenReplicas := strat.replicaMap(&tokenRing{hosts: hosts, tokens: tokens})
-	if len(tokenReplicas) != len(tokens) {
-		t.Fatalf("expected replica map to have %d items but has %d", len(tokens), len(tokenReplicas))
-	}
-	if !sort.IsSorted(tokenReplicas) {
-		t.Fatal("replica map was not sorted by token")
-	}
+	for _, strat := range strats {
+		var expReplicas int
+		for _, rf := range strat.dcs {
+			expReplicas += rf
+		}
 
-	for token, replicas := range tokenReplicas {
-		if len(replicas.hosts) != expReplicas {
-			t.Fatalf("expected to have %d replicas got %d for token=%v", expReplicas, len(replicas.hosts), token)
+		tokenReplicas := strat.replicaMap(&tokenRing{hosts: hosts, tokens: tokens})
+		if needTokens := hostsPerDC * len(strat.dcs); len(tokenReplicas) != needTokens {
+			t.Fatalf("expected replica map to have %d items but has %d", needTokens, len(tokenReplicas))
+		}
+		if !sort.IsSorted(tokenReplicas) {
+			t.Fatal("replica map was not sorted by token")
 		}
-	}
 
-	for dc, rf := range strat.dcs {
-		dcTokens := dcRing[dc]
-		for i, th := range dcTokens {
-			token := th.token
-			allReplicas := tokenReplicas.replicasFor(token)
-			if allReplicas.token != token {
-				t.Fatalf("token %v not in replica map", token)
+		for token, replicas := range tokenReplicas {
+			if len(replicas.hosts) != expReplicas {
+				t.Fatalf("expected to have %d replicas got %d for token=%v", expReplicas, len(replicas.hosts), token)
 			}
+		}
 
-			var replicas []*HostInfo
-			for _, replica := range allReplicas.hosts {
-				if replica.dataCenter == dc {
-					replicas = append(replicas, replica)
+		for dc, rf := range strat.dcs {
+			dcTokens := dcRing[dc]
+			for i, th := range dcTokens {
+				token := th.token
+				allReplicas := tokenReplicas.replicasFor(token)
+				if allReplicas.token != token {
+					t.Fatalf("token %v not in replica map", token)
 				}
-			}
 
-			if len(replicas) != rf {
-				t.Fatalf("expected %d replicas in dc %q got %d", rf, dc, len(replicas))
-			}
+				var replicas []*HostInfo
+				for _, replica := range allReplicas.hosts {
+					if replica.dataCenter == dc {
+						replicas = append(replicas, replica)
+					}
+				}
+
+				if len(replicas) != rf {
+					t.Fatalf("expected %d replicas in dc %q got %d", rf, dc, len(replicas))
+				}
 
-			var lastRack string
-			for j, replica := range replicas {
-				// expected is in the next rack
-				var exp *HostInfo
-				if lastRack == "" {
-					// primary, first replica
-					exp = dcTokens[(i+j)%len(dcTokens)].host
-				} else {
-					for k := 0; k < len(dcTokens); k++ {
-						// walk around the ring from i + j to find the next host the
-						// next rack
-						p := (i + j + k) % len(dcTokens)
-						h := dcTokens[p].host
-						if h.rack != lastRack {
-							exp = h
-							break
+				var lastRack string
+				for j, replica := range replicas {
+					// expected is in the next rack
+					var exp *HostInfo
+					if lastRack == "" {
+						// primary, first replica
+						exp = dcTokens[(i+j)%len(dcTokens)].host
+					} else {
+						for k := 0; k < len(dcTokens); k++ {
+							// walk around the ring from i + j to find the next host the
+							// next rack
+							p := (i + j + k) % len(dcTokens)
+							h := dcTokens[p].host
+							if h.rack != lastRack {
+								exp = h
+								break
+							}
+						}
+						if exp.rack == lastRack {
+							panic("no more racks")
 						}
 					}
-					if exp.rack == lastRack {
-						panic("no more racks")
-					}
+					lastRack = replica.rack
 				}
-				lastRack = replica.rack
 			}
 		}
 	}