events.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package gocql
  2. import (
  3. "log"
  4. "net"
  5. )
  6. func (s *Session) handleEvent(framer *framer) {
  7. // TODO(zariel): need to debounce events frames, and possible also events
  8. defer framerPool.Put(framer)
  9. frame, err := framer.parseFrame()
  10. if err != nil {
  11. // TODO: logger
  12. log.Printf("gocql: unable to parse event frame: %v\n", err)
  13. return
  14. }
  15. // TODO: handle medatadata events
  16. switch f := frame.(type) {
  17. case *schemaChangeKeyspace:
  18. case *schemaChangeFunction:
  19. case *schemaChangeTable:
  20. case *topologyChangeEventFrame:
  21. switch f.change {
  22. case "NEW_NODE":
  23. s.handleNewNode(f.host, f.port)
  24. case "REMOVED_NODE":
  25. s.handleRemovedNode(f.host, f.port)
  26. case "MOVED_NODE":
  27. // java-driver handles this, not mentioned in the spec
  28. // TODO(zariel): refresh token map
  29. }
  30. case *statusChangeEventFrame:
  31. // TODO(zariel): is it worth having 2 methods for these?
  32. switch f.change {
  33. case "UP":
  34. s.handleNodeUp(f.host, f.port)
  35. case "DOWN":
  36. s.handleNodeDown(f.host, f.port)
  37. }
  38. default:
  39. log.Printf("gocql: invalid event frame (%T): %v\n", f, f)
  40. }
  41. }
  42. func (s *Session) handleNewNode(host net.IP, port int) {
  43. // TODO(zariel): need to be able to filter discovered nodes
  44. if s.control == nil {
  45. return
  46. }
  47. hostInfo, err := s.control.fetchHostInfo(host, port)
  48. if err != nil {
  49. log.Printf("gocql: unable to fetch host info for %v: %v\n", host, err)
  50. return
  51. }
  52. if s.hostFilter.Accept(hostInfo) {
  53. s.pool.addHost(hostInfo)
  54. }
  55. }
  56. func (s *Session) handleRemovedNode(host net.IP, port int) {
  57. // we remove all nodes but only add ones which pass the filter
  58. s.pool.removeHost(host.String())
  59. }
  60. func (s *Session) handleNodeUp(host net.IP, port int) {
  61. // TODO(zariel): handle this case even when not discovering, just mark the
  62. // host up.
  63. // TODO: implement this properly not as newNode
  64. s.handleNewNode(host, port)
  65. }
  66. func (s *Session) handleNodeDown(host net.IP, port int) {
  67. s.pool.hostDown(host.String())
  68. }