host_source.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. package gocql
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. )
  10. type nodeState int32
  11. func (n nodeState) String() string {
  12. if n == NodeUp {
  13. return "UP"
  14. } else if n == NodeDown {
  15. return "DOWN"
  16. }
  17. return fmt.Sprintf("UNKNOWN_%d", n)
  18. }
  19. const (
  20. NodeUp nodeState = iota
  21. NodeDown
  22. )
  23. type cassVersion struct {
  24. Major, Minor, Patch int
  25. }
  26. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  27. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  28. v := strings.Split(version, ".")
  29. if len(v) < 3 {
  30. return fmt.Errorf("invalid schema_version: %v", string(data))
  31. }
  32. var err error
  33. c.Major, err = strconv.Atoi(v[0])
  34. if err != nil {
  35. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  36. }
  37. c.Minor, err = strconv.Atoi(v[1])
  38. if err != nil {
  39. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  40. }
  41. c.Patch, err = strconv.Atoi(v[2])
  42. if err != nil {
  43. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  44. }
  45. return nil
  46. }
  47. func (c cassVersion) String() string {
  48. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  49. }
  50. func (c cassVersion) nodeUpDelay() time.Duration {
  51. if c.Major >= 2 && c.Minor >= 2 {
  52. // CASSANDRA-8236
  53. return 0
  54. }
  55. return 10 * time.Second
  56. }
  57. type HostInfo struct {
  58. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  59. // that we are thread safe use a mutex to access all fields.
  60. mu sync.RWMutex
  61. peer string
  62. port int
  63. dataCenter string
  64. rack string
  65. hostId string
  66. version cassVersion
  67. state nodeState
  68. tokens []string
  69. }
  70. func (h *HostInfo) Equal(host *HostInfo) bool {
  71. h.mu.RLock()
  72. defer h.mu.RUnlock()
  73. host.mu.RLock()
  74. defer host.mu.RUnlock()
  75. return h.peer == host.peer && h.hostId == host.hostId
  76. }
  77. func (h *HostInfo) Peer() string {
  78. h.mu.RLock()
  79. defer h.mu.RUnlock()
  80. return h.peer
  81. }
  82. func (h *HostInfo) setPeer(peer string) *HostInfo {
  83. h.mu.Lock()
  84. defer h.mu.Unlock()
  85. h.peer = peer
  86. return h
  87. }
  88. func (h *HostInfo) DataCenter() string {
  89. h.mu.RLock()
  90. defer h.mu.RUnlock()
  91. return h.dataCenter
  92. }
  93. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  94. h.mu.Lock()
  95. defer h.mu.Unlock()
  96. h.dataCenter = dataCenter
  97. return h
  98. }
  99. func (h *HostInfo) Rack() string {
  100. h.mu.RLock()
  101. defer h.mu.RUnlock()
  102. return h.rack
  103. }
  104. func (h *HostInfo) setRack(rack string) *HostInfo {
  105. h.mu.Lock()
  106. defer h.mu.Unlock()
  107. h.rack = rack
  108. return h
  109. }
  110. func (h *HostInfo) HostID() string {
  111. h.mu.RLock()
  112. defer h.mu.RUnlock()
  113. return h.hostId
  114. }
  115. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  116. h.mu.Lock()
  117. defer h.mu.Unlock()
  118. h.hostId = hostID
  119. return h
  120. }
  121. func (h *HostInfo) Version() cassVersion {
  122. h.mu.RLock()
  123. defer h.mu.RUnlock()
  124. return h.version
  125. }
  126. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  127. h.mu.Lock()
  128. defer h.mu.Unlock()
  129. h.version = cassVersion{major, minor, patch}
  130. return h
  131. }
  132. func (h *HostInfo) State() nodeState {
  133. h.mu.RLock()
  134. defer h.mu.RUnlock()
  135. return h.state
  136. }
  137. func (h *HostInfo) setState(state nodeState) *HostInfo {
  138. h.mu.Lock()
  139. defer h.mu.Unlock()
  140. h.state = state
  141. return h
  142. }
  143. func (h *HostInfo) Tokens() []string {
  144. h.mu.RLock()
  145. defer h.mu.RUnlock()
  146. return h.tokens
  147. }
  148. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  149. h.mu.Lock()
  150. defer h.mu.Unlock()
  151. h.tokens = tokens
  152. return h
  153. }
  154. func (h *HostInfo) Port() int {
  155. h.mu.RLock()
  156. defer h.mu.RUnlock()
  157. return h.port
  158. }
  159. func (h *HostInfo) setPort(port int) *HostInfo {
  160. h.mu.Lock()
  161. defer h.mu.Unlock()
  162. h.port = port
  163. return h
  164. }
  165. func (h *HostInfo) update(from *HostInfo) {
  166. h.mu.Lock()
  167. defer h.mu.Unlock()
  168. h.tokens = from.tokens
  169. h.version = from.version
  170. h.hostId = from.hostId
  171. h.dataCenter = from.dataCenter
  172. }
  173. func (h *HostInfo) IsUp() bool {
  174. return h.State() == NodeUp
  175. }
  176. func (h *HostInfo) String() string {
  177. h.mu.RLock()
  178. defer h.mu.RUnlock()
  179. return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  180. }
  181. // Polls system.peers at a specific interval to find new hosts
  182. type ringDescriber struct {
  183. dcFilter string
  184. rackFilter string
  185. session *Session
  186. closeChan chan bool
  187. // indicates that we can use system.local to get the connections remote address
  188. localHasRpcAddr bool
  189. mu sync.Mutex
  190. prevHosts []*HostInfo
  191. prevPartitioner string
  192. }
  193. func checkSystemLocal(control *controlConn) (bool, error) {
  194. iter := control.query("SELECT broadcast_address FROM system.local")
  195. if err := iter.err; err != nil {
  196. if errf, ok := err.(*errorFrame); ok {
  197. if errf.code == errSyntax {
  198. return false, nil
  199. }
  200. }
  201. return false, err
  202. }
  203. return true, nil
  204. }
  205. func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) {
  206. r.mu.Lock()
  207. defer r.mu.Unlock()
  208. // we need conn to be the same because we need to query system.peers and system.local
  209. // on the same node to get the whole cluster
  210. const (
  211. legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  212. // only supported in 2.2.0, 2.1.6, 2.0.16
  213. localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local"
  214. )
  215. localHost := &HostInfo{}
  216. if r.localHasRpcAddr {
  217. iter := r.session.control.query(localQuery)
  218. if iter == nil {
  219. return r.prevHosts, r.prevPartitioner, nil
  220. }
  221. iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack,
  222. &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  223. if err = iter.Close(); err != nil {
  224. return nil, "", err
  225. }
  226. } else {
  227. iter := r.session.control.query(legacyLocalQuery)
  228. if iter == nil {
  229. return r.prevHosts, r.prevPartitioner, nil
  230. }
  231. iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version)
  232. if err = iter.Close(); err != nil {
  233. return nil, "", err
  234. }
  235. addr, _, err := net.SplitHostPort(r.session.control.addr())
  236. if err != nil {
  237. // this should not happen, ever, as this is the address that was dialed by conn, here
  238. // a panic makes sense, please report a bug if it occurs.
  239. panic(err)
  240. }
  241. localHost.peer = addr
  242. }
  243. localHost.port = r.session.cfg.Port
  244. hosts = []*HostInfo{localHost}
  245. iter := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers")
  246. if iter == nil {
  247. return r.prevHosts, r.prevPartitioner, nil
  248. }
  249. host := &HostInfo{port: r.session.cfg.Port}
  250. for iter.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) {
  251. if r.matchFilter(host) {
  252. hosts = append(hosts, host)
  253. }
  254. host = &HostInfo{
  255. port: r.session.cfg.Port,
  256. }
  257. }
  258. if err = iter.Close(); err != nil {
  259. return nil, "", err
  260. }
  261. r.prevHosts = hosts
  262. r.prevPartitioner = partitioner
  263. return hosts, partitioner, nil
  264. }
  265. func (r *ringDescriber) matchFilter(host *HostInfo) bool {
  266. if r.dcFilter != "" && r.dcFilter != host.DataCenter() {
  267. return false
  268. }
  269. if r.rackFilter != "" && r.rackFilter != host.Rack() {
  270. return false
  271. }
  272. return true
  273. }
  274. func (r *ringDescriber) refreshRing() error {
  275. // if we have 0 hosts this will return the previous list of hosts to
  276. // attempt to reconnect to the cluster otherwise we would never find
  277. // downed hosts again, could possibly have an optimisation to only
  278. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  279. hosts, partitioner, err := r.GetHosts()
  280. if err != nil {
  281. return err
  282. }
  283. // TODO: move this to session
  284. // TODO: handle removing hosts here
  285. for _, h := range hosts {
  286. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  287. r.session.pool.addHost(h)
  288. } else {
  289. host.update(h)
  290. }
  291. }
  292. r.session.pool.SetPartitioner(partitioner)
  293. return nil
  294. }