register.go 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package grpcproxy
  15. import (
  16. "github.com/coreos/etcd/clientv3"
  17. "github.com/coreos/etcd/clientv3/concurrency"
  18. "github.com/coreos/etcd/clientv3/naming"
  19. "golang.org/x/time/rate"
  20. gnaming "google.golang.org/grpc/naming"
  21. )
  22. // allow maximum 1 retry per second
  23. const registerRetryRate = 1
  24. // register registers itself as a grpc-proxy server by writing prefixed-key
  25. // with session of specified TTL (in seconds). The returned channel is closed
  26. // when the client's context is canceled.
  27. func register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} {
  28. rm := rate.NewLimiter(rate.Limit(registerRetryRate), registerRetryRate)
  29. donec := make(chan struct{})
  30. go func() {
  31. defer close(donec)
  32. for rm.Wait(c.Ctx()) == nil {
  33. ss, err := registerSession(c, prefix, addr, ttl)
  34. if err != nil {
  35. plog.Warningf("failed to create a session %v", err)
  36. continue
  37. }
  38. select {
  39. case <-c.Ctx().Done():
  40. ss.Close()
  41. return
  42. case <-ss.Done():
  43. plog.Warning("session expired; possible network partition or server restart")
  44. plog.Warning("creating a new session to rejoin")
  45. continue
  46. }
  47. }
  48. }()
  49. return donec
  50. }
  51. func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*concurrency.Session, error) {
  52. ss, err := concurrency.NewSession(c, concurrency.WithTTL(ttl))
  53. if err != nil {
  54. return nil, err
  55. }
  56. gr := &naming.GRPCResolver{Client: c}
  57. if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr}, clientv3.WithLease(ss.Lease())); err != nil {
  58. return nil, err
  59. }
  60. plog.Infof("registered %q with %d-second lease", addr, ttl)
  61. return ss, nil
  62. }