register.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package grpcproxy
  15. import (
  16. "encoding/json"
  17. "os"
  18. "go.etcd.io/etcd/clientv3"
  19. "go.etcd.io/etcd/clientv3/concurrency"
  20. "go.etcd.io/etcd/clientv3/naming"
  21. "golang.org/x/time/rate"
  22. gnaming "google.golang.org/grpc/naming"
  23. )
  24. // allow maximum 1 retry per second
  25. const registerRetryRate = 1
  26. // Register registers itself as a grpc-proxy server by writing prefixed-key
  27. // with session of specified TTL (in seconds). The returned channel is closed
  28. // when the client's context is canceled.
  29. func Register(c *clientv3.Client, prefix string, addr string, ttl int) <-chan struct{} {
  30. rm := rate.NewLimiter(rate.Limit(registerRetryRate), registerRetryRate)
  31. donec := make(chan struct{})
  32. go func() {
  33. defer close(donec)
  34. for rm.Wait(c.Ctx()) == nil {
  35. ss, err := registerSession(c, prefix, addr, ttl)
  36. if err != nil {
  37. plog.Warningf("failed to create a session %v", err)
  38. continue
  39. }
  40. select {
  41. case <-c.Ctx().Done():
  42. ss.Close()
  43. return
  44. case <-ss.Done():
  45. plog.Warning("session expired; possible network partition or server restart")
  46. plog.Warning("creating a new session to rejoin")
  47. continue
  48. }
  49. }
  50. }()
  51. return donec
  52. }
  53. func registerSession(c *clientv3.Client, prefix string, addr string, ttl int) (*concurrency.Session, error) {
  54. ss, err := concurrency.NewSession(c, concurrency.WithTTL(ttl))
  55. if err != nil {
  56. return nil, err
  57. }
  58. gr := &naming.GRPCResolver{Client: c}
  59. if err = gr.Update(c.Ctx(), prefix, gnaming.Update{Op: gnaming.Add, Addr: addr, Metadata: getMeta()}, clientv3.WithLease(ss.Lease())); err != nil {
  60. return nil, err
  61. }
  62. plog.Infof("registered %q with %d-second lease", addr, ttl)
  63. return ss, nil
  64. }
  65. // meta represents metadata of proxy register.
  66. type meta struct {
  67. Name string `json:"name"`
  68. }
  69. func getMeta() string {
  70. hostname, _ := os.Hostname()
  71. bts, _ := json.Marshal(meta{Name: hostname})
  72. return string(bts)
  73. }
  74. func decodeMeta(s string) (meta, error) {
  75. m := meta{}
  76. err := json.Unmarshal([]byte(s), &m)
  77. return m, err
  78. }