stm.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package concurrency
  15. import (
  16. v3 "github.com/coreos/etcd/clientv3"
  17. "golang.org/x/net/context"
  18. )
  19. // STM is an interface for software transactional memory.
  20. type STM interface {
  21. // Get returns the value for a key and inserts the key in the txn's read set.
  22. // If Get fails, it aborts the transaction with an error, never returning.
  23. Get(key string) string
  24. // Put adds a value for a key to the write set.
  25. Put(key, val string, opts ...v3.OpOption)
  26. // Rev returns the revision of a key in the read set.
  27. Rev(key string) int64
  28. // Del deletes a key.
  29. Del(key string)
  30. // commit attempts to apply the txn's changes to the server.
  31. commit() *v3.TxnResponse
  32. reset()
  33. }
  34. // Isolation is an enumeration of transactional isolation levels which
  35. // describes how transactions should interfere and conflict.
  36. type Isolation int
  37. const (
  38. // Snapshot is serializable but also checks writes for conflicts.
  39. Snapshot Isolation = iota
  40. // Serializable reads within the same transactiona attempt return data
  41. // from the at the revision of the first read.
  42. Serializable
  43. // RepeatableReads reads within the same transaction attempt always
  44. // return the same data.
  45. RepeatableReads
  46. // ReadCommitted reads keys from any committed revision.
  47. ReadCommitted
  48. )
  49. // stmError safely passes STM errors through panic to the STM error channel.
  50. type stmError struct{ err error }
  51. type stmOptions struct {
  52. iso Isolation
  53. ctx context.Context
  54. }
  55. type stmOption func(*stmOptions)
  56. // WithIsolation specifies the transaction isolation level.
  57. func WithIsolation(lvl Isolation) stmOption {
  58. return func(so *stmOptions) { so.iso = lvl }
  59. }
  60. // WithAbortContext specifies the context for permanently aborting the transaction.
  61. func WithAbortContext(ctx context.Context) stmOption {
  62. return func(so *stmOptions) { so.ctx = ctx }
  63. }
  64. // NewSTM initiates a new STM instance.
  65. func NewSTM(c *v3.Client, apply func(STM) error, so ...stmOption) (*v3.TxnResponse, error) {
  66. opts := &stmOptions{ctx: c.Ctx()}
  67. for _, f := range so {
  68. f(opts)
  69. }
  70. var s STM
  71. switch opts.iso {
  72. case Serializable:
  73. s = &stmSerializable{
  74. stm: stm{client: c, ctx: opts.ctx},
  75. prefetch: make(map[string]*v3.GetResponse),
  76. }
  77. case RepeatableReads:
  78. s = &stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
  79. case ReadCommitted:
  80. ss := stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
  81. s = &stmReadCommitted{ss}
  82. default:
  83. panic("unsupported")
  84. }
  85. return runSTM(s, apply)
  86. }
  87. type stmResponse struct {
  88. resp *v3.TxnResponse
  89. err error
  90. }
  91. func runSTM(s STM, apply func(STM) error) (*v3.TxnResponse, error) {
  92. outc := make(chan stmResponse, 1)
  93. go func() {
  94. defer func() {
  95. if r := recover(); r != nil {
  96. e, ok := r.(stmError)
  97. if !ok {
  98. // client apply panicked
  99. panic(r)
  100. }
  101. outc <- stmResponse{nil, e.err}
  102. }
  103. }()
  104. var out stmResponse
  105. for {
  106. s.reset()
  107. if out.err = apply(s); out.err != nil {
  108. break
  109. }
  110. if out.resp = s.commit(); out.resp != nil {
  111. break
  112. }
  113. }
  114. outc <- out
  115. }()
  116. r := <-outc
  117. return r.resp, r.err
  118. }
  119. // stm implements repeatable-read software transactional memory over etcd
  120. type stm struct {
  121. client *v3.Client
  122. ctx context.Context
  123. // rset holds read key values and revisions
  124. rset map[string]*v3.GetResponse
  125. // wset holds overwritten keys and their values
  126. wset map[string]stmPut
  127. // getOpts are the opts used for gets
  128. getOpts []v3.OpOption
  129. }
  130. type stmPut struct {
  131. val string
  132. op v3.Op
  133. }
  134. func (s *stm) Get(key string) string {
  135. if wv, ok := s.wset[key]; ok {
  136. return wv.val
  137. }
  138. return respToValue(s.fetch(key))
  139. }
  140. func (s *stm) Put(key, val string, opts ...v3.OpOption) {
  141. s.wset[key] = stmPut{val, v3.OpPut(key, val, opts...)}
  142. }
  143. func (s *stm) Del(key string) { s.wset[key] = stmPut{"", v3.OpDelete(key)} }
  144. func (s *stm) Rev(key string) int64 {
  145. if resp := s.fetch(key); resp != nil && len(resp.Kvs) != 0 {
  146. return resp.Kvs[0].ModRevision
  147. }
  148. return 0
  149. }
  150. func (s *stm) commit() *v3.TxnResponse {
  151. txnresp, err := s.client.Txn(s.ctx).If(s.cmps()...).Then(s.puts()...).Commit()
  152. if err != nil {
  153. panic(stmError{err})
  154. }
  155. if txnresp.Succeeded {
  156. return txnresp
  157. }
  158. return nil
  159. }
  160. // cmps guards the txn from updates to read set
  161. func (s *stm) cmps() []v3.Cmp {
  162. cmps := make([]v3.Cmp, 0, len(s.rset))
  163. for k, rk := range s.rset {
  164. cmps = append(cmps, isKeyCurrent(k, rk))
  165. }
  166. return cmps
  167. }
  168. func (s *stm) fetch(key string) *v3.GetResponse {
  169. if resp, ok := s.rset[key]; ok {
  170. return resp
  171. }
  172. resp, err := s.client.Get(s.ctx, key, s.getOpts...)
  173. if err != nil {
  174. panic(stmError{err})
  175. }
  176. s.rset[key] = resp
  177. return resp
  178. }
  179. // puts is the list of ops for all pending writes
  180. func (s *stm) puts() []v3.Op {
  181. puts := make([]v3.Op, 0, len(s.wset))
  182. for _, v := range s.wset {
  183. puts = append(puts, v.op)
  184. }
  185. return puts
  186. }
  187. func (s *stm) reset() {
  188. s.rset = make(map[string]*v3.GetResponse)
  189. s.wset = make(map[string]stmPut)
  190. }
  191. type stmSerializable struct {
  192. stm
  193. prefetch map[string]*v3.GetResponse
  194. }
  195. func (s *stmSerializable) Get(key string) string {
  196. if wv, ok := s.wset[key]; ok {
  197. return wv.val
  198. }
  199. firstRead := len(s.rset) == 0
  200. if resp, ok := s.prefetch[key]; ok {
  201. delete(s.prefetch, key)
  202. s.rset[key] = resp
  203. }
  204. resp := s.stm.fetch(key)
  205. if firstRead {
  206. // txn's base revision is defined by the first read
  207. s.getOpts = []v3.OpOption{
  208. v3.WithRev(resp.Header.Revision),
  209. v3.WithSerializable(),
  210. }
  211. }
  212. return respToValue(resp)
  213. }
  214. func (s *stmSerializable) Rev(key string) int64 {
  215. s.Get(key)
  216. return s.stm.Rev(key)
  217. }
  218. func (s *stmSerializable) gets() ([]string, []v3.Op) {
  219. keys := make([]string, 0, len(s.rset))
  220. ops := make([]v3.Op, 0, len(s.rset))
  221. for k := range s.rset {
  222. keys = append(keys, k)
  223. ops = append(ops, v3.OpGet(k))
  224. }
  225. return keys, ops
  226. }
  227. func (s *stmSerializable) commit() *v3.TxnResponse {
  228. keys, getops := s.gets()
  229. txn := s.client.Txn(s.ctx).If(s.cmps()...).Then(s.puts()...)
  230. // use Else to prefetch keys in case of conflict to save a round trip
  231. txnresp, err := txn.Else(getops...).Commit()
  232. if err != nil {
  233. panic(stmError{err})
  234. }
  235. if txnresp.Succeeded {
  236. return txnresp
  237. }
  238. // load prefetch with Else data
  239. for i := range keys {
  240. resp := txnresp.Responses[i].GetResponseRange()
  241. s.rset[keys[i]] = (*v3.GetResponse)(resp)
  242. }
  243. s.prefetch = s.rset
  244. s.getOpts = nil
  245. return nil
  246. }
  247. type stmReadCommitted struct{ stm }
  248. // commit always goes through when read committed
  249. func (s *stmReadCommitted) commit() *v3.TxnResponse {
  250. s.rset = nil
  251. return s.stm.commit()
  252. }
  253. func isKeyCurrent(k string, r *v3.GetResponse) v3.Cmp {
  254. if len(r.Kvs) != 0 {
  255. return v3.Compare(v3.ModRevision(k), "=", r.Kvs[0].ModRevision)
  256. }
  257. return v3.Compare(v3.ModRevision(k), "=", 0)
  258. }
  259. func respToValue(resp *v3.GetResponse) string {
  260. if len(resp.Kvs) == 0 {
  261. return ""
  262. }
  263. return string(resp.Kvs[0].Value)
  264. }