key.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // Package v3rpc implements etcd v3 RPC system based on gRPC.
  15. package v3rpc
  16. import (
  17. "context"
  18. "github.com/coreos/etcd/etcdserver"
  19. "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
  20. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  21. "github.com/coreos/etcd/pkg/adt"
  22. "github.com/coreos/pkg/capnslog"
  23. )
  24. var (
  25. plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver/api/v3rpc")
  26. )
  27. type kvServer struct {
  28. hdr header
  29. kv etcdserver.RaftKV
  30. // maxTxnOps is the max operations per txn.
  31. // e.g suppose maxTxnOps = 128.
  32. // Txn.Success can have at most 128 operations,
  33. // and Txn.Failure can have at most 128 operations.
  34. maxTxnOps uint
  35. }
  36. func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
  37. return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
  38. }
  39. func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
  40. if err := checkRangeRequest(r); err != nil {
  41. return nil, err
  42. }
  43. resp, err := s.kv.Range(ctx, r)
  44. if err != nil {
  45. return nil, togRPCError(err)
  46. }
  47. s.hdr.fill(resp.Header)
  48. return resp, nil
  49. }
  50. func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
  51. if err := checkPutRequest(r); err != nil {
  52. return nil, err
  53. }
  54. resp, err := s.kv.Put(ctx, r)
  55. if err != nil {
  56. return nil, togRPCError(err)
  57. }
  58. s.hdr.fill(resp.Header)
  59. return resp, nil
  60. }
  61. func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
  62. if err := checkDeleteRequest(r); err != nil {
  63. return nil, err
  64. }
  65. resp, err := s.kv.DeleteRange(ctx, r)
  66. if err != nil {
  67. return nil, togRPCError(err)
  68. }
  69. s.hdr.fill(resp.Header)
  70. return resp, nil
  71. }
  72. func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
  73. if err := checkTxnRequest(r, int(s.maxTxnOps)); err != nil {
  74. return nil, err
  75. }
  76. // check for forbidden put/del overlaps after checking request to avoid quadratic blowup
  77. if _, _, err := checkIntervals(r.Success); err != nil {
  78. return nil, err
  79. }
  80. if _, _, err := checkIntervals(r.Failure); err != nil {
  81. return nil, err
  82. }
  83. resp, err := s.kv.Txn(ctx, r)
  84. if err != nil {
  85. return nil, togRPCError(err)
  86. }
  87. s.hdr.fill(resp.Header)
  88. return resp, nil
  89. }
  90. func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
  91. resp, err := s.kv.Compact(ctx, r)
  92. if err != nil {
  93. return nil, togRPCError(err)
  94. }
  95. s.hdr.fill(resp.Header)
  96. return resp, nil
  97. }
  98. func checkRangeRequest(r *pb.RangeRequest) error {
  99. if len(r.Key) == 0 {
  100. return rpctypes.ErrGRPCEmptyKey
  101. }
  102. return nil
  103. }
  104. func checkPutRequest(r *pb.PutRequest) error {
  105. if len(r.Key) == 0 {
  106. return rpctypes.ErrGRPCEmptyKey
  107. }
  108. if r.IgnoreValue && len(r.Value) != 0 {
  109. return rpctypes.ErrGRPCValueProvided
  110. }
  111. if r.IgnoreLease && r.Lease != 0 {
  112. return rpctypes.ErrGRPCLeaseProvided
  113. }
  114. return nil
  115. }
  116. func checkDeleteRequest(r *pb.DeleteRangeRequest) error {
  117. if len(r.Key) == 0 {
  118. return rpctypes.ErrGRPCEmptyKey
  119. }
  120. return nil
  121. }
  122. func checkTxnRequest(r *pb.TxnRequest, maxTxnOps int) error {
  123. opc := len(r.Compare)
  124. if opc < len(r.Success) {
  125. opc = len(r.Success)
  126. }
  127. if opc < len(r.Failure) {
  128. opc = len(r.Failure)
  129. }
  130. if opc > maxTxnOps {
  131. return rpctypes.ErrGRPCTooManyOps
  132. }
  133. for _, c := range r.Compare {
  134. if len(c.Key) == 0 {
  135. return rpctypes.ErrGRPCEmptyKey
  136. }
  137. }
  138. for _, u := range r.Success {
  139. if err := checkRequestOp(u, maxTxnOps-opc); err != nil {
  140. return err
  141. }
  142. }
  143. for _, u := range r.Failure {
  144. if err := checkRequestOp(u, maxTxnOps-opc); err != nil {
  145. return err
  146. }
  147. }
  148. return nil
  149. }
  150. // checkIntervals tests whether puts and deletes overlap for a list of ops. If
  151. // there is an overlap, returns an error. If no overlap, return put and delete
  152. // sets for recursive evaluation.
  153. func checkIntervals(reqs []*pb.RequestOp) (map[string]struct{}, adt.IntervalTree, error) {
  154. dels := adt.NewIntervalTree()
  155. // collect deletes from this level; build first to check lower level overlapped puts
  156. for _, req := range reqs {
  157. tv, ok := req.Request.(*pb.RequestOp_RequestDeleteRange)
  158. if !ok {
  159. continue
  160. }
  161. dreq := tv.RequestDeleteRange
  162. if dreq == nil {
  163. continue
  164. }
  165. var iv adt.Interval
  166. if len(dreq.RangeEnd) != 0 {
  167. iv = adt.NewStringAffineInterval(string(dreq.Key), string(dreq.RangeEnd))
  168. } else {
  169. iv = adt.NewStringAffinePoint(string(dreq.Key))
  170. }
  171. dels.Insert(iv, struct{}{})
  172. }
  173. // collect children puts/deletes
  174. puts := make(map[string]struct{})
  175. for _, req := range reqs {
  176. tv, ok := req.Request.(*pb.RequestOp_RequestTxn)
  177. if !ok {
  178. continue
  179. }
  180. putsThen, delsThen, err := checkIntervals(tv.RequestTxn.Success)
  181. if err != nil {
  182. return nil, dels, err
  183. }
  184. putsElse, delsElse, err := checkIntervals(tv.RequestTxn.Failure)
  185. if err != nil {
  186. return nil, dels, err
  187. }
  188. for k := range putsThen {
  189. if _, ok := puts[k]; ok {
  190. return nil, dels, rpctypes.ErrGRPCDuplicateKey
  191. }
  192. if dels.Intersects(adt.NewStringAffinePoint(k)) {
  193. return nil, dels, rpctypes.ErrGRPCDuplicateKey
  194. }
  195. puts[k] = struct{}{}
  196. }
  197. for k := range putsElse {
  198. if _, ok := puts[k]; ok {
  199. // if key is from putsThen, overlap is OK since
  200. // either then/else are mutually exclusive
  201. if _, isSafe := putsThen[k]; !isSafe {
  202. return nil, dels, rpctypes.ErrGRPCDuplicateKey
  203. }
  204. }
  205. if dels.Intersects(adt.NewStringAffinePoint(k)) {
  206. return nil, dels, rpctypes.ErrGRPCDuplicateKey
  207. }
  208. puts[k] = struct{}{}
  209. }
  210. dels.Union(delsThen, adt.NewStringAffineInterval("\x00", ""))
  211. dels.Union(delsElse, adt.NewStringAffineInterval("\x00", ""))
  212. }
  213. // collect and check this level's puts
  214. for _, req := range reqs {
  215. tv, ok := req.Request.(*pb.RequestOp_RequestPut)
  216. if !ok || tv.RequestPut == nil {
  217. continue
  218. }
  219. k := string(tv.RequestPut.Key)
  220. if _, ok := puts[k]; ok {
  221. return nil, dels, rpctypes.ErrGRPCDuplicateKey
  222. }
  223. if dels.Intersects(adt.NewStringAffinePoint(k)) {
  224. return nil, dels, rpctypes.ErrGRPCDuplicateKey
  225. }
  226. puts[k] = struct{}{}
  227. }
  228. return puts, dels, nil
  229. }
  230. func checkRequestOp(u *pb.RequestOp, maxTxnOps int) error {
  231. // TODO: ensure only one of the field is set.
  232. switch uv := u.Request.(type) {
  233. case *pb.RequestOp_RequestRange:
  234. return checkRangeRequest(uv.RequestRange)
  235. case *pb.RequestOp_RequestPut:
  236. return checkPutRequest(uv.RequestPut)
  237. case *pb.RequestOp_RequestDeleteRange:
  238. return checkDeleteRequest(uv.RequestDeleteRange)
  239. case *pb.RequestOp_RequestTxn:
  240. return checkTxnRequest(uv.RequestTxn, maxTxnOps)
  241. default:
  242. // empty op / nil entry
  243. return rpctypes.ErrGRPCKeyNotFound
  244. }
  245. }