topology.go 7.0 KB


  1. package gocql
  2. import (
  3. "fmt"
  4. "sort"
  5. "strconv"
  6. "strings"
  7. )
  8. type hostTokens struct {
  9. // token is end (inclusive) of token range these hosts belong to
  10. token token
  11. hosts []*HostInfo
  12. }
  13. // tokenRingReplicas maps token ranges to list of replicas.
  14. // The elements in tokenRingReplicas are sorted by token ascending.
  15. // The range for a given item in tokenRingReplicas starts after preceding range and ends with the token specified in
  16. // token. The end token is part of the range.
  17. // The lowest (i.e. index 0) range wraps around the ring (its preceding range is the one with largest index).
  18. type tokenRingReplicas []hostTokens
  19. func (h tokenRingReplicas) Less(i, j int) bool { return h[i].token.Less(h[j].token) }
  20. func (h tokenRingReplicas) Len() int { return len(h) }
  21. func (h tokenRingReplicas) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  22. func (h tokenRingReplicas) replicasFor(t token) *hostTokens {
  23. if len(h) == 0 {
  24. return nil
  25. }
  26. p := sort.Search(len(h), func(i int) bool {
  27. return !h[i].token.Less(t)
  28. })
  29. if p >= len(h) {
  30. // rollover
  31. p = 0
  32. }
  33. return &h[p]
  34. }
  35. type placementStrategy interface {
  36. replicaMap(tokenRing *tokenRing) tokenRingReplicas
  37. replicationFactor(dc string) int
  38. }
  39. func getReplicationFactorFromOpts(keyspace string, val interface{}) int {
  40. // TODO: dont really want to panic here, but is better
  41. // than spamming
  42. switch v := val.(type) {
  43. case int:
  44. if v <= 0 {
  45. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", v, keyspace))
  46. }
  47. return v
  48. case string:
  49. n, err := strconv.Atoi(v)
  50. if err != nil {
  51. panic(fmt.Sprintf("invalid replication_factor. Is the %q keyspace configured correctly? %v", keyspace, err))
  52. } else if n <= 0 {
  53. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", n, keyspace))
  54. }
  55. return n
  56. default:
  57. panic(fmt.Sprintf("unkown replication_factor type %T", v))
  58. }
  59. }
  60. func getStrategy(ks *KeyspaceMetadata) placementStrategy {
  61. switch {
  62. case strings.Contains(ks.StrategyClass, "SimpleStrategy"):
  63. return &simpleStrategy{rf: getReplicationFactorFromOpts(ks.Name, ks.StrategyOptions["replication_factor"])}
  64. case strings.Contains(ks.StrategyClass, "NetworkTopologyStrategy"):
  65. dcs := make(map[string]int)
  66. for dc, rf := range ks.StrategyOptions {
  67. if dc == "class" {
  68. continue
  69. }
  70. dcs[dc] = getReplicationFactorFromOpts(ks.Name+":dc="+dc, rf)
  71. }
  72. return &networkTopology{dcs: dcs}
  73. case strings.Contains(ks.StrategyClass, "LocalStrategy"):
  74. return nil
  75. default:
  76. // TODO: handle unknown replicas and just return the primary host for a token
  77. panic(fmt.Sprintf("unsupported strategy class: %v", ks.StrategyClass))
  78. }
  79. }
  80. type simpleStrategy struct {
  81. rf int
  82. }
  83. func (s *simpleStrategy) replicationFactor(dc string) int {
  84. return s.rf
  85. }
  86. func (s *simpleStrategy) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
  87. tokens := tokenRing.tokens
  88. ring := make(tokenRingReplicas, len(tokens))
  89. for i, th := range tokens {
  90. replicas := make([]*HostInfo, 0, s.rf)
  91. seen := make(map[*HostInfo]bool)
  92. for j := 0; j < len(tokens) && len(replicas) < s.rf; j++ {
  93. h := tokens[(i+j)%len(tokens)]
  94. if !seen[h.host] {
  95. replicas = append(replicas, h.host)
  96. seen[h.host] = true
  97. }
  98. }
  99. ring[i] = hostTokens{th.token, replicas}
  100. }
  101. sort.Sort(ring)
  102. return ring
  103. }
  104. type networkTopology struct {
  105. dcs map[string]int
  106. }
  107. func (n *networkTopology) replicationFactor(dc string) int {
  108. return n.dcs[dc]
  109. }
  110. func (n *networkTopology) haveRF(replicaCounts map[string]int) bool {
  111. if len(replicaCounts) != len(n.dcs) {
  112. return false
  113. }
  114. for dc, rf := range n.dcs {
  115. if rf != replicaCounts[dc] {
  116. return false
  117. }
  118. }
  119. return true
  120. }
  121. func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas {
  122. dcRacks := make(map[string]map[string]struct{}, len(n.dcs))
  123. // skipped hosts in a dc
  124. skipped := make(map[string][]*HostInfo, len(n.dcs))
  125. // number of replicas per dc
  126. replicasInDC := make(map[string]int, len(n.dcs))
  127. // dc -> racks
  128. seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs))
  129. for _, h := range tokenRing.hosts {
  130. dc := h.DataCenter()
  131. rack := h.Rack()
  132. racks, ok := dcRacks[dc]
  133. if !ok {
  134. racks = make(map[string]struct{})
  135. dcRacks[dc] = racks
  136. }
  137. racks[rack] = struct{}{}
  138. }
  139. for dc, racks := range dcRacks {
  140. replicasInDC[dc] = 0
  141. seenDCRacks[dc] = make(map[string]struct{}, len(racks))
  142. }
  143. tokens := tokenRing.tokens
  144. replicaRing := make(tokenRingReplicas, len(tokens))
  145. var totalRF int
  146. for _, rf := range n.dcs {
  147. totalRF += rf
  148. }
  149. for i, th := range tokenRing.tokens {
  150. for k, v := range skipped {
  151. skipped[k] = v[:0]
  152. }
  153. for dc := range n.dcs {
  154. replicasInDC[dc] = 0
  155. for rack := range seenDCRacks[dc] {
  156. delete(seenDCRacks[dc], rack)
  157. }
  158. }
  159. replicas := make([]*HostInfo, 0, totalRF)
  160. for j := 0; j < len(tokens) && (len(replicas) < totalRF && !n.haveRF(replicasInDC)); j++ {
  161. // TODO: ensure we dont add the same host twice
  162. p := i + j
  163. if p >= len(tokens) {
  164. p -= len(tokens)
  165. }
  166. h := tokens[p].host
  167. dc := h.DataCenter()
  168. rack := h.Rack()
  169. rf, ok := n.dcs[dc]
  170. if !ok {
  171. // skip this DC, dont know about it
  172. continue
  173. } else if replicasInDC[dc] >= rf {
  174. if replicasInDC[dc] > rf {
  175. panic(fmt.Sprintf("replica overflow. rf=%d have=%d in dc %q", rf, replicasInDC[dc], dc))
  176. }
  177. // have enough replicas in this DC
  178. continue
  179. } else if _, ok := dcRacks[dc][rack]; !ok {
  180. // dont know about this rack
  181. continue
  182. }
  183. racks := seenDCRacks[dc]
  184. if _, ok := racks[rack]; ok && len(racks) == len(dcRacks[dc]) {
  185. // we have been through all the racks and dont have RF yet, add this
  186. replicas = append(replicas, h)
  187. replicasInDC[dc]++
  188. } else if !ok {
  189. if racks == nil {
  190. racks = make(map[string]struct{}, 1)
  191. seenDCRacks[dc] = racks
  192. }
  193. // new rack
  194. racks[rack] = struct{}{}
  195. replicas = append(replicas, h)
  196. r := replicasInDC[dc] + 1
  197. if len(racks) == len(dcRacks[dc]) {
  198. // if we have been through all the racks, drain the rest of the skipped
  199. // hosts until we have RF. The next iteration will skip in the block
  200. // above
  201. skippedHosts := skipped[dc]
  202. var k int
  203. for ; k < len(skippedHosts) && r+k < rf; k++ {
  204. sh := skippedHosts[k]
  205. replicas = append(replicas, sh)
  206. }
  207. r += k
  208. skipped[dc] = skippedHosts[k:]
  209. }
  210. replicasInDC[dc] = r
  211. } else {
  212. // already seen this rack, keep hold of this host incase
  213. // we dont get enough for rf
  214. skipped[dc] = append(skipped[dc], h)
  215. }
  216. }
  217. if len(replicas) == 0 {
  218. panic(fmt.Sprintf("no replicas for token: %v", th.token))
  219. } else if !replicas[0].Equal(th.host) {
  220. panic(fmt.Sprintf("first replica is not the primary replica for the token: expected %v got %v", replicas[0].ConnectAddress(), th.host.ConnectAddress()))
  221. }
  222. replicaRing[i] = hostTokens{th.token, replicas}
  223. }
  224. if len(replicaRing) != len(tokens) {
  225. panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(replicaRing), len(tokens)))
  226. }
  227. return replicaRing
  228. }