topology.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. package gocql
  2. import (
  3. "fmt"
  4. "sort"
  5. "strconv"
  6. "strings"
  7. )
  8. type hostTokens struct {
  9. token token
  10. hosts []*HostInfo
  11. }
  12. type tokenRingReplicas []hostTokens
  13. func (h tokenRingReplicas) Less(i, j int) bool { return h[i].token.Less(h[j].token) }
  14. func (h tokenRingReplicas) Len() int { return len(h) }
  15. func (h tokenRingReplicas) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  16. func (h tokenRingReplicas) replicasFor(t token) *hostTokens {
  17. if len(h) == 0 {
  18. return nil
  19. }
  20. p := sort.Search(len(h), func(i int) bool {
  21. return !h[i].token.Less(t)
  22. })
  23. // TODO: simplify this
  24. if p < len(h) && h[p].token == t {
  25. return &h[p]
  26. }
  27. p--
  28. if p >= len(h) {
  29. // rollover
  30. p = 0
  31. } else if p < 0 {
  32. // rollunder
  33. p = len(h) - 1
  34. }
  35. return &h[p]
  36. }
  37. type placementStrategy interface {
  38. replicaMap(tokenRing *tokenRing) tokenRingReplicas
  39. replicationFactor(dc string) int
  40. }
  41. func getReplicationFactorFromOpts(keyspace string, val interface{}) int {
  42. // TODO: dont really want to panic here, but is better
  43. // than spamming
  44. switch v := val.(type) {
  45. case int:
  46. if v <= 0 {
  47. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", v, keyspace))
  48. }
  49. return v
  50. case string:
  51. n, err := strconv.Atoi(v)
  52. if err != nil {
  53. panic(fmt.Sprintf("invalid replication_factor. Is the %q keyspace configured correctly? %v", keyspace, err))
  54. } else if n <= 0 {
  55. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", n, keyspace))
  56. }
  57. return n
  58. default:
  59. panic(fmt.Sprintf("unkown replication_factor type %T", v))
  60. }
  61. }
  62. func getStrategy(ks *KeyspaceMetadata) placementStrategy {
  63. switch {
  64. case strings.Contains(ks.StrategyClass, "SimpleStrategy"):
  65. return &simpleStrategy{rf: getReplicationFactorFromOpts(ks.Name, ks.StrategyOptions["replication_factor"])}
  66. case strings.Contains(ks.StrategyClass, "NetworkTopologyStrategy"):
  67. dcs := make(map[string]int)
  68. for dc, rf := range ks.StrategyOptions {
  69. if dc == "class" {
  70. continue
  71. }
  72. dcs[dc] = getReplicationFactorFromOpts(ks.Name+":dc="+dc, rf)
  73. }
  74. return &networkTopology{dcs: dcs}
  75. case strings.Contains(ks.StrategyClass, "LocalStrategy"):
  76. return nil
  77. default:
  78. // TODO: handle unknown replicas and just return the primary host for a token
  79. panic(fmt.Sprintf("unsupported strategy class: %v", ks.StrategyClass))
  80. }
  81. }
  82. type simpleStrategy struct {
  83. rf int
  84. }
  85. func (s *simpleStrategy) replicationFactor(dc string) int {
  86. return s.rf
  87. }
  88. func (s *simpleStrategy) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
  89. tokens := tokenRing.tokens
  90. ring := make(tokenRingReplicas, len(tokens))
  91. for i, th := range tokens {
  92. replicas := make([]*HostInfo, 0, s.rf)
  93. seen := make(map[*HostInfo]bool)
  94. for j := 0; j < len(tokens) && len(replicas) < s.rf; j++ {
  95. h := tokens[(i+j)%len(tokens)]
  96. if !seen[h.host] {
  97. replicas = append(replicas, h.host)
  98. seen[h.host] = true
  99. }
  100. }
  101. ring[i] = hostTokens{th.token, replicas}
  102. }
  103. sort.Sort(ring)
  104. return ring
  105. }
  106. type networkTopology struct {
  107. dcs map[string]int
  108. }
  109. func (n *networkTopology) replicationFactor(dc string) int {
  110. return n.dcs[dc]
  111. }
  112. func (n *networkTopology) haveRF(replicaCounts map[string]int) bool {
  113. if len(replicaCounts) != len(n.dcs) {
  114. return false
  115. }
  116. for dc, rf := range n.dcs {
  117. if rf != replicaCounts[dc] {
  118. return false
  119. }
  120. }
  121. return true
  122. }
  123. func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
  124. dcRacks := make(map[string]map[string]struct{}, len(n.dcs))
  125. for _, h := range tokenRing.hosts {
  126. dc := h.DataCenter()
  127. rack := h.Rack()
  128. racks, ok := dcRacks[dc]
  129. if !ok {
  130. racks = make(map[string]struct{})
  131. dcRacks[dc] = racks
  132. }
  133. racks[rack] = struct{}{}
  134. }
  135. tokens := tokenRing.tokens
  136. replicaRing := make(tokenRingReplicas, len(tokens))
  137. var totalRF int
  138. for _, rf := range n.dcs {
  139. totalRF += rf
  140. }
  141. for i, th := range tokenRing.tokens {
  142. // number of replicas per dc
  143. // TODO: recycle these
  144. replicasInDC := make(map[string]int, len(n.dcs))
  145. // dc -> racks
  146. seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs))
  147. // skipped hosts in a dc
  148. skipped := make(map[string][]*HostInfo, len(n.dcs))
  149. replicas := make([]*HostInfo, 0, totalRF)
  150. for j := 0; j < len(tokens) && !n.haveRF(replicasInDC); j++ {
  151. // TODO: ensure we dont add the same host twice
  152. h := tokens[(i+j)%len(tokens)].host
  153. dc := h.DataCenter()
  154. rack := h.Rack()
  155. rf, ok := n.dcs[dc]
  156. if !ok {
  157. // skip this DC, dont know about it
  158. continue
  159. } else if replicasInDC[dc] >= rf {
  160. if replicasInDC[dc] > rf {
  161. panic(fmt.Sprintf("replica overflow. rf=%d have=%d in dc %q", rf, replicasInDC[dc], dc))
  162. }
  163. // have enough replicas in this DC
  164. continue
  165. } else if _, ok := dcRacks[dc][rack]; !ok {
  166. // dont know about this rack
  167. continue
  168. } else if len(replicas) >= totalRF {
  169. if replicasInDC[dc] > rf {
  170. panic(fmt.Sprintf("replica overflow. total rf=%d have=%d", totalRF, len(replicas)))
  171. }
  172. // we now have enough replicas
  173. break
  174. }
  175. racks := seenDCRacks[dc]
  176. if _, ok := racks[rack]; ok && len(racks) == len(dcRacks[dc]) {
  177. // we have been through all the racks and dont have RF yet, add this
  178. replicas = append(replicas, h)
  179. replicasInDC[dc]++
  180. } else if !ok {
  181. if racks == nil {
  182. racks = make(map[string]struct{}, 1)
  183. seenDCRacks[dc] = racks
  184. }
  185. // new rack
  186. racks[rack] = struct{}{}
  187. replicas = append(replicas, h)
  188. replicasInDC[dc]++
  189. if len(racks) == len(dcRacks[dc]) {
  190. // if we have been through all the racks, drain the rest of the skipped
  191. // hosts until we have RF. The next iteration will skip in the block
  192. // above
  193. skippedHosts := skipped[dc]
  194. var k int
  195. for ; k < len(skippedHosts) && replicasInDC[dc] < rf; k++ {
  196. sh := skippedHosts[k]
  197. replicas = append(replicas, sh)
  198. replicasInDC[dc]++
  199. }
  200. skipped[dc] = skippedHosts[k:]
  201. }
  202. } else {
  203. // already seen this rack, keep hold of this host incase
  204. // we dont get enough for rf
  205. skipped[dc] = append(skipped[dc], h)
  206. }
  207. }
  208. if len(replicas) == 0 {
  209. panic(fmt.Sprintf("no replicas for token: %v", th.token))
  210. } else if !replicas[0].Equal(th.host) {
  211. panic(fmt.Sprintf("first replica is not the primary replica for the token: expected %v got %v", replicas[0].ConnectAddress(), th.host.ConnectAddress()))
  212. }
  213. replicaRing[i] = hostTokens{th.token, replicas}
  214. }
  215. if len(replicaRing) != len(tokens) {
  216. panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(replicaRing), len(tokens)))
  217. }
  218. sort.Sort(replicaRing)
  219. return replicaRing
  220. }