host_source.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. package gocql
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. type nodeState int32
  12. func (n nodeState) String() string {
  13. if n == NodeUp {
  14. return "UP"
  15. } else if n == NodeDown {
  16. return "DOWN"
  17. }
  18. return fmt.Sprintf("UNKNOWN_%d", n)
  19. }
  20. const (
  21. NodeUp nodeState = iota
  22. NodeDown
  23. )
  24. type cassVersion struct {
  25. Major, Minor, Patch int
  26. }
  27. func (c *cassVersion) Set(v string) error {
  28. if v == "" {
  29. return nil
  30. }
  31. return c.UnmarshalCQL(nil, []byte(v))
  32. }
  33. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  34. return c.unmarshal(data)
  35. }
  36. func (c *cassVersion) unmarshal(data []byte) error {
  37. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  38. version = strings.TrimPrefix(version, "v")
  39. v := strings.Split(version, ".")
  40. if len(v) < 2 {
  41. return fmt.Errorf("invalid version string: %s", data)
  42. }
  43. var err error
  44. c.Major, err = strconv.Atoi(v[0])
  45. if err != nil {
  46. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  47. }
  48. c.Minor, err = strconv.Atoi(v[1])
  49. if err != nil {
  50. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  51. }
  52. if len(v) > 2 {
  53. c.Patch, err = strconv.Atoi(v[2])
  54. if err != nil {
  55. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  56. }
  57. }
  58. return nil
  59. }
  60. func (c cassVersion) Before(major, minor, patch int) bool {
  61. if c.Major > major {
  62. return true
  63. } else if c.Minor > minor {
  64. return true
  65. } else if c.Patch > patch {
  66. return true
  67. }
  68. return false
  69. }
  70. func (c cassVersion) String() string {
  71. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  72. }
  73. func (c cassVersion) nodeUpDelay() time.Duration {
  74. if c.Major >= 2 && c.Minor >= 2 {
  75. // CASSANDRA-8236
  76. return 0
  77. }
  78. return 10 * time.Second
  79. }
  80. type HostInfo struct {
  81. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  82. // that we are thread safe use a mutex to access all fields.
  83. mu sync.RWMutex
  84. peer net.IP
  85. broadcastAddress net.IP
  86. listenAddress net.IP
  87. rpcAddress net.IP
  88. preferredIP net.IP
  89. connectAddress net.IP
  90. port int
  91. dataCenter string
  92. rack string
  93. hostId string
  94. workload string
  95. graph bool
  96. dseVersion string
  97. partitioner string
  98. clusterName string
  99. version cassVersion
  100. state nodeState
  101. tokens []string
  102. }
  103. func (h *HostInfo) Equal(host *HostInfo) bool {
  104. if h == host {
  105. // prevent rlock reentry
  106. return true
  107. }
  108. return h.ConnectAddress().Equal(host.ConnectAddress())
  109. }
  110. func (h *HostInfo) Peer() net.IP {
  111. h.mu.RLock()
  112. defer h.mu.RUnlock()
  113. return h.peer
  114. }
  115. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  116. h.mu.Lock()
  117. defer h.mu.Unlock()
  118. h.peer = peer
  119. return h
  120. }
  121. func (h *HostInfo) invalidConnectAddr() bool {
  122. h.mu.RLock()
  123. defer h.mu.RUnlock()
  124. addr, _ := h.connectAddressLocked()
  125. return !validIpAddr(addr)
  126. }
  127. func validIpAddr(addr net.IP) bool {
  128. return addr != nil && !addr.IsUnspecified()
  129. }
  130. func (h *HostInfo) connectAddressLocked() (net.IP, string) {
  131. if validIpAddr(h.connectAddress) {
  132. return h.connectAddress, "connect_address"
  133. } else if validIpAddr(h.rpcAddress) {
  134. return h.rpcAddress, "rpc_adress"
  135. } else if validIpAddr(h.preferredIP) {
  136. // where does perferred_ip get set?
  137. return h.preferredIP, "preferred_ip"
  138. } else if validIpAddr(h.broadcastAddress) {
  139. return h.broadcastAddress, "broadcast_address"
  140. } else if validIpAddr(h.peer) {
  141. return h.peer, "peer"
  142. }
  143. return net.IPv4zero, "invalid"
  144. }
  145. // Returns the address that should be used to connect to the host.
  146. // If you wish to override this, use an AddressTranslator or
  147. // use a HostFilter to SetConnectAddress()
  148. func (h *HostInfo) ConnectAddress() net.IP {
  149. h.mu.RLock()
  150. defer h.mu.RUnlock()
  151. if addr, _ := h.connectAddressLocked(); validIpAddr(addr) {
  152. return addr
  153. }
  154. panic(fmt.Sprintf("no valid connect address for host: %v. Is your cluster configured correctly?", h))
  155. }
  156. func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
  157. // TODO(zariel): should this not be exported?
  158. h.mu.Lock()
  159. defer h.mu.Unlock()
  160. h.connectAddress = address
  161. return h
  162. }
  163. func (h *HostInfo) BroadcastAddress() net.IP {
  164. h.mu.RLock()
  165. defer h.mu.RUnlock()
  166. return h.broadcastAddress
  167. }
  168. func (h *HostInfo) ListenAddress() net.IP {
  169. h.mu.RLock()
  170. defer h.mu.RUnlock()
  171. return h.listenAddress
  172. }
  173. func (h *HostInfo) RPCAddress() net.IP {
  174. h.mu.RLock()
  175. defer h.mu.RUnlock()
  176. return h.rpcAddress
  177. }
  178. func (h *HostInfo) PreferredIP() net.IP {
  179. h.mu.RLock()
  180. defer h.mu.RUnlock()
  181. return h.preferredIP
  182. }
  183. func (h *HostInfo) DataCenter() string {
  184. h.mu.RLock()
  185. defer h.mu.RUnlock()
  186. return h.dataCenter
  187. }
  188. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  189. h.mu.Lock()
  190. defer h.mu.Unlock()
  191. h.dataCenter = dataCenter
  192. return h
  193. }
  194. func (h *HostInfo) Rack() string {
  195. h.mu.RLock()
  196. defer h.mu.RUnlock()
  197. return h.rack
  198. }
  199. func (h *HostInfo) setRack(rack string) *HostInfo {
  200. h.mu.Lock()
  201. defer h.mu.Unlock()
  202. h.rack = rack
  203. return h
  204. }
  205. func (h *HostInfo) HostID() string {
  206. h.mu.RLock()
  207. defer h.mu.RUnlock()
  208. return h.hostId
  209. }
  210. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  211. h.mu.Lock()
  212. defer h.mu.Unlock()
  213. h.hostId = hostID
  214. return h
  215. }
  216. func (h *HostInfo) WorkLoad() string {
  217. h.mu.RLock()
  218. defer h.mu.RUnlock()
  219. return h.workload
  220. }
  221. func (h *HostInfo) Graph() bool {
  222. h.mu.RLock()
  223. defer h.mu.RUnlock()
  224. return h.graph
  225. }
  226. func (h *HostInfo) DSEVersion() string {
  227. h.mu.RLock()
  228. defer h.mu.RUnlock()
  229. return h.dseVersion
  230. }
  231. func (h *HostInfo) Partitioner() string {
  232. h.mu.RLock()
  233. defer h.mu.RUnlock()
  234. return h.partitioner
  235. }
  236. func (h *HostInfo) ClusterName() string {
  237. h.mu.RLock()
  238. defer h.mu.RUnlock()
  239. return h.clusterName
  240. }
  241. func (h *HostInfo) Version() cassVersion {
  242. h.mu.RLock()
  243. defer h.mu.RUnlock()
  244. return h.version
  245. }
  246. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  247. h.mu.Lock()
  248. defer h.mu.Unlock()
  249. h.version = cassVersion{major, minor, patch}
  250. return h
  251. }
  252. func (h *HostInfo) State() nodeState {
  253. h.mu.RLock()
  254. defer h.mu.RUnlock()
  255. return h.state
  256. }
  257. func (h *HostInfo) setState(state nodeState) *HostInfo {
  258. h.mu.Lock()
  259. defer h.mu.Unlock()
  260. h.state = state
  261. return h
  262. }
  263. func (h *HostInfo) Tokens() []string {
  264. h.mu.RLock()
  265. defer h.mu.RUnlock()
  266. return h.tokens
  267. }
  268. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  269. h.mu.Lock()
  270. defer h.mu.Unlock()
  271. h.tokens = tokens
  272. return h
  273. }
  274. func (h *HostInfo) Port() int {
  275. h.mu.RLock()
  276. defer h.mu.RUnlock()
  277. return h.port
  278. }
  279. func (h *HostInfo) setPort(port int) *HostInfo {
  280. h.mu.Lock()
  281. defer h.mu.Unlock()
  282. h.port = port
  283. return h
  284. }
  285. func (h *HostInfo) update(from *HostInfo) {
  286. if h == from {
  287. return
  288. }
  289. h.mu.Lock()
  290. defer h.mu.Unlock()
  291. from.mu.RLock()
  292. defer from.mu.RUnlock()
  293. // autogenerated do not update
  294. if h.peer == nil {
  295. h.peer = from.peer
  296. }
  297. if h.broadcastAddress == nil {
  298. h.broadcastAddress = from.broadcastAddress
  299. }
  300. if h.listenAddress == nil {
  301. h.listenAddress = from.listenAddress
  302. }
  303. if h.rpcAddress == nil {
  304. h.rpcAddress = from.rpcAddress
  305. }
  306. if h.preferredIP == nil {
  307. h.preferredIP = from.preferredIP
  308. }
  309. if h.connectAddress == nil {
  310. h.connectAddress = from.connectAddress
  311. }
  312. if h.port == 0 {
  313. h.port = from.port
  314. }
  315. if h.dataCenter == "" {
  316. h.dataCenter = from.dataCenter
  317. }
  318. if h.rack == "" {
  319. h.rack = from.rack
  320. }
  321. if h.hostId == "" {
  322. h.hostId = from.hostId
  323. }
  324. if h.workload == "" {
  325. h.workload = from.workload
  326. }
  327. if h.dseVersion == "" {
  328. h.dseVersion = from.dseVersion
  329. }
  330. if h.partitioner == "" {
  331. h.partitioner = from.partitioner
  332. }
  333. if h.clusterName == "" {
  334. h.clusterName = from.clusterName
  335. }
  336. if h.version == (cassVersion{}) {
  337. h.version = from.version
  338. }
  339. if h.tokens == nil {
  340. h.tokens = from.tokens
  341. }
  342. }
  343. func (h *HostInfo) IsUp() bool {
  344. return h != nil && h.State() == NodeUp
  345. }
  346. func (h *HostInfo) String() string {
  347. h.mu.RLock()
  348. defer h.mu.RUnlock()
  349. connectAddr, source := h.connectAddressLocked()
  350. return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
  351. "preferred_ip=%q connect_addr=%q connect_addr_source=%q "+
  352. "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
  353. h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP,
  354. connectAddr, source,
  355. h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  356. }
  357. // Polls system.peers at a specific interval to find new hosts
  358. type ringDescriber struct {
  359. session *Session
  360. mu sync.Mutex
  361. prevHosts []*HostInfo
  362. prevPartitioner string
  363. }
  364. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  365. func checkSystemSchema(control *controlConn) (bool, error) {
  366. iter := control.query("SELECT * FROM system_schema.keyspaces")
  367. if err := iter.err; err != nil {
  368. if errf, ok := err.(*errorFrame); ok {
  369. if errf.code == errSyntax {
  370. return false, nil
  371. }
  372. }
  373. return false, err
  374. }
  375. return true, nil
  376. }
  377. // Given a map that represents a row from either system.local or system.peers
  378. // return as much information as we can in *HostInfo
  379. func (s *Session) hostInfoFromMap(row map[string]interface{}, port int) (*HostInfo, error) {
  380. const assertErrorMsg = "Assertion failed for %s"
  381. var ok bool
  382. // Default to our connected port if the cluster doesn't have port information
  383. host := HostInfo{
  384. port: port,
  385. }
  386. for key, value := range row {
  387. switch key {
  388. case "data_center":
  389. host.dataCenter, ok = value.(string)
  390. if !ok {
  391. return nil, fmt.Errorf(assertErrorMsg, "data_center")
  392. }
  393. case "rack":
  394. host.rack, ok = value.(string)
  395. if !ok {
  396. return nil, fmt.Errorf(assertErrorMsg, "rack")
  397. }
  398. case "host_id":
  399. hostId, ok := value.(UUID)
  400. if !ok {
  401. return nil, fmt.Errorf(assertErrorMsg, "host_id")
  402. }
  403. host.hostId = hostId.String()
  404. case "release_version":
  405. version, ok := value.(string)
  406. if !ok {
  407. return nil, fmt.Errorf(assertErrorMsg, "release_version")
  408. }
  409. host.version.Set(version)
  410. case "peer":
  411. ip, ok := value.(string)
  412. if !ok {
  413. return nil, fmt.Errorf(assertErrorMsg, "peer")
  414. }
  415. host.peer = net.ParseIP(ip)
  416. case "cluster_name":
  417. host.clusterName, ok = value.(string)
  418. if !ok {
  419. return nil, fmt.Errorf(assertErrorMsg, "cluster_name")
  420. }
  421. case "partitioner":
  422. host.partitioner, ok = value.(string)
  423. if !ok {
  424. return nil, fmt.Errorf(assertErrorMsg, "partitioner")
  425. }
  426. case "broadcast_address":
  427. ip, ok := value.(string)
  428. if !ok {
  429. return nil, fmt.Errorf(assertErrorMsg, "broadcast_address")
  430. }
  431. host.broadcastAddress = net.ParseIP(ip)
  432. case "preferred_ip":
  433. ip, ok := value.(string)
  434. if !ok {
  435. return nil, fmt.Errorf(assertErrorMsg, "preferred_ip")
  436. }
  437. host.preferredIP = net.ParseIP(ip)
  438. case "rpc_address":
  439. ip, ok := value.(string)
  440. if !ok {
  441. return nil, fmt.Errorf(assertErrorMsg, "rpc_address")
  442. }
  443. host.rpcAddress = net.ParseIP(ip)
  444. case "listen_address":
  445. ip, ok := value.(string)
  446. if !ok {
  447. return nil, fmt.Errorf(assertErrorMsg, "listen_address")
  448. }
  449. host.listenAddress = net.ParseIP(ip)
  450. case "workload":
  451. host.workload, ok = value.(string)
  452. if !ok {
  453. return nil, fmt.Errorf(assertErrorMsg, "workload")
  454. }
  455. case "graph":
  456. host.graph, ok = value.(bool)
  457. if !ok {
  458. return nil, fmt.Errorf(assertErrorMsg, "graph")
  459. }
  460. case "tokens":
  461. host.tokens, ok = value.([]string)
  462. if !ok {
  463. return nil, fmt.Errorf(assertErrorMsg, "tokens")
  464. }
  465. case "dse_version":
  466. host.dseVersion, ok = value.(string)
  467. if !ok {
  468. return nil, fmt.Errorf(assertErrorMsg, "dse_version")
  469. }
  470. }
  471. // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
  472. // Not sure what the port field will be called until the JIRA issue is complete
  473. }
  474. ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port)
  475. host.connectAddress = ip
  476. host.port = port
  477. return &host, nil
  478. }
  479. // Ask the control node for host info on all it's known peers
  480. func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) {
  481. var hosts []*HostInfo
  482. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  483. hosts = append(hosts, ch.host)
  484. return ch.conn.query("SELECT * FROM system.peers")
  485. })
  486. if iter == nil {
  487. return nil, errNoControl
  488. }
  489. rows, err := iter.SliceMap()
  490. if err != nil {
  491. // TODO(zariel): make typed error
  492. return nil, fmt.Errorf("unable to fetch peer host info: %s", err)
  493. }
  494. for _, row := range rows {
  495. // extract all available info about the peer
  496. host, err := r.session.hostInfoFromMap(row, r.session.cfg.Port)
  497. if err != nil {
  498. return nil, err
  499. } else if !isValidPeer(host) {
  500. // If it's not a valid peer
  501. Logger.Printf("Found invalid peer '%s' "+
  502. "Likely due to a gossip or snitch issue, this host will be ignored", host)
  503. continue
  504. }
  505. hosts = append(hosts, host)
  506. }
  507. return hosts, nil
  508. }
  509. // Return true if the host is a valid peer
  510. func isValidPeer(host *HostInfo) bool {
  511. return !(len(host.RPCAddress()) == 0 ||
  512. host.hostId == "" ||
  513. host.dataCenter == "" ||
  514. host.rack == "" ||
  515. len(host.tokens) == 0)
  516. }
  517. // Return a list of hosts the cluster knows about
  518. func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
  519. r.mu.Lock()
  520. defer r.mu.Unlock()
  521. hosts, err := r.getClusterPeerInfo()
  522. if err != nil {
  523. return r.prevHosts, r.prevPartitioner, err
  524. }
  525. var partitioner string
  526. if len(hosts) > 0 {
  527. partitioner = hosts[0].Partitioner()
  528. }
  529. return hosts, partitioner, nil
  530. }
  531. // Given an ip/port return HostInfo for the specified ip/port
  532. func (r *ringDescriber) getHostInfo(ip net.IP, port int) (*HostInfo, error) {
  533. var host *HostInfo
  534. iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
  535. if ch.host.ConnectAddress().Equal(ip) {
  536. host = ch.host
  537. return nil
  538. }
  539. return ch.conn.query("SELECT * FROM system.peers")
  540. })
  541. if iter != nil {
  542. rows, err := iter.SliceMap()
  543. if err != nil {
  544. return nil, err
  545. }
  546. for _, row := range rows {
  547. h, err := r.session.hostInfoFromMap(row, port)
  548. if err != nil {
  549. return nil, err
  550. }
  551. if host.ConnectAddress().Equal(ip) {
  552. host = h
  553. break
  554. }
  555. }
  556. if host == nil {
  557. return nil, errors.New("host not found in peers table")
  558. }
  559. }
  560. if host == nil {
  561. return nil, errors.New("unable to fetch host info: invalid control connection")
  562. } else if host.invalidConnectAddr() {
  563. return nil, fmt.Errorf("host ConnectAddress invalid ip=%v: %v", ip, host)
  564. }
  565. return host, nil
  566. }
  567. func (r *ringDescriber) refreshRing() error {
  568. // if we have 0 hosts this will return the previous list of hosts to
  569. // attempt to reconnect to the cluster otherwise we would never find
  570. // downed hosts again, could possibly have an optimisation to only
  571. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  572. hosts, partitioner, err := r.GetHosts()
  573. if err != nil {
  574. return err
  575. }
  576. prevHosts := r.session.ring.currentHosts()
  577. // TODO: move this to session
  578. for _, h := range hosts {
  579. if filter := r.session.cfg.HostFilter; filter != nil && !filter.Accept(h) {
  580. continue
  581. }
  582. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  583. r.session.pool.addHost(h)
  584. r.session.policy.AddHost(h)
  585. } else {
  586. host.update(h)
  587. }
  588. delete(prevHosts, h.ConnectAddress().String())
  589. }
  590. // TODO(zariel): it may be worth having a mutex covering the overall ring state
  591. // in a session so that everything sees a consistent state. Becuase as is today
  592. // events can come in and due to ordering an UP host could be removed from the cluster
  593. for _, host := range prevHosts {
  594. r.session.removeHost(host)
  595. }
  596. r.session.metadata.setPartitioner(partitioner)
  597. r.session.policy.SetPartitioner(partitioner)
  598. return nil
  599. }