policies_test.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818
  1. // Copyright (c) 2015 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "errors"
  7. "fmt"
  8. "net"
  9. "testing"
  10. "time"
  11. "github.com/hailocab/go-hostpool"
  12. )
  13. // Tests of the round-robin host selection policy implementation
  14. func TestHostPolicy_RoundRobin(t *testing.T) {
  15. policy := RoundRobinHostPolicy()
  16. hosts := [...]*HostInfo{
  17. {hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1)},
  18. {hostId: "1", connectAddress: net.IPv4(0, 0, 0, 2)},
  19. }
  20. for _, host := range hosts {
  21. policy.AddHost(host)
  22. }
  23. // interleaved iteration should always increment the host
  24. iterA := policy.Pick(nil)
  25. if actual := iterA(); actual.Info() != hosts[0] {
  26. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  27. }
  28. iterB := policy.Pick(nil)
  29. if actual := iterB(); actual.Info() != hosts[1] {
  30. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  31. }
  32. if actual := iterB(); actual.Info() != hosts[0] {
  33. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  34. }
  35. if actual := iterA(); actual.Info() != hosts[1] {
  36. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  37. }
  38. iterC := policy.Pick(nil)
  39. if actual := iterC(); actual.Info() != hosts[0] {
  40. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  41. }
  42. if actual := iterC(); actual.Info() != hosts[1] {
  43. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  44. }
  45. }
  46. // Tests of the token-aware host selection policy implementation with a
  47. // round-robin host selection policy fallback.
  48. func TestHostPolicy_TokenAware(t *testing.T) {
  49. policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
  50. policyInternal := policy.(*tokenAwareHostPolicy)
  51. policyInternal.getKeyspaceName = func() string {return "myKeyspace"}
  52. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  53. return nil, errors.New("not initalized")
  54. }
  55. query := &Query{}
  56. query.getKeyspace = func() string{return "myKeyspace"}
  57. iter := policy.Pick(nil)
  58. if iter == nil {
  59. t.Fatal("host iterator was nil")
  60. }
  61. actual := iter()
  62. if actual != nil {
  63. t.Fatalf("expected nil from iterator, but was %v", actual)
  64. }
  65. // set the hosts
  66. hosts := [...]*HostInfo{
  67. {connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00"}},
  68. {connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"25"}},
  69. {connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50"}},
  70. {connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"75"}},
  71. }
  72. for _, host := range hosts {
  73. policy.AddHost(host)
  74. }
  75. // the token ring is not setup without the partitioner, but the fallback
  76. // should work
  77. if actual := policy.Pick(nil)(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) {
  78. t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress())
  79. }
  80. query.RoutingKey([]byte("30"))
  81. if actual := policy.Pick(query)(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) {
  82. t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress())
  83. }
  84. policy.SetPartitioner("OrderedPartitioner")
  85. policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
  86. if keyspaceName != "myKeyspace" {
  87. return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
  88. }
  89. return &KeyspaceMetadata{
  90. Name: "myKeyspace",
  91. StrategyClass: "SimpleStrategy",
  92. StrategyOptions: map[string]interface{} {
  93. "class": "SimpleStrategy",
  94. "replication_factor": 2,
  95. },
  96. }, nil
  97. }
  98. policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
  99. // The SimpleStrategy above should generate the following replicas.
  100. // It's handy to have as reference here.
  101. assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
  102. "myKeyspace": {
  103. orderedToken("00"): {hosts[0], hosts[1]},
  104. orderedToken("25"): {hosts[1], hosts[2]},
  105. orderedToken("50"): {hosts[2], hosts[3]},
  106. orderedToken("75"): {hosts[3], hosts[0]},
  107. },
  108. }, policyInternal.getMetadataReadOnly().replicas)
  109. // now the token ring is configured
  110. query.RoutingKey([]byte("20"))
  111. iter = policy.Pick(query)
  112. if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) {
  113. t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress())
  114. }
  115. // rest are round robin
  116. if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[2].ConnectAddress()) {
  117. t.Errorf("Expected peer 2 but was %s", actual.Info().ConnectAddress())
  118. }
  119. if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[3].ConnectAddress()) {
  120. t.Errorf("Expected peer 3 but was %s", actual.Info().ConnectAddress())
  121. }
  122. if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) {
  123. t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress())
  124. }
  125. }
  126. // Tests of the host pool host selection policy implementation
  127. func TestHostPolicy_HostPool(t *testing.T) {
  128. policy := HostPoolHostPolicy(hostpool.New(nil))
  129. hosts := []*HostInfo{
  130. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)},
  131. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)},
  132. }
  133. // Using set host to control the ordering of the hosts as calling "AddHost" iterates the map
  134. // which will result in an unpredictable ordering
  135. policy.(*hostPoolHostPolicy).SetHosts(hosts)
  136. // the first host selected is actually at [1], but this is ok for RR
  137. // interleaved iteration should always increment the host
  138. iter := policy.Pick(nil)
  139. actualA := iter()
  140. if actualA.Info().HostID() != "0" {
  141. t.Errorf("Expected hosts[0] but was hosts[%s]", actualA.Info().HostID())
  142. }
  143. actualA.Mark(nil)
  144. actualB := iter()
  145. if actualB.Info().HostID() != "1" {
  146. t.Errorf("Expected hosts[1] but was hosts[%s]", actualB.Info().HostID())
  147. }
  148. actualB.Mark(fmt.Errorf("error"))
  149. actualC := iter()
  150. if actualC.Info().HostID() != "0" {
  151. t.Errorf("Expected hosts[0] but was hosts[%s]", actualC.Info().HostID())
  152. }
  153. actualC.Mark(nil)
  154. actualD := iter()
  155. if actualD.Info().HostID() != "0" {
  156. t.Errorf("Expected hosts[0] but was hosts[%s]", actualD.Info().HostID())
  157. }
  158. actualD.Mark(nil)
  159. }
  160. func TestHostPolicy_RoundRobin_NilHostInfo(t *testing.T) {
  161. policy := RoundRobinHostPolicy()
  162. host := &HostInfo{hostId: "host-1"}
  163. policy.AddHost(host)
  164. iter := policy.Pick(nil)
  165. next := iter()
  166. if next == nil {
  167. t.Fatal("got nil host")
  168. } else if v := next.Info(); v == nil {
  169. t.Fatal("got nil HostInfo")
  170. } else if v.HostID() != host.HostID() {
  171. t.Fatalf("expected host %v got %v", host, v)
  172. }
  173. next = iter()
  174. if next != nil {
  175. t.Errorf("expected to get nil host got %+v", next)
  176. if next.Info() == nil {
  177. t.Fatalf("HostInfo is nil")
  178. }
  179. }
  180. }
  181. func TestHostPolicy_TokenAware_NilHostInfo(t *testing.T) {
  182. policy := TokenAwareHostPolicy(RoundRobinHostPolicy())
  183. policyInternal := policy.(*tokenAwareHostPolicy)
  184. policyInternal.getKeyspaceName = func() string {return "myKeyspace"}
  185. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  186. return nil, errors.New("not initialized")
  187. }
  188. hosts := [...]*HostInfo{
  189. {connectAddress: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}},
  190. {connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"25"}},
  191. {connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"50"}},
  192. {connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"75"}},
  193. }
  194. for _, host := range hosts {
  195. policy.AddHost(host)
  196. }
  197. policy.SetPartitioner("OrderedPartitioner")
  198. query := &Query{}
  199. query.getKeyspace = func() string {return "myKeyspace"}
  200. query.RoutingKey([]byte("20"))
  201. iter := policy.Pick(query)
  202. next := iter()
  203. if next == nil {
  204. t.Fatal("got nil host")
  205. } else if v := next.Info(); v == nil {
  206. t.Fatal("got nil HostInfo")
  207. } else if !v.ConnectAddress().Equal(hosts[1].ConnectAddress()) {
  208. t.Fatalf("expected peer 1 got %v", v.ConnectAddress())
  209. }
  210. // Empty the hosts to trigger the panic when using the fallback.
  211. for _, host := range hosts {
  212. policy.RemoveHost(host)
  213. }
  214. next = iter()
  215. if next != nil {
  216. t.Errorf("expected to get nil host got %+v", next)
  217. if next.Info() == nil {
  218. t.Fatalf("HostInfo is nil")
  219. }
  220. }
  221. }
  222. func TestCOWList_Add(t *testing.T) {
  223. var cow cowHostList
  224. toAdd := [...]net.IP{net.IPv4(10, 0, 0, 1), net.IPv4(10, 0, 0, 2), net.IPv4(10, 0, 0, 3)}
  225. for _, addr := range toAdd {
  226. if !cow.add(&HostInfo{connectAddress: addr}) {
  227. t.Fatal("did not add peer which was not in the set")
  228. }
  229. }
  230. hosts := cow.get()
  231. if len(hosts) != len(toAdd) {
  232. t.Fatalf("expected to have %d hosts got %d", len(toAdd), len(hosts))
  233. }
  234. set := make(map[string]bool)
  235. for _, host := range hosts {
  236. set[string(host.ConnectAddress())] = true
  237. }
  238. for _, addr := range toAdd {
  239. if !set[string(addr)] {
  240. t.Errorf("addr was not in the host list: %q", addr)
  241. }
  242. }
  243. }
  244. // TestSimpleRetryPolicy makes sure that we only allow 1 + numRetries attempts
  245. func TestSimpleRetryPolicy(t *testing.T) {
  246. q := &Query{}
  247. // this should allow a total of 3 tries.
  248. rt := &SimpleRetryPolicy{NumRetries: 2}
  249. cases := []struct {
  250. attempts int
  251. allow bool
  252. }{
  253. {0, true},
  254. {1, true},
  255. {2, true},
  256. {3, false},
  257. {4, false},
  258. {5, false},
  259. }
  260. for _, c := range cases {
  261. q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
  262. if c.allow && !rt.Attempt(q) {
  263. t.Fatalf("should allow retry after %d attempts", c.attempts)
  264. }
  265. if !c.allow && rt.Attempt(q) {
  266. t.Fatalf("should not allow retry after %d attempts", c.attempts)
  267. }
  268. }
  269. }
  270. func TestExponentialBackoffPolicy(t *testing.T) {
  271. // test with defaults
  272. sut := &ExponentialBackoffRetryPolicy{NumRetries: 2}
  273. cases := []struct {
  274. attempts int
  275. delay time.Duration
  276. }{
  277. {1, 100 * time.Millisecond},
  278. {2, (2) * 100 * time.Millisecond},
  279. {3, (2 * 2) * 100 * time.Millisecond},
  280. {4, (2 * 2 * 2) * 100 * time.Millisecond},
  281. }
  282. for _, c := range cases {
  283. // test 100 times for each case
  284. for i := 0; i < 100; i++ {
  285. d := sut.napTime(c.attempts)
  286. if d < c.delay-(100*time.Millisecond)/2 {
  287. t.Fatalf("Delay %d less than jitter min of %d", d, c.delay-100*time.Millisecond/2)
  288. }
  289. if d > c.delay+(100*time.Millisecond)/2 {
  290. t.Fatalf("Delay %d greater than jitter max of %d", d, c.delay+100*time.Millisecond/2)
  291. }
  292. }
  293. }
  294. }
  295. func TestDowngradingConsistencyRetryPolicy(t *testing.T) {
  296. q := &Query{cons: LocalQuorum}
  297. rewt0 := &RequestErrWriteTimeout{
  298. Received: 0,
  299. WriteType: "SIMPLE",
  300. }
  301. rewt1 := &RequestErrWriteTimeout{
  302. Received: 1,
  303. WriteType: "BATCH",
  304. }
  305. rewt2 := &RequestErrWriteTimeout{
  306. WriteType: "UNLOGGED_BATCH",
  307. }
  308. rert := &RequestErrReadTimeout{}
  309. reu0 := &RequestErrUnavailable{
  310. Alive: 0,
  311. }
  312. reu1 := &RequestErrUnavailable{
  313. Alive: 1,
  314. }
  315. // this should allow a total of 3 tries.
  316. consistencyLevels := []Consistency{Three, Two, One}
  317. rt := &DowngradingConsistencyRetryPolicy{ConsistencyLevelsToTry: consistencyLevels}
  318. cases := []struct {
  319. attempts int
  320. allow bool
  321. err error
  322. retryType RetryType
  323. }{
  324. {0, true, rewt0, Rethrow},
  325. {3, true, rewt1, Ignore},
  326. {1, true, rewt2, Retry},
  327. {2, true, rert, Retry},
  328. {4, false, reu0, Rethrow},
  329. {16, false, reu1, Retry},
  330. }
  331. for _, c := range cases {
  332. q.metrics = preFilledQueryMetrics(map[string]*hostMetrics{"127.0.0.1": {Attempts: c.attempts}})
  333. if c.retryType != rt.GetRetryType(c.err) {
  334. t.Fatalf("retry type should be %v", c.retryType)
  335. }
  336. if c.allow && !rt.Attempt(q) {
  337. t.Fatalf("should allow retry after %d attempts", c.attempts)
  338. }
  339. if !c.allow && rt.Attempt(q) {
  340. t.Fatalf("should not allow retry after %d attempts", c.attempts)
  341. }
  342. }
  343. }
  344. func TestHostPolicy_DCAwareRR(t *testing.T) {
  345. p := DCAwareRoundRobinPolicy("local")
  346. hosts := [...]*HostInfo{
  347. {hostId: "0", connectAddress: net.ParseIP("10.0.0.1"), dataCenter: "local"},
  348. {hostId: "1", connectAddress: net.ParseIP("10.0.0.2"), dataCenter: "local"},
  349. {hostId: "2", connectAddress: net.ParseIP("10.0.0.3"), dataCenter: "remote"},
  350. {hostId: "3", connectAddress: net.ParseIP("10.0.0.4"), dataCenter: "remote"},
  351. }
  352. for _, host := range hosts {
  353. p.AddHost(host)
  354. }
  355. // interleaved iteration should always increment the host
  356. iterA := p.Pick(nil)
  357. if actual := iterA(); actual.Info() != hosts[0] {
  358. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  359. }
  360. iterB := p.Pick(nil)
  361. if actual := iterB(); actual.Info() != hosts[1] {
  362. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  363. }
  364. if actual := iterB(); actual.Info() != hosts[0] {
  365. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  366. }
  367. if actual := iterA(); actual.Info() != hosts[1] {
  368. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  369. }
  370. iterC := p.Pick(nil)
  371. if actual := iterC(); actual.Info() != hosts[0] {
  372. t.Errorf("Expected hosts[0] but was hosts[%s]", actual.Info().HostID())
  373. }
  374. p.RemoveHost(hosts[0])
  375. if actual := iterC(); actual.Info() != hosts[1] {
  376. t.Errorf("Expected hosts[1] but was hosts[%s]", actual.Info().HostID())
  377. }
  378. p.RemoveHost(hosts[1])
  379. iterD := p.Pick(nil)
  380. if actual := iterD(); actual.Info() != hosts[2] {
  381. t.Errorf("Expected hosts[2] but was hosts[%s]", actual.Info().HostID())
  382. }
  383. if actual := iterD(); actual.Info() != hosts[3] {
  384. t.Errorf("Expected hosts[3] but was hosts[%s]", actual.Info().HostID())
  385. }
  386. }
  387. // Tests of the token-aware host selection policy implementation with a
  388. // DC aware round-robin host selection policy fallback
  389. // with {"class": "NetworkTopologyStrategy", "a": 1, "b": 1, "c": 1} replication.
  390. func TestHostPolicy_TokenAware_DCAwareRR(t *testing.T) {
  391. policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"))
  392. policyInternal := policy.(*tokenAwareHostPolicy)
  393. policyInternal.getKeyspaceName = func() string {return "myKeyspace"}
  394. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  395. return nil, errors.New("not initialized")
  396. }
  397. query := &Query{}
  398. query.getKeyspace = func() string {return "myKeyspace"}
  399. iter := policy.Pick(nil)
  400. if iter == nil {
  401. t.Fatal("host iterator was nil")
  402. }
  403. actual := iter()
  404. if actual != nil {
  405. t.Fatalf("expected nil from iterator, but was %v", actual)
  406. }
  407. // set the hosts
  408. hosts := [...]*HostInfo{
  409. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"},
  410. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"},
  411. {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"},
  412. {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"},
  413. {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"},
  414. {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"},
  415. {hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"},
  416. {hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"},
  417. {hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"},
  418. {hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"},
  419. {hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"},
  420. {hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
  421. }
  422. for _, host := range hosts {
  423. policy.AddHost(host)
  424. }
  425. // the token ring is not setup without the partitioner, but the fallback
  426. // should work
  427. if actual := policy.Pick(nil)(); actual.Info().HostID() != "1" {
  428. t.Errorf("Expected host 1 but was %s", actual.Info().HostID())
  429. }
  430. query.RoutingKey([]byte("30"))
  431. if actual := policy.Pick(query)(); actual.Info().HostID() != "4" {
  432. t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
  433. }
  434. policy.SetPartitioner("OrderedPartitioner")
  435. policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
  436. if keyspaceName != "myKeyspace" {
  437. return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
  438. }
  439. return &KeyspaceMetadata{
  440. Name: "myKeyspace",
  441. StrategyClass: "NetworkTopologyStrategy",
  442. StrategyOptions: map[string]interface{} {
  443. "class": "NetworkTopologyStrategy",
  444. "local": 1,
  445. "remote1": 1,
  446. "remote2": 1,
  447. },
  448. }, nil
  449. }
  450. policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
  451. // The NetworkTopologyStrategy above should generate the following replicas.
  452. // It's handy to have as reference here.
  453. assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
  454. "myKeyspace": {
  455. orderedToken("05"): {hosts[0], hosts[1], hosts[2]},
  456. orderedToken("10"): {hosts[1], hosts[2], hosts[3]},
  457. orderedToken("15"): {hosts[2], hosts[3], hosts[4]},
  458. orderedToken("20"): {hosts[3], hosts[4], hosts[5]},
  459. orderedToken("25"): {hosts[4], hosts[5], hosts[6]},
  460. orderedToken("30"): {hosts[5], hosts[6], hosts[7]},
  461. orderedToken("35"): {hosts[6], hosts[7], hosts[8]},
  462. orderedToken("40"): {hosts[7], hosts[8], hosts[9]},
  463. orderedToken("45"): {hosts[8], hosts[9], hosts[10]},
  464. orderedToken("50"): {hosts[9], hosts[10], hosts[11]},
  465. orderedToken("55"): {hosts[10], hosts[11], hosts[0]},
  466. orderedToken("60"): {hosts[11], hosts[0], hosts[1]},
  467. },
  468. }, policyInternal.getMetadataReadOnly().replicas)
  469. // now the token ring is configured
  470. query.RoutingKey([]byte("23"))
  471. iter = policy.Pick(query)
  472. // first should be host with matching token from the local DC
  473. if actual := iter(); actual.Info().HostID() != "4" {
  474. t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
  475. }
  476. // rest are according DCAwareRR from local DC only, starting with 7 as the fallback was used twice above
  477. if actual := iter(); actual.Info().HostID() != "7" {
  478. t.Errorf("Expected peer 7 but was %s", actual.Info().HostID())
  479. }
  480. if actual := iter(); actual.Info().HostID() != "10" {
  481. t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
  482. }
  483. if actual := iter(); actual.Info().HostID() != "1" {
  484. t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
  485. }
  486. // and it starts to repeat now without host 4...
  487. if actual := iter(); actual.Info().HostID() != "7" {
  488. t.Errorf("Expected peer 7 but was %s", actual.Info().HostID())
  489. }
  490. if actual := iter(); actual.Info().HostID() != "10" {
  491. t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
  492. }
  493. if actual := iter(); actual.Info().HostID() != "1" {
  494. t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
  495. }
  496. }
  497. // Tests of the token-aware host selection policy implementation with a
  498. // DC aware round-robin host selection policy fallback
  499. // with {"class": "NetworkTopologyStrategy", "a": 2, "b": 2, "c": 2} replication.
  500. func TestHostPolicy_TokenAware_DCAwareRR2(t *testing.T) {
  501. policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"))
  502. policyInternal := policy.(*tokenAwareHostPolicy)
  503. policyInternal.getKeyspaceName = func() string {return "myKeyspace"}
  504. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  505. return nil, errors.New("not initialized")
  506. }
  507. query := &Query{}
  508. query.getKeyspace = func() string {return "myKeyspace"}
  509. iter := policy.Pick(nil)
  510. if iter == nil {
  511. t.Fatal("host iterator was nil")
  512. }
  513. actual := iter()
  514. if actual != nil {
  515. t.Fatalf("expected nil from iterator, but was %v", actual)
  516. }
  517. // set the hosts
  518. hosts := [...]*HostInfo{
  519. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"},
  520. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"},
  521. {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"},
  522. {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"},
  523. {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"},
  524. {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"},
  525. {hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"},
  526. {hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"},
  527. {hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"},
  528. {hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"},
  529. {hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"},
  530. {hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
  531. }
  532. for _, host := range hosts {
  533. policy.AddHost(host)
  534. }
  535. // the token ring is not setup without the partitioner, but the fallback
  536. // should work
  537. if actual := policy.Pick(nil)(); actual.Info().HostID() != "1" {
  538. t.Errorf("Expected host 1 but was %s", actual.Info().HostID())
  539. }
  540. query.RoutingKey([]byte("30"))
  541. if actual := policy.Pick(query)(); actual.Info().HostID() != "4" {
  542. t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
  543. }
  544. // advance the index in round-robin so that the next expected value does not overlap with the one selected by token.
  545. policy.Pick(query)()
  546. policy.Pick(query)()
  547. policy.SetPartitioner("OrderedPartitioner")
  548. policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
  549. if keyspaceName != "myKeyspace" {
  550. return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
  551. }
  552. return &KeyspaceMetadata{
  553. Name: "myKeyspace",
  554. StrategyClass: "NetworkTopologyStrategy",
  555. StrategyOptions: map[string]interface{} {
  556. "class": "NetworkTopologyStrategy",
  557. "local": 2,
  558. "remote1": 2,
  559. "remote2": 2,
  560. },
  561. }, nil
  562. }
  563. policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
  564. // The NetworkTopologyStrategy above should generate the following replicas.
  565. // It's handy to have as reference here.
  566. assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
  567. "myKeyspace": {
  568. orderedToken("05"): {hosts[0], hosts[1], hosts[2], hosts[3], hosts[4], hosts[5]},
  569. orderedToken("10"): {hosts[1], hosts[2], hosts[3], hosts[4], hosts[5], hosts[6]},
  570. orderedToken("15"): {hosts[2], hosts[3], hosts[4], hosts[5], hosts[6], hosts[7]},
  571. orderedToken("20"): {hosts[3], hosts[4], hosts[5], hosts[6], hosts[7], hosts[8]},
  572. orderedToken("25"): {hosts[4], hosts[5], hosts[6], hosts[7], hosts[8], hosts[9]},
  573. orderedToken("30"): {hosts[5], hosts[6], hosts[7], hosts[8], hosts[9], hosts[10]},
  574. orderedToken("35"): {hosts[6], hosts[7], hosts[8], hosts[9], hosts[10], hosts[11]},
  575. orderedToken("40"): {hosts[7], hosts[8], hosts[9], hosts[10], hosts[11], hosts[0]},
  576. orderedToken("45"): {hosts[8], hosts[9], hosts[10], hosts[11], hosts[0], hosts[1]},
  577. orderedToken("50"): {hosts[9], hosts[10], hosts[11], hosts[0], hosts[1], hosts[2]},
  578. orderedToken("55"): {hosts[10], hosts[11], hosts[0], hosts[1], hosts[2], hosts[3]},
  579. orderedToken("60"): {hosts[11], hosts[0], hosts[1], hosts[2], hosts[3], hosts[4]},
  580. },
  581. }, policyInternal.getMetadataReadOnly().replicas)
  582. // now the token ring is configured
  583. query.RoutingKey([]byte("23"))
  584. iter = policy.Pick(query)
  585. // first should be hosts with matching token from the local DC
  586. if actual := iter(); actual.Info().HostID() != "4" {
  587. t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
  588. }
  589. if actual := iter(); actual.Info().HostID() != "7" {
  590. t.Errorf("Expected peer 7 but was %s", actual.Info().HostID())
  591. }
  592. // rest are according DCAwareRR from local DC only, starting with 7 as the fallback was used twice above
  593. if actual := iter(); actual.Info().HostID() != "1" {
  594. t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
  595. }
  596. if actual := iter(); actual.Info().HostID() != "10" {
  597. t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
  598. }
  599. // and it starts to repeat now without host 4 and 7...
  600. if actual := iter(); actual.Info().HostID() != "1" {
  601. t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
  602. }
  603. if actual := iter(); actual.Info().HostID() != "10" {
  604. t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
  605. }
  606. }
  607. // Tests of the token-aware host selection policy implementation with a
  608. // DC aware round-robin host selection policy fallback with NonLocalReplicasFallback option enabled.
  609. func TestHostPolicy_TokenAware_DCAwareRR_NonLocalFallback(t *testing.T) {
  610. policy := TokenAwareHostPolicy(DCAwareRoundRobinPolicy("local"), NonLocalReplicasFallback())
  611. policyInternal := policy.(*tokenAwareHostPolicy)
  612. policyInternal.getKeyspaceName = func() string {return "myKeyspace"}
  613. policyInternal.getKeyspaceMetadata = func(ks string) (*KeyspaceMetadata, error) {
  614. return nil, errors.New("not initialized")
  615. }
  616. query := &Query{}
  617. query.getKeyspace = func() string {return "myKeyspace"}
  618. iter := policy.Pick(nil)
  619. if iter == nil {
  620. t.Fatal("host iterator was nil")
  621. }
  622. actual := iter()
  623. if actual != nil {
  624. t.Fatalf("expected nil from iterator, but was %v", actual)
  625. }
  626. // set the hosts
  627. hosts := [...]*HostInfo{
  628. {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"05"}, dataCenter: "remote1"},
  629. {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"10"}, dataCenter: "local"},
  630. {hostId: "2", connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"15"}, dataCenter: "remote2"},
  631. {hostId: "3", connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"20"}, dataCenter: "remote1"},
  632. {hostId: "4", connectAddress: net.IPv4(10, 0, 0, 5), tokens: []string{"25"}, dataCenter: "local"},
  633. {hostId: "5", connectAddress: net.IPv4(10, 0, 0, 6), tokens: []string{"30"}, dataCenter: "remote2"},
  634. {hostId: "6", connectAddress: net.IPv4(10, 0, 0, 7), tokens: []string{"35"}, dataCenter: "remote1"},
  635. {hostId: "7", connectAddress: net.IPv4(10, 0, 0, 8), tokens: []string{"40"}, dataCenter: "local"},
  636. {hostId: "8", connectAddress: net.IPv4(10, 0, 0, 9), tokens: []string{"45"}, dataCenter: "remote2"},
  637. {hostId: "9", connectAddress: net.IPv4(10, 0, 0, 10), tokens: []string{"50"}, dataCenter: "remote1"},
  638. {hostId: "10", connectAddress: net.IPv4(10, 0, 0, 11), tokens: []string{"55"}, dataCenter: "local"},
  639. {hostId: "11", connectAddress: net.IPv4(10, 0, 0, 12), tokens: []string{"60"}, dataCenter: "remote2"},
  640. }
  641. for _, host := range hosts {
  642. policy.AddHost(host)
  643. }
  644. // the token ring is not setup without the partitioner, but the fallback
  645. // should work
  646. if actual := policy.Pick(nil)(); actual.Info().HostID() != "1" {
  647. t.Errorf("Expected host 1 but was %s", actual.Info().HostID())
  648. }
  649. query.RoutingKey([]byte("30"))
  650. if actual := policy.Pick(query)(); actual.Info().HostID() != "4" {
  651. t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
  652. }
  653. policy.SetPartitioner("OrderedPartitioner")
  654. policyInternal.getKeyspaceMetadata = func(keyspaceName string) (*KeyspaceMetadata, error) {
  655. if keyspaceName != "myKeyspace" {
  656. return nil, fmt.Errorf("unknown keyspace: %s", keyspaceName)
  657. }
  658. return &KeyspaceMetadata{
  659. Name: "myKeyspace",
  660. StrategyClass: "NetworkTopologyStrategy",
  661. StrategyOptions: map[string]interface{} {
  662. "class": "NetworkTopologyStrategy",
  663. "local": 1,
  664. "remote1": 1,
  665. "remote2": 1,
  666. },
  667. }, nil
  668. }
  669. policy.KeyspaceChanged(KeyspaceUpdateEvent{Keyspace: "myKeyspace"})
  670. // The NetworkTopologyStrategy above should generate the following replicas.
  671. // It's handy to have as reference here.
  672. assertDeepEqual(t, "replicas", map[string]map[token][]*HostInfo{
  673. "myKeyspace": {
  674. orderedToken("05"): {hosts[0], hosts[1], hosts[2]},
  675. orderedToken("10"): {hosts[1], hosts[2], hosts[3]},
  676. orderedToken("15"): {hosts[2], hosts[3], hosts[4]},
  677. orderedToken("20"): {hosts[3], hosts[4], hosts[5]},
  678. orderedToken("25"): {hosts[4], hosts[5], hosts[6]},
  679. orderedToken("30"): {hosts[5], hosts[6], hosts[7]},
  680. orderedToken("35"): {hosts[6], hosts[7], hosts[8]},
  681. orderedToken("40"): {hosts[7], hosts[8], hosts[9]},
  682. orderedToken("45"): {hosts[8], hosts[9], hosts[10]},
  683. orderedToken("50"): {hosts[9], hosts[10], hosts[11]},
  684. orderedToken("55"): {hosts[10], hosts[11], hosts[0]},
  685. orderedToken("60"): {hosts[11], hosts[0], hosts[1]},
  686. },
  687. }, policyInternal.getMetadataReadOnly().replicas)
  688. // now the token ring is configured
  689. query.RoutingKey([]byte("18"))
  690. iter = policy.Pick(query)
  691. // first should be host with matching token from the local DC
  692. if actual := iter(); actual.Info().HostID() != "4" {
  693. t.Errorf("Expected peer 4 but was %s", actual.Info().HostID())
  694. }
  695. // rest should be hosts with matching token from remote DCs
  696. if actual := iter(); actual.Info().HostID() != "3" {
  697. t.Errorf("Expected peer 3 but was %s", actual.Info().HostID())
  698. }
  699. if actual := iter(); actual.Info().HostID() != "5" {
  700. t.Errorf("Expected peer 5 but was %s", actual.Info().HostID())
  701. }
  702. // rest are according DCAwareRR from local DC only, starting with 7 as the fallback was used twice above
  703. if actual := iter(); actual.Info().HostID() != "7" {
  704. t.Errorf("Expected peer 7 but was %s", actual.Info().HostID())
  705. }
  706. if actual := iter(); actual.Info().HostID() != "10" {
  707. t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
  708. }
  709. if actual := iter(); actual.Info().HostID() != "1" {
  710. t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
  711. }
  712. // and it starts to repeat now without host 4...
  713. if actual := iter(); actual.Info().HostID() != "7" {
  714. t.Errorf("Expected peer 7 but was %s", actual.Info().HostID())
  715. }
  716. if actual := iter(); actual.Info().HostID() != "10" {
  717. t.Errorf("Expected peer 10 but was %s", actual.Info().HostID())
  718. }
  719. if actual := iter(); actual.Info().HostID() != "1" {
  720. t.Errorf("Expected peer 1 but was %s", actual.Info().HostID())
  721. }
  722. }