statewatcher.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. //go:generate mockgen -package internal -destination statewatcher_mock.go -source statewatcher.go etcdConn
  2. package internal
  3. import (
  4. "context"
  5. "sync"
  6. "google.golang.org/grpc/connectivity"
  7. )
  8. type (
  9. etcdConn interface {
  10. GetState() connectivity.State
  11. WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool
  12. }
  13. stateWatcher struct {
  14. disconnected bool
  15. currentState connectivity.State
  16. listeners []func()
  17. // lock only guards listeners, because only listens can be accessed by other goroutines.
  18. lock sync.Mutex
  19. }
  20. )
  21. func newStateWatcher() *stateWatcher {
  22. return new(stateWatcher)
  23. }
  24. func (sw *stateWatcher) addListener(l func()) {
  25. sw.lock.Lock()
  26. sw.listeners = append(sw.listeners, l)
  27. sw.lock.Unlock()
  28. }
  29. func (sw *stateWatcher) notifyListeners() {
  30. sw.lock.Lock()
  31. defer sw.lock.Unlock()
  32. for _, l := range sw.listeners {
  33. l()
  34. }
  35. }
  36. func (sw *stateWatcher) updateState(conn etcdConn) {
  37. sw.currentState = conn.GetState()
  38. switch sw.currentState {
  39. case connectivity.TransientFailure, connectivity.Shutdown:
  40. sw.disconnected = true
  41. case connectivity.Ready:
  42. if sw.disconnected {
  43. sw.disconnected = false
  44. sw.notifyListeners()
  45. }
  46. }
  47. }
  48. func (sw *stateWatcher) watch(conn etcdConn) {
  49. sw.currentState = conn.GetState()
  50. for {
  51. if conn.WaitForStateChange(context.Background(), sw.currentState) {
  52. sw.updateState(conn)
  53. }
  54. }
  55. }