store.go 17 KB


  1. // Copyright 2017 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package v2v3
  15. import (
  16. "context"
  17. "fmt"
  18. "path"
  19. "sort"
  20. "strings"
  21. "time"
  22. "go.etcd.io/etcd/clientv3"
  23. "go.etcd.io/etcd/clientv3/concurrency"
  24. "go.etcd.io/etcd/etcdserver/api/v2error"
  25. "go.etcd.io/etcd/etcdserver/api/v2store"
  26. "go.etcd.io/etcd/mvcc/mvccpb"
  27. )
  28. // store implements the Store interface for V2 using
  29. // a v3 client.
  30. type v2v3Store struct {
  31. c *clientv3.Client
  32. // pfx is the v3 prefix where keys should be stored.
  33. pfx string
  34. ctx context.Context
  35. }
  36. const maxPathDepth = 63
  37. var errUnsupported = fmt.Errorf("TTLs are unsupported")
  38. func NewStore(c *clientv3.Client, pfx string) v2store.Store { return newStore(c, pfx) }
  39. func newStore(c *clientv3.Client, pfx string) *v2v3Store { return &v2v3Store{c, pfx, c.Ctx()} }
  40. func (s *v2v3Store) Index() uint64 { panic("STUB") }
  41. func (s *v2v3Store) Get(nodePath string, recursive, sorted bool) (*v2store.Event, error) {
  42. key := s.mkPath(nodePath)
  43. resp, err := s.c.Txn(s.ctx).Then(
  44. clientv3.OpGet(key+"/"),
  45. clientv3.OpGet(key),
  46. ).Commit()
  47. if err != nil {
  48. return nil, err
  49. }
  50. if kvs := resp.Responses[0].GetResponseRange().Kvs; len(kvs) != 0 || isRoot(nodePath) {
  51. nodes, err := s.getDir(nodePath, recursive, sorted, resp.Header.Revision)
  52. if err != nil {
  53. return nil, err
  54. }
  55. cidx, midx := uint64(0), uint64(0)
  56. if len(kvs) > 0 {
  57. cidx, midx = mkV2Rev(kvs[0].CreateRevision), mkV2Rev(kvs[0].ModRevision)
  58. }
  59. return &v2store.Event{
  60. Action: v2store.Get,
  61. Node: &v2store.NodeExtern{
  62. Key: nodePath,
  63. Dir: true,
  64. Nodes: nodes,
  65. CreatedIndex: cidx,
  66. ModifiedIndex: midx,
  67. },
  68. EtcdIndex: mkV2Rev(resp.Header.Revision),
  69. }, nil
  70. }
  71. kvs := resp.Responses[1].GetResponseRange().Kvs
  72. if len(kvs) == 0 {
  73. return nil, v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
  74. }
  75. return &v2store.Event{
  76. Action: v2store.Get,
  77. Node: s.mkV2Node(kvs[0]),
  78. EtcdIndex: mkV2Rev(resp.Header.Revision),
  79. }, nil
  80. }
  81. func (s *v2v3Store) getDir(nodePath string, recursive, sorted bool, rev int64) ([]*v2store.NodeExtern, error) {
  82. rootNodes, err := s.getDirDepth(nodePath, 1, rev)
  83. if err != nil || !recursive {
  84. if sorted {
  85. sort.Sort(v2store.NodeExterns(rootNodes))
  86. }
  87. return rootNodes, err
  88. }
  89. nextNodes := rootNodes
  90. nodes := make(map[string]*v2store.NodeExtern)
  91. // Breadth walk the subdirectories
  92. for i := 2; len(nextNodes) > 0; i++ {
  93. for _, n := range nextNodes {
  94. nodes[n.Key] = n
  95. if parent := nodes[path.Dir(n.Key)]; parent != nil {
  96. parent.Nodes = append(parent.Nodes, n)
  97. }
  98. }
  99. if nextNodes, err = s.getDirDepth(nodePath, i, rev); err != nil {
  100. return nil, err
  101. }
  102. }
  103. if sorted {
  104. sort.Sort(v2store.NodeExterns(rootNodes))
  105. }
  106. return rootNodes, nil
  107. }
  108. func (s *v2v3Store) getDirDepth(nodePath string, depth int, rev int64) ([]*v2store.NodeExtern, error) {
  109. pd := s.mkPathDepth(nodePath, depth)
  110. resp, err := s.c.Get(s.ctx, pd, clientv3.WithPrefix(), clientv3.WithRev(rev))
  111. if err != nil {
  112. return nil, err
  113. }
  114. nodes := make([]*v2store.NodeExtern, len(resp.Kvs))
  115. for i, kv := range resp.Kvs {
  116. nodes[i] = s.mkV2Node(kv)
  117. }
  118. return nodes, nil
  119. }
  120. func (s *v2v3Store) Set(
  121. nodePath string,
  122. dir bool,
  123. value string,
  124. expireOpts v2store.TTLOptionSet,
  125. ) (*v2store.Event, error) {
  126. if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
  127. return nil, errUnsupported
  128. }
  129. if isRoot(nodePath) {
  130. return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
  131. }
  132. ecode := 0
  133. applyf := func(stm concurrency.STM) error {
  134. // build path if any directories in path do not exist
  135. dirs := []string{}
  136. for p := path.Dir(nodePath); !isRoot(p); p = path.Dir(p) {
  137. pp := s.mkPath(p)
  138. if stm.Rev(pp) > 0 {
  139. ecode = v2error.EcodeNotDir
  140. return nil
  141. }
  142. if stm.Rev(pp+"/") == 0 {
  143. dirs = append(dirs, pp+"/")
  144. }
  145. }
  146. for _, d := range dirs {
  147. stm.Put(d, "")
  148. }
  149. key := s.mkPath(nodePath)
  150. if dir {
  151. if stm.Rev(key) != 0 {
  152. // exists as non-dir
  153. ecode = v2error.EcodeNotDir
  154. return nil
  155. }
  156. key = key + "/"
  157. } else if stm.Rev(key+"/") != 0 {
  158. ecode = v2error.EcodeNotFile
  159. return nil
  160. }
  161. stm.Put(key, value, clientv3.WithPrevKV())
  162. stm.Put(s.mkActionKey(), v2store.Set)
  163. return nil
  164. }
  165. resp, err := s.newSTM(applyf)
  166. if err != nil {
  167. return nil, err
  168. }
  169. if ecode != 0 {
  170. return nil, v2error.NewError(ecode, nodePath, mkV2Rev(resp.Header.Revision))
  171. }
  172. createRev := resp.Header.Revision
  173. var pn *v2store.NodeExtern
  174. if pkv := prevKeyFromPuts(resp); pkv != nil {
  175. pn = s.mkV2Node(pkv)
  176. createRev = pkv.CreateRevision
  177. }
  178. vp := &value
  179. if dir {
  180. vp = nil
  181. }
  182. return &v2store.Event{
  183. Action: v2store.Set,
  184. Node: &v2store.NodeExtern{
  185. Key: nodePath,
  186. Value: vp,
  187. Dir: dir,
  188. ModifiedIndex: mkV2Rev(resp.Header.Revision),
  189. CreatedIndex: mkV2Rev(createRev),
  190. },
  191. PrevNode: pn,
  192. EtcdIndex: mkV2Rev(resp.Header.Revision),
  193. }, nil
  194. }
  195. func (s *v2v3Store) Update(nodePath, newValue string, expireOpts v2store.TTLOptionSet) (*v2store.Event, error) {
  196. if isRoot(nodePath) {
  197. return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
  198. }
  199. if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
  200. return nil, errUnsupported
  201. }
  202. key := s.mkPath(nodePath)
  203. ecode := 0
  204. applyf := func(stm concurrency.STM) error {
  205. if rev := stm.Rev(key + "/"); rev != 0 {
  206. ecode = v2error.EcodeNotFile
  207. return nil
  208. }
  209. if rev := stm.Rev(key); rev == 0 {
  210. ecode = v2error.EcodeKeyNotFound
  211. return nil
  212. }
  213. stm.Put(key, newValue, clientv3.WithPrevKV())
  214. stm.Put(s.mkActionKey(), v2store.Update)
  215. return nil
  216. }
  217. resp, err := s.newSTM(applyf)
  218. if err != nil {
  219. return nil, err
  220. }
  221. if ecode != 0 {
  222. return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
  223. }
  224. pkv := prevKeyFromPuts(resp)
  225. return &v2store.Event{
  226. Action: v2store.Update,
  227. Node: &v2store.NodeExtern{
  228. Key: nodePath,
  229. Value: &newValue,
  230. ModifiedIndex: mkV2Rev(resp.Header.Revision),
  231. CreatedIndex: mkV2Rev(pkv.CreateRevision),
  232. },
  233. PrevNode: s.mkV2Node(pkv),
  234. EtcdIndex: mkV2Rev(resp.Header.Revision),
  235. }, nil
  236. }
  237. func (s *v2v3Store) Create(
  238. nodePath string,
  239. dir bool,
  240. value string,
  241. unique bool,
  242. expireOpts v2store.TTLOptionSet,
  243. ) (*v2store.Event, error) {
  244. if isRoot(nodePath) {
  245. return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
  246. }
  247. if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
  248. return nil, errUnsupported
  249. }
  250. ecode := 0
  251. applyf := func(stm concurrency.STM) error {
  252. ecode = 0
  253. key := s.mkPath(nodePath)
  254. if unique {
  255. // append unique item under the node path
  256. for {
  257. key = nodePath + "/" + fmt.Sprintf("%020s", time.Now())
  258. key = path.Clean(path.Join("/", key))
  259. key = s.mkPath(key)
  260. if stm.Rev(key) == 0 {
  261. break
  262. }
  263. }
  264. }
  265. if stm.Rev(key) > 0 || stm.Rev(key+"/") > 0 {
  266. ecode = v2error.EcodeNodeExist
  267. return nil
  268. }
  269. // build path if any directories in path do not exist
  270. dirs := []string{}
  271. for p := path.Dir(nodePath); !isRoot(p); p = path.Dir(p) {
  272. pp := s.mkPath(p)
  273. if stm.Rev(pp) > 0 {
  274. ecode = v2error.EcodeNotDir
  275. return nil
  276. }
  277. if stm.Rev(pp+"/") == 0 {
  278. dirs = append(dirs, pp+"/")
  279. }
  280. }
  281. for _, d := range dirs {
  282. stm.Put(d, "")
  283. }
  284. if dir {
  285. // directories marked with extra slash in key name
  286. key += "/"
  287. }
  288. stm.Put(key, value)
  289. stm.Put(s.mkActionKey(), v2store.Create)
  290. return nil
  291. }
  292. resp, err := s.newSTM(applyf)
  293. if err != nil {
  294. return nil, err
  295. }
  296. if ecode != 0 {
  297. return nil, v2error.NewError(ecode, nodePath, mkV2Rev(resp.Header.Revision))
  298. }
  299. var v *string
  300. if !dir {
  301. v = &value
  302. }
  303. return &v2store.Event{
  304. Action: v2store.Create,
  305. Node: &v2store.NodeExtern{
  306. Key: nodePath,
  307. Value: v,
  308. Dir: dir,
  309. ModifiedIndex: mkV2Rev(resp.Header.Revision),
  310. CreatedIndex: mkV2Rev(resp.Header.Revision),
  311. },
  312. EtcdIndex: mkV2Rev(resp.Header.Revision),
  313. }, nil
  314. }
  315. func (s *v2v3Store) CompareAndSwap(
  316. nodePath string,
  317. prevValue string,
  318. prevIndex uint64,
  319. value string,
  320. expireOpts v2store.TTLOptionSet,
  321. ) (*v2store.Event, error) {
  322. if isRoot(nodePath) {
  323. return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
  324. }
  325. if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
  326. return nil, errUnsupported
  327. }
  328. key := s.mkPath(nodePath)
  329. resp, err := s.c.Txn(s.ctx).If(
  330. s.mkCompare(nodePath, prevValue, prevIndex)...,
  331. ).Then(
  332. clientv3.OpPut(key, value, clientv3.WithPrevKV()),
  333. clientv3.OpPut(s.mkActionKey(), v2store.CompareAndSwap),
  334. ).Else(
  335. clientv3.OpGet(key),
  336. clientv3.OpGet(key+"/"),
  337. ).Commit()
  338. if err != nil {
  339. return nil, err
  340. }
  341. if !resp.Succeeded {
  342. return nil, compareFail(nodePath, prevValue, prevIndex, resp)
  343. }
  344. pkv := resp.Responses[0].GetResponsePut().PrevKv
  345. return &v2store.Event{
  346. Action: v2store.CompareAndSwap,
  347. Node: &v2store.NodeExtern{
  348. Key: nodePath,
  349. Value: &value,
  350. CreatedIndex: mkV2Rev(pkv.CreateRevision),
  351. ModifiedIndex: mkV2Rev(resp.Header.Revision),
  352. },
  353. PrevNode: s.mkV2Node(pkv),
  354. EtcdIndex: mkV2Rev(resp.Header.Revision),
  355. }, nil
  356. }
  357. func (s *v2v3Store) Delete(nodePath string, dir, recursive bool) (*v2store.Event, error) {
  358. if isRoot(nodePath) {
  359. return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
  360. }
  361. if !dir && !recursive {
  362. return s.deleteNode(nodePath)
  363. }
  364. if !recursive {
  365. return s.deleteEmptyDir(nodePath)
  366. }
  367. dels := make([]clientv3.Op, maxPathDepth+1)
  368. dels[0] = clientv3.OpDelete(s.mkPath(nodePath)+"/", clientv3.WithPrevKV())
  369. for i := 1; i < maxPathDepth; i++ {
  370. dels[i] = clientv3.OpDelete(s.mkPathDepth(nodePath, i), clientv3.WithPrefix())
  371. }
  372. dels[maxPathDepth] = clientv3.OpPut(s.mkActionKey(), v2store.Delete)
  373. resp, err := s.c.Txn(s.ctx).If(
  374. clientv3.Compare(clientv3.Version(s.mkPath(nodePath)+"/"), ">", 0),
  375. clientv3.Compare(clientv3.Version(s.mkPathDepth(nodePath, maxPathDepth)+"/"), "=", 0),
  376. ).Then(
  377. dels...,
  378. ).Commit()
  379. if err != nil {
  380. return nil, err
  381. }
  382. if !resp.Succeeded {
  383. return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, mkV2Rev(resp.Header.Revision))
  384. }
  385. dresp := resp.Responses[0].GetResponseDeleteRange()
  386. return &v2store.Event{
  387. Action: v2store.Delete,
  388. PrevNode: s.mkV2Node(dresp.PrevKvs[0]),
  389. EtcdIndex: mkV2Rev(resp.Header.Revision),
  390. }, nil
  391. }
  392. func (s *v2v3Store) deleteEmptyDir(nodePath string) (*v2store.Event, error) {
  393. resp, err := s.c.Txn(s.ctx).If(
  394. clientv3.Compare(clientv3.Version(s.mkPathDepth(nodePath, 1)), "=", 0).WithPrefix(),
  395. ).Then(
  396. clientv3.OpDelete(s.mkPath(nodePath)+"/", clientv3.WithPrevKV()),
  397. clientv3.OpPut(s.mkActionKey(), v2store.Delete),
  398. ).Commit()
  399. if err != nil {
  400. return nil, err
  401. }
  402. if !resp.Succeeded {
  403. return nil, v2error.NewError(v2error.EcodeDirNotEmpty, nodePath, mkV2Rev(resp.Header.Revision))
  404. }
  405. dresp := resp.Responses[0].GetResponseDeleteRange()
  406. if len(dresp.PrevKvs) == 0 {
  407. return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, mkV2Rev(resp.Header.Revision))
  408. }
  409. return &v2store.Event{
  410. Action: v2store.Delete,
  411. PrevNode: s.mkV2Node(dresp.PrevKvs[0]),
  412. EtcdIndex: mkV2Rev(resp.Header.Revision),
  413. }, nil
  414. }
  415. func (s *v2v3Store) deleteNode(nodePath string) (*v2store.Event, error) {
  416. resp, err := s.c.Txn(s.ctx).If(
  417. clientv3.Compare(clientv3.Version(s.mkPath(nodePath)+"/"), "=", 0),
  418. ).Then(
  419. clientv3.OpDelete(s.mkPath(nodePath), clientv3.WithPrevKV()),
  420. clientv3.OpPut(s.mkActionKey(), v2store.Delete),
  421. ).Commit()
  422. if err != nil {
  423. return nil, err
  424. }
  425. if !resp.Succeeded {
  426. return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
  427. }
  428. pkvs := resp.Responses[0].GetResponseDeleteRange().PrevKvs
  429. if len(pkvs) == 0 {
  430. return nil, v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
  431. }
  432. pkv := pkvs[0]
  433. return &v2store.Event{
  434. Action: v2store.Delete,
  435. Node: &v2store.NodeExtern{
  436. Key: nodePath,
  437. CreatedIndex: mkV2Rev(pkv.CreateRevision),
  438. ModifiedIndex: mkV2Rev(resp.Header.Revision),
  439. },
  440. PrevNode: s.mkV2Node(pkv),
  441. EtcdIndex: mkV2Rev(resp.Header.Revision),
  442. }, nil
  443. }
  444. func (s *v2v3Store) CompareAndDelete(nodePath, prevValue string, prevIndex uint64) (*v2store.Event, error) {
  445. if isRoot(nodePath) {
  446. return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
  447. }
  448. key := s.mkPath(nodePath)
  449. resp, err := s.c.Txn(s.ctx).If(
  450. s.mkCompare(nodePath, prevValue, prevIndex)...,
  451. ).Then(
  452. clientv3.OpDelete(key, clientv3.WithPrevKV()),
  453. clientv3.OpPut(s.mkActionKey(), v2store.CompareAndDelete),
  454. ).Else(
  455. clientv3.OpGet(key),
  456. clientv3.OpGet(key+"/"),
  457. ).Commit()
  458. if err != nil {
  459. return nil, err
  460. }
  461. if !resp.Succeeded {
  462. return nil, compareFail(nodePath, prevValue, prevIndex, resp)
  463. }
  464. // len(pkvs) > 1 since txn only succeeds when key exists
  465. pkv := resp.Responses[0].GetResponseDeleteRange().PrevKvs[0]
  466. return &v2store.Event{
  467. Action: v2store.CompareAndDelete,
  468. Node: &v2store.NodeExtern{
  469. Key: nodePath,
  470. CreatedIndex: mkV2Rev(pkv.CreateRevision),
  471. ModifiedIndex: mkV2Rev(resp.Header.Revision),
  472. },
  473. PrevNode: s.mkV2Node(pkv),
  474. EtcdIndex: mkV2Rev(resp.Header.Revision),
  475. }, nil
  476. }
  477. func compareFail(nodePath, prevValue string, prevIndex uint64, resp *clientv3.TxnResponse) error {
  478. if dkvs := resp.Responses[1].GetResponseRange().Kvs; len(dkvs) > 0 {
  479. return v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
  480. }
  481. kvs := resp.Responses[0].GetResponseRange().Kvs
  482. if len(kvs) == 0 {
  483. return v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
  484. }
  485. kv := kvs[0]
  486. indexMatch := prevIndex == 0 || kv.ModRevision == int64(prevIndex)
  487. valueMatch := prevValue == "" || string(kv.Value) == prevValue
  488. var cause string
  489. switch {
  490. case indexMatch && !valueMatch:
  491. cause = fmt.Sprintf("[%v != %v]", prevValue, string(kv.Value))
  492. case valueMatch && !indexMatch:
  493. cause = fmt.Sprintf("[%v != %v]", prevIndex, kv.ModRevision)
  494. default:
  495. cause = fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, string(kv.Value), prevIndex, kv.ModRevision)
  496. }
  497. return v2error.NewError(v2error.EcodeTestFailed, cause, mkV2Rev(resp.Header.Revision))
  498. }
  499. func (s *v2v3Store) mkCompare(nodePath, prevValue string, prevIndex uint64) []clientv3.Cmp {
  500. key := s.mkPath(nodePath)
  501. cmps := []clientv3.Cmp{clientv3.Compare(clientv3.Version(key), ">", 0)}
  502. if prevIndex != 0 {
  503. cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(key), "=", mkV3Rev(prevIndex)))
  504. }
  505. if prevValue != "" {
  506. cmps = append(cmps, clientv3.Compare(clientv3.Value(key), "=", prevValue))
  507. }
  508. return cmps
  509. }
  510. func (s *v2v3Store) JsonStats() []byte { panic("STUB") }
  511. func (s *v2v3Store) DeleteExpiredKeys(cutoff time.Time) { panic("STUB") }
  512. func (s *v2v3Store) Version() int { return 2 }
  513. // TODO: move this out of the Store interface?
  514. func (s *v2v3Store) Save() ([]byte, error) { panic("STUB") }
  515. func (s *v2v3Store) Recovery(state []byte) error { panic("STUB") }
  516. func (s *v2v3Store) Clone() v2store.Store { panic("STUB") }
  517. func (s *v2v3Store) SaveNoCopy() ([]byte, error) { panic("STUB") }
  518. func (s *v2v3Store) HasTTLKeys() bool { panic("STUB") }
  519. func (s *v2v3Store) mkPath(nodePath string) string { return s.mkPathDepth(nodePath, 0) }
  520. func (s *v2v3Store) mkNodePath(p string) string {
  521. return path.Clean(p[len(s.pfx)+len("/k/000/"):])
  522. }
  523. // mkPathDepth makes a path to a key that encodes its directory depth
  524. // for fast directory listing. If a depth is provided, it is added
  525. // to the computed depth.
  526. func (s *v2v3Store) mkPathDepth(nodePath string, depth int) string {
  527. normalForm := path.Clean(path.Join("/", nodePath))
  528. n := strings.Count(normalForm, "/") + depth
  529. return fmt.Sprintf("%s/%03d/k/%s", s.pfx, n, normalForm)
  530. }
  531. func (s *v2v3Store) mkActionKey() string { return s.pfx + "/act" }
  532. func isRoot(s string) bool { return len(s) == 0 || s == "/" || s == "/0" || s == "/1" }
  533. func mkV2Rev(v3Rev int64) uint64 {
  534. if v3Rev == 0 {
  535. return 0
  536. }
  537. return uint64(v3Rev - 1)
  538. }
  539. func mkV3Rev(v2Rev uint64) int64 {
  540. if v2Rev == 0 {
  541. return 0
  542. }
  543. return int64(v2Rev + 1)
  544. }
  545. // mkV2Node creates a V2 NodeExtern from a V3 KeyValue
  546. func (s *v2v3Store) mkV2Node(kv *mvccpb.KeyValue) *v2store.NodeExtern {
  547. if kv == nil {
  548. return nil
  549. }
  550. n := &v2store.NodeExtern{
  551. Key: s.mkNodePath(string(kv.Key)),
  552. Dir: kv.Key[len(kv.Key)-1] == '/',
  553. CreatedIndex: mkV2Rev(kv.CreateRevision),
  554. ModifiedIndex: mkV2Rev(kv.ModRevision),
  555. }
  556. if !n.Dir {
  557. v := string(kv.Value)
  558. n.Value = &v
  559. }
  560. return n
  561. }
  562. // prevKeyFromPuts gets the prev key that is being put; ignores
  563. // the put action response.
  564. func prevKeyFromPuts(resp *clientv3.TxnResponse) *mvccpb.KeyValue {
  565. for _, r := range resp.Responses {
  566. pkv := r.GetResponsePut().PrevKv
  567. if pkv != nil && pkv.CreateRevision > 0 {
  568. return pkv
  569. }
  570. }
  571. return nil
  572. }
  573. func (s *v2v3Store) newSTM(applyf func(concurrency.STM) error) (*clientv3.TxnResponse, error) {
  574. return concurrency.NewSTM(s.c, applyf, concurrency.WithIsolation(concurrency.Serializable))
  575. }