events.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package gocql
  2. import (
  3. "log"
  4. "net"
  5. )
  6. func (s *Session) handleEvent(framer *framer) {
  7. // TODO(zariel): need to debounce events frames, and possible also events
  8. defer framerPool.Put(framer)
  9. frame, err := framer.parseFrame()
  10. if err != nil {
  11. // TODO: logger
  12. log.Printf("gocql: unable to parse event frame: %v\n", err)
  13. return
  14. }
  15. // TODO: handle medatadata events
  16. switch f := frame.(type) {
  17. case *schemaChangeKeyspace:
  18. case *schemaChangeFunction:
  19. case *schemaChangeTable:
  20. case *topologyChangeEventFrame:
  21. switch f.change {
  22. case "NEW_NODE":
  23. s.handleNewNode(f.host, f.port)
  24. case "REMOVED_NODE":
  25. s.handleRemovedNode(f.host, f.port)
  26. case "MOVED_NODE":
  27. // java-driver handles this, not mentioned in the spec
  28. // TODO(zariel): refresh token map
  29. }
  30. case *statusChangeEventFrame:
  31. // TODO(zariel): is it worth having 2 methods for these?
  32. switch f.change {
  33. case "UP":
  34. s.handleNodeUp(f.host, f.port)
  35. case "DOWN":
  36. s.handleNodeDown(f.host, f.port)
  37. }
  38. default:
  39. log.Printf("gocql: invalid event frame (%T): %v\n", f, f)
  40. }
  41. }
  42. func (s *Session) handleNewNode(host net.IP, port int) {
  43. // TODO(zariel): need to be able to filter discovered nodes
  44. if s.control == nil {
  45. return
  46. }
  47. hostInfo, err := s.control.fetchHostInfo(host, port)
  48. if err != nil {
  49. log.Printf("gocql: unable to fetch host info for %v: %v\n", host, err)
  50. return
  51. }
  52. // should this handle token moving?
  53. if !s.ring.addHostIfMissing(hostInfo) {
  54. s.handleNodeUp(host, port)
  55. return
  56. }
  57. s.pool.addHost(hostInfo)
  58. }
  59. func (s *Session) handleRemovedNode(ip net.IP, port int) {
  60. // we remove all nodes but only add ones which pass the filter
  61. addr := ip.String()
  62. s.pool.removeHost(addr)
  63. s.ring.removeHost(addr)
  64. }
  65. func (s *Session) handleNodeUp(ip net.IP, port int) {
  66. addr := ip.String()
  67. host := s.ring.getHost(addr)
  68. if host != nil {
  69. host.setState(NodeUp)
  70. s.pool.hostUp(host)
  71. return
  72. }
  73. // TODO: this could infinite loop
  74. s.handleNewNode(ip, port)
  75. }
  76. func (s *Session) handleNodeDown(ip net.IP, port int) {
  77. addr := ip.String()
  78. host := s.ring.getHost(addr)
  79. if host != nil {
  80. host.setState(NodeDown)
  81. }
  82. s.pool.hostDown(addr)
  83. }