topology.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package gocql
  2. import (
  3. "fmt"
  4. "strconv"
  5. "strings"
  6. )
  7. type placementStrategy interface {
  8. replicaMap(hosts []*HostInfo, tokens []hostToken) map[token][]*HostInfo
  9. replicationFactor(dc string) int
  10. }
  11. func getReplicationFactorFromOpts(keyspace string, val interface{}) int {
  12. // TODO: dont really want to panic here, but is better
  13. // than spamming
  14. switch v := val.(type) {
  15. case int:
  16. if v <= 0 {
  17. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", v, keyspace))
  18. }
  19. return v
  20. case string:
  21. n, err := strconv.Atoi(v)
  22. if err != nil {
  23. panic(fmt.Sprintf("invalid replication_factor. Is the %q keyspace configured correctly? %v", keyspace, err))
  24. } else if n <= 0 {
  25. panic(fmt.Sprintf("invalid replication_factor %d. Is the %q keyspace configured correctly?", n, keyspace))
  26. }
  27. return n
  28. default:
  29. panic(fmt.Sprintf("unkown replication_factor type %T", v))
  30. }
  31. }
  32. func getStrategy(ks *KeyspaceMetadata) placementStrategy {
  33. switch {
  34. case strings.Contains(ks.StrategyClass, "SimpleStrategy"):
  35. return &simpleStrategy{rf: getReplicationFactorFromOpts(ks.Name, ks.StrategyOptions["replication_factor"])}
  36. case strings.Contains(ks.StrategyClass, "NetworkTopologyStrategy"):
  37. dcs := make(map[string]int)
  38. for dc, rf := range ks.StrategyOptions {
  39. if dc == "class" {
  40. continue
  41. }
  42. dcs[dc] = getReplicationFactorFromOpts(ks.Name+":dc="+dc, rf)
  43. }
  44. return &networkTopology{dcs: dcs}
  45. case strings.Contains(ks.StrategyClass, "LocalStrategy"):
  46. return nil
  47. default:
  48. // TODO: handle unknown replicas and just return the primary host for a token
  49. panic(fmt.Sprintf("unsupported strategy class: %v", ks.StrategyClass))
  50. }
  51. }
  52. type simpleStrategy struct {
  53. rf int
  54. }
  55. func (s *simpleStrategy) replicationFactor(dc string) int {
  56. return s.rf
  57. }
  58. func (s *simpleStrategy) replicaMap(_ []*HostInfo, tokens []hostToken) map[token][]*HostInfo {
  59. tokenRing := make(map[token][]*HostInfo, len(tokens))
  60. for i, th := range tokens {
  61. replicas := make([]*HostInfo, 0, s.rf)
  62. for j := 0; j < len(tokens) && len(replicas) < s.rf; j++ {
  63. // TODO: need to ensure we dont add the same hosts twice
  64. h := tokens[(i+j)%len(tokens)]
  65. replicas = append(replicas, h.host)
  66. }
  67. tokenRing[th.token] = replicas
  68. }
  69. return tokenRing
  70. }
  71. type networkTopology struct {
  72. dcs map[string]int
  73. }
  74. func (n *networkTopology) replicationFactor(dc string) int {
  75. return n.dcs[dc]
  76. }
  77. func (n *networkTopology) haveRF(replicaCounts map[string]int) bool {
  78. if len(replicaCounts) != len(n.dcs) {
  79. return false
  80. }
  81. for dc, rf := range n.dcs {
  82. if rf != replicaCounts[dc] {
  83. return false
  84. }
  85. }
  86. return true
  87. }
  88. func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[token][]*HostInfo {
  89. dcRacks := make(map[string]map[string]struct{})
  90. for _, h := range hosts {
  91. dc := h.DataCenter()
  92. rack := h.Rack()
  93. racks, ok := dcRacks[dc]
  94. if !ok {
  95. racks = make(map[string]struct{})
  96. dcRacks[dc] = racks
  97. }
  98. racks[rack] = struct{}{}
  99. }
  100. tokenRing := make(map[token][]*HostInfo, len(tokens))
  101. var totalRF int
  102. for _, rf := range n.dcs {
  103. totalRF += rf
  104. }
  105. for i, th := range tokens {
  106. // number of replicas per dc
  107. // TODO: recycle these
  108. replicasInDC := make(map[string]int, len(n.dcs))
  109. // dc -> racks
  110. seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs))
  111. // skipped hosts in a dc
  112. skipped := make(map[string][]*HostInfo, len(n.dcs))
  113. replicas := make([]*HostInfo, 0, totalRF)
  114. for j := 0; j < len(tokens) && !n.haveRF(replicasInDC); j++ {
  115. // TODO: ensure we dont add the same host twice
  116. h := tokens[(i+j)%len(tokens)].host
  117. dc := h.DataCenter()
  118. rack := h.Rack()
  119. rf, ok := n.dcs[dc]
  120. if !ok {
  121. // skip this DC, dont know about it
  122. continue
  123. } else if replicasInDC[dc] >= rf {
  124. if replicasInDC[dc] > rf {
  125. panic(fmt.Sprintf("replica overflow. rf=%d have=%d in dc %q", rf, replicasInDC[dc], dc))
  126. }
  127. // have enough replicas in this DC
  128. continue
  129. } else if _, ok := dcRacks[dc][rack]; !ok {
  130. // dont know about this rack
  131. continue
  132. } else if len(replicas) >= totalRF {
  133. if replicasInDC[dc] > rf {
  134. panic(fmt.Sprintf("replica overflow. total rf=%d have=%d", totalRF, len(replicas)))
  135. }
  136. // we now have enough replicas
  137. break
  138. }
  139. racks := seenDCRacks[dc]
  140. if _, ok := racks[rack]; ok && len(racks) == len(dcRacks[dc]) {
  141. // we have been through all the racks and dont have RF yet, add this
  142. replicas = append(replicas, h)
  143. replicasInDC[dc]++
  144. } else if !ok {
  145. if racks == nil {
  146. racks = make(map[string]struct{}, 1)
  147. seenDCRacks[dc] = racks
  148. }
  149. // new rack
  150. racks[rack] = struct{}{}
  151. replicas = append(replicas, h)
  152. replicasInDC[dc]++
  153. if len(racks) == len(dcRacks[dc]) {
  154. // if we have been through all the racks, drain the rest of the skipped
  155. // hosts until we have RF. The next iteration will skip in the block
  156. // above
  157. skippedHosts := skipped[dc]
  158. var k int
  159. for ; k < len(skippedHosts) && replicasInDC[dc] < rf; k++ {
  160. sh := skippedHosts[k]
  161. replicas = append(replicas, sh)
  162. replicasInDC[dc]++
  163. }
  164. skipped[dc] = skippedHosts[k:]
  165. }
  166. } else {
  167. // already seen this rack, keep hold of this host incase
  168. // we dont get enough for rf
  169. skipped[dc] = append(skipped[dc], h)
  170. }
  171. }
  172. if len(replicas) == 0 || replicas[0] != th.host {
  173. panic("first replica is not the primary replica for the token")
  174. }
  175. tokenRing[th.token] = replicas
  176. }
  177. if len(tokenRing) != len(tokens) {
  178. panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(tokenRing), len(tokens)))
  179. }
  180. return tokenRing
  181. }