v2_server.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdserver
  15. import (
  16. "time"
  17. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  18. "golang.org/x/net/context"
  19. )
  20. type v2API interface {
  21. Post(ctx context.Context, r *pb.Request) (Response, error)
  22. Put(ctx context.Context, r *pb.Request) (Response, error)
  23. Delete(ctx context.Context, r *pb.Request) (Response, error)
  24. QGet(ctx context.Context, r *pb.Request) (Response, error)
  25. Get(ctx context.Context, r *pb.Request) (Response, error)
  26. Head(ctx context.Context, r *pb.Request) (Response, error)
  27. }
  28. type v2apiStore struct{ s *EtcdServer }
  29. func (a *v2apiStore) Post(ctx context.Context, r *pb.Request) (Response, error) {
  30. return a.processRaftRequest(ctx, r)
  31. }
  32. func (a *v2apiStore) Put(ctx context.Context, r *pb.Request) (Response, error) {
  33. return a.processRaftRequest(ctx, r)
  34. }
  35. func (a *v2apiStore) Delete(ctx context.Context, r *pb.Request) (Response, error) {
  36. return a.processRaftRequest(ctx, r)
  37. }
  38. func (a *v2apiStore) QGet(ctx context.Context, r *pb.Request) (Response, error) {
  39. return a.processRaftRequest(ctx, r)
  40. }
  41. func (a *v2apiStore) processRaftRequest(ctx context.Context, r *pb.Request) (Response, error) {
  42. data, err := r.Marshal()
  43. if err != nil {
  44. return Response{}, err
  45. }
  46. ch := a.s.w.Register(r.ID)
  47. start := time.Now()
  48. a.s.r.Propose(ctx, data)
  49. proposalsPending.Inc()
  50. defer proposalsPending.Dec()
  51. select {
  52. case x := <-ch:
  53. resp := x.(Response)
  54. return resp, resp.err
  55. case <-ctx.Done():
  56. proposalsFailed.Inc()
  57. a.s.w.Trigger(r.ID, nil) // GC wait
  58. return Response{}, a.s.parseProposeCtxErr(ctx.Err(), start)
  59. case <-a.s.done:
  60. }
  61. return Response{}, ErrStopped
  62. }
  63. func (a *v2apiStore) Get(ctx context.Context, r *pb.Request) (Response, error) {
  64. if r.Wait {
  65. wc, err := a.s.store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
  66. if err != nil {
  67. return Response{}, err
  68. }
  69. return Response{Watcher: wc}, nil
  70. }
  71. ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted)
  72. if err != nil {
  73. return Response{}, err
  74. }
  75. return Response{Event: ev}, nil
  76. }
  77. func (a *v2apiStore) Head(ctx context.Context, r *pb.Request) (Response, error) {
  78. ev, err := a.s.store.Get(r.Path, r.Recursive, r.Sorted)
  79. if err != nil {
  80. return Response{}, err
  81. }
  82. return Response{Event: ev}, nil
  83. }
  84. // Do interprets r and performs an operation on s.store according to r.Method
  85. // and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
  86. // Quorum == true, r will be sent through consensus before performing its
  87. // respective operation. Do will block until an action is performed or there is
  88. // an error.
  89. func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
  90. r.ID = s.reqIDGen.Next()
  91. if r.Method == "GET" && r.Quorum {
  92. r.Method = "QGET"
  93. }
  94. v2api := (v2API)(&v2apiStore{s})
  95. switch r.Method {
  96. case "POST":
  97. return v2api.Post(ctx, &r)
  98. case "PUT":
  99. return v2api.Put(ctx, &r)
  100. case "DELETE":
  101. return v2api.Delete(ctx, &r)
  102. case "QGET":
  103. return v2api.QGet(ctx, &r)
  104. case "GET":
  105. return v2api.Get(ctx, &r)
  106. case "HEAD":
  107. return v2api.Head(ctx, &r)
  108. }
  109. return Response{}, ErrUnknownMethod
  110. }