Selaa lähdekoodia

Add NonLocalReplicasFallback option to TokenAwareHostPolicy (#1328)

* Add NonLocalReplicasFallback option to TokenAwareHostPolicy
* Add a test for TokenAware and DCAwareRR host policies used together

Token aware policy with DC aware RR as fallback behaves in a way
that might surprise some users. This combination queries the
host selected by token in the local DC first and since all hosts
for that token are in another DC, falls back to other hosts in
the local DC.

Moreover for retry count > replication factor, the host selected
by token aware policy is only queried once.

This option allows to fallback to nodes based by token in
remote DCs before falling back to other nodes in local DC.
This is useful in particular when used with
{'class': 'NetworkTopologyStrategy', 'a': '1', 'b': '1', 'c': '1'}
Martin Sucha 6 vuotta sitten
vanhempi
commit
e163eff7a8
4 muutettua tiedostoa jossa 456 lisäystä ja 12 poistoa
  1. 7 0
      common_test.go
  2. 35 12
      policies.go
  3. 408 0
      policies_test.go
  4. 6 0
      session.go

+ 7 - 0
common_test.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"log"
 	"net"
+	"reflect"
 	"strings"
 	"sync"
 	"testing"
@@ -232,6 +233,12 @@ func assertEqual(t *testing.T, description string, expected, actual interface{})
 	}
 }
 
+func assertDeepEqual(t *testing.T, description string, expected, actual interface{}) {
+	if !reflect.DeepEqual(expected, actual) {
+		t.Errorf("expected %s to be (%+v) but was (%+v) instead", description, expected, actual)
+	}
+}
+
 func assertNil(t *testing.T, description string, actual interface{}) {
 	if actual != nil {
 		t.Errorf("expected %s to be (nil) but was (%+v) instead", description, actual)

+ 35 - 12
policies.go

@@ -387,6 +387,18 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) {
 	}
 }
 
+// NonLocalReplicasFallback enables fallback to replicas that are not considered local.
+//
+// TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then
+// falls back to other nodes in the local DC. Enabling NonLocalReplicasFallback causes TokenAwareHostPolicy
+// to first select replicas by partition key in local DC, then replicas by partition key in remote DCs and fall back
+// to other nodes in local DC.
+func NonLocalReplicasFallback() func(policy *tokenAwareHostPolicy) {
+	return func(t *tokenAwareHostPolicy) {
+		t.nonLocalReplicasFallback = true
+	}
+}
+
 // TokenAwareHostPolicy is a token aware host selection policy, where hosts are
 // selected based on the partition key, so queries are sent to the host which
 // owns the partition. Fallback is used when routing information is not available.
