policies_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. // Copyright (c) 2015 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "errors"
  7. "fmt"
  8. "net"
  9. "testing"
  10. "time"
  11. "github.com/hailocab/go-hostpool"
  12. )
  13. // Tests of the round-robin host selection policy implementation
  14. func TestRoundRobbin(t *testing.T) {
  15. policy := RoundRobinHostPolicy()
  16. hosts := [...]*HostInfo{
  17. {hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1)},
  18. {hostId: "1", connectAddress: net.IPv4(0, 0, 0, 2)},
  19. }
  20. for _, host := range hosts {
  21. policy.AddHost(host)
  22. }
  23. got := make(map[string]bool)
  24. it := policy.Pick(nil)
  25. for h := it(); h != nil; h = it() {
  26. id := h.Info().hostId
  27. if got[id] {
  28. t.Fatalf("got duplicate host: %v", id)
  29. }
  30. got[id] = true
  31. }
  32. if len(got) != len(hosts) {
  33. t.Fatalf("expected %d hosts got %d", len(hosts), len(got))
  34. }
  35. }
  36. // Tests of the token-aware host selection policy implementation with a
  37. // round-robin host selection policy fallback.
  38. func TestHostPolicy_TokenAware_SimpleStrategy(t *testing.T) {
  39. const keyspace = "myKeyspace"
  40. policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
  41. policyInternal := policy.(*tokenAwareHostPolicy)
  42. policyInternal.getKeyspaceName = func() string { return keyspace }
  43. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  44. return nil, errors.New("not initalized")
  45. }
  46. query := &Query{}
  47. query.getKeyspace = func() string { return keyspace }
  48. iter := policy.Pick(nil)
  49. if iter == nil {
  50. t.Fatal("host iterator was nil")
  51. }
  52. actual := iter()
  53. if actual != nil {
  54. t.Fatalf("expected nil from iterator, but was %v", actual)
  55. }
  56. // set the hosts
  57. hosts := [...]*HostInfo{
  58. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00"}},
  59. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"25"}},
  60. {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50"}},
  61. {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"75"}},
  62. }
  63. for _, host := range &hosts {
  64. policy.AddHost(host)
  65. }
  66. policy.SetPartitioner("OrderedPartitioner")
  67. policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
  68. if keyspaceName != keyspace {
  69. return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
  70. }
  71. return &KeyspaceMetadata{
  72. Name: keyspace,
  73. StrategyClass: "SimpleStrategy",
  74. StrategyOptions: map[string]interface{}{
  75. "class": "SimpleStrategy",
  76. "replication_factor": 2,
  77. },
  78. }, nil
  79. }
  80. policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace})
  81. // The SimpleStrategy above should generate the following replicas.
  82. // It's handy to have as reference here.
  83. assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
  84. "myKeyspace": {
  85. {orderedToken("00"), []*HostInfo{hosts[0], hosts[1]}},
  86. {orderedToken("25"), []*HostInfo{hosts[1], hosts[2]}},
  87. {orderedToken("50"), []*HostInfo{hosts[2], hosts[3]}},
  88. {orderedToken("75"), []*HostInfo{hosts[3], hosts[0]}},
  89. },
  90. }, policyInternal.getMetadataReadOnly().replicas)
  91. // now the token ring is configured
  92. query.RoutingKey([]byte("20"))
  93. iter = policy.Pick(query)
  94. iterCheck(t, iter, "0")
  95. iterCheck(t, iter, "1")
  96. }
  97. // Tests of the host pool host selection policy implementation
  98. func TestHostPolicy_HostPool(t *testing.T) {
  99. policy := HostPoolHostPolicy(hostpool.New(nil))
  100. hosts := []*HostInfo{
  101. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)},
  102. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)},
  103. }
  104. // Using set host to control the ordering of the hosts as calling "AddHost" iterates the map
  105. // which will result in an unpredictable ordering
  106. policy.(*hostPoolHostPolicy).SetHosts(hosts)
  107. // the first host selected is actually at [1], but this is ok for RR
  108. // interleaved iteration should always increment the host
  109. iter := policy.Pick(nil)
  110. actualA := iter()
  111. if actualA.Info().HostID() != "0" {
  112. t.Errorf("Expected hosts[0] but was hosts[%s]", actualA.Info().HostID())
  113. }
  114. actualA.Mark(nil)
  115. actualB := iter()
  116. if actualB.Info().HostID() != "1" {
  117. t.Errorf("Expected hosts[1] but was hosts[%s]", actualB.Info().HostID())
  118. }
  119. actualB.Mark(fmt.Errorf("error"))
  120. actualC := iter()
  121. if actualC.Info().HostID() != "0" {
  122. t.Errorf("Expected hosts[0] but was hosts[%s]", actualC.Info().HostID())
  123. }
  124. actualC.Mark(nil)
  125. actualD := iter()
  126. if actualD.Info().HostID() != "0" {
  127. t.Errorf("Expected hosts[0] but was hosts[%s]", actualD.Info().HostID())
  128. }
  129. actualD.Mark(nil)
  130. }
  131. func TestHostPolicy_RoundRobin_NilHostInfo(t *testing.T) {
  132. policy := RoundRobinHostPolicy()
  133. host := &HostInfo{hostId: "host-1"}
  134. policy.AddHost(host)
  135. iter := policy.Pick(nil)
  136. next := iter()
  137. if next == nil {
  138. t.Fatal("got nil host")
  139. } else if v := next.Info(); v == nil {
  140. t.Fatal("got nil HostInfo")
  141. } else if v.HostID() != host.HostID() {
  142. t.Fatalf("expected host %v got %v", host, v)
  143. }
  144. next = iter()
  145. if next != nil {
  146. t.Errorf("expected to get nil host got %+v", next)
  147. if next.Info() == nil {
  148. t.Fatalf("HostInfo is nil")
  149. }
  150. }
  151. }
  152. func TestHostPolicy_TokenAware_NilHostInfo(t *testing.T) {
  153. policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
  154. policyInternal := policy.(*tokenAwareHostPolicy)
  155. policyInternal.getKeyspaceName = func() string { return "myKeyspace" }
  156. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  157. return nil, errors.New("not initialized")
  158. }
  159. hosts := [...]*HostInfo{
  160. {connectAddress: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}},
  161. {connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"25"}},
  162. {connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"50"}},
  163. {connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"75"}},
  164. }
  165. for _, host := range hosts {
  166. policy.AddHost(host)
  167. }
  168. policy.SetPartitioner("OrderedPartitioner")
  169. query := &Query{}
  170. query.getKeyspace = func() string { return "myKeyspace" }
  171. query.RoutingKey([]byte("20"))
  172. iter := policy.Pick(query)
  173. next := iter()
  174. if next == nil {
  175. t.Fatal("got nil host")
  176. } else if v := next.Info(); v == nil {
  177. t.Fatal("got nil HostInfo")
  178. } else if !v.ConnectAddress().Equal(hosts[1].ConnectAddress()) {
  179. t.Fatalf("expected peer 1 got %v", v.ConnectAddress())
  180. }
  181. // Empty the hosts to trigger the panic when using the fallback.
  182. for _, host := range hosts {
  183. policy.RemoveHost(host)
  184. }
  185. next = iter()
  186. if next != nil {
  187. t.Errorf("expected to get nil host got %+v", next)
  188. if next.Info() == nil {
  189. t.Fatalf("HostInfo is nil")
  190. }
  191. }
  192. }
  193. func TestCOWList_Add(t *testing.T) {
  194. var cow cowHostList
  195. toAdd := [...]net.IP{net.IPv4(10, 0, 0, 1), net.IPv4(10, 0, 0, 2), net.IPv4(10, 0, 0, 3)}
  196. for _, addr := range toAdd {
  197. if !cow.add(&HostInfo{connectAddress: addr}) {
  198. t.Fatal("did not add peer which was not in the set")
  199. }
  200. }
  201. hosts := cow.get()
  202. if len(hosts) != len(toAdd) {
  203. t.Fatalf("expected to have %d hosts got %d", len(toAdd), len(hosts))
  204. }
  205. set := make(map[string]bool)
  206. for _, host := range hosts {
  207. set[string(host.ConnectAddress())] = true
  208. }
  209. for _, addr := range toAdd {
  210. if !set[string(addr)] {
  211. t.Errorf("addr was not in the host list: %q", addr)
  212. }
  213. }
  214. }
  215. // TestSimpleRetryPolicy makes sure that we only allow 1 + numRetries attempts
  216. func TestSimpleRetryPolicy(t *testing.T) {
  217. q := &Query{}
  218. // this should allow a total of 3 tries.
  219. rt := &SimpleRetryPolicy{NumRetries: 2}
  220. cases := []struct {
  221. attempts int
  222. allow bool
  223. }{
  224. {0, true},
  225. {1, true},
  226. {2, true},
  227. {3, false},
  228. {4, false},
  229. {5, false},
  230. }
  231. for _, c := range cases {
  232. q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
  233. if c.allow && !rt.Attempt(q) {
  234. t.Fatalf("should allow retry after %d attempts", c.attempts)
  235. }
  236. if !c.allow && rt.Attempt(q) {
  237. t.Fatalf("should not allow retry after %d attempts", c.attempts)
  238. }
  239. }
  240. }
  241. func TestExponentialBackoffPolicy(t *testing.T) {
  242. // test with defaults
  243. sut := &ExponentialBackoffRetryPolicy{NumRetries: 2}
  244. cases := []struct {
  245. attempts int
  246. delay time.Duration
  247. }{
  248. {1, 100 * time.Millisecond},
  249. {2, (2) * 100 * time.Millisecond},
  250. {3, (2 * 2) * 100 * time.Millisecond},
  251. {4, (2 * 2 * 2) * 100 * time.Millisecond},
  252. }
  253. for _, c := range cases {
  254. // test 100 times for each case
  255. for i := 0; i < 100; i++ {
  256. d := sut.napTime(c.attempts)
  257. if d < c.delay-(100*time.Millisecond)/2 {
  258. t.Fatalf("Delay %d less than jitter min of %d", d, c.delay-100*time.Millisecond/2)
  259. }
  260. if d > c.delay+(100*time.Millisecond)/2 {
  261. t.Fatalf("Delay %d greater than jitter max of %d", d, c.delay+100*time.Millisecond/2)
  262. }
  263. }
  264. }
  265. }
  266. func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
  267. q := &Query{cons: LocalQuorum}
  268. rewt0 := &RequestErrWriteTimeout{
  269. Received: 0,
  270. WriteType: "SIMPLE",
  271. }
  272. rewt1 := &RequestErrWriteTimeout{
  273. Received: 1,
  274. WriteType: "BATCH",
  275. }
  276. rewt2 := &RequestErrWriteTimeout{
  277. WriteType: "UNLOGGED_BATCH",
  278. }
  279. rert := &RequestErrReadTimeout{}
  280. reu0 := &RequestErrUnavailable{
  281. Alive: 0,
  282. }
  283. reu1 := &RequestErrUnavailable{
  284. Alive: 1,
  285. }
  286. // this should allow a total of 3 tries.
  287. consistencyLevels := []Consistency{Three, Two, One}
  288. rt := &DowngradingConsistencyRetryPolicy{ConsistencyLevelsToTry: consistencyLevels}
  289. cases := []struct {
  290. attempts int
  291. allow bool
  292. err error
  293. retryType RetryType
  294. }{
  295. {0, true, rewt0, Rethrow},
  296. {3, true, rewt1, Ignore},
  297. {1, true, rewt2, Retry},
  298. {2, true, rert, Retry},
  299. {4, false, reu0, Rethrow},
  300. {16, false, reu1, Retry},
  301. }
  302. for _, c := range cases {
  303. q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
  304. if c.retryType != rt.GetRetryType(c.err) {
  305. t.Fatalf("retry type should be %v", c.retryType)
  306. }
  307. if c.allow && !rt.Attempt(q) {
  308. t.Fatalf("should allow retry after %d attempts", c.attempts)
  309. }
  310. if !c.allow && rt.Attempt(q) {
  311. t.Fatalf("should not allow retry after %d attempts", c.attempts)
  312. }
  313. }
  314. }
  315. func iterCheck(t *testing.T, iter NextHost, hostID string) {
  316. t.Helper()
  317. host := iter()
  318. if host == nil || host.Info() == nil {
  319. t.Fatalf("expected hostID %s got nil", hostID)
  320. }
  321. if host.Info().HostID() != hostID {
  322. t.Fatalf("Expected peer %s but was %s", hostID, host.Info().HostID())
  323. }
  324. }
  325. func TestHostPolicy_DCAwareRR(t *testing.T) {
  326. p := DCAwareRoundRobinPolicy("local")
  327. hosts := [...]*HostInfo{
  328. {hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"},
  329. {hostId: "1", connectAddress: net.ParseIP("10.0.0.2"), dataCenter: "local"},
  330. {hostId: "2", connectAddress: net.ParseIP("10.0.0.3"), dataCenter: "remote"},
  331. {hostId: "3", connectAddress: net.ParseIP("10.0.0.4"), dataCenter: "remote"},
  332. }
  333. for _, host := range hosts {
  334. p.AddHost(host)
  335. }
  336. got := make(map[string]bool, len(hosts))
  337. var dcs []string
  338. it := p.Pick(nil)
  339. for h := it(); h != nil; h = it() {
  340. id := h.Info().hostId
  341. dc := h.Info().dataCenter
  342. if got[id] {
  343. t.Fatalf("got duplicate host %s", id)
  344. }
  345. got[id] = true
  346. dcs = append(dcs, dc)
  347. }
  348. if len(got) != len(hosts) {
  349. t.Fatalf("expected %d hosts got %d", len(hosts), len(got))
  350. }
  351. var remote bool
  352. for _, dc := range dcs {
  353. if dc == "local" {
  354. if remote {
  355. t.Fatalf("got local dc after remote: %v", dcs)
  356. }
  357. } else {
  358. remote = true
  359. }
  360. }
  361. }
  362. // Tests of the token-aware host selection policy implementation with a
  363. // DC aware round-robin host selection policy fallback
  364. // with {"class": "NetworkTopologyStrategy", "a": 1, "b": 1, "c": 1} replication.
  365. func TestHostPolicy_TokenAware(t *testing.T) {
  366. const keyspace = "myKeyspace"
  367. policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"))
  368. policyInternal := policy.(*tokenAwareHostPolicy)
  369. policyInternal.getKeyspaceName = func() string { return keyspace }
  370. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  371. return nil, errors.New("not initialized")
  372. }
  373. query := &Query{}
  374. query.getKeyspace = func() string { return keyspace }
  375. iter := policy.Pick(nil)
  376. if iter == nil {
  377. t.Fatal("host iterator was nil")
  378. }
  379. actual := iter()
  380. if actual != nil {
  381. t.Fatalf("expected nil from iterator, but was %v", actual)
  382. }
  383. // set the hosts
  384. hosts := [...]*HostInfo{
  385. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"},
  386. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"},
  387. {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"},
  388. {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"},
  389. {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"},
  390. {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"},
  391. {hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"},
  392. {hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"},
  393. {hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"},
  394. {hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"},
  395. {hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"},
  396. {hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
  397. }
  398. for _, host := range hosts {
  399. policy.AddHost(host)
  400. }
  401. // the token ring is not setup without the partitioner, but the fallback
  402. // should work
  403. if actual := policy.Pick(nil)(); actual == nil {
  404. t.Fatal("expected to get host from fallback got nil")
  405. }
  406. query.RoutingKey([]byte("30"))
  407. if actual := policy.Pick(query)(); actual == nil {
  408. t.Fatal("expected to get host from fallback got nil")
  409. }
  410. policy.SetPartitioner("OrderedPartitioner")
  411. policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
  412. if keyspaceName != keyspace {
  413. return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
  414. }
  415. return &KeyspaceMetadata{
  416. Name: keyspace,
  417. StrategyClass: "NetworkTopologyStrategy",
  418. StrategyOptions: map[string]interface{}{
  419. "class": "NetworkTopologyStrategy",
  420. "local": 1,
  421. "remote1": 1,
  422. "remote2": 1,
  423. },
  424. }, nil
  425. }
  426. policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
  427. // The NetworkTopologyStrategy above should generate the following replicas.
  428. // It's handy to have as reference here.
  429. assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
  430. "myKeyspace": {
  431. {orderedToken("05"), []*HostInfo{hosts[0], hosts[1], hosts[2]}},
  432. {orderedToken("10"), []*HostInfo{hosts[1], hosts[2], hosts[3]}},
  433. {orderedToken("15"), []*HostInfo{hosts[2], hosts[3], hosts[4]}},
  434. {orderedToken("20"), []*HostInfo{hosts[3], hosts[4], hosts[5]}},
  435. {orderedToken("25"), []*HostInfo{hosts[4], hosts[5], hosts[6]}},
  436. {orderedToken("30"), []*HostInfo{hosts[5], hosts[6], hosts[7]}},
  437. {orderedToken("35"), []*HostInfo{hosts[6], hosts[7], hosts[8]}},
  438. {orderedToken("40"), []*HostInfo{hosts[7], hosts[8], hosts[9]}},
  439. {orderedToken("45"), []*HostInfo{hosts[8], hosts[9], hosts[10]}},
  440. {orderedToken("50"), []*HostInfo{hosts[9], hosts[10], hosts[11]}},
  441. {orderedToken("55"), []*HostInfo{hosts[10], hosts[11], hosts[0]}},
  442. {orderedToken("60"), []*HostInfo{hosts[11], hosts[0], hosts[1]}},
  443. },
  444. }, policyInternal.getMetadataReadOnly().replicas)
  445. // now the token ring is configured
  446. query.RoutingKey([]byte("23"))
  447. iter = policy.Pick(query)
  448. // first should be host with matching token from the local DC
  449. iterCheck(t, iter, "4")
  450. // next are in non deterministic order
  451. }
  452. // Tests of the token-aware host selection policy implementation with a
  453. // DC aware round-robin host selection policy fallback
  454. // with {"class": "NetworkTopologyStrategy", "a": 2, "b": 2, "c": 2} replication.
  455. func TestHostPolicy_TokenAware_NetworkStrategy(t *testing.T) {
  456. const keyspace = "myKeyspace"
  457. policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"), NonLocalReplicasFallback())
  458. policyInternal := policy.(*tokenAwareHostPolicy)
  459. policyInternal.getKeyspaceName = func() string { return keyspace }
  460. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  461. return nil, errors.New("not initialized")
  462. }
  463. query := &Query{}
  464. query.getKeyspace = func() string { return keyspace }
  465. iter := policy.Pick(nil)
  466. if iter == nil {
  467. t.Fatal("host iterator was nil")
  468. }
  469. actual := iter()
  470. if actual != nil {
  471. t.Fatalf("expected nil from iterator, but was %v", actual)
  472. }
  473. // set the hosts
  474. hosts := [...]*HostInfo{
  475. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"},
  476. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"},
  477. {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"},
  478. {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"}, // 1
  479. {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"}, // 2
  480. {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"}, // 3
  481. {hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"}, // 4
  482. {hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"}, // 5
  483. {hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"}, // 6
  484. {hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"},
  485. {hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"},
  486. {hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
  487. }
  488. for _, host := range hosts {
  489. policy.AddHost(host)
  490. }
  491. policy.SetPartitioner("OrderedPartitioner")
  492. policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
  493. if keyspaceName != keyspace {
  494. return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
  495. }
  496. return &KeyspaceMetadata{
  497. Name: keyspace,
  498. StrategyClass: "NetworkTopologyStrategy",
  499. StrategyOptions: map[string]interface{}{
  500. "class": "NetworkTopologyStrategy",
  501. "local": 2,
  502. "remote1": 2,
  503. "remote2": 2,
  504. },
  505. }, nil
  506. }
  507. policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace})
  508. // The NetworkTopologyStrategy above should generate the following replicas.
  509. // It's handy to have as reference here.
  510. assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
  511. keyspace: {
  512. {orderedToken("05"), []*HostInfo{hosts[0], hosts[1], hosts[2], hosts[3], hosts[4], hosts[5]}},
  513. {orderedToken("10"), []*HostInfo{hosts[1], hosts[2], hosts[3], hosts[4], hosts[5], hosts[6]}},
  514. {orderedToken("15"), []*HostInfo{hosts[2], hosts[3], hosts[4], hosts[5], hosts[6], hosts[7]}},
  515. {orderedToken("20"), []*HostInfo{hosts[3], hosts[4], hosts[5], hosts[6], hosts[7], hosts[8]}},
  516. {orderedToken("25"), []*HostInfo{hosts[4], hosts[5], hosts[6], hosts[7], hosts[8], hosts[9]}},
  517. {orderedToken("30"), []*HostInfo{hosts[5], hosts[6], hosts[7], hosts[8], hosts[9], hosts[10]}},
  518. {orderedToken("35"), []*HostInfo{hosts[6], hosts[7], hosts[8], hosts[9], hosts[10], hosts[11]}},
  519. {orderedToken("40"), []*HostInfo{hosts[7], hosts[8], hosts[9], hosts[10], hosts[11], hosts[0]}},
  520. {orderedToken("45"), []*HostInfo{hosts[8], hosts[9], hosts[10], hosts[11], hosts[0], hosts[1]}},
  521. {orderedToken("50"), []*HostInfo{hosts[9], hosts[10], hosts[11], hosts[0], hosts[1], hosts[2]}},
  522. {orderedToken("55"), []*HostInfo{hosts[10], hosts[11], hosts[0], hosts[1], hosts[2], hosts[3]}},
  523. {orderedToken("60"), []*HostInfo{hosts[11], hosts[0], hosts[1], hosts[2], hosts[3], hosts[4]}},
  524. },
  525. }, policyInternal.getMetadataReadOnly().replicas)
  526. // now the token ring is configured
  527. query.RoutingKey([]byte("23"))
  528. iter = policy.Pick(query)
  529. // first should be hosts with matching token from the local DC
  530. iterCheck(t, iter, "4")
  531. iterCheck(t, iter, "7")
  532. // rest should be hosts with matching token from remote DCs
  533. iterCheck(t, iter, "3")
  534. iterCheck(t, iter, "5")
  535. iterCheck(t, iter, "6")
  536. iterCheck(t, iter, "8")
  537. }