grpc.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package naming
  15. import (
  16. "encoding/json"
  17. etcd "github.com/coreos/etcd/clientv3"
  18. "golang.org/x/net/context"
  19. "google.golang.org/grpc"
  20. "google.golang.org/grpc/codes"
  21. "google.golang.org/grpc/naming"
  22. )
  23. // GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
  24. type GRPCResolver struct {
  25. // Client is an initialized etcd client.
  26. Client *etcd.Client
  27. }
  28. func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
  29. switch nm.Op {
  30. case naming.Add:
  31. var v []byte
  32. if v, err = json.Marshal(nm); err != nil {
  33. return grpc.Errorf(codes.InvalidArgument, err.Error())
  34. }
  35. _, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
  36. case naming.Delete:
  37. _, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
  38. default:
  39. return grpc.Errorf(codes.InvalidArgument, "naming: bad naming op")
  40. }
  41. return err
  42. }
  43. func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
  44. ctx, cancel := context.WithCancel(context.Background())
  45. w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
  46. return w, nil
  47. }
  48. type gRPCWatcher struct {
  49. c *etcd.Client
  50. target string
  51. ctx context.Context
  52. cancel context.CancelFunc
  53. wch etcd.WatchChan
  54. err error
  55. }
  56. // Next gets the next set of updates from the etcd resolver.
  57. // Calls to Next should be serialized; concurrent calls are not safe since
  58. // there is no way to reconcile the update ordering.
  59. func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
  60. if gw.wch == nil {
  61. // first Next() returns all addresses
  62. return gw.firstNext()
  63. }
  64. if gw.err != nil {
  65. return nil, gw.err
  66. }
  67. // process new events on target/*
  68. wr, ok := <-gw.wch
  69. if !ok {
  70. gw.err = grpc.Errorf(codes.Unavailable, "naming: watch closed")
  71. return nil, gw.err
  72. }
  73. if gw.err = wr.Err(); gw.err != nil {
  74. return nil, gw.err
  75. }
  76. updates := make([]*naming.Update, 0, len(wr.Events))
  77. for _, e := range wr.Events {
  78. var jupdate naming.Update
  79. var err error
  80. switch e.Type {
  81. case etcd.EventTypePut:
  82. err = json.Unmarshal(e.Kv.Value, &jupdate)
  83. jupdate.Op = naming.Add
  84. case etcd.EventTypeDelete:
  85. err = json.Unmarshal(e.PrevKv.Value, &jupdate)
  86. jupdate.Op = naming.Delete
  87. }
  88. if err == nil {
  89. updates = append(updates, &jupdate)
  90. }
  91. }
  92. return updates, nil
  93. }
  94. func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
  95. // Use serialized request so resolution still works if the target etcd
  96. // server is partitioned away from the quorum.
  97. resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
  98. if gw.err = err; err != nil {
  99. return nil, err
  100. }
  101. updates := make([]*naming.Update, 0, len(resp.Kvs))
  102. for _, kv := range resp.Kvs {
  103. var jupdate naming.Update
  104. if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
  105. continue
  106. }
  107. updates = append(updates, &jupdate)
  108. }
  109. opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
  110. gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
  111. return updates, nil
  112. }
  113. func (gw *gRPCWatcher) Close() { gw.cancel() }