grpc.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package naming
  15. import (
  16. "context"
  17. "encoding/json"
  18. "fmt"
  19. etcd "go.etcd.io/etcd/clientv3"
  20. "google.golang.org/grpc/codes"
  21. "google.golang.org/grpc/naming"
  22. "google.golang.org/grpc/status"
  23. )
  24. var ErrWatcherClosed = fmt.Errorf("naming: watch closed")
  25. // GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
  26. type GRPCResolver struct {
  27. // Client is an initialized etcd client.
  28. Client *etcd.Client
  29. }
  30. func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
  31. switch nm.Op {
  32. case naming.Add:
  33. var v []byte
  34. if v, err = json.Marshal(nm); err != nil {
  35. return status.Error(codes.InvalidArgument, err.Error())
  36. }
  37. _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
  38. case naming.Delete:
  39. _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
  40. default:
  41. return status.Error(codes.InvalidArgument, "naming: bad naming op")
  42. }
  43. return err
  44. }
  45. func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
  46. ctx, cancel := context.WithCancel(context.Background())
  47. w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
  48. return w, nil
  49. }
  50. type gRPCWatcher struct {
  51. c *etcd.Client
  52. target string
  53. ctx context.Context
  54. cancel context.CancelFunc
  55. wch etcd.WatchChan
  56. err error
  57. }
  58. // Next gets the next set of updates from the etcd resolver.
  59. // Calls to Next should be serialized; concurrent calls are not safe since
  60. // there is no way to reconcile the update ordering.
  61. func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
  62. if gw.wch == nil {
  63. // first Next() returns all addresses
  64. return gw.firstNext()
  65. }
  66. if gw.err != nil {
  67. return nil, gw.err
  68. }
  69. // process new events on target/*
  70. wr, ok := <-gw.wch
  71. if !ok {
  72. gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
  73. return nil, gw.err
  74. }
  75. if gw.err = wr.Err(); gw.err != nil {
  76. return nil, gw.err
  77. }
  78. updates := make([]*naming.Update, 0, len(wr.Events))
  79. for _, e := range wr.Events {
  80. var jupdate naming.Update
  81. var err error
  82. switch e.Type {
  83. case etcd.EventTypePut:
  84. err = json.Unmarshal(e.Kv.Value, &jupdate)
  85. jupdate.Op = naming.Add
  86. case etcd.EventTypeDelete:
  87. err = json.Unmarshal(e.PrevKv.Value, &jupdate)
  88. jupdate.Op = naming.Delete
  89. default:
  90. continue
  91. }
  92. if err == nil {
  93. updates = append(updates, &jupdate)
  94. }
  95. }
  96. return updates, nil
  97. }
  98. func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
  99. // Use serialized request so resolution still works if the target etcd
  100. // server is partitioned away from the quorum.
  101. resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
  102. if gw.err = err; err != nil {
  103. return nil, err
  104. }
  105. updates := make([]*naming.Update, 0, len(resp.Kvs))
  106. for _, kv := range resp.Kvs {
  107. var jupdate naming.Update
  108. if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
  109. continue
  110. }
  111. updates = append(updates, &jupdate)
  112. }
  113. opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
  114. gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
  115. return updates, nil
  116. }
  117. func (gw *gRPCWatcher) Close() { gw.cancel() }