host_source.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703
  1. package gocql
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. )
  11. type nodeState int32
  12. func (n nodeState) String() string {
  13. if n == NodeUp {
  14. return "UP"
  15. } else if n == NodeDown {
  16. return "DOWN"
  17. }
  18. return fmt.Sprintf("UNKNOWN_%d", n)
  19. }
  20. const (
  21. NodeUp nodeState = iota
  22. NodeDown
  23. )
  24. type cassVersion struct {
  25. Major, Minor, Patch int
  26. }
  27. func (c *cassVersion) Set(v string) error {
  28. if v == "" {
  29. return nil
  30. }
  31. return c.UnmarshalCQL(nil, []byte(v))
  32. }
  33. func (c *cassVersion) UnmarshalCQL(info TypeInfo, data []byte) error {
  34. return c.unmarshal(data)
  35. }
  36. func (c *cassVersion) unmarshal(data []byte) error {
  37. version := strings.TrimSuffix(string(data), "-SNAPSHOT")
  38. version = strings.TrimPrefix(version, "v")
  39. v := strings.Split(version, ".")
  40. if len(v) < 2 {
  41. return fmt.Errorf("invalid version string: %s", data)
  42. }
  43. var err error
  44. c.Major, err = strconv.Atoi(v[0])
  45. if err != nil {
  46. return fmt.Errorf("invalid major version %v: %v", v[0], err)
  47. }
  48. c.Minor, err = strconv.Atoi(v[1])
  49. if err != nil {
  50. return fmt.Errorf("invalid minor version %v: %v", v[1], err)
  51. }
  52. if len(v) > 2 {
  53. c.Patch, err = strconv.Atoi(v[2])
  54. if err != nil {
  55. return fmt.Errorf("invalid patch version %v: %v", v[2], err)
  56. }
  57. }
  58. return nil
  59. }
  60. func (c cassVersion) Before(major, minor, patch int) bool {
  61. if c.Major > major {
  62. return true
  63. } else if c.Minor > minor {
  64. return true
  65. } else if c.Patch > patch {
  66. return true
  67. }
  68. return false
  69. }
  70. func (c cassVersion) String() string {
  71. return fmt.Sprintf("v%d.%d.%d", c.Major, c.Minor, c.Patch)
  72. }
  73. func (c cassVersion) nodeUpDelay() time.Duration {
  74. if c.Major >= 2 && c.Minor >= 2 {
  75. // CASSANDRA-8236
  76. return 0
  77. }
  78. return 10 * time.Second
  79. }
  80. type HostInfo struct {
  81. // TODO(zariel): reduce locking maybe, not all values will change, but to ensure
  82. // that we are thread safe use a mutex to access all fields.
  83. mu sync.RWMutex
  84. peer net.IP
  85. broadcastAddress net.IP
  86. listenAddress net.IP
  87. rpcAddress net.IP
  88. preferredIP net.IP
  89. connectAddress net.IP
  90. port int
  91. dataCenter string
  92. rack string
  93. hostId string
  94. workload string
  95. graph bool
  96. dseVersion string
  97. partitioner string
  98. clusterName string
  99. version cassVersion
  100. state nodeState
  101. tokens []string
  102. }
  103. func (h *HostInfo) Equal(host *HostInfo) bool {
  104. h.mu.RLock()
  105. defer h.mu.RUnlock()
  106. //If both hosts pointers are same then lock is required only once because of below reasons:
  107. //Reason 1: There is no point taking lock twice on same mutex variable.
  108. //Reason 2: It may lead to deadlock e.g. if WLock is requested by other routine in between 1st & 2nd RLock
  109. //So WLock will be blocked on 1st RLock and 2nd RLock will be blocked on requested WLock.
  110. if h != host {
  111. host.mu.RLock()
  112. defer host.mu.RUnlock()
  113. }
  114. return h.ConnectAddress().Equal(host.ConnectAddress())
  115. }
  116. func (h *HostInfo) Peer() net.IP {
  117. h.mu.RLock()
  118. defer h.mu.RUnlock()
  119. return h.peer
  120. }
  121. func (h *HostInfo) setPeer(peer net.IP) *HostInfo {
  122. h.mu.Lock()
  123. defer h.mu.Unlock()
  124. h.peer = peer
  125. return h
  126. }
  127. func (h *HostInfo) invalidConnectAddr() bool {
  128. h.mu.RLock()
  129. defer h.mu.RUnlock()
  130. addr, _ := h.connectAddressLocked()
  131. return !validIpAddr(addr)
  132. }
  133. func validIpAddr(addr net.IP) bool {
  134. return addr != nil && !addr.IsUnspecified()
  135. }
  136. func (h *HostInfo) connectAddressLocked() (net.IP, string) {
  137. if validIpAddr(h.connectAddress) {
  138. return h.connectAddress, "connect_address"
  139. } else if validIpAddr(h.rpcAddress) {
  140. return h.rpcAddress, "rpc_adress"
  141. } else if validIpAddr(h.preferredIP) {
  142. // where does perferred_ip get set?
  143. return h.preferredIP, "preferred_ip"
  144. } else if validIpAddr(h.broadcastAddress) {
  145. return h.broadcastAddress, "broadcast_address"
  146. } else if validIpAddr(h.peer) {
  147. return h.peer, "peer"
  148. }
  149. return net.IPv4zero, "invalid"
  150. }
  151. // Returns the address that should be used to connect to the host.
  152. // If you wish to override this, use an AddressTranslator or
  153. // use a HostFilter to SetConnectAddress()
  154. func (h *HostInfo) ConnectAddress() net.IP {
  155. h.mu.RLock()
  156. defer h.mu.RUnlock()
  157. if addr, _ := h.connectAddressLocked(); validIpAddr(addr) {
  158. return addr
  159. }
  160. panic(fmt.Sprintf("no valid connect address for host: %v. Is your cluster configured correctly?", h))
  161. }
  162. func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo {
  163. h.mu.Lock()
  164. defer h.mu.Unlock()
  165. h.connectAddress = address
  166. return h
  167. }
  168. func (h *HostInfo) BroadcastAddress() net.IP {
  169. h.mu.RLock()
  170. defer h.mu.RUnlock()
  171. return h.broadcastAddress
  172. }
  173. func (h *HostInfo) ListenAddress() net.IP {
  174. h.mu.RLock()
  175. defer h.mu.RUnlock()
  176. return h.listenAddress
  177. }
  178. func (h *HostInfo) RPCAddress() net.IP {
  179. h.mu.RLock()
  180. defer h.mu.RUnlock()
  181. return h.rpcAddress
  182. }
  183. func (h *HostInfo) PreferredIP() net.IP {
  184. h.mu.RLock()
  185. defer h.mu.RUnlock()
  186. return h.preferredIP
  187. }
  188. func (h *HostInfo) DataCenter() string {
  189. h.mu.RLock()
  190. defer h.mu.RUnlock()
  191. return h.dataCenter
  192. }
  193. func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo {
  194. h.mu.Lock()
  195. defer h.mu.Unlock()
  196. h.dataCenter = dataCenter
  197. return h
  198. }
  199. func (h *HostInfo) Rack() string {
  200. h.mu.RLock()
  201. defer h.mu.RUnlock()
  202. return h.rack
  203. }
  204. func (h *HostInfo) setRack(rack string) *HostInfo {
  205. h.mu.Lock()
  206. defer h.mu.Unlock()
  207. h.rack = rack
  208. return h
  209. }
  210. func (h *HostInfo) HostID() string {
  211. h.mu.RLock()
  212. defer h.mu.RUnlock()
  213. return h.hostId
  214. }
  215. func (h *HostInfo) setHostID(hostID string) *HostInfo {
  216. h.mu.Lock()
  217. defer h.mu.Unlock()
  218. h.hostId = hostID
  219. return h
  220. }
  221. func (h *HostInfo) WorkLoad() string {
  222. h.mu.RLock()
  223. defer h.mu.RUnlock()
  224. return h.workload
  225. }
  226. func (h *HostInfo) Graph() bool {
  227. h.mu.RLock()
  228. defer h.mu.RUnlock()
  229. return h.graph
  230. }
  231. func (h *HostInfo) DSEVersion() string {
  232. h.mu.RLock()
  233. defer h.mu.RUnlock()
  234. return h.dseVersion
  235. }
  236. func (h *HostInfo) Partitioner() string {
  237. h.mu.RLock()
  238. defer h.mu.RUnlock()
  239. return h.partitioner
  240. }
  241. func (h *HostInfo) ClusterName() string {
  242. h.mu.RLock()
  243. defer h.mu.RUnlock()
  244. return h.clusterName
  245. }
  246. func (h *HostInfo) Version() cassVersion {
  247. h.mu.RLock()
  248. defer h.mu.RUnlock()
  249. return h.version
  250. }
  251. func (h *HostInfo) setVersion(major, minor, patch int) *HostInfo {
  252. h.mu.Lock()
  253. defer h.mu.Unlock()
  254. h.version = cassVersion{major, minor, patch}
  255. return h
  256. }
  257. func (h *HostInfo) State() nodeState {
  258. h.mu.RLock()
  259. defer h.mu.RUnlock()
  260. return h.state
  261. }
  262. func (h *HostInfo) setState(state nodeState) *HostInfo {
  263. h.mu.Lock()
  264. defer h.mu.Unlock()
  265. h.state = state
  266. return h
  267. }
  268. func (h *HostInfo) Tokens() []string {
  269. h.mu.RLock()
  270. defer h.mu.RUnlock()
  271. return h.tokens
  272. }
  273. func (h *HostInfo) setTokens(tokens []string) *HostInfo {
  274. h.mu.Lock()
  275. defer h.mu.Unlock()
  276. h.tokens = tokens
  277. return h
  278. }
  279. func (h *HostInfo) Port() int {
  280. h.mu.RLock()
  281. defer h.mu.RUnlock()
  282. return h.port
  283. }
  284. func (h *HostInfo) setPort(port int) *HostInfo {
  285. h.mu.Lock()
  286. defer h.mu.Unlock()
  287. h.port = port
  288. return h
  289. }
  290. func (h *HostInfo) update(from *HostInfo) {
  291. h.mu.Lock()
  292. defer h.mu.Unlock()
  293. h.tokens = from.tokens
  294. h.version = from.version
  295. h.hostId = from.hostId
  296. h.dataCenter = from.dataCenter
  297. }
  298. func (h *HostInfo) IsUp() bool {
  299. return h != nil && h.State() == NodeUp
  300. }
  301. func (h *HostInfo) String() string {
  302. h.mu.RLock()
  303. defer h.mu.RUnlock()
  304. connectAddr, source := h.connectAddressLocked()
  305. return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+
  306. "preferred_ip=%q connect_addr=%q connect_addr_source=%q "+
  307. "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]",
  308. h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP,
  309. connectAddr, source,
  310. h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
  311. }
  312. // Polls system.peers at a specific interval to find new hosts
  313. type ringDescriber struct {
  314. session *Session
  315. mu sync.Mutex
  316. prevHosts []*HostInfo
  317. localHost *HostInfo
  318. prevPartitioner string
  319. }
  320. // Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
  321. func checkSystemSchema(control *controlConn) (bool, error) {
  322. iter := control.query("SELECT * FROM system_schema.keyspaces")
  323. if err := iter.err; err != nil {
  324. if errf, ok := err.(*errorFrame); ok {
  325. if errf.code == errSyntax {
  326. return false, nil
  327. }
  328. }
  329. return false, err
  330. }
  331. return true, nil
  332. }
  333. // Given a map that represents a row from either system.local or system.peers
  334. // return as much information as we can in *HostInfo
  335. func (r *ringDescriber) hostInfoFromMap(row map[string]interface{}) (*HostInfo, error) {
  336. const assertErrorMsg = "Assertion failed for %s"
  337. var ok bool
  338. // Default to our connected port if the cluster doesn't have port information
  339. host := HostInfo{
  340. port: r.session.cfg.Port,
  341. }
  342. for key, value := range row {
  343. switch key {
  344. case "data_center":
  345. host.dataCenter, ok = value.(string)
  346. if !ok {
  347. return nil, fmt.Errorf(assertErrorMsg, "data_center")
  348. }
  349. case "rack":
  350. host.rack, ok = value.(string)
  351. if !ok {
  352. return nil, fmt.Errorf(assertErrorMsg, "rack")
  353. }
  354. case "host_id":
  355. hostId, ok := value.(UUID)
  356. if !ok {
  357. return nil, fmt.Errorf(assertErrorMsg, "host_id")
  358. }
  359. host.hostId = hostId.String()
  360. case "release_version":
  361. version, ok := value.(string)
  362. if !ok {
  363. return nil, fmt.Errorf(assertErrorMsg, "release_version")
  364. }
  365. host.version.Set(version)
  366. case "peer":
  367. ip, ok := value.(string)
  368. if !ok {
  369. return nil, fmt.Errorf(assertErrorMsg, "peer")
  370. }
  371. host.peer = net.ParseIP(ip)
  372. case "cluster_name":
  373. host.clusterName, ok = value.(string)
  374. if !ok {
  375. return nil, fmt.Errorf(assertErrorMsg, "cluster_name")
  376. }
  377. case "partitioner":
  378. host.partitioner, ok = value.(string)
  379. if !ok {
  380. return nil, fmt.Errorf(assertErrorMsg, "partitioner")
  381. }
  382. case "broadcast_address":
  383. ip, ok := value.(string)
  384. if !ok {
  385. return nil, fmt.Errorf(assertErrorMsg, "broadcast_address")
  386. }
  387. host.broadcastAddress = net.ParseIP(ip)
  388. case "preferred_ip":
  389. ip, ok := value.(string)
  390. if !ok {
  391. return nil, fmt.Errorf(assertErrorMsg, "preferred_ip")
  392. }
  393. host.preferredIP = net.ParseIP(ip)
  394. case "rpc_address":
  395. ip, ok := value.(string)
  396. if !ok {
  397. return nil, fmt.Errorf(assertErrorMsg, "rpc_address")
  398. }
  399. host.rpcAddress = net.ParseIP(ip)
  400. case "listen_address":
  401. ip, ok := value.(string)
  402. if !ok {
  403. return nil, fmt.Errorf(assertErrorMsg, "listen_address")
  404. }
  405. host.listenAddress = net.ParseIP(ip)
  406. case "workload":
  407. host.workload, ok = value.(string)
  408. if !ok {
  409. return nil, fmt.Errorf(assertErrorMsg, "workload")
  410. }
  411. case "graph":
  412. host.graph, ok = value.(bool)
  413. if !ok {
  414. return nil, fmt.Errorf(assertErrorMsg, "graph")
  415. }
  416. case "tokens":
  417. host.tokens, ok = value.([]string)
  418. if !ok {
  419. return nil, fmt.Errorf(assertErrorMsg, "tokens")
  420. }
  421. case "dse_version":
  422. host.dseVersion, ok = value.(string)
  423. if !ok {
  424. return nil, fmt.Errorf(assertErrorMsg, "dse_version")
  425. }
  426. }
  427. // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete
  428. // Not sure what the port field will be called until the JIRA issue is complete
  429. }
  430. return &host, nil
  431. }
  432. // Ask the control node for it's local host information
  433. func (r *ringDescriber) GetLocalHostInfo() (*HostInfo, error) {
  434. it := r.session.control.query("SELECT * FROM system.local WHERE key='local'")
  435. if it == nil {
  436. return nil, errors.New("Attempted to query 'system.local' on a closed control connection")
  437. }
  438. host, err := r.extractHostInfo(it)
  439. if err != nil {
  440. return nil, err
  441. }
  442. if host.invalidConnectAddr() {
  443. host.SetConnectAddress(r.session.control.GetHostInfo().ConnectAddress())
  444. }
  445. return host, nil
  446. }
  447. // Given an ip address and port, return a peer that matched the ip address
  448. func (r *ringDescriber) GetPeerHostInfo(ip net.IP, port int) (*HostInfo, error) {
  449. it := r.session.control.query("SELECT * FROM system.peers WHERE peer=?", ip)
  450. if it == nil {
  451. return nil, errors.New("Attempted to query 'system.peers' on a closed control connection")
  452. }
  453. return r.extractHostInfo(it)
  454. }
  455. func (r *ringDescriber) extractHostInfo(it *Iter) (*HostInfo, error) {
  456. row := make(map[string]interface{})
  457. // expect only 1 row
  458. it.MapScan(row)
  459. if err := it.Close(); err != nil {
  460. return nil, err
  461. }
  462. // extract all available info about the host
  463. return r.hostInfoFromMap(row)
  464. }
  465. // Ask the control node for host info on all it's known peers
  466. func (r *ringDescriber) GetClusterPeerInfo() ([]*HostInfo, error) {
  467. var hosts []*HostInfo
  468. // Ask the node for a list of it's peers
  469. it := r.session.control.query("SELECT * FROM system.peers")
  470. if it == nil {
  471. return nil, errors.New("Attempted to query 'system.peers' on a closed connection")
  472. }
  473. for {
  474. row := make(map[string]interface{})
  475. if !it.MapScan(row) {
  476. break
  477. }
  478. // extract all available info about the peer
  479. host, err := r.hostInfoFromMap(row)
  480. if err != nil {
  481. return nil, err
  482. }
  483. // If it's not a valid peer
  484. if !r.IsValidPeer(host) {
  485. Logger.Printf("Found invalid peer '%+v' "+
  486. "Likely due to a gossip or snitch issue, this host will be ignored", host)
  487. continue
  488. }
  489. hosts = append(hosts, host)
  490. }
  491. if it.err != nil {
  492. return nil, fmt.Errorf("while scanning 'system.peers' table: %s", it.err)
  493. }
  494. return hosts, nil
  495. }
  496. // Return true if the host is a valid peer
  497. func (r *ringDescriber) IsValidPeer(host *HostInfo) bool {
  498. return !(len(host.RPCAddress()) == 0 ||
  499. host.hostId == "" ||
  500. host.dataCenter == "" ||
  501. host.rack == "" ||
  502. len(host.tokens) == 0)
  503. }
  504. // Return a list of hosts the cluster knows about
  505. func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
  506. r.mu.Lock()
  507. defer r.mu.Unlock()
  508. // Update the localHost info with data from the connected host
  509. localHost, err := r.GetLocalHostInfo()
  510. if err != nil {
  511. return r.prevHosts, r.prevPartitioner, err
  512. } else if localHost.invalidConnectAddr() {
  513. panic(fmt.Sprintf("unable to get localhost connect address: %v", localHost))
  514. }
  515. // Update our list of hosts by querying the cluster
  516. hosts, err := r.GetClusterPeerInfo()
  517. if err != nil {
  518. return r.prevHosts, r.prevPartitioner, err
  519. }
  520. hosts = append(hosts, localHost)
  521. // Filter the hosts if filter is provided
  522. filteredHosts := hosts
  523. if r.session.cfg.HostFilter != nil {
  524. filteredHosts = filteredHosts[:0]
  525. for _, host := range hosts {
  526. if r.session.cfg.HostFilter.Accept(host) {
  527. filteredHosts = append(filteredHosts, host)
  528. }
  529. }
  530. }
  531. r.prevHosts = filteredHosts
  532. r.prevPartitioner = localHost.partitioner
  533. r.localHost = localHost
  534. return filteredHosts, localHost.partitioner, nil
  535. }
  536. // Given an ip/port return HostInfo for the specified ip/port
  537. func (r *ringDescriber) GetHostInfo(ip net.IP, port int) (*HostInfo, error) {
  538. // TODO(thrawn01): Is IgnorePeerAddr still useful now that we have DisableInitialHostLookup?
  539. // TODO(thrawn01): should we also check for DisableInitialHostLookup and return if true?
  540. // Ignore the port and connect address and use the address/port we already have
  541. if r.session.control == nil || r.session.cfg.IgnorePeerAddr {
  542. return &HostInfo{connectAddress: ip, port: port}, nil
  543. }
  544. // Attempt to get the host info for our control connection
  545. controlHost := r.session.control.GetHostInfo()
  546. if controlHost == nil {
  547. return nil, errors.New("invalid control connection")
  548. }
  549. var (
  550. host *HostInfo
  551. err error
  552. )
  553. // If we are asking about the same node our control connection has a connection too
  554. if controlHost.ConnectAddress().Equal(ip) {
  555. host, err = r.GetLocalHostInfo()
  556. } else {
  557. host, err = r.GetPeerHostInfo(ip, port)
  558. }
  559. // No host was found matching this ip/port
  560. if err != nil {
  561. return nil, err
  562. }
  563. if controlHost.ConnectAddress().Equal(ip) {
  564. // Always respect the provided control node address and disregard the ip address
  565. // the cassandra node provides. We do this as we are already connected and have a
  566. // known valid ip address. This insulates gocql from client connection issues stemming
  567. // from node misconfiguration. For instance when a node is run from a container, by
  568. // default the node will report its ip address as 127.0.0.1 which is typically invalid.
  569. host.SetConnectAddress(ip)
  570. }
  571. if host.invalidConnectAddr() {
  572. return nil, fmt.Errorf("host ConnectAddress invalid: %v", host)
  573. }
  574. return host, nil
  575. }
  576. func (r *ringDescriber) refreshRing() error {
  577. // if we have 0 hosts this will return the previous list of hosts to
  578. // attempt to reconnect to the cluster otherwise we would never find
  579. // downed hosts again, could possibly have an optimisation to only
  580. // try to add new hosts if GetHosts didnt error and the hosts didnt change.
  581. hosts, partitioner, err := r.GetHosts()
  582. if err != nil {
  583. return err
  584. }
  585. prevHosts := r.session.ring.currentHosts()
  586. // TODO: move this to session
  587. for _, h := range hosts {
  588. if host, ok := r.session.ring.addHostIfMissing(h); !ok {
  589. r.session.pool.addHost(h)
  590. r.session.policy.AddHost(h)
  591. } else {
  592. host.update(h)
  593. }
  594. delete(prevHosts, h.ConnectAddress().String())
  595. }
  596. // TODO(zariel): it may be worth having a mutex covering the overall ring state
  597. // in a session so that everything sees a consistent state. Becuase as is today
  598. // events can come in and due to ordering an UP host could be removed from the cluster
  599. for _, host := range prevHosts {
  600. r.session.removeHost(host)
  601. }
  602. r.session.metadata.setPartitioner(partitioner)
  603. r.session.policy.SetPartitioner(partitioner)
  604. return nil
  605. }