stm.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package concurrency
  15. import (
  16. v3 "github.com/coreos/etcd/clientv3"
  17. "golang.org/x/net/context"
  18. )
  19. // STM is an interface for software transactional memory.
  20. type STM interface {
  21. // Get returns the value for a key and inserts the key in the txn's read set.
  22. // If Get fails, it aborts the transaction with an error, never returning.
  23. Get(key ...string) string
  24. // Put adds a value for a key to the write set.
  25. Put(key, val string, opts ...v3.OpOption)
  26. // Rev returns the revision of a key in the read set.
  27. Rev(key string) int64
  28. // Del deletes a key.
  29. Del(key string)
  30. // commit attempts to apply the txn's changes to the server.
  31. commit() *v3.TxnResponse
  32. reset()
  33. }
  34. // Isolation is an enumeration of transactional isolation levels which
  35. // describes how transactions should interfere and conflict.
  36. type Isolation int
  37. const (
  38. // Snapshot is serializable but also checks writes for conflicts.
  39. Snapshot Isolation = iota
  40. // Serializable reads within the same transactiona attempt return data
  41. // from the at the revision of the first read.
  42. Serializable
  43. // RepeatableReads reads within the same transaction attempt always
  44. // return the same data.
  45. RepeatableReads
  46. // ReadCommitted reads keys from any committed revision.
  47. ReadCommitted
  48. )
  49. // stmError safely passes STM errors through panic to the STM error channel.
  50. type stmError struct{ err error }
  51. type stmOptions struct {
  52. iso Isolation
  53. ctx context.Context
  54. }
  55. type stmOption func(*stmOptions)
  56. // WithIsolation specifies the transaction isolation level.
  57. func WithIsolation(lvl Isolation) stmOption {
  58. return func(so *stmOptions) { so.iso = lvl }
  59. }
  60. // WithAbortContext specifies the context for permanently aborting the transaction.
  61. func WithAbortContext(ctx context.Context) stmOption {
  62. return func(so *stmOptions) { so.ctx = ctx }
  63. }
  64. // NewSTM initiates a new STM instance.
  65. func NewSTM(c *v3.Client, apply func(STM) error, so ...stmOption) (*v3.TxnResponse, error) {
  66. opts := &stmOptions{ctx: c.Ctx()}
  67. for _, f := range so {
  68. f(opts)
  69. }
  70. var s STM
  71. switch opts.iso {
  72. case Serializable:
  73. s = &stmSerializable{
  74. stm: stm{client: c, ctx: opts.ctx},
  75. prefetch: make(map[string]*v3.GetResponse),
  76. }
  77. case RepeatableReads:
  78. s = &stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
  79. case ReadCommitted:
  80. ss := stm{client: c, ctx: opts.ctx, getOpts: []v3.OpOption{v3.WithSerializable()}}
  81. s = &stmReadCommitted{ss}
  82. default:
  83. panic("unsupported")
  84. }
  85. return runSTM(s, apply)
  86. }
  87. type stmResponse struct {
  88. resp *v3.TxnResponse
  89. err error
  90. }
  91. func runSTM(s STM, apply func(STM) error) (*v3.TxnResponse, error) {
  92. outc := make(chan stmResponse, 1)
  93. go func() {
  94. defer func() {
  95. if r := recover(); r != nil {
  96. e, ok := r.(stmError)
  97. if !ok {
  98. // client apply panicked
  99. panic(r)
  100. }
  101. outc <- stmResponse{nil, e.err}
  102. }
  103. }()
  104. var out stmResponse
  105. for {
  106. s.reset()
  107. if out.err = apply(s); out.err != nil {
  108. break
  109. }
  110. if out.resp = s.commit(); out.resp != nil {
  111. break
  112. }
  113. }
  114. outc <- out
  115. }()
  116. r := <-outc
  117. return r.resp, r.err
  118. }
  119. // stm implements repeatable-read software transactional memory over etcd
  120. type stm struct {
  121. client *v3.Client
  122. ctx context.Context
  123. // rset holds read key values and revisions
  124. rset map[string]*v3.GetResponse
  125. // wset holds overwritten keys and their values
  126. wset writeSet
  127. // getOpts are the opts used for gets
  128. getOpts []v3.OpOption
  129. }
  130. type stmPut struct {
  131. val string
  132. op v3.Op
  133. }
  134. type writeSet map[string]stmPut
  135. func (ws writeSet) get(keys ...string) *stmPut {
  136. for _, key := range keys {
  137. if wv, ok := ws[key]; ok {
  138. return &wv
  139. }
  140. }
  141. return nil
  142. }
  143. // puts is the list of ops for all pending writes
  144. func (ws writeSet) puts() []v3.Op {
  145. puts := make([]v3.Op, 0, len(ws))
  146. for _, v := range ws {
  147. puts = append(puts, v.op)
  148. }
  149. return puts
  150. }
  151. func (s *stm) Get(keys ...string) string {
  152. if wv := s.wset.get(keys...); wv != nil {
  153. return wv.val
  154. }
  155. return respToValue(s.fetch(keys...))
  156. }
  157. func (s *stm) Put(key, val string, opts ...v3.OpOption) {
  158. s.wset[key] = stmPut{val, v3.OpPut(key, val, opts...)}
  159. }
  160. func (s *stm) Del(key string) { s.wset[key] = stmPut{"", v3.OpDelete(key)} }
  161. func (s *stm) Rev(key string) int64 {
  162. if resp := s.fetch(key); resp != nil && len(resp.Kvs) != 0 {
  163. return resp.Kvs[0].ModRevision
  164. }
  165. return 0
  166. }
  167. func (s *stm) commit() *v3.TxnResponse {
  168. txnresp, err := s.client.Txn(s.ctx).If(s.cmps()...).Then(s.wset.puts()...).Commit()
  169. if err != nil {
  170. panic(stmError{err})
  171. }
  172. if txnresp.Succeeded {
  173. return txnresp
  174. }
  175. return nil
  176. }
  177. // cmps guards the txn from updates to read set
  178. func (s *stm) cmps() []v3.Cmp {
  179. cmps := make([]v3.Cmp, 0, len(s.rset))
  180. for k, rk := range s.rset {
  181. cmps = append(cmps, isKeyCurrent(k, rk))
  182. }
  183. return cmps
  184. }
  185. func (s *stm) fetch(keys ...string) *v3.GetResponse {
  186. if len(keys) == 0 {
  187. return nil
  188. }
  189. ops := make([]v3.Op, len(keys))
  190. for i, key := range keys {
  191. if resp, ok := s.rset[key]; ok {
  192. return resp
  193. }
  194. ops[i] = v3.OpGet(key, s.getOpts...)
  195. }
  196. txnresp, err := s.client.Txn(s.ctx).Then(ops...).Commit()
  197. if err != nil {
  198. panic(stmError{err})
  199. }
  200. addTxnResp(s.rset, keys, txnresp)
  201. return (*v3.GetResponse)(txnresp.Responses[0].GetResponseRange())
  202. }
  203. func (s *stm) reset() {
  204. s.rset = make(map[string]*v3.GetResponse)
  205. s.wset = make(map[string]stmPut)
  206. }
  207. type stmSerializable struct {
  208. stm
  209. prefetch map[string]*v3.GetResponse
  210. }
  211. func (s *stmSerializable) Get(keys ...string) string {
  212. if wv := s.wset.get(keys...); wv != nil {
  213. return wv.val
  214. }
  215. firstRead := len(s.rset) == 0
  216. for _, key := range keys {
  217. if resp, ok := s.prefetch[key]; ok {
  218. delete(s.prefetch, key)
  219. s.rset[key] = resp
  220. }
  221. }
  222. resp := s.stm.fetch(keys...)
  223. if firstRead {
  224. // txn's base revision is defined by the first read
  225. s.getOpts = []v3.OpOption{
  226. v3.WithRev(resp.Header.Revision),
  227. v3.WithSerializable(),
  228. }
  229. }
  230. return respToValue(resp)
  231. }
  232. func (s *stmSerializable) Rev(key string) int64 {
  233. s.Get(key)
  234. return s.stm.Rev(key)
  235. }
  236. func (s *stmSerializable) gets() ([]string, []v3.Op) {
  237. keys := make([]string, 0, len(s.rset))
  238. ops := make([]v3.Op, 0, len(s.rset))
  239. for k := range s.rset {
  240. keys = append(keys, k)
  241. ops = append(ops, v3.OpGet(k))
  242. }
  243. return keys, ops
  244. }
  245. func (s *stmSerializable) commit() *v3.TxnResponse {
  246. keys, getops := s.gets()
  247. txn := s.client.Txn(s.ctx).If(s.cmps()...).Then(s.wset.puts()...)
  248. // use Else to prefetch keys in case of conflict to save a round trip
  249. txnresp, err := txn.Else(getops...).Commit()
  250. if err != nil {
  251. panic(stmError{err})
  252. }
  253. if txnresp.Succeeded {
  254. return txnresp
  255. }
  256. // load prefetch with Else data
  257. addTxnResp(s.rset, keys, txnresp)
  258. s.prefetch = s.rset
  259. s.getOpts = nil
  260. return nil
  261. }
  262. func addTxnResp(rset map[string]*v3.GetResponse, keys []string, txnresp *v3.TxnResponse) {
  263. for i, resp := range txnresp.Responses {
  264. rset[keys[i]] = (*v3.GetResponse)(resp.GetResponseRange())
  265. }
  266. }
  267. type stmReadCommitted struct{ stm }
  268. // commit always goes through when read committed
  269. func (s *stmReadCommitted) commit() *v3.TxnResponse {
  270. s.rset = nil
  271. return s.stm.commit()
  272. }
  273. func isKeyCurrent(k string, r *v3.GetResponse) v3.Cmp {
  274. if len(r.Kvs) != 0 {
  275. return v3.Compare(v3.ModRevision(k), "=", r.Kvs[0].ModRevision)
  276. }
  277. return v3.Compare(v3.ModRevision(k), "=", 0)
  278. }
  279. func respToValue(resp *v3.GetResponse) string {
  280. if resp == nil || len(resp.Kvs) == 0 {
  281. return ""
  282. }
  283. return string(resp.Kvs[0].Value)
  284. }