policies_test.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. // Copyright (c) 2015 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "errors"
  7. "fmt"
  8. "net"
  9. "testing"
  10. "time"
  11. "github.com/hailocab/go-hostpool"
  12. )
  13. // Tests of the round-robin host selection policy implementation
  14. func TestHostPolicy_RoundRobin(t *testing.T) {
  15. policy := RoundRobinHostPolicy()
  16. hosts := [...]*HostInfo{
  17. {hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1)},
  18. {hostId: "1", connectAddress: net.IPv4(0, 0, 0, 2)},
  19. }
  20. for _, host := range hosts {
  21. policy.AddHost(host)
  22. }
  23. // interleaved iteration should always increment the host
  24. iterA := policy.Pick(nil)
  25. if actual := iterA(); actual.Info() != hosts[0] {
  26. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  27. }
  28. iterB := policy.Pick(nil)
  29. if actual := iterB(); actual.Info() != hosts[1] {
  30. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  31. }
  32. if actual := iterB(); actual.Info() != hosts[0] {
  33. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  34. }
  35. if actual := iterA(); actual.Info() != hosts[1] {
  36. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  37. }
  38. iterC := policy.Pick(nil)
  39. if actual := iterC(); actual.Info() != hosts[0] {
  40. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  41. }
  42. if actual := iterC(); actual.Info() != hosts[1] {
  43. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  44. }
  45. }
  46. // Tests of the token-aware host selection policy implementation with a
  47. // round-robin host selection policy fallback.
  48. func TestHostPolicy_TokenAware(t *testing.T) {
  49. const keyspace = "myKeyspace"
  50. policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
  51. policyInternal := policy.(*tokenAwareHostPolicy)
  52. policyInternal.getKeyspaceName = func() string { return keyspace }
  53. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  54. return nil, errors.New("not initalized")
  55. }
  56. query := &Query{}
  57. query.getKeyspace = func() string { return keyspace }
  58. iter := policy.Pick(nil)
  59. if iter == nil {
  60. t.Fatal("host iterator was nil")
  61. }
  62. actual := iter()
  63. if actual != nil {
  64. t.Fatalf("expected nil from iterator, but was %v", actual)
  65. }
  66. // set the hosts
  67. hosts := [...]*HostInfo{
  68. {connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00"}},
  69. {connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"25"}},
  70. {connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50"}},
  71. {connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"75"}},
  72. }
  73. for _, host := range hosts {
  74. policy.AddHost(host)
  75. }
  76. // the token ring is not setup without the partitioner, but the fallback
  77. // should work
  78. if actual := policy.Pick(nil)(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) {
  79. t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress())
  80. }
  81. query.RoutingKey([]byte("30"))
  82. if actual := policy.Pick(query)(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) {
  83. t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress())
  84. }
  85. policy.SetPartitioner("OrderedPartitioner")
  86. policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
  87. if keyspaceName != keyspace {
  88. return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
  89. }
  90. return &KeyspaceMetadata{
  91. Name: keyspace,
  92. StrategyClass: "SimpleStrategy",
  93. StrategyOptions: map[string]interface{}{
  94. "class": "SimpleStrategy",
  95. "replication_factor": 2,
  96. },
  97. }, nil
  98. }
  99. policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace})
  100. // The SimpleStrategy above should generate the following replicas.
  101. // It's handy to have as reference here.
  102. assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
  103. "myKeyspace": {
  104. {orderedToken("00"), []*HostInfo{hosts[0], hosts[1]}},
  105. {orderedToken("25"), []*HostInfo{hosts[1], hosts[2]}},
  106. {orderedToken("50"), []*HostInfo{hosts[2], hosts[3]}},
  107. {orderedToken("75"), []*HostInfo{hosts[3], hosts[0]}},
  108. },
  109. }, policyInternal.getMetadataReadOnly().replicas)
  110. // now the token ring is configured
  111. query.RoutingKey([]byte("20"))
  112. iter = policy.Pick(query)
  113. if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) {
  114. t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress())
  115. }
  116. // rest are round robin
  117. if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[2].ConnectAddress()) {
  118. t.Errorf("Expected peer 2 but was %s", actual.Info().ConnectAddress())
  119. }
  120. if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[3].ConnectAddress()) {
  121. t.Errorf("Expected peer 3 but was %s", actual.Info().ConnectAddress())
  122. }
  123. if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) {
  124. t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress())
  125. }
  126. }
  127. // Tests of the host pool host selection policy implementation
  128. func TestHostPolicy_HostPool(t *testing.T) {
  129. policy := HostPoolHostPolicy(hostpool.New(nil))
  130. hosts := []*HostInfo{
  131. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)},
  132. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)},
  133. }
  134. // Using set host to control the ordering of the hosts as calling "AddHost" iterates the map
  135. // which will result in an unpredictable ordering
  136. policy.(*hostPoolHostPolicy).SetHosts(hosts)
  137. // the first host selected is actually at [1], but this is ok for RR
  138. // interleaved iteration should always increment the host
  139. iter := policy.Pick(nil)
  140. actualA := iter()
  141. if actualA.Info().HostID() != "0" {
  142. t.Errorf("Expected hosts[0] but was hosts[%s]", actualA.Info().HostID())
  143. }
  144. actualA.Mark(nil)
  145. actualB := iter()
  146. if actualB.Info().HostID() != "1" {
  147. t.Errorf("Expected hosts[1] but was hosts[%s]", actualB.Info().HostID())
  148. }
  149. actualB.Mark(fmt.Errorf("error"))
  150. actualC := iter()
  151. if actualC.Info().HostID() != "0" {
  152. t.Errorf("Expected hosts[0] but was hosts[%s]", actualC.Info().HostID())
  153. }
  154. actualC.Mark(nil)
  155. actualD := iter()
  156. if actualD.Info().HostID() != "0" {
  157. t.Errorf("Expected hosts[0] but was hosts[%s]", actualD.Info().HostID())
  158. }
  159. actualD.Mark(nil)
  160. }
  161. func TestHostPolicy_RoundRobin_NilHostInfo(t *testing.T) {
  162. policy := RoundRobinHostPolicy()
  163. host := &HostInfo{hostId: "host-1"}
  164. policy.AddHost(host)
  165. iter := policy.Pick(nil)
  166. next := iter()
  167. if next == nil {
  168. t.Fatal("got nil host")
  169. } else if v := next.Info(); v == nil {
  170. t.Fatal("got nil HostInfo")
  171. } else if v.HostID() != host.HostID() {
  172. t.Fatalf("expected host %v got %v", host, v)
  173. }
  174. next = iter()
  175. if next != nil {
  176. t.Errorf("expected to get nil host got %+v", next)
  177. if next.Info() == nil {
  178. t.Fatalf("HostInfo is nil")
  179. }
  180. }
  181. }
  182. func TestHostPolicy_TokenAware_NilHostInfo(t *testing.T) {
  183. policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
  184. policyInternal := policy.(*tokenAwareHostPolicy)
  185. policyInternal.getKeyspaceName = func() string { return "myKeyspace" }
  186. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  187. return nil, errors.New("not initialized")
  188. }
  189. hosts := [...]*HostInfo{
  190. {connectAddress: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}},
  191. {connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"25"}},
  192. {connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"50"}},
  193. {connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"75"}},
  194. }
  195. for _, host := range hosts {
  196. policy.AddHost(host)
  197. }
  198. policy.SetPartitioner("OrderedPartitioner")
  199. query := &Query{}
  200. query.getKeyspace = func() string { return "myKeyspace" }
  201. query.RoutingKey([]byte("20"))
  202. iter := policy.Pick(query)
  203. next := iter()
  204. if next == nil {
  205. t.Fatal("got nil host")
  206. } else if v := next.Info(); v == nil {
  207. t.Fatal("got nil HostInfo")
  208. } else if !v.ConnectAddress().Equal(hosts[1].ConnectAddress()) {
  209. t.Fatalf("expected peer 1 got %v", v.ConnectAddress())
  210. }
  211. // Empty the hosts to trigger the panic when using the fallback.
  212. for _, host := range hosts {
  213. policy.RemoveHost(host)
  214. }
  215. next = iter()
  216. if next != nil {
  217. t.Errorf("expected to get nil host got %+v", next)
  218. if next.Info() == nil {
  219. t.Fatalf("HostInfo is nil")
  220. }
  221. }
  222. }
  223. func TestCOWList_Add(t *testing.T) {
  224. var cow cowHostList
  225. toAdd := [...]net.IP{net.IPv4(10, 0, 0, 1), net.IPv4(10, 0, 0, 2), net.IPv4(10, 0, 0, 3)}
  226. for _, addr := range toAdd {
  227. if !cow.add(&HostInfo{connectAddress: addr}) {
  228. t.Fatal("did not add peer which was not in the set")
  229. }
  230. }
  231. hosts := cow.get()
  232. if len(hosts) != len(toAdd) {
  233. t.Fatalf("expected to have %d hosts got %d", len(toAdd), len(hosts))
  234. }
  235. set := make(map[string]bool)
  236. for _, host := range hosts {
  237. set[string(host.ConnectAddress())] = true
  238. }
  239. for _, addr := range toAdd {
  240. if !set[string(addr)] {
  241. t.Errorf("addr was not in the host list: %q", addr)
  242. }
  243. }
  244. }
  245. // TestSimpleRetryPolicy makes sure that we only allow 1 + numRetries attempts
  246. func TestSimpleRetryPolicy(t *testing.T) {
  247. q := &Query{}
  248. // this should allow a total of 3 tries.
  249. rt := &SimpleRetryPolicy{NumRetries: 2}
  250. cases := []struct {
  251. attempts int
  252. allow bool
  253. }{
  254. {0, true},
  255. {1, true},
  256. {2, true},
  257. {3, false},
  258. {4, false},
  259. {5, false},
  260. }
  261. for _, c := range cases {
  262. q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
  263. if c.allow && !rt.Attempt(q) {
  264. t.Fatalf("should allow retry after %d attempts", c.attempts)
  265. }
  266. if !c.allow && rt.Attempt(q) {
  267. t.Fatalf("should not allow retry after %d attempts", c.attempts)
  268. }
  269. }
  270. }
  271. func TestExponentialBackoffPolicy(t *testing.T) {
  272. // test with defaults
  273. sut := &ExponentialBackoffRetryPolicy{NumRetries: 2}
  274. cases := []struct {
  275. attempts int
  276. delay time.Duration
  277. }{
  278. {1, 100 * time.Millisecond},
  279. {2, (2) * 100 * time.Millisecond},
  280. {3, (2 * 2) * 100 * time.Millisecond},
  281. {4, (2 * 2 * 2) * 100 * time.Millisecond},
  282. }
  283. for _, c := range cases {
  284. // test 100 times for each case
  285. for i := 0; i < 100; i++ {
  286. d := sut.napTime(c.attempts)
  287. if d < c.delay-(100*time.Millisecond)/2 {
  288. t.Fatalf("Delay %d less than jitter min of %d", d, c.delay-100*time.Millisecond/2)
  289. }
  290. if d > c.delay+(100*time.Millisecond)/2 {
  291. t.Fatalf("Delay %d greater than jitter max of %d", d, c.delay+100*time.Millisecond/2)
  292. }
  293. }
  294. }
  295. }
  296. func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
  297. q := &Query{cons: LocalQuorum}
  298. rewt0 := &RequestErrWriteTimeout{
  299. Received: 0,
  300. WriteType: "SIMPLE",
  301. }
  302. rewt1 := &RequestErrWriteTimeout{
  303. Received: 1,
  304. WriteType: "BATCH",
  305. }
  306. rewt2 := &RequestErrWriteTimeout{
  307. WriteType: "UNLOGGED_BATCH",
  308. }
  309. rert := &RequestErrReadTimeout{}
  310. reu0 := &RequestErrUnavailable{
  311. Alive: 0,
  312. }
  313. reu1 := &RequestErrUnavailable{
  314. Alive: 1,
  315. }
  316. // this should allow a total of 3 tries.
  317. consistencyLevels := []Consistency{Three, Two, One}
  318. rt := &DowngradingConsistencyRetryPolicy{ConsistencyLevelsToTry: consistencyLevels}
  319. cases := []struct {
  320. attempts int
  321. allow bool
  322. err error
  323. retryType RetryType
  324. }{
  325. {0, true, rewt0, Rethrow},
  326. {3, true, rewt1, Ignore},
  327. {1, true, rewt2, Retry},
  328. {2, true, rert, Retry},
  329. {4, false, reu0, Rethrow},
  330. {16, false, reu1, Retry},
  331. }
  332. for _, c := range cases {
  333. q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
  334. if c.retryType != rt.GetRetryType(c.err) {
  335. t.Fatalf("retry type should be %v", c.retryType)
  336. }
  337. if c.allow && !rt.Attempt(q) {
  338. t.Fatalf("should allow retry after %d attempts", c.attempts)
  339. }
  340. if !c.allow && rt.Attempt(q) {
  341. t.Fatalf("should not allow retry after %d attempts", c.attempts)
  342. }
  343. }
  344. }
  345. func iterCheck(t *testing.T, iter NextHost, hostID string) {
  346. t.Helper()
  347. host := iter()
  348. if host == nil || host.Info() == nil {
  349. t.Fatalf("expected hostID %s got nil", hostID)
  350. }
  351. if host.Info().HostID() != hostID {
  352. t.Fatalf("Expected peer %s but was %s", hostID, host.Info().HostID())
  353. }
  354. }
  355. func TestHostPolicy_DCAwareRR(t *testing.T) {
  356. p := DCAwareRoundRobinPolicy("local")
  357. hosts := [...]*HostInfo{
  358. {hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"},
  359. {hostId: "1", connectAddress: net.ParseIP("10.0.0.2"), dataCenter: "local"},
  360. {hostId: "2", connectAddress: net.ParseIP("10.0.0.3"), dataCenter: "remote"},
  361. {hostId: "3", connectAddress: net.ParseIP("10.0.0.4"), dataCenter: "remote"},
  362. }
  363. for _, host := range hosts {
  364. p.AddHost(host)
  365. }
  366. // interleaved iteration should always increment the host
  367. iterA := p.Pick(nil)
  368. if actual := iterA(); actual.Info() != hosts[0] {
  369. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  370. }
  371. iterB := p.Pick(nil)
  372. if actual := iterB(); actual.Info() != hosts[1] {
  373. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  374. }
  375. if actual := iterB(); actual.Info() != hosts[0] {
  376. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  377. }
  378. if actual := iterA(); actual.Info() != hosts[1] {
  379. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  380. }
  381. iterC := p.Pick(nil)
  382. if actual := iterC(); actual.Info() != hosts[0] {
  383. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  384. }
  385. p.RemoveHost(hosts[0])
  386. if actual := iterC(); actual.Info() != hosts[1] {
  387. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  388. }
  389. p.RemoveHost(hosts[1])
  390. iterD := p.Pick(nil)
  391. if actual := iterD(); actual.Info() != hosts[2] {
  392. t.Errorf("Expected hosts[2] but was hosts[%s]", actual.Info().HostID())
  393. }
  394. if actual := iterD(); actual.Info() != hosts[3] {
  395. t.Errorf("Expected hosts[3] but was hosts[%s]", actual.Info().HostID())
  396. }
  397. }
  398. // Tests of the token-aware host selection policy implementation with a
  399. // DC aware round-robin host selection policy fallback
  400. // with {"class": "NetworkTopologyStrategy", "a": 1, "b": 1, "c": 1} replication.
  401. func TestHostPolicy_TokenAware_DCAwareRR(t *testing.T) {
  402. const keyspace = "myKeyspace"
  403. policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"))
  404. policyInternal := policy.(*tokenAwareHostPolicy)
  405. policyInternal.getKeyspaceName = func() string { return keyspace }
  406. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  407. return nil, errors.New("not initialized")
  408. }
  409. query := &Query{}
  410. query.getKeyspace = func() string { return keyspace }
  411. iter := policy.Pick(nil)
  412. if iter == nil {
  413. t.Fatal("host iterator was nil")
  414. }
  415. actual := iter()
  416. if actual != nil {
  417. t.Fatalf("expected nil from iterator, but was %v", actual)
  418. }
  419. // set the hosts
  420. hosts := [...]*HostInfo{
  421. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"},
  422. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"},
  423. {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"},
  424. {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"},
  425. {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"},
  426. {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"},
  427. {hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"},
  428. {hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"},
  429. {hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"},
  430. {hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"},
  431. {hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"},
  432. {hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
  433. }
  434. for _, host := range hosts {
  435. policy.AddHost(host)
  436. }
  437. // the token ring is not setup without the partitioner, but the fallback
  438. // should work
  439. if actual := policy.Pick(nil)(); actual.Info().HostID() != "1" {
  440. t.Fatalf("Expected host 1 but was %s", actual.Info().HostID())
  441. }
  442. query.RoutingKey([]byte("30"))
  443. if actual := policy.Pick(query)(); actual.Info().HostID() != "4" {
  444. t.Fatalf("Expected peer 4 but was %s", actual.Info().HostID())
  445. }
  446. policy.SetPartitioner("OrderedPartitioner")
  447. policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
  448. if keyspaceName != keyspace {
  449. return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
  450. }
  451. return &KeyspaceMetadata{
  452. Name: keyspace,
  453. StrategyClass: "NetworkTopologyStrategy",
  454. StrategyOptions: map[string]interface{}{
  455. "class": "NetworkTopologyStrategy",
  456. "local": 1,
  457. "remote1": 1,
  458. "remote2": 1,
  459. },
  460. }, nil
  461. }
  462. policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
  463. // The NetworkTopologyStrategy above should generate the following replicas.
  464. // It's handy to have as reference here.
  465. assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
  466. "myKeyspace": {
  467. {orderedToken("05"), []*HostInfo{hosts[0], hosts[1], hosts[2]}},
  468. {orderedToken("10"), []*HostInfo{hosts[1], hosts[2], hosts[3]}},
  469. {orderedToken("15"), []*HostInfo{hosts[2], hosts[3], hosts[4]}},
  470. {orderedToken("20"), []*HostInfo{hosts[3], hosts[4], hosts[5]}},
  471. {orderedToken("25"), []*HostInfo{hosts[4], hosts[5], hosts[6]}},
  472. {orderedToken("30"), []*HostInfo{hosts[5], hosts[6], hosts[7]}},
  473. {orderedToken("35"), []*HostInfo{hosts[6], hosts[7], hosts[8]}},
  474. {orderedToken("40"), []*HostInfo{hosts[7], hosts[8], hosts[9]}},
  475. {orderedToken("45"), []*HostInfo{hosts[8], hosts[9], hosts[10]}},
  476. {orderedToken("50"), []*HostInfo{hosts[9], hosts[10], hosts[11]}},
  477. {orderedToken("55"), []*HostInfo{hosts[10], hosts[11], hosts[0]}},
  478. {orderedToken("60"), []*HostInfo{hosts[11], hosts[0], hosts[1]}},
  479. },
  480. }, policyInternal.getMetadataReadOnly().replicas)
  481. // now the token ring is configured
  482. query.RoutingKey([]byte("23"))
  483. iter = policy.Pick(query)
  484. // first should be host with matching token from the local DC
  485. iterCheck(t, iter, "4")
  486. // rest are according DCAwareRR from local DC only, starting with 7 as the fallback was used twice above
  487. iterCheck(t, iter, "7")
  488. iterCheck(t, iter, "10")
  489. iterCheck(t, iter, "1")
  490. }
  491. // Tests of the token-aware host selection policy implementation with a
  492. // DC aware round-robin host selection policy fallback
  493. // with {"class": "NetworkTopologyStrategy", "a": 2, "b": 2, "c": 2} replication.
  494. func TestHostPolicy_TokenAware_DCAwareRR2(t *testing.T) {
  495. policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"))
  496. policyInternal := policy.(*tokenAwareHostPolicy)
  497. policyInternal.getKeyspaceName = func() string { return "myKeyspace" }
  498. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  499. return nil, errors.New("not initialized")
  500. }
  501. query := &Query{}
  502. query.getKeyspace = func() string { return "myKeyspace" }
  503. iter := policy.Pick(nil)
  504. if iter == nil {
  505. t.Fatal("host iterator was nil")
  506. }
  507. actual := iter()
  508. if actual != nil {
  509. t.Fatalf("expected nil from iterator, but was %v", actual)
  510. }
  511. // set the hosts
  512. hosts := [...]*HostInfo{
  513. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"},
  514. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"},
  515. {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"},
  516. {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"},
  517. {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"},
  518. {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"},
  519. {hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"},
  520. {hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"},
  521. {hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"},
  522. {hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"},
  523. {hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"},
  524. {hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
  525. }
  526. for _, host := range hosts {
  527. policy.AddHost(host)
  528. }
  529. // the token ring is not setup without the partitioner, but the fallback
  530. // should work
  531. if actual := policy.Pick(nil)(); actual.Info().HostID() != "1" {
  532. t.Errorf("Expected host 1 but was %s", actual.Info().HostID())
  533. }
  534. query.RoutingKey([]byte("30"))
  535. if actual := policy.Pick(query)(); actual.Info().HostID() != "4" {
  536. t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
  537. }
  538. // advance the index in round-robin so that the next expected value does not overlap with the one selected by token.
  539. policy.Pick(query)()
  540. policy.Pick(query)()
  541. policy.SetPartitioner("OrderedPartitioner")
  542. policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
  543. if keyspaceName != "myKeyspace" {
  544. return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
  545. }
  546. return &KeyspaceMetadata{
  547. Name: "myKeyspace",
  548. StrategyClass: "NetworkTopologyStrategy",
  549. StrategyOptions: map[string]interface{}{
  550. "class": "NetworkTopologyStrategy",
  551. "local": 2,
  552. "remote1": 2,
  553. "remote2": 2,
  554. },
  555. }, nil
  556. }
  557. policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
  558. // The NetworkTopologyStrategy above should generate the following replicas.
  559. // It's handy to have as reference here.
  560. assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
  561. "myKeyspace": {
  562. {orderedToken("05"), []*HostInfo{hosts[0], hosts[1], hosts[2], hosts[3], hosts[4], hosts[5]}},
  563. {orderedToken("10"), []*HostInfo{hosts[1], hosts[2], hosts[3], hosts[4], hosts[5], hosts[6]}},
  564. {orderedToken("15"), []*HostInfo{hosts[2], hosts[3], hosts[4], hosts[5], hosts[6], hosts[7]}},
  565. {orderedToken("20"), []*HostInfo{hosts[3], hosts[4], hosts[5], hosts[6], hosts[7], hosts[8]}},
  566. {orderedToken("25"), []*HostInfo{hosts[4], hosts[5], hosts[6], hosts[7], hosts[8], hosts[9]}},
  567. {orderedToken("30"), []*HostInfo{hosts[5], hosts[6], hosts[7], hosts[8], hosts[9], hosts[10]}},
  568. {orderedToken("35"), []*HostInfo{hosts[6], hosts[7], hosts[8], hosts[9], hosts[10], hosts[11]}},
  569. {orderedToken("40"), []*HostInfo{hosts[7], hosts[8], hosts[9], hosts[10], hosts[11], hosts[0]}},
  570. {orderedToken("45"), []*HostInfo{hosts[8], hosts[9], hosts[10], hosts[11], hosts[0], hosts[1]}},
  571. {orderedToken("50"), []*HostInfo{hosts[9], hosts[10], hosts[11], hosts[0], hosts[1], hosts[2]}},
  572. {orderedToken("55"), []*HostInfo{hosts[10], hosts[11], hosts[0], hosts[1], hosts[2], hosts[3]}},
  573. {orderedToken("60"), []*HostInfo{hosts[11], hosts[0], hosts[1], hosts[2], hosts[3], hosts[4]}},
  574. },
  575. }, policyInternal.getMetadataReadOnly().replicas)
  576. // now the token ring is configured
  577. query.RoutingKey([]byte("23"))
  578. iter = policy.Pick(query)
  579. // first should be hosts with matching token from the local DC
  580. iterCheck(t, iter, "4")
  581. iterCheck(t, iter, "7")
  582. // rest are according DCAwareRR from local DC only, starting with 7 as the fallback was used twice above
  583. iterCheck(t, iter, "1")
  584. iterCheck(t, iter, "10")
  585. }
  586. // Tests of the token-aware host selection policy implementation with a
  587. // DC aware round-robin host selection policy fallback with NonLocalReplicasFallback option enabled.
  588. func TestHostPolicy_TokenAware_DCAwareRR_NonLocalFallback(t *testing.T) {
  589. const (
  590. keyspace = "myKeyspace"
  591. localDC = "local"
  592. )
  593. policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy(localDC), NonLocalReplicasFallback())
  594. policyInternal := policy.(*tokenAwareHostPolicy)
  595. policyInternal.getKeyspaceName = func() string { return keyspace }
  596. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  597. return nil, errors.New("not initialized")
  598. }
  599. query := &Query{}
  600. query.getKeyspace = func() string { return keyspace }
  601. iter := policy.Pick(nil)
  602. if iter == nil {
  603. t.Fatal("host iterator was nil")
  604. }
  605. actual := iter()
  606. if actual != nil {
  607. t.Fatalf("expected nil from iterator, but was %v", actual)
  608. }
  609. // set the hosts
  610. hosts := [...]*HostInfo{
  611. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"},
  612. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: localDC},
  613. {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"}, // 1
  614. {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"}, // 2
  615. {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: localDC}, // 3
  616. {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"},
  617. {hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"},
  618. {hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: localDC},
  619. {hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"},
  620. {hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"},
  621. {hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: localDC},
  622. {hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
  623. }
  624. for _, host := range hosts {
  625. policy.AddHost(host)
  626. }
  627. // the token ring is not setup without the partitioner, but the fallback
  628. // should work
  629. iterCheck(t, policy.Pick(nil), "1")
  630. query.RoutingKey([]byte("30"))
  631. iterCheck(t, policy.Pick(query), "4")
  632. policy.SetPartitioner("OrderedPartitioner")
  633. policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
  634. if keyspaceName != keyspace {
  635. return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
  636. }
  637. return &KeyspaceMetadata{
  638. Name: keyspace,
  639. StrategyClass: "NetworkTopologyStrategy",
  640. StrategyOptions: map[string]interface{}{
  641. "class": "NetworkTopologyStrategy",
  642. localDC: 1,
  643. "remote1": 1,
  644. "remote2": 1,
  645. },
  646. }, nil
  647. }
  648. policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: keyspace})
  649. // The NetworkTopologyStrategy above should generate the following replicas.
  650. // It's handy to have as reference here.
  651. assertDeepEqual(t, "replicas", map[string]tokenRingReplicas{
  652. keyspace: {
  653. {orderedToken("05"), []*HostInfo{hosts[0], hosts[1], hosts[2]}},
  654. {orderedToken("10"), []*HostInfo{hosts[1], hosts[2], hosts[3]}},
  655. {orderedToken("15"), []*HostInfo{hosts[2], hosts[3], hosts[4]}},
  656. {orderedToken("20"), []*HostInfo{hosts[3], hosts[4], hosts[5]}},
  657. {orderedToken("25"), []*HostInfo{hosts[4], hosts[5], hosts[6]}},
  658. {orderedToken("30"), []*HostInfo{hosts[5], hosts[6], hosts[7]}},
  659. {orderedToken("35"), []*HostInfo{hosts[6], hosts[7], hosts[8]}},
  660. {orderedToken("40"), []*HostInfo{hosts[7], hosts[8], hosts[9]}},
  661. {orderedToken("45"), []*HostInfo{hosts[8], hosts[9], hosts[10]}},
  662. {orderedToken("50"), []*HostInfo{hosts[9], hosts[10], hosts[11]}},
  663. {orderedToken("55"), []*HostInfo{hosts[10], hosts[11], hosts[0]}},
  664. {orderedToken("60"), []*HostInfo{hosts[11], hosts[0], hosts[1]}},
  665. },
  666. }, policyInternal.getMetadataReadOnly().replicas)
  667. // now the token ring is configured
  668. query.RoutingKey([]byte("18"))
  669. iter = policy.Pick(query)
  670. // first should be host with matching token from the local DC
  671. iterCheck(t, iter, "4")
  672. // rest should be hosts with matching token from remote DCs
  673. iterCheck(t, iter, "3")
  674. iterCheck(t, iter, "5")
  675. // rest are according DCAwareRR from local DC only, starting with 7 as the fallback was used twice above
  676. iterCheck(t, iter, "7")
  677. iterCheck(t, iter, "10")
  678. iterCheck(t, iter, "1")
  679. }