key.go 4.7 KB


  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Package v3rpc implements etcd v3 RPC system based on gRPC.
  15. package v3rpc
  16. import (
  17. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
  18. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  19. "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc"
  20. "github.com/coreos/etcd/Godeps/_workspace/src/google.golang.org/grpc/codes"
  21. "github.com/coreos/etcd/etcdserver"
  22. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  23. "github.com/coreos/etcd/storage"
  24. )
  25. var (
  26. plog = capnslog.NewPackageLogger("github.com/coreos/etcd/etcdserver/api", "v3rpc")
  27. )
  28. type kvServer struct {
  29. clusterID int64
  30. memberID int64
  31. raftTimer etcdserver.RaftTimer
  32. kv etcdserver.RaftKV
  33. }
  34. func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
  35. return &kvServer{
  36. clusterID: int64(s.Cluster().ID()),
  37. memberID: int64(s.ID()),
  38. raftTimer: s,
  39. kv: s,
  40. }
  41. }
  42. func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
  43. if err := checkRangeRequest(r); err != nil {
  44. return nil, err
  45. }
  46. resp, err := s.kv.Range(ctx, r)
  47. if err != nil {
  48. return nil, togRPCError(err)
  49. }
  50. if resp.Header == nil {
  51. plog.Panic("unexpected nil resp.Header")
  52. }
  53. s.fillInHeader(resp.Header)
  54. return resp, err
  55. }
  56. func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
  57. if err := checkPutRequest(r); err != nil {
  58. return nil, err
  59. }
  60. resp, err := s.kv.Put(ctx, r)
  61. if err != nil {
  62. return nil, togRPCError(err)
  63. }
  64. if resp.Header == nil {
  65. plog.Panic("unexpected nil resp.Header")
  66. }
  67. s.fillInHeader(resp.Header)
  68. return resp, err
  69. }
  70. func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
  71. if err := checkDeleteRequest(r); err != nil {
  72. return nil, err
  73. }
  74. resp, err := s.kv.DeleteRange(ctx, r)
  75. if err != nil {
  76. return nil, togRPCError(err)
  77. }
  78. if resp.Header == nil {
  79. plog.Panic("unexpected nil resp.Header")
  80. }
  81. s.fillInHeader(resp.Header)
  82. return resp, err
  83. }
  84. func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
  85. if err := checkTxnRequest(r); err != nil {
  86. return nil, err
  87. }
  88. resp, err := s.kv.Txn(ctx, r)
  89. if err != nil {
  90. return nil, togRPCError(err)
  91. }
  92. if resp.Header == nil {
  93. plog.Panic("unexpected nil resp.Header")
  94. }
  95. s.fillInHeader(resp.Header)
  96. return resp, err
  97. }
  98. func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
  99. resp, err := s.kv.Compact(ctx, r)
  100. if err != nil {
  101. return nil, togRPCError(err)
  102. }
  103. if resp.Header == nil {
  104. plog.Panic("unexpected nil resp.Header")
  105. }
  106. s.fillInHeader(resp.Header)
  107. return resp, nil
  108. }
  109. // fillInHeader populates pb.ResponseHeader from kvServer, except Revision.
  110. func (s *kvServer) fillInHeader(h *pb.ResponseHeader) {
  111. h.ClusterId = uint64(s.clusterID)
  112. h.MemberId = uint64(s.memberID)
  113. h.RaftTerm = s.raftTimer.Term()
  114. }
  115. func checkRangeRequest(r *pb.RangeRequest) error {
  116. if len(r.Key) == 0 {
  117. return ErrEmptyKey
  118. }
  119. return nil
  120. }
  121. func checkPutRequest(r *pb.PutRequest) error {
  122. if len(r.Key) == 0 {
  123. return ErrEmptyKey
  124. }
  125. return nil
  126. }
  127. func checkDeleteRequest(r *pb.DeleteRangeRequest) error {
  128. if len(r.Key) == 0 {
  129. return ErrEmptyKey
  130. }
  131. return nil
  132. }
  133. func checkTxnRequest(r *pb.TxnRequest) error {
  134. for _, c := range r.Compare {
  135. if len(c.Key) == 0 {
  136. return ErrEmptyKey
  137. }
  138. }
  139. for _, u := range r.Success {
  140. if err := checkRequestUnion(u); err != nil {
  141. return err
  142. }
  143. }
  144. for _, u := range r.Failure {
  145. if err := checkRequestUnion(u); err != nil {
  146. return err
  147. }
  148. }
  149. return nil
  150. }
  151. func checkRequestUnion(u *pb.RequestUnion) error {
  152. // TODO: ensure only one of the field is set.
  153. switch {
  154. case u.RequestRange != nil:
  155. return checkRangeRequest(u.RequestRange)
  156. case u.RequestPut != nil:
  157. return checkPutRequest(u.RequestPut)
  158. case u.RequestDeleteRange != nil:
  159. return checkDeleteRequest(u.RequestDeleteRange)
  160. default:
  161. // empty union
  162. return nil
  163. }
  164. }
  165. func togRPCError(err error) error {
  166. switch err {
  167. case storage.ErrCompacted:
  168. return ErrCompacted
  169. case storage.ErrFutureRev:
  170. return ErrFutureRev
  171. // TODO: handle error from raft and timeout
  172. default:
  173. return grpc.Errorf(codes.Unknown, err.Error())
  174. }
  175. }