Browse Source

concurrency: variadic stm options

Makes txn isolation and the context variadic options.
Anthony Romano 9 years ago
parent
commit
59880a0ab8
1 changed files with 51 additions and 16 deletions
  1. 51 16
      clientv3/concurrency/stm.go

+ 51 - 16
clientv3/concurrency/stm.go

@@ -36,29 +36,64 @@ type STM interface {
 	reset()
 	reset()
 }
 }
 
 
+// Isolation is an enumeration of transactional isolation levels which
+// describes how transactions should interfere and conflict.
+type Isolation int
+
+const (
+	// Snapshot is serializable but also checks writes for conflicts.
+	Snapshot Isolation = iota
+	// Serializable reads within the same transactiona attempt return data
+	// from the at the revision of the first read.
+	Serializable
+	// RepeatableReads reads within the same transaction attempt always
+	// return the same data.
+	RepeatableReads
+	// ReadCommitted reads keys from any committed revision.
+	ReadCommitted
+)
+
 // stmError safely passes STM errors through panic to the STM error channel.
 // stmError safely passes STM errors through panic to the STM error channel.
 type stmError struct{ err error }
 type stmError struct{ err error }
 
 
-// NewSTMRepeatable initiates new repeatable read transaction; reads within
-// the same transaction attempt always return the same data.
-func NewSTMRepeatable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
-	s := &stm{client: c, ctx: ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
-	return runSTM(s, apply)
+type stmOptions struct {
+	iso Isolation
+	ctx       context.Context
 }
 }
 
 
-// NewSTMSerializable initiates a new serialized transaction; reads within the
-// same transactiona attempt return data from the revision of the first read.
-func NewSTMSerializable(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
-	s := &stmSerializable{
-		stm:      stm{client: c, ctx: ctx},
-		prefetch: make(map[string]*v3.GetResponse),
-	}
-	return runSTM(s, apply)
+type stmOption func(*stmOptions)
+
+// WithIsolation specifies the transaction isolation level.
+func WithIsolation(lvl Isolation) stmOption {
+	return func(so *stmOptions) { so.iso = lvl }
+}
+
+// WithAbortContext specifies the context for permanently aborting the transaction.
+func WithAbortContext(ctx context.Context) stmOption {
+	return func(so *stmOptions) { so.ctx = ctx }
 }
 }
 
 
-// NewSTMReadCommitted initiates a new read committed transaction.
-func NewSTMReadCommitted(ctx context.Context, c *v3.Client, apply func(STM) error) (*v3.TxnResponse, error) {
-	s := &stmReadCommitted{stm{client: c, ctx: ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}}
+// NewSTM initiates a new STM instance.
+func NewSTM(c *v3.Client, apply func(STM) error, so ...stmOption) (*v3.TxnResponse, error) {
+	opts := &stmOptions{ctx: c.Ctx()}
+	for _, f := range so {
+		f(opts)
+	}
+	var s STM
+	switch opts.iso {
+	case Serializable:
+		s = &stmSerializable{
+			stm:      stm{client: c, ctx: opts.ctx},
+			prefetch: make(map[string]*v3.GetResponse),
+		}
+	case RepeatableReads:
+		s = &stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
+	case ReadCommitted:
+		ss := stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
+		s = &stmReadCommitted{ss}
+	default:
+		panic("unsupported")
+	}
 	return runSTM(s, apply)
 	return runSTM(s, apply)
 }
 }