topology.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. package gocql
  2. import (
  3. "fmt"
  4. "sort"
  5. "strconv"
  6. "strings"
  7. )
  8. type hostTokens struct {
  9. // token is end (inclusive) of token range these hosts belong to
  10. token token
  11. hosts []*HostInfo
  12. }
  13. // tokenRingReplicas maps token ranges to list of replicas.
  14. // The elements in tokenRingReplicas are sorted by token ascending.
  15. // The range for a given item in tokenRingReplicas starts after preceding range and ends with the token specified in
  16. // token. The end token is part of the range.
  17. // The lowest (i.e. index 0) range wraps around the ring (its preceding range is the one with largest index).
  18. type tokenRingReplicas []hostTokens
  19. func (h tokenRingReplicas) Less(i, j int) bool { return h[i].token.Less(h[j].token) }
  20. func (h tokenRingReplicas) Len() int { return len(h) }
  21. func (h tokenRingReplicas) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  22. func (h tokenRingReplicas) replicasFor(t token) *hostTokens {
  23. if len(h) == 0 {
  24. return nil
  25. }
  26. p := sort.Search(len(h), func(i int) bool {
  27. return !h[i].token.Less(t)
  28. })
  29. if p >= len(h) {
  30. // rollover
  31. p = 0
  32. }
  33. return &h[p]
  34. }
  35. type placementStrategy interface {
  36. replicaMap(tokenRing *tokenRing) tokenRingReplicas
  37. replicationFactor(dc string) int
  38. }
  39. func getReplicationFactorFromOpts(keyspace string, val interface{}) int {
  40. // TODO: dont really want to panic here, but is better
  41. // than spamming
  42. switch v := val.(type) {
  43. case int:
  44. if v <= 0 {
  45. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", v, keyspace))
  46. }
  47. return v
  48. case string:
  49. n, err := strconv.Atoi(v)
  50. if err != nil {
  51. panic(fmt.Sprintf("invalid replication_factor. Is the %q keyspace configured correctly? %v", keyspace, err))
  52. } else if n <= 0 {
  53. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", n, keyspace))
  54. }
  55. return n
  56. default:
  57. panic(fmt.Sprintf("unkown replication_factor type %T", v))
  58. }
  59. }
  60. func getStrategy(ks *KeyspaceMetadata) placementStrategy {
  61. switch {
  62. case strings.Contains(ks.StrategyClass, "SimpleStrategy"):
  63. return &simpleStrategy{rf: getReplicationFactorFromOpts(ks.Name, ks.StrategyOptions["replication_factor"])}
  64. case strings.Contains(ks.StrategyClass, "NetworkTopologyStrategy"):
  65. dcs := make(map[string]int)
  66. for dc, rf := range ks.StrategyOptions {
  67. if dc == "class" {
  68. continue
  69. }
  70. dcs[dc] = getReplicationFactorFromOpts(ks.Name+":dc="+dc, rf)
  71. }
  72. return &networkTopology{dcs: dcs}
  73. case strings.Contains(ks.StrategyClass, "LocalStrategy"):
  74. return nil
  75. default:
  76. // TODO: handle unknown replicas and just return the primary host for a token
  77. panic(fmt.Sprintf("unsupported strategy class: %v", ks.StrategyClass))
  78. }
  79. }
  80. type simpleStrategy struct {
  81. rf int
  82. }
  83. func (s *simpleStrategy) replicationFactor(dc string) int {
  84. return s.rf
  85. }
  86. func (s *simpleStrategy) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
  87. tokens := tokenRing.tokens
  88. ring := make(tokenRingReplicas, len(tokens))
  89. for i, th := range tokens {
  90. replicas := make([]*HostInfo, 0, s.rf)
  91. seen := make(map[*HostInfo]bool)
  92. for j := 0; j < len(tokens) && len(replicas) < s.rf; j++ {
  93. h := tokens[(i+j)%len(tokens)]
  94. if !seen[h.host] {
  95. replicas = append(replicas, h.host)
  96. seen[h.host] = true
  97. }
  98. }
  99. ring[i] = hostTokens{th.token, replicas}
  100. }
  101. sort.Sort(ring)
  102. return ring
  103. }
  104. type networkTopology struct {
  105. dcs map[string]int
  106. }
  107. func (n *networkTopology) replicationFactor(dc string) int {
  108. return n.dcs[dc]
  109. }
  110. func (n *networkTopology) haveRF(replicaCounts map[string]int) bool {
  111. if len(replicaCounts) != len(n.dcs) {
  112. return false
  113. }
  114. for dc, rf := range n.dcs {
  115. if rf != replicaCounts[dc] {
  116. return false
  117. }
  118. }
  119. return true
  120. }
  121. func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
  122. dcRacks := make(map[string]map[string]struct{}, len(n.dcs))
  123. // skipped hosts in a dc
  124. skipped := make(map[string][]*HostInfo, len(n.dcs))
  125. // number of replicas per dc
  126. replicasInDC := make(map[string]int, len(n.dcs))
  127. // dc -> racks
  128. seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs))
  129. for _, h := range tokenRing.hosts {
  130. dc := h.DataCenter()
  131. rack := h.Rack()
  132. racks, ok := dcRacks[dc]
  133. if !ok {
  134. racks = make(map[string]struct{})
  135. dcRacks[dc] = racks
  136. }
  137. racks[rack] = struct{}{}
  138. }
  139. for dc, racks := range dcRacks {
  140. replicasInDC[dc] = 0
  141. seenDCRacks[dc] = make(map[string]struct{}, len(racks))
  142. }
  143. tokens := tokenRing.tokens
  144. replicaRing := make(tokenRingReplicas, 0, len(tokens))
  145. var totalRF int
  146. for _, rf := range n.dcs {
  147. totalRF += rf
  148. }
  149. for i, th := range tokenRing.tokens {
  150. if rf := n.dcs[th.host.DataCenter()]; rf == 0 {
  151. // skip this token since no replica in this datacenter.
  152. continue
  153. }
  154. for k, v := range skipped {
  155. skipped[k] = v[:0]
  156. }
  157. for dc := range n.dcs {
  158. replicasInDC[dc] = 0
  159. for rack := range seenDCRacks[dc] {
  160. delete(seenDCRacks[dc], rack)
  161. }
  162. }
  163. replicas := make([]*HostInfo, 0, totalRF)
  164. for j := 0; j < len(tokens) && (len(replicas) < totalRF && !n.haveRF(replicasInDC)); j++ {
  165. // TODO: ensure we dont add the same host twice
  166. p := i + j
  167. if p >= len(tokens) {
  168. p -= len(tokens)
  169. }
  170. h := tokens[p].host
  171. dc := h.DataCenter()
  172. rack := h.Rack()
  173. rf := n.dcs[dc]
  174. if rf == 0 {
  175. // skip this DC, dont know about it or replication factor is zero
  176. continue
  177. } else if replicasInDC[dc] >= rf {
  178. if replicasInDC[dc] > rf {
  179. panic(fmt.Sprintf("replica overflow. rf=%d have=%d in dc %q", rf, replicasInDC[dc], dc))
  180. }
  181. // have enough replicas in this DC
  182. continue
  183. } else if _, ok := dcRacks[dc][rack]; !ok {
  184. // dont know about this rack
  185. continue
  186. }
  187. racks := seenDCRacks[dc]
  188. if _, ok := racks[rack]; ok && len(racks) == len(dcRacks[dc]) {
  189. // we have been through all the racks and dont have RF yet, add this
  190. replicas = append(replicas, h)
  191. replicasInDC[dc]++
  192. } else if !ok {
  193. if racks == nil {
  194. racks = make(map[string]struct{}, 1)
  195. seenDCRacks[dc] = racks
  196. }
  197. // new rack
  198. racks[rack] = struct{}{}
  199. replicas = append(replicas, h)
  200. r := replicasInDC[dc] + 1
  201. if len(racks) == len(dcRacks[dc]) {
  202. // if we have been through all the racks, drain the rest of the skipped
  203. // hosts until we have RF. The next iteration will skip in the block
  204. // above
  205. skippedHosts := skipped[dc]
  206. var k int
  207. for ; k < len(skippedHosts) && r+k < rf; k++ {
  208. sh := skippedHosts[k]
  209. replicas = append(replicas, sh)
  210. }
  211. r += k
  212. skipped[dc] = skippedHosts[k:]
  213. }
  214. replicasInDC[dc] = r
  215. } else {
  216. // already seen this rack, keep hold of this host incase
  217. // we dont get enough for rf
  218. skipped[dc] = append(skipped[dc], h)
  219. }
  220. }
  221. if len(replicas) == 0 {
  222. panic(fmt.Sprintf("no replicas for token: %v", th.token))
  223. } else if !replicas[0].Equal(th.host) {
  224. panic(fmt.Sprintf("first replica is not the primary replica for the token: expected %v got %v", replicas[0].ConnectAddress(), th.host.ConnectAddress()))
  225. }
  226. replicaRing = append(replicaRing, hostTokens{th.token, replicas})
  227. }
  228. if len(n.dcs) == len(dcRacks) && len(replicaRing) != len(tokens) {
  229. panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(replicaRing), len(tokens)))
  230. }
  231. return replicaRing
  232. }