topology.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package gocql
  2. import (
  3. "fmt"
  4. "sort"
  5. "strconv"
  6. "strings"
  7. )
  8. type hostTokens struct {
  9. token token
  10. hosts []*HostInfo
  11. }
  12. type tokenRingReplicas []hostTokens
  13. func (h tokenRingReplicas) Less(i, j int) bool { return h[i].token.Less(h[j].token) }
  14. func (h tokenRingReplicas) Len() int { return len(h) }
  15. func (h tokenRingReplicas) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  16. func (h tokenRingReplicas) replicasFor(t token) *hostTokens {
  17. if len(h) == 0 {
  18. return nil
  19. }
  20. p := sort.Search(len(h), func(i int) bool {
  21. return !h[i].token.Less(t)
  22. })
  23. if p >= len(h) {
  24. // rollover
  25. p = 0
  26. }
  27. return &h[p]
  28. }
  29. type placementStrategy interface {
  30. replicaMap(tokenRing *tokenRing) tokenRingReplicas
  31. replicationFactor(dc string) int
  32. }
  33. func getReplicationFactorFromOpts(keyspace string, val interface{}) int {
  34. // TODO: dont really want to panic here, but is better
  35. // than spamming
  36. switch v := val.(type) {
  37. case int:
  38. if v <= 0 {
  39. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", v, keyspace))
  40. }
  41. return v
  42. case string:
  43. n, err := strconv.Atoi(v)
  44. if err != nil {
  45. panic(fmt.Sprintf("invalid replication_factor. Is the %q keyspace configured correctly? %v", keyspace, err))
  46. } else if n <= 0 {
  47. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", n, keyspace))
  48. }
  49. return n
  50. default:
  51. panic(fmt.Sprintf("unkown replication_factor type %T", v))
  52. }
  53. }
  54. func getStrategy(ks *KeyspaceMetadata) placementStrategy {
  55. switch {
  56. case strings.Contains(ks.StrategyClass, "SimpleStrategy"):
  57. return &simpleStrategy{rf: getReplicationFactorFromOpts(ks.Name, ks.StrategyOptions["replication_factor"])}
  58. case strings.Contains(ks.StrategyClass, "NetworkTopologyStrategy"):
  59. dcs := make(map[string]int)
  60. for dc, rf := range ks.StrategyOptions {
  61. if dc == "class" {
  62. continue
  63. }
  64. dcs[dc] = getReplicationFactorFromOpts(ks.Name+":dc="+dc, rf)
  65. }
  66. return &networkTopology{dcs: dcs}
  67. case strings.Contains(ks.StrategyClass, "LocalStrategy"):
  68. return nil
  69. default:
  70. // TODO: handle unknown replicas and just return the primary host for a token
  71. panic(fmt.Sprintf("unsupported strategy class: %v", ks.StrategyClass))
  72. }
  73. }
  74. type simpleStrategy struct {
  75. rf int
  76. }
  77. func (s *simpleStrategy) replicationFactor(dc string) int {
  78. return s.rf
  79. }
  80. func (s *simpleStrategy) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
  81. tokens := tokenRing.tokens
  82. ring := make(tokenRingReplicas, len(tokens))
  83. for i, th := range tokens {
  84. replicas := make([]*HostInfo, 0, s.rf)
  85. seen := make(map[*HostInfo]bool)
  86. for j := 0; j < len(tokens) && len(replicas) < s.rf; j++ {
  87. // TODO: need to ensure we dont add the same hosts twice
  88. h := tokens[(i+j)%len(tokens)]
  89. if !seen[h.host] {
  90. replicas = append(replicas, h.host)
  91. seen[h.host] = true
  92. }
  93. }
  94. ring[i] = hostTokens{th.token, replicas}
  95. }
  96. sort.Sort(ring)
  97. return ring
  98. }
  99. type networkTopology struct {
  100. dcs map[string]int
  101. }
  102. func (n *networkTopology) replicationFactor(dc string) int {
  103. return n.dcs[dc]
  104. }
  105. func (n *networkTopology) haveRF(replicaCounts map[string]int) bool {
  106. if len(replicaCounts) != len(n.dcs) {
  107. return false
  108. }
  109. for dc, rf := range n.dcs {
  110. if rf != replicaCounts[dc] {
  111. return false
  112. }
  113. }
  114. return true
  115. }
  116. func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
  117. dcRacks := make(map[string]map[string]struct{}, len(n.dcs))
  118. for _, h := range tokenRing.hosts {
  119. dc := h.DataCenter()
  120. rack := h.Rack()
  121. racks, ok := dcRacks[dc]
  122. if !ok {
  123. racks = make(map[string]struct{})
  124. dcRacks[dc] = racks
  125. }
  126. racks[rack] = struct{}{}
  127. }
  128. tokens := tokenRing.tokens
  129. replicaRing := make(tokenRingReplicas, len(tokens))
  130. var totalRF int
  131. for _, rf := range n.dcs {
  132. totalRF += rf
  133. }
  134. for i, th := range tokenRing.tokens {
  135. // number of replicas per dc
  136. // TODO: recycle these
  137. replicasInDC := make(map[string]int, len(n.dcs))
  138. // dc -> racks
  139. seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs))
  140. // skipped hosts in a dc
  141. skipped := make(map[string][]*HostInfo, len(n.dcs))
  142. replicas := make([]*HostInfo, 0, totalRF)
  143. for j := 0; j < len(tokens) && !n.haveRF(replicasInDC); j++ {
  144. // TODO: ensure we dont add the same host twice
  145. h := tokens[(i+j)%len(tokens)].host
  146. dc := h.DataCenter()
  147. rack := h.Rack()
  148. rf, ok := n.dcs[dc]
  149. if !ok {
  150. // skip this DC, dont know about it
  151. continue
  152. } else if replicasInDC[dc] >= rf {
  153. if replicasInDC[dc] > rf {
  154. panic(fmt.Sprintf("replica overflow. rf=%d have=%d in dc %q", rf, replicasInDC[dc], dc))
  155. }
  156. // have enough replicas in this DC
  157. continue
  158. } else if _, ok := dcRacks[dc][rack]; !ok {
  159. // dont know about this rack
  160. continue
  161. } else if len(replicas) >= totalRF {
  162. if replicasInDC[dc] > rf {
  163. panic(fmt.Sprintf("replica overflow. total rf=%d have=%d", totalRF, len(replicas)))
  164. }
  165. // we now have enough replicas
  166. break
  167. }
  168. racks := seenDCRacks[dc]
  169. if _, ok := racks[rack]; ok && len(racks) == len(dcRacks[dc]) {
  170. // we have been through all the racks and dont have RF yet, add this
  171. replicas = append(replicas, h)
  172. replicasInDC[dc]++
  173. } else if !ok {
  174. if racks == nil {
  175. racks = make(map[string]struct{}, 1)
  176. seenDCRacks[dc] = racks
  177. }
  178. // new rack
  179. racks[rack] = struct{}{}
  180. replicas = append(replicas, h)
  181. replicasInDC[dc]++
  182. if len(racks) == len(dcRacks[dc]) {
  183. // if we have been through all the racks, drain the rest of the skipped
  184. // hosts until we have RF. The next iteration will skip in the block
  185. // above
  186. skippedHosts := skipped[dc]
  187. var k int
  188. for ; k < len(skippedHosts) && replicasInDC[dc] < rf; k++ {
  189. sh := skippedHosts[k]
  190. replicas = append(replicas, sh)
  191. replicasInDC[dc]++
  192. }
  193. skipped[dc] = skippedHosts[k:]
  194. }
  195. } else {
  196. // already seen this rack, keep hold of this host incase
  197. // we dont get enough for rf
  198. skipped[dc] = append(skipped[dc], h)
  199. }
  200. }
  201. if len(replicas) == 0 {
  202. panic(fmt.Sprintf("no replicas for token: %v", th.token))
  203. } else if !replicas[0].Equal(th.host) {
  204. panic(fmt.Sprintf("first replica is not the primary replica for the token: expected %v got %v", replicas[0].ConnectAddress(), th.host.ConnectAddress()))
  205. }
  206. replicaRing[i] = hostTokens{th.token, replicas}
  207. }
  208. if len(replicaRing) != len(tokens) {
  209. panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(replicaRing), len(tokens)))
  210. }
  211. sort.Sort(replicaRing)
  212. return replicaRing
  213. }