statewatcher.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. //go:generate mockgen -package internal -destination statewatcher_mock.go -source statewatcher.go etcdConn
  2. package internal
  3. import (
  4. "context"
  5. "sync"
  6. "google.golang.org/grpc/connectivity"
  7. )
  8. type (
  9. etcdConn interface {
  10. GetState() connectivity.State
  11. WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool
  12. }
  13. stateWatcher struct {
  14. disconnected bool
  15. currentState connectivity.State
  16. listeners []func()
  17. lock sync.Mutex
  18. }
  19. )
  20. func newStateWatcher() *stateWatcher {
  21. return new(stateWatcher)
  22. }
  23. func (sw *stateWatcher) addListener(l func()) {
  24. sw.lock.Lock()
  25. sw.listeners = append(sw.listeners, l)
  26. sw.lock.Unlock()
  27. }
  28. func (sw *stateWatcher) watch(conn etcdConn) {
  29. sw.currentState = conn.GetState()
  30. for {
  31. if conn.WaitForStateChange(context.Background(), sw.currentState) {
  32. newState := conn.GetState()
  33. sw.lock.Lock()
  34. sw.currentState = newState
  35. switch newState {
  36. case connectivity.TransientFailure, connectivity.Shutdown:
  37. sw.disconnected = true
  38. case connectivity.Ready:
  39. if sw.disconnected {
  40. sw.disconnected = false
  41. for _, l := range sw.listeners {
  42. l()
  43. }
  44. }
  45. }
  46. sw.lock.Unlock()
  47. }
  48. }
  49. }