123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160 |
- // Copyright 2016 CoreOS, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package clientv3
- import (
- "sync"
- "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
- pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
- )
- //
- // Tx.If(
- // Compare(Value(k1), ">", v1),
- // Compare(Version(k1), "=", 2)
- // ).Then(
- // OpPut(k2,v2), OpPut(k3,v3)
- // ).Else(
- // OpPut(k4,v4), OpPut(k5,v5)
- // ).Commit()
- type Txn interface {
- // If takes a list of comparison. If all comparisons passed in succeed,
- // the operations passed into Then() will be executed. Or the operations
- // passed into Else() will be executed.
- If(cs ...Cmp) Txn
- // Then takes a list of operations. The Ops list will be executed, if the
- // comparisons passed in If() succeed.
- Then(ops ...Op) Txn
- // Else takes a list of operations. The Ops list will be executed, if the
- // comparisons passed in If() fail.
- Else(ops ...Op) Txn
- // Commit tries to commit the transaction.
- Commit() (*TxnResponse, error)
- // TODO: add a Do for shortcut the txn without any condition?
- }
- type txn struct {
- kv *kv
- ctx context.Context
- mu sync.Mutex
- cif bool
- cthen bool
- celse bool
- isWrite bool
- cmps []*pb.Compare
- sus []*pb.RequestUnion
- fas []*pb.RequestUnion
- }
- func (txn *txn) If(cs ...Cmp) Txn {
- txn.mu.Lock()
- defer txn.mu.Unlock()
- if txn.cif {
- panic("cannot call If twice!")
- }
- if txn.cthen {
- panic("cannot call If after Then!")
- }
- if txn.celse {
- panic("cannot call If after Else!")
- }
- txn.cif = true
- for i := range cs {
- txn.cmps = append(txn.cmps, (*pb.Compare)(&cs[i]))
- }
- return txn
- }
- func (txn *txn) Then(ops ...Op) Txn {
- txn.mu.Lock()
- defer txn.mu.Unlock()
- if txn.cthen {
- panic("cannot call Then twice!")
- }
- if txn.celse {
- panic("cannot call Then after Else!")
- }
- txn.cthen = true
- for _, op := range ops {
- txn.isWrite = txn.isWrite || op.isWrite()
- txn.sus = append(txn.sus, op.toRequestUnion())
- }
- return txn
- }
- func (txn *txn) Else(ops ...Op) Txn {
- txn.mu.Lock()
- defer txn.mu.Unlock()
- if txn.celse {
- panic("cannot call Else twice!")
- }
- txn.celse = true
- for _, op := range ops {
- txn.isWrite = txn.isWrite || op.isWrite()
- txn.fas = append(txn.fas, op.toRequestUnion())
- }
- return txn
- }
- func (txn *txn) Commit() (*TxnResponse, error) {
- txn.mu.Lock()
- defer txn.mu.Unlock()
- kv := txn.kv
- for {
- r := &pb.TxnRequest{Compare: txn.cmps, Success: txn.sus, Failure: txn.fas}
- resp, err := kv.getRemote().Txn(txn.ctx, r)
- if err == nil {
- return (*TxnResponse)(resp), nil
- }
- if isHalted(txn.ctx, err) {
- return nil, err
- }
- if txn.isWrite {
- go kv.switchRemote(err)
- return nil, err
- }
- if nerr := kv.switchRemote(err); nerr != nil {
- return nil, nerr
- }
- }
- }
|