@@ -410,8 +422,11 @@ type clusterMeta struct {
 
 type tokenAwareHostPolicy struct {
 	fallback    HostSelectionPolicy
-	session     *Session
+	getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error)
+	getKeyspaceName func() string
+
 	shuffleReplicas          bool
+	nonLocalReplicasFallback bool
 
 	// mu protects writes to hosts, partitioner, metadata.
 	// reads can be unlocked as long as they are not used for updating state later.
@@ -422,7 +437,8 @@ type tokenAwareHostPolicy struct {
 }
 
 func (t *tokenAwareHostPolicy) Init(s *Session) {
-	t.session = s
+	t.getKeyspaceMetadata = s.KeyspaceMetadata
+	t.getKeyspaceName = func() string {return s.cfg.Keyspace}
 }
 
 func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool {
@@ -443,7 +459,7 @@ func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) {
 func (t *tokenAwareHostPolicy) updateReplicas(meta *clusterMeta, keyspace string) {
 	newReplicas := make(map[string]map[token][]*HostInfo, len(meta.replicas))
 
-	ks, err := t.session.KeyspaceMetadata(keyspace)
+	ks, err := t.getKeyspaceMetadata(keyspace)
 	if err == nil {
 		strat := getStrategy(ks)
 		if strat != nil {
@@ -472,9 +488,7 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) {
 		t.partitioner = partitioner
 		meta := t.getMetadataForUpdate()
 		meta.resetTokenRing(t.partitioner, t.hosts.get())
-		if t.session != nil { // disable for unit tests
-			t.updateReplicas(meta, t.session.cfg.Keyspace)
-		}
+		t.updateReplicas(meta, t.getKeyspaceName())
 		t.metadata.Store(meta)
 	}
 }
@@ -484,9 +498,7 @@ func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) {
 	if t.hosts.add(host) {
 		meta := t.getMetadataForUpdate()
 		meta.resetTokenRing(t.partitioner, t.hosts.get())
-		if t.session != nil { // disable for unit tests
-			t.updateReplicas(meta, t.session.cfg.Keyspace)
-		}
+		t.updateReplicas(meta, t.getKeyspaceName())
 		t.metadata.Store(meta)
 	}
 	t.mu.Unlock()
@@ -499,9 +511,7 @@ func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) {
 	if t.hosts.remove(host.ConnectAddress()) {
 		meta := t.getMetadataForUpdate()
 		meta.resetTokenRing(t.partitioner, t.hosts.get())
-		if t.session != nil { // disable for unit tests
-			t.updateReplicas(meta, t.session.cfg.Keyspace)
-		}
+		t.updateReplicas(meta, t.getKeyspaceName())
 		t.metadata.Store(meta)
 	}
 	t.mu.Unlock()
@@ -597,6 +607,7 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
 	var (
 		fallbackIter NextHost
 		i            int
+		j            int
 	)
 
 	used := make(map[*HostInfo]bool, len(replicas))
@@ -611,6 +622,18 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost {
 			}
 		}
 
+		if t.nonLocalReplicasFallback {
+			for j < len(replicas) {
+				h := replicas[j]
+				j++
+
+				if h.IsUp() && !t.fallback.IsLocal(h) {
+					used[h] = true
+					return (*selectedHost)(h)
+				}
+			}
+		}
+
 		if fallbackIter == nil {
 			// fallback
 			fallbackIter = t.fallback.Pick(qry)

+ 408 - 0
policies_test.go

@@ -5,6 +5,7 @@
 package gocql
 
 import (
+	"errors"
 	"fmt"
 	"net"
 	"testing"
@@ -55,8 +56,14 @@ func TestHostPolicy_RoundRobin(t *testing.T) {
 // round-robin host selection policy fallback.
 func TestHostPolicy_TokenAware(t *testing.T) {
 	policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
+	policyInternal := policy.(*tokenAwareHostPolicy)
+	policyInternal.getKeyspaceName = func() string {return "myKeyspace"}
+	policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
+		return nil, errors.New("not initalized")
+	}
 
 	query := &Query{}
+	query.getKeyspace = func() string{return "myKeyspace"}
 
 	iter := policy.Pick(nil)
 	if iter == nil {
@@ -91,6 +98,32 @@ func TestHostPolicy_TokenAware(t *testing.T) {
 
 	policy.SetPartitioner("OrderedPartitioner")
 
+	policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
+		if keyspaceName != "myKeyspace" {
+			return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
+		}
+		return &KeyspaceMetadata{
+			Name: "myKeyspace",
+			StrategyClass: "SimpleStrategy",
+			StrategyOptions: map[string]interface{} {
+				"class": "SimpleStrategy",
+				"replication_factor": 2,
+			},
+		}, nil
+	}
+	policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
+
+	// The SimpleStrategy above should generate the following replicas.
+	// It's handy to have as reference here.
+	assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
+		"myKeyspace": {
+			orderedToken("00"): {hosts[0], hosts[1]},
+			orderedToken("25"): {hosts[1], hosts[2]},
+			orderedToken("50"): {hosts[2], hosts[3]},
+			orderedToken("75"): {hosts[3], hosts[0]},
+		},
+	}, policyInternal.getMetadataReadOnly().replicas)
+
 	// now the token ring is configured
 	query.RoutingKey([]byte("20"))
 	iter = policy.Pick(query)
@@ -177,6 +210,11 @@ func TestHostPolicy_RoundRobin_NilHostInfo(t *testing.T) {
 
 func TestHostPolicy_TokenAware_NilHostInfo(t *testing.T) {
 	policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
+	policyInternal := policy.(*tokenAwareHostPolicy)
+	policyInternal.getKeyspaceName = func() string {return "myKeyspace"}
+	policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
+		return nil, errors.New("not initialized")
+	}
 
 	hosts := [...]*HostInfo{
 		{connectAddress: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}},
@@ -190,6 +228,7 @@ func TestHostPolicy_TokenAware_NilHostInfo(t *testing.T) {
 	policy.SetPartitioner("OrderedPartitioner")
 
 	query := &Query{}
+	query.getKeyspace = func() string {return "myKeyspace"}
 	query.RoutingKey([]byte("20"))
 
 	iter := policy.Pick(query)
@@ -408,3 +447,372 @@ func TestHostPolicy_DCAwareRR(t *testing.T) {
 	}
 
 }
+
+
+// Tests of the token-aware host selection policy implementation with a
+// DC aware round-robin host selection policy fallback
+// with {"class": "NetworkTopologyStrategy", "a": 1, "b": 1, "c": 1} replication.
+func TestHostPolicy_TokenAware_DCAwareRR(t *testing.T) {
+	policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"))
+	policyInternal := policy.(*tokenAwareHostPolicy)
+	policyInternal.getKeyspaceName = func() string {return "myKeyspace"}
+	policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
+		return nil, errors.New("not initialized")
+	}
+
+	query := &Query{}
+	query.getKeyspace = func() string {return "myKeyspace"}
+
+	iter := policy.Pick(nil)
+	if iter == nil {
+		t.Fatal("host iterator was nil")
+	}
+	actual := iter()
+	if actual != nil {
+		t.Fatalf("expected nil from iterator, but was %v", actual)
+	}
+
+	// set the hosts
+	hosts := [...]*HostInfo{
+		{hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"},
+		{hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"},
+		{hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"},
+		{hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"},
+		{hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"},
+		{hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"},
+		{hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"},
+		{hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"},
+		{hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"},
+		{hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"},
+		{hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"},
+		{hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
+	}
+	for _, host := range hosts {
+		policy.AddHost(host)
+	}
+
+	// the token ring is not setup without the partitioner, but the fallback
+	// should work
+	if actual := policy.Pick(nil)(); actual.Info().HostID() != "1" {
+		t.Errorf("Expected host 1 but was %s", actual.Info().HostID())
+	}
+
+	query.RoutingKey([]byte("30"))
+	if actual := policy.Pick(query)(); actual.Info().HostID() != "4" {
+		t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
+	}
+
+	policy.SetPartitioner("OrderedPartitioner")
+
+
+	policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
+		if keyspaceName != "myKeyspace" {
+			return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
+		}
+		return &KeyspaceMetadata{
+			Name: "myKeyspace",
+			StrategyClass: "NetworkTopologyStrategy",
+			StrategyOptions: map[string]interface{} {
+				"class": "NetworkTopologyStrategy",
+				"local": 1,
+				"remote1": 1,
+				"remote2": 1,
+			},
+		}, nil
+	}
+	policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
+
+	// The NetworkTopologyStrategy above should generate the following replicas.
+	// It's handy to have as reference here.
+	assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
+		"myKeyspace": {
+			orderedToken("05"): {hosts[0], hosts[1], hosts[2]},
+			orderedToken("10"): {hosts[1], hosts[2], hosts[3]},
+			orderedToken("15"): {hosts[2], hosts[3], hosts[4]},
+			orderedToken("20"): {hosts[3], hosts[4], hosts[5]},
+			orderedToken("25"): {hosts[4], hosts[5], hosts[6]},
+			orderedToken("30"): {hosts[5], hosts[6], hosts[7]},
+			orderedToken("35"): {hosts[6], hosts[7], hosts[8]},
+			orderedToken("40"): {hosts[7], hosts[8], hosts[9]},
+			orderedToken("45"): {hosts[8], hosts[9], hosts[10]},
+			orderedToken("50"): {hosts[9], hosts[10], hosts[11]},
+			orderedToken("55"): {hosts[10], hosts[11], hosts[0]},
+			orderedToken("60"): {hosts[11], hosts[0], hosts[1]},
+		},
+	}, policyInternal.getMetadataReadOnly().replicas)
+
+	// now the token ring is configured
+	query.RoutingKey([]byte("23"))
+	iter = policy.Pick(query)
+	// first should be host with matching token from the local DC
+	if actual := iter(); actual.Info().HostID() != "4" {
+		t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
+	}
+	// rest are according DCAwareRR from local DC only, starting with 7 as the fallback was used twice above
+	if actual := iter(); actual.Info().HostID() != "7" {
+		t.Errorf("Expected peer 7 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "10" {
+		t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "1" {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
+	}
+	// and it starts to repeat now without host 4...
+	if actual := iter(); actual.Info().HostID() != "7" {
+		t.Errorf("Expected peer 7 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "10" {
+		t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "1" {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
+	}
+}
+
+// Tests of the token-aware host selection policy implementation with a
+// DC aware round-robin host selection policy fallback
+// with {"class": "NetworkTopologyStrategy", "a": 2, "b": 2, "c": 2} replication.
+func TestHostPolicy_TokenAware_DCAwareRR2(t *testing.T) {
+	policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"))
+	policyInternal := policy.(*tokenAwareHostPolicy)
+	policyInternal.getKeyspaceName = func() string {return "myKeyspace"}
+	policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
+		return nil, errors.New("not initialized")
+	}
+
+	query := &Query{}
+	query.getKeyspace = func() string {return "myKeyspace"}
+
+	iter := policy.Pick(nil)
+	if iter == nil {
+		t.Fatal("host iterator was nil")
+	}
+	actual := iter()
+	if actual != nil {
+		t.Fatalf("expected nil from iterator, but was %v", actual)
+	}
+
+	// set the hosts
+	hosts := [...]*HostInfo{
+		{hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"},
+		{hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"},
+		{hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"},
+		{hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"},
+		{hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"},
+		{hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"},
+		{hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"},
+		{hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"},
+		{hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"},
+		{hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"},
+		{hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"},
+		{hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
+	}
+	for _, host := range hosts {
+		policy.AddHost(host)
+	}
+
+	// the token ring is not setup without the partitioner, but the fallback
+	// should work
+	if actual := policy.Pick(nil)(); actual.Info().HostID() != "1" {
+		t.Errorf("Expected host 1 but was %s", actual.Info().HostID())
+	}
+
+	query.RoutingKey([]byte("30"))
+	if actual := policy.Pick(query)(); actual.Info().HostID() != "4" {
+		t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
+	}
+
+	// advance the index in round-robin so that the next expected value does not overlap with the one selected by token.
+	policy.Pick(query)()
+	policy.Pick(query)()
+
+	policy.SetPartitioner("OrderedPartitioner")
+
+	policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
+		if keyspaceName != "myKeyspace" {
+			return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
+		}
+		return &KeyspaceMetadata{
+			Name: "myKeyspace",
+			StrategyClass: "NetworkTopologyStrategy",
+			StrategyOptions: map[string]interface{} {
+				"class": "NetworkTopologyStrategy",
+				"local": 2,
+				"remote1": 2,
+				"remote2": 2,
+			},
+		}, nil
+	}
+	policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
+
+	// The NetworkTopologyStrategy above should generate the following replicas.
+	// It's handy to have as reference here.
+	assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
+		"myKeyspace": {
+			orderedToken("05"): {hosts[0], hosts[1], hosts[2], hosts[3], hosts[4], hosts[5]},
+			orderedToken("10"): {hosts[1], hosts[2], hosts[3], hosts[4], hosts[5], hosts[6]},
+			orderedToken("15"): {hosts[2], hosts[3], hosts[4], hosts[5], hosts[6], hosts[7]},
+			orderedToken("20"): {hosts[3], hosts[4], hosts[5], hosts[6], hosts[7], hosts[8]},
+			orderedToken("25"): {hosts[4], hosts[5], hosts[6], hosts[7], hosts[8], hosts[9]},
+			orderedToken("30"): {hosts[5], hosts[6], hosts[7], hosts[8], hosts[9], hosts[10]},
+			orderedToken("35"): {hosts[6], hosts[7], hosts[8], hosts[9], hosts[10], hosts[11]},
+			orderedToken("40"): {hosts[7], hosts[8], hosts[9], hosts[10], hosts[11], hosts[0]},
+			orderedToken("45"): {hosts[8], hosts[9], hosts[10], hosts[11], hosts[0], hosts[1]},
+			orderedToken("50"): {hosts[9], hosts[10], hosts[11], hosts[0], hosts[1], hosts[2]},
+			orderedToken("55"): {hosts[10], hosts[11], hosts[0], hosts[1], hosts[2], hosts[3]},
+			orderedToken("60"): {hosts[11], hosts[0], hosts[1], hosts[2], hosts[3], hosts[4]},
+		},
+	}, policyInternal.getMetadataReadOnly().replicas)
+
+	// now the token ring is configured
+	query.RoutingKey([]byte("23"))
+	iter = policy.Pick(query)
+	// first should be hosts with matching token from the local DC
+	if actual := iter(); actual.Info().HostID() != "4" {
+		t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "7" {
+		t.Errorf("Expected peer 7 but was %s", actual.Info().HostID())
+	}
+	// rest are according DCAwareRR from local DC only, starting with 7 as the fallback was used twice above
+	if actual := iter(); actual.Info().HostID() != "1" {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "10" {
+		t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
+	}
+	// and it starts to repeat now without host 4 and 7...
+	if actual := iter(); actual.Info().HostID() != "1" {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "10" {
+		t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
+	}
+}
+
+// Tests of the token-aware host selection policy implementation with a
+// DC aware round-robin host selection policy fallback with NonLocalReplicasFallback option enabled.
+func TestHostPolicy_TokenAware_DCAwareRR_NonLocalFallback(t *testing.T) {
+	policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"), NonLocalReplicasFallback())
+	policyInternal := policy.(*tokenAwareHostPolicy)
+	policyInternal.getKeyspaceName = func() string {return "myKeyspace"}
+	policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
+		return nil, errors.New("not initialized")
+	}
+
+	query := &Query{}
+	query.getKeyspace = func() string {return "myKeyspace"}
+
+	iter := policy.Pick(nil)
+	if iter == nil {
+		t.Fatal("host iterator was nil")
+	}
+	actual := iter()
+	if actual != nil {
+		t.Fatalf("expected nil from iterator, but was %v", actual)
+	}
+
+	// set the hosts
+	hosts := [...]*HostInfo{
+		{hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"},
+		{hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"},
+		{hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"},
+		{hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"},
+		{hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"},
+		{hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"},
+		{hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"},
+		{hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"},
+		{hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"},
+		{hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"},
+		{hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"},
+		{hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
+	}
+	for _, host := range hosts {
+		policy.AddHost(host)
+	}
+
+	// the token ring is not setup without the partitioner, but the fallback
+	// should work
+	if actual := policy.Pick(nil)(); actual.Info().HostID() != "1" {
+		t.Errorf("Expected host 1 but was %s", actual.Info().HostID())
+	}
+
+	query.RoutingKey([]byte("30"))
+	if actual := policy.Pick(query)(); actual.Info().HostID() != "4" {
+		t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
+	}
+
+	policy.SetPartitioner("OrderedPartitioner")
+
+	policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
+		if keyspaceName != "myKeyspace" {
+			return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
+		}
+		return &KeyspaceMetadata{
+			Name: "myKeyspace",
+			StrategyClass: "NetworkTopologyStrategy",
+			StrategyOptions: map[string]interface{} {
+				"class": "NetworkTopologyStrategy",
+				"local": 1,
+				"remote1": 1,
+				"remote2": 1,
+			},
+		}, nil
+	}
+	policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
+
+	// The NetworkTopologyStrategy above should generate the following replicas.
+	// It's handy to have as reference here.
+	assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
+		"myKeyspace": {
+			orderedToken("05"): {hosts[0], hosts[1], hosts[2]},
+			orderedToken("10"): {hosts[1], hosts[2], hosts[3]},
+			orderedToken("15"): {hosts[2], hosts[3], hosts[4]},
+			orderedToken("20"): {hosts[3], hosts[4], hosts[5]},
+			orderedToken("25"): {hosts[4], hosts[5], hosts[6]},
+			orderedToken("30"): {hosts[5], hosts[6], hosts[7]},
+			orderedToken("35"): {hosts[6], hosts[7], hosts[8]},
+			orderedToken("40"): {hosts[7], hosts[8], hosts[9]},
+			orderedToken("45"): {hosts[8], hosts[9], hosts[10]},
+			orderedToken("50"): {hosts[9], hosts[10], hosts[11]},
+			orderedToken("55"): {hosts[10], hosts[11], hosts[0]},
+			orderedToken("60"): {hosts[11], hosts[0], hosts[1]},
+		},
+	}, policyInternal.getMetadataReadOnly().replicas)
+
+	// now the token ring is configured
+	query.RoutingKey([]byte("18"))
+	iter = policy.Pick(query)
+	// first should be host with matching token from the local DC
+	if actual := iter(); actual.Info().HostID() != "4" {
+		t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
+	}
+	// rest should be hosts with matching token from remote DCs
+	if actual := iter(); actual.Info().HostID() != "3" {
+		t.Errorf("Expected peer 3 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "5" {
+		t.Errorf("Expected peer 5 but was %s", actual.Info().HostID())
+	}
+	// rest are according DCAwareRR from local DC only, starting with 7 as the fallback was used twice above
+	if actual := iter(); actual.Info().HostID() != "7" {
+		t.Errorf("Expected peer 7 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "10" {
+		t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "1" {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
+	}
+	// and it starts to repeat now without host 4...
+	if actual := iter(); actual.Info().HostID() != "7" {
+		t.Errorf("Expected peer 7 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "10" {
+		t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
+	}
+	if actual := iter(); actual.Info().HostID() != "1" {
+		t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
+	}
+}

+ 6 - 0
session.go

@@ -767,6 +767,9 @@ type Query struct {
 	metrics               *queryMetrics
 
 	disableAutoPage bool
+
+	// getKeyspace is field so that it can be overriden in tests
+	getKeyspace func() string
 }
 
 func (q *Query) defaultsFromSession() {
@@ -952,6 +955,9 @@ func (q *Query) retryPolicy() RetryPolicy {
 
 // Keyspace returns the keyspace the query will be executed against.
 func (q *Query) Keyspace() string {
+	if q.getKeyspace != nil {
+		return q.getKeyspace()
+	}
 	if q.session == nil {
 		return ""
 	}