tx.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package redis
  2. import (
  3. "context"
  4. "github.com/go-redis/redis/internal/pool"
  5. "github.com/go-redis/redis/internal/proto"
  6. )
  7. // TxFailedErr transaction redis failed.
  8. const TxFailedErr = proto.RedisError("redis: transaction failed")
  9. // Tx implements Redis transactions as described in
  10. // http://redis.io/topics/transactions. It's NOT safe for concurrent use
  11. // by multiple goroutines, because Exec resets list of watched keys.
  12. // If you don't need WATCH it is better to use Pipeline.
  13. type Tx struct {
  14. baseClient
  15. cmdable
  16. statefulCmdable
  17. hooks
  18. ctx context.Context
  19. }
  20. func (c *Client) newTx(ctx context.Context) *Tx {
  21. tx := Tx{
  22. baseClient: baseClient{
  23. opt: c.opt,
  24. connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true),
  25. },
  26. hooks: c.hooks.Clone(),
  27. ctx: ctx,
  28. }
  29. tx.init()
  30. return &tx
  31. }
  32. func (c *Tx) init() {
  33. c.cmdable = c.Process
  34. c.statefulCmdable = c.Process
  35. }
  36. func (c *Tx) Context() context.Context {
  37. return c.ctx
  38. }
  39. func (c *Tx) WithContext(ctx context.Context) *Tx {
  40. if ctx == nil {
  41. panic("nil context")
  42. }
  43. clone := *c
  44. clone.init()
  45. clone.hooks.Lock()
  46. clone.ctx = ctx
  47. return &clone
  48. }
  49. func (c *Tx) Process(cmd Cmder) error {
  50. return c.ProcessContext(c.ctx, cmd)
  51. }
  52. func (c *Tx) ProcessContext(ctx context.Context, cmd Cmder) error {
  53. return c.hooks.process(ctx, cmd, c.baseClient.process)
  54. }
  55. // Watch prepares a transaction and marks the keys to be watched
  56. // for conditional execution if there are any keys.
  57. //
  58. // The transaction is automatically closed when fn exits.
  59. func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
  60. return c.WatchContext(c.ctx, fn, keys...)
  61. }
  62. func (c *Client) WatchContext(ctx context.Context, fn func(*Tx) error, keys ...string) error {
  63. tx := c.newTx(ctx)
  64. if len(keys) > 0 {
  65. if err := tx.Watch(keys...).Err(); err != nil {
  66. _ = tx.Close()
  67. return err
  68. }
  69. }
  70. err := fn(tx)
  71. _ = tx.Close()
  72. return err
  73. }
  74. // Close closes the transaction, releasing any open resources.
  75. func (c *Tx) Close() error {
  76. _ = c.Unwatch().Err()
  77. return c.baseClient.Close()
  78. }
  79. // Watch marks the keys to be watched for conditional execution
  80. // of a transaction.
  81. func (c *Tx) Watch(keys ...string) *StatusCmd {
  82. args := make([]interface{}, 1+len(keys))
  83. args[0] = "watch"
  84. for i, key := range keys {
  85. args[1+i] = key
  86. }
  87. cmd := NewStatusCmd(args...)
  88. _ = c.Process(cmd)
  89. return cmd
  90. }
  91. // Unwatch flushes all the previously watched keys for a transaction.
  92. func (c *Tx) Unwatch(keys ...string) *StatusCmd {
  93. args := make([]interface{}, 1+len(keys))
  94. args[0] = "unwatch"
  95. for i, key := range keys {
  96. args[1+i] = key
  97. }
  98. cmd := NewStatusCmd(args...)
  99. _ = c.Process(cmd)
  100. return cmd
  101. }
  102. func (c *Tx) Pipeline() Pipeliner {
  103. pipe := Pipeline{
  104. ctx: c.ctx,
  105. exec: func(ctx context.Context, cmds []Cmder) error {
  106. return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline)
  107. },
  108. }
  109. pipe.init()
  110. return &pipe
  111. }
  112. func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  113. return c.Pipeline().Pipelined(fn)
  114. }
  115. // TxPipelined executes commands queued in the fn in a transaction.
  116. //
  117. // When using WATCH, EXEC will execute commands only if the watched keys
  118. // were not modified, allowing for a check-and-set mechanism.
  119. //
  120. // Exec always returns list of commands. If transaction fails
  121. // TxFailedErr is returned. Otherwise Exec returns an error of the first
  122. // failed command or nil.
  123. func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  124. return c.TxPipeline().Pipelined(fn)
  125. }
  126. // TxPipeline creates a new pipeline. Usually it is more convenient to use TxPipelined.
  127. func (c *Tx) TxPipeline() Pipeliner {
  128. pipe := Pipeline{
  129. ctx: c.ctx,
  130. exec: func(ctx context.Context, cmds []Cmder) error {
  131. return c.hooks.processPipeline(ctx, cmds, c.baseClient.processTxPipeline)
  132. },
  133. }
  134. pipe.init()
  135. return &pipe
  136. }