topology.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package gocql
  2. import (
  3. "fmt"
  4. "sort"
  5. "strconv"
  6. "strings"
  7. )
  8. type hostTokens struct {
  9. // token is end (inclusive) of token range these hosts belong to
  10. token token
  11. hosts []*HostInfo
  12. }
  13. // tokenRingReplicas maps token ranges to list of replicas.
  14. // The elements in tokenRingReplicas are sorted by token ascending.
  15. // The range for a given item in tokenRingReplicas starts after preceding range and ends with the token specified in
  16. // token. The end token is part of the range.
  17. // The lowest (i.e. index 0) range wraps around the ring (its preceding range is the one with largest index).
  18. type tokenRingReplicas []hostTokens
  19. func (h tokenRingReplicas) Less(i, j int) bool { return h[i].token.Less(h[j].token) }
  20. func (h tokenRingReplicas) Len() int { return len(h) }
  21. func (h tokenRingReplicas) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  22. func (h tokenRingReplicas) replicasFor(t token) *hostTokens {
  23. if len(h) == 0 {
  24. return nil
  25. }
  26. p := sort.Search(len(h), func(i int) bool {
  27. return !h[i].token.Less(t)
  28. })
  29. // TODO: simplify this
  30. if p < len(h) && h[p].token == t {
  31. return &h[p]
  32. }
  33. p--
  34. if p >= len(h) {
  35. // rollover
  36. p = 0
  37. } else if p < 0 {
  38. // rollunder
  39. p = len(h) - 1
  40. }
  41. return &h[p]
  42. }
  43. type placementStrategy interface {
  44. replicaMap(tokenRing *tokenRing) tokenRingReplicas
  45. replicationFactor(dc string) int
  46. }
  47. func getReplicationFactorFromOpts(keyspace string, val interface{}) int {
  48. // TODO: dont really want to panic here, but is better
  49. // than spamming
  50. switch v := val.(type) {
  51. case int:
  52. if v <= 0 {
  53. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", v, keyspace))
  54. }
  55. return v
  56. case string:
  57. n, err := strconv.Atoi(v)
  58. if err != nil {
  59. panic(fmt.Sprintf("invalid replication_factor. Is the %q keyspace configured correctly? %v", keyspace, err))
  60. } else if n <= 0 {
  61. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", n, keyspace))
  62. }
  63. return n
  64. default:
  65. panic(fmt.Sprintf("unkown replication_factor type %T", v))
  66. }
  67. }
  68. func getStrategy(ks *KeyspaceMetadata) placementStrategy {
  69. switch {
  70. case strings.Contains(ks.StrategyClass, "SimpleStrategy"):
  71. return &simpleStrategy{rf: getReplicationFactorFromOpts(ks.Name, ks.StrategyOptions["replication_factor"])}
  72. case strings.Contains(ks.StrategyClass, "NetworkTopologyStrategy"):
  73. dcs := make(map[string]int)
  74. for dc, rf := range ks.StrategyOptions {
  75. if dc == "class" {
  76. continue
  77. }
  78. dcs[dc] = getReplicationFactorFromOpts(ks.Name+":dc="+dc, rf)
  79. }
  80. return &networkTopology{dcs: dcs}
  81. case strings.Contains(ks.StrategyClass, "LocalStrategy"):
  82. return nil
  83. default:
  84. // TODO: handle unknown replicas and just return the primary host for a token
  85. panic(fmt.Sprintf("unsupported strategy class: %v", ks.StrategyClass))
  86. }
  87. }
  88. type simpleStrategy struct {
  89. rf int
  90. }
  91. func (s *simpleStrategy) replicationFactor(dc string) int {
  92. return s.rf
  93. }
  94. func (s *simpleStrategy) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
  95. tokens := tokenRing.tokens
  96. ring := make(tokenRingReplicas, len(tokens))
  97. for i, th := range tokens {
  98. replicas := make([]*HostInfo, 0, s.rf)
  99. seen := make(map[*HostInfo]bool)
  100. for j := 0; j < len(tokens) && len(replicas) < s.rf; j++ {
  101. h := tokens[(i+j)%len(tokens)]
  102. if !seen[h.host] {
  103. replicas = append(replicas, h.host)
  104. seen[h.host] = true
  105. }
  106. }
  107. ring[i] = hostTokens{th.token, replicas}
  108. }
  109. sort.Sort(ring)
  110. return ring
  111. }
  112. type networkTopology struct {
  113. dcs map[string]int
  114. }
  115. func (n *networkTopology) replicationFactor(dc string) int {
  116. return n.dcs[dc]
  117. }
  118. func (n *networkTopology) haveRF(replicaCounts map[string]int) bool {
  119. if len(replicaCounts) != len(n.dcs) {
  120. return false
  121. }
  122. for dc, rf := range n.dcs {
  123. if rf != replicaCounts[dc] {
  124. return false
  125. }
  126. }
  127. return true
  128. }
  129. func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
  130. dcRacks := make(map[string]map[string]struct{}, len(n.dcs))
  131. // skipped hosts in a dc
  132. skipped := make(map[string][]*HostInfo, len(n.dcs))
  133. // number of replicas per dc
  134. replicasInDC := make(map[string]int, len(n.dcs))
  135. // dc -> racks
  136. seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs))
  137. for _, h := range tokenRing.hosts {
  138. dc := h.DataCenter()
  139. rack := h.Rack()
  140. racks, ok := dcRacks[dc]
  141. if !ok {
  142. racks = make(map[string]struct{})
  143. dcRacks[dc] = racks
  144. }
  145. racks[rack] = struct{}{}
  146. }
  147. for dc, racks := range dcRacks {
  148. replicasInDC[dc] = 0
  149. seenDCRacks[dc] = make(map[string]struct{}, len(racks))
  150. }
  151. tokens := tokenRing.tokens
  152. replicaRing := make(tokenRingReplicas, len(tokens))
  153. var totalRF int
  154. for _, rf := range n.dcs {
  155. totalRF += rf
  156. }
  157. for i, th := range tokenRing.tokens {
  158. for k, v := range skipped {
  159. skipped[k] = v[:0]
  160. }
  161. for dc := range n.dcs {
  162. replicasInDC[dc] = 0
  163. for rack := range seenDCRacks[dc] {
  164. delete(seenDCRacks[dc], rack)
  165. }
  166. }
  167. replicas := make([]*HostInfo, 0, totalRF)
  168. for j := 0; j < len(tokens) && (len(replicas) < totalRF && !n.haveRF(replicasInDC)); j++ {
  169. // TODO: ensure we dont add the same host twice
  170. p := i + j
  171. if p >= len(tokens) {
  172. p -= len(tokens)
  173. }
  174. h := tokens[p].host
  175. dc := h.DataCenter()
  176. rack := h.Rack()
  177. rf, ok := n.dcs[dc]
  178. if !ok {
  179. // skip this DC, dont know about it
  180. continue
  181. } else if replicasInDC[dc] >= rf {
  182. if replicasInDC[dc] > rf {
  183. panic(fmt.Sprintf("replica overflow. rf=%d have=%d in dc %q", rf, replicasInDC[dc], dc))
  184. }
  185. // have enough replicas in this DC
  186. continue
  187. } else if _, ok := dcRacks[dc][rack]; !ok {
  188. // dont know about this rack
  189. continue
  190. }
  191. racks := seenDCRacks[dc]
  192. if _, ok := racks[rack]; ok && len(racks) == len(dcRacks[dc]) {
  193. // we have been through all the racks and dont have RF yet, add this
  194. replicas = append(replicas, h)
  195. replicasInDC[dc]++
  196. } else if !ok {
  197. if racks == nil {
  198. racks = make(map[string]struct{}, 1)
  199. seenDCRacks[dc] = racks
  200. }
  201. // new rack
  202. racks[rack] = struct{}{}
  203. replicas = append(replicas, h)
  204. r := replicasInDC[dc] + 1
  205. if len(racks) == len(dcRacks[dc]) {
  206. // if we have been through all the racks, drain the rest of the skipped
  207. // hosts until we have RF. The next iteration will skip in the block
  208. // above
  209. skippedHosts := skipped[dc]
  210. var k int
  211. for ; k < len(skippedHosts) && r+k < rf; k++ {
  212. sh := skippedHosts[k]
  213. replicas = append(replicas, sh)
  214. }
  215. r += k
  216. skipped[dc] = skippedHosts[k:]
  217. }
  218. replicasInDC[dc] = r
  219. } else {
  220. // already seen this rack, keep hold of this host incase
  221. // we dont get enough for rf
  222. skipped[dc] = append(skipped[dc], h)
  223. }
  224. }
  225. if len(replicas) == 0 {
  226. panic(fmt.Sprintf("no replicas for token: %v", th.token))
  227. } else if !replicas[0].Equal(th.host) {
  228. panic(fmt.Sprintf("first replica is not the primary replica for the token: expected %v got %v", replicas[0].ConnectAddress(), th.host.ConnectAddress()))
  229. }
  230. replicaRing[i] = hostTokens{th.token, replicas}
  231. }
  232. if len(replicaRing) != len(tokens) {
  233. panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(replicaRing), len(tokens)))
  234. }
  235. return replicaRing
  236. }