v2_server.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "time"
  17. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  18. "golang.org/x/net/context"
  19. )
  20. type v2API interface {
  21. Post(ctx context.Context, r *pb.Request) (Response, error)
  22. Put(ctx context.Context, r *pb.Request) (Response, error)
  23. Delete(ctx context.Context, r *pb.Request) (Response, error)
  24. QGet(ctx context.Context, r *pb.Request) (Response, error)
  25. Get(ctx context.Context, r *pb.Request) (Response, error)
  26. Head(ctx context.Context, r *pb.Request) (Response, error)
  27. }
  28. type v2apiStore struct{ s *EtcdServer }
  29. func (a *v2apiStore) Post(ctx context.Context, r *pb.Request) (Response, error) {
  30. return a.processRaftRequest(ctx, r)
  31. }
  32. func (a *v2apiStore) Put(ctx context.Context, r *pb.Request) (Response, error) {
  33. return a.processRaftRequest(ctx, r)
  34. }
  35. func (a *v2apiStore) Delete(ctx context.Context, r *pb.Request) (Response, error) {
  36. return a.processRaftRequest(ctx, r)
  37. }
  38. func (a *v2apiStore) QGet(ctx context.Context, r *pb.Request) (Response, error) {
  39. return a.processRaftRequest(ctx, r)
  40. }
  41. func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Response, error) {
  42. data, err := r.Marshal()
  43. if err != nil {
  44. return Response{}, err
  45. }
  46. ch := a.s.w.Register(r.ID)
  47. // TODO: benchmark the cost of time.Now()
  48. // might be sampling?
  49. start := time.Now()
  50. a.s.r.Propose(ctx, data)
  51. proposePending.Inc()
  52. defer proposePending.Dec()
  53. select {
  54. case x := <-ch:
  55. proposeDurations.Observe(float64(time.Since(start)) / float64(time.Second))
  56. resp := x.(Response)
  57. return resp, resp.err
  58. case <-ctx.Done():
  59. proposeFailed.Inc()
  60. a.s.w.Trigger(r.ID, nil) // GC wait
  61. return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start)
  62. case <-a.s.done:
  63. }
  64. return Response{}, ErrStopped
  65. }
  66. func (a *v2apiStore) Get(ctx context.Context, r *pb.Request) (Response, error) {
  67. if r.Wait {
  68. wc, err := a.s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
  69. if err != nil {
  70. return Response{}, err
  71. }
  72. return Response{Watcher: wc}, nil
  73. }
  74. ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted)
  75. if err != nil {
  76. return Response{}, err
  77. }
  78. return Response{Event: ev}, nil
  79. }
  80. func (a *v2apiStore) Head(ctx context.Context, r *pb.Request) (Response, error) {
  81. ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted)
  82. if err != nil {
  83. return Response{}, err
  84. }
  85. return Response{Event: ev}, nil
  86. }
  87. // Do interprets r and performs an operation on s.store according to r.Method
  88. // and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
  89. // Quorum == true, r will be sent through consensus before performing its
  90. // respective operation. Do will block until an action is performed or there is
  91. // an error.
  92. func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
  93. r.ID = s.reqIDGen.Next()
  94. if r.Method == "GET" && r.Quorum {
  95. r.Method = "QGET"
  96. }
  97. v2api := (v2API)(&v2apiStore{s})
  98. switch r.Method {
  99. case "POST":
  100. return v2api.Post(ctx, &r)
  101. case "PUT":
  102. return v2api.Put(ctx, &r)
  103. case "DELETE":
  104. return v2api.Delete(ctx, &r)
  105. case "QGET":
  106. return v2api.QGet(ctx, &r)
  107. case "GET":
  108. return v2api.Get(ctx, &r)
  109. case "HEAD":
  110. return v2api.Head(ctx, &r)
  111. }
  112. return Response{}, ErrUnknownMethod
  113. }