ring.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722
  1. package redis
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math/rand"
  7. "strconv"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/go-redis/redis/internal"
  12. "github.com/go-redis/redis/internal/consistenthash"
  13. "github.com/go-redis/redis/internal/hashtag"
  14. "github.com/go-redis/redis/internal/pool"
  15. )
  16. // Hash is type of hash function used in consistent hash.
  17. type Hash consistenthash.Hash
  18. var errRingShardsDown = errors.New("redis: all ring shards are down")
  19. // RingOptions are used to configure a ring client and should be
  20. // passed to NewRing.
  21. type RingOptions struct {
  22. // Map of name => host:port addresses of ring shards.
  23. Addrs map[string]string
  24. // Map of name => password of ring shards, to allow different shards to have
  25. // different passwords. It will be ignored if the Password field is set.
  26. Passwords map[string]string
  27. // Frequency of PING commands sent to check shards availability.
  28. // Shard is considered down after 3 subsequent failed checks.
  29. HeartbeatFrequency time.Duration
  30. // Hash function used in consistent hash.
  31. // Default is crc32.ChecksumIEEE.
  32. Hash Hash
  33. // Number of replicas in consistent hash.
  34. // Default is 100 replicas.
  35. //
  36. // Higher number of replicas will provide less deviation, that is keys will be
  37. // distributed to nodes more evenly.
  38. //
  39. // Following is deviation for common nreplicas:
  40. // --------------------------------------------------------
  41. // | nreplicas | standard error | 99% confidence interval |
  42. // | 10 | 0.3152 | (0.37, 1.98) |
  43. // | 100 | 0.0997 | (0.76, 1.28) |
  44. // | 1000 | 0.0316 | (0.92, 1.09) |
  45. // --------------------------------------------------------
  46. //
  47. // See https://arxiv.org/abs/1406.2294 for reference
  48. HashReplicas int
  49. // Optional hook that is called when a new shard is created.
  50. OnNewShard func(*Client)
  51. // Following options are copied from Options struct.
  52. OnConnect func(*Conn) error
  53. DB int
  54. Password string
  55. MaxRetries int
  56. MinRetryBackoff time.Duration
  57. MaxRetryBackoff time.Duration
  58. DialTimeout time.Duration
  59. ReadTimeout time.Duration
  60. WriteTimeout time.Duration
  61. PoolSize int
  62. MinIdleConns int
  63. MaxConnAge time.Duration
  64. PoolTimeout time.Duration
  65. IdleTimeout time.Duration
  66. IdleCheckFrequency time.Duration
  67. }
  68. func (opt *RingOptions) init() {
  69. if opt.HeartbeatFrequency == 0 {
  70. opt.HeartbeatFrequency = 500 * time.Millisecond
  71. }
  72. if opt.HashReplicas == 0 {
  73. opt.HashReplicas = 100
  74. }
  75. switch opt.MinRetryBackoff {
  76. case -1:
  77. opt.MinRetryBackoff = 0
  78. case 0:
  79. opt.MinRetryBackoff = 8 * time.Millisecond
  80. }
  81. switch opt.MaxRetryBackoff {
  82. case -1:
  83. opt.MaxRetryBackoff = 0
  84. case 0:
  85. opt.MaxRetryBackoff = 512 * time.Millisecond
  86. }
  87. }
  88. func (opt *RingOptions) clientOptions(shard string) *Options {
  89. return &Options{
  90. OnConnect: opt.OnConnect,
  91. DB: opt.DB,
  92. Password: opt.getPassword(shard),
  93. DialTimeout: opt.DialTimeout,
  94. ReadTimeout: opt.ReadTimeout,
  95. WriteTimeout: opt.WriteTimeout,
  96. PoolSize: opt.PoolSize,
  97. MinIdleConns: opt.MinIdleConns,
  98. MaxConnAge: opt.MaxConnAge,
  99. PoolTimeout: opt.PoolTimeout,
  100. IdleTimeout: opt.IdleTimeout,
  101. IdleCheckFrequency: opt.IdleCheckFrequency,
  102. }
  103. }
  104. func (opt *RingOptions) getPassword(shard string) string {
  105. if opt.Password == "" {
  106. return opt.Passwords[shard]
  107. }
  108. return opt.Password
  109. }
  110. //------------------------------------------------------------------------------
  111. type ringShard struct {
  112. Client *Client
  113. down int32
  114. }
  115. func (shard *ringShard) String() string {
  116. var state string
  117. if shard.IsUp() {
  118. state = "up"
  119. } else {
  120. state = "down"
  121. }
  122. return fmt.Sprintf("%s is %s", shard.Client, state)
  123. }
  124. func (shard *ringShard) IsDown() bool {
  125. const threshold = 3
  126. return atomic.LoadInt32(&shard.down) >= threshold
  127. }
  128. func (shard *ringShard) IsUp() bool {
  129. return !shard.IsDown()
  130. }
  131. // Vote votes to set shard state and returns true if state was changed.
  132. func (shard *ringShard) Vote(up bool) bool {
  133. if up {
  134. changed := shard.IsDown()
  135. atomic.StoreInt32(&shard.down, 0)
  136. return changed
  137. }
  138. if shard.IsDown() {
  139. return false
  140. }
  141. atomic.AddInt32(&shard.down, 1)
  142. return shard.IsDown()
  143. }
  144. //------------------------------------------------------------------------------
  145. type ringShards struct {
  146. opt *RingOptions
  147. mu sync.RWMutex
  148. hash *consistenthash.Map
  149. shards map[string]*ringShard // read only
  150. list []*ringShard // read only
  151. len int
  152. closed bool
  153. }
  154. func newRingShards(opt *RingOptions) *ringShards {
  155. return &ringShards{
  156. opt: opt,
  157. hash: newConsistentHash(opt),
  158. shards: make(map[string]*ringShard),
  159. }
  160. }
  161. func (c *ringShards) Add(name string, cl *Client) {
  162. shard := &ringShard{Client: cl}
  163. c.hash.Add(name)
  164. c.shards[name] = shard
  165. c.list = append(c.list, shard)
  166. }
  167. func (c *ringShards) List() []*ringShard {
  168. c.mu.RLock()
  169. list := c.list
  170. c.mu.RUnlock()
  171. return list
  172. }
  173. func (c *ringShards) Hash(key string) string {
  174. c.mu.RLock()
  175. hash := c.hash.Get(key)
  176. c.mu.RUnlock()
  177. return hash
  178. }
  179. func (c *ringShards) GetByKey(key string) (*ringShard, error) {
  180. key = hashtag.Key(key)
  181. c.mu.RLock()
  182. if c.closed {
  183. c.mu.RUnlock()
  184. return nil, pool.ErrClosed
  185. }
  186. hash := c.hash.Get(key)
  187. if hash == "" {
  188. c.mu.RUnlock()
  189. return nil, errRingShardsDown
  190. }
  191. shard := c.shards[hash]
  192. c.mu.RUnlock()
  193. return shard, nil
  194. }
  195. func (c *ringShards) GetByHash(name string) (*ringShard, error) {
  196. if name == "" {
  197. return c.Random()
  198. }
  199. c.mu.RLock()
  200. shard := c.shards[name]
  201. c.mu.RUnlock()
  202. return shard, nil
  203. }
  204. func (c *ringShards) Random() (*ringShard, error) {
  205. return c.GetByKey(strconv.Itoa(rand.Int()))
  206. }
  207. // heartbeat monitors state of each shard in the ring.
  208. func (c *ringShards) Heartbeat(frequency time.Duration) {
  209. ticker := time.NewTicker(frequency)
  210. defer ticker.Stop()
  211. for range ticker.C {
  212. var rebalance bool
  213. c.mu.RLock()
  214. if c.closed {
  215. c.mu.RUnlock()
  216. break
  217. }
  218. shards := c.list
  219. c.mu.RUnlock()
  220. for _, shard := range shards {
  221. err := shard.Client.Ping().Err()
  222. if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
  223. internal.Logger.Printf("ring shard state changed: %s", shard)
  224. rebalance = true
  225. }
  226. }
  227. if rebalance {
  228. c.rebalance()
  229. }
  230. }
  231. }
  232. // rebalance removes dead shards from the Ring.
  233. func (c *ringShards) rebalance() {
  234. c.mu.RLock()
  235. shards := c.shards
  236. c.mu.RUnlock()
  237. hash := newConsistentHash(c.opt)
  238. var shardsNum int
  239. for name, shard := range shards {
  240. if shard.IsUp() {
  241. hash.Add(name)
  242. shardsNum++
  243. }
  244. }
  245. c.mu.Lock()
  246. c.hash = hash
  247. c.len = shardsNum
  248. c.mu.Unlock()
  249. }
  250. func (c *ringShards) Len() int {
  251. c.mu.RLock()
  252. l := c.len
  253. c.mu.RUnlock()
  254. return l
  255. }
  256. func (c *ringShards) Close() error {
  257. c.mu.Lock()
  258. defer c.mu.Unlock()
  259. if c.closed {
  260. return nil
  261. }
  262. c.closed = true
  263. var firstErr error
  264. for _, shard := range c.shards {
  265. if err := shard.Client.Close(); err != nil && firstErr == nil {
  266. firstErr = err
  267. }
  268. }
  269. c.hash = nil
  270. c.shards = nil
  271. c.list = nil
  272. return firstErr
  273. }
  274. //------------------------------------------------------------------------------
  275. type ring struct {
  276. opt *RingOptions
  277. shards *ringShards
  278. cmdsInfoCache *cmdsInfoCache //nolint:structcheck
  279. }
  280. // Ring is a Redis client that uses consistent hashing to distribute
  281. // keys across multiple Redis servers (shards). It's safe for
  282. // concurrent use by multiple goroutines.
  283. //
  284. // Ring monitors the state of each shard and removes dead shards from
  285. // the ring. When a shard comes online it is added back to the ring. This
  286. // gives you maximum availability and partition tolerance, but no
  287. // consistency between different shards or even clients. Each client
  288. // uses shards that are available to the client and does not do any
  289. // coordination when shard state is changed.
  290. //
  291. // Ring should be used when you need multiple Redis servers for caching
  292. // and can tolerate losing data when one of the servers dies.
  293. // Otherwise you should use Redis Cluster.
  294. type Ring struct {
  295. *ring
  296. cmdable
  297. hooks
  298. ctx context.Context
  299. }
  300. func NewRing(opt *RingOptions) *Ring {
  301. opt.init()
  302. ring := Ring{
  303. ring: &ring{
  304. opt: opt,
  305. shards: newRingShards(opt),
  306. },
  307. ctx: context.Background(),
  308. }
  309. ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
  310. ring.cmdable = ring.Process
  311. for name, addr := range opt.Addrs {
  312. shard := newRingShard(opt, name, addr)
  313. ring.shards.Add(name, shard)
  314. }
  315. go ring.shards.Heartbeat(opt.HeartbeatFrequency)
  316. return &ring
  317. }
  318. func newRingShard(opt *RingOptions, name, addr string) *Client {
  319. clopt := opt.clientOptions(name)
  320. clopt.Addr = addr
  321. shard := NewClient(clopt)
  322. if opt.OnNewShard != nil {
  323. opt.OnNewShard(shard)
  324. }
  325. return shard
  326. }
  327. func (c *Ring) Context() context.Context {
  328. return c.ctx
  329. }
  330. func (c *Ring) WithContext(ctx context.Context) *Ring {
  331. if ctx == nil {
  332. panic("nil context")
  333. }
  334. clone := *c
  335. clone.cmdable = clone.Process
  336. clone.hooks.Lock()
  337. clone.ctx = ctx
  338. return &clone
  339. }
  340. // Do creates a Cmd from the args and processes the cmd.
  341. func (c *Ring) Do(args ...interface{}) *Cmd {
  342. return c.DoContext(c.ctx, args...)
  343. }
  344. func (c *Ring) DoContext(ctx context.Context, args ...interface{}) *Cmd {
  345. cmd := NewCmd(args...)
  346. _ = c.ProcessContext(ctx, cmd)
  347. return cmd
  348. }
  349. func (c *Ring) Process(cmd Cmder) error {
  350. return c.ProcessContext(c.ctx, cmd)
  351. }
  352. func (c *Ring) ProcessContext(ctx context.Context, cmd Cmder) error {
  353. return c.hooks.process(ctx, cmd, c.process)
  354. }
  355. // Options returns read-only Options that were used to create the client.
  356. func (c *Ring) Options() *RingOptions {
  357. return c.opt
  358. }
  359. func (c *Ring) retryBackoff(attempt int) time.Duration {
  360. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  361. }
  362. // PoolStats returns accumulated connection pool stats.
  363. func (c *Ring) PoolStats() *PoolStats {
  364. shards := c.shards.List()
  365. var acc PoolStats
  366. for _, shard := range shards {
  367. s := shard.Client.connPool.Stats()
  368. acc.Hits += s.Hits
  369. acc.Misses += s.Misses
  370. acc.Timeouts += s.Timeouts
  371. acc.TotalConns += s.TotalConns
  372. acc.IdleConns += s.IdleConns
  373. }
  374. return &acc
  375. }
  376. // Len returns the current number of shards in the ring.
  377. func (c *Ring) Len() int {
  378. return c.shards.Len()
  379. }
  380. // Subscribe subscribes the client to the specified channels.
  381. func (c *Ring) Subscribe(channels ...string) *PubSub {
  382. if len(channels) == 0 {
  383. panic("at least one channel is required")
  384. }
  385. shard, err := c.shards.GetByKey(channels[0])
  386. if err != nil {
  387. //TODO: return PubSub with sticky error
  388. panic(err)
  389. }
  390. return shard.Client.Subscribe(channels...)
  391. }
  392. // PSubscribe subscribes the client to the given patterns.
  393. func (c *Ring) PSubscribe(channels ...string) *PubSub {
  394. if len(channels) == 0 {
  395. panic("at least one channel is required")
  396. }
  397. shard, err := c.shards.GetByKey(channels[0])
  398. if err != nil {
  399. //TODO: return PubSub with sticky error
  400. panic(err)
  401. }
  402. return shard.Client.PSubscribe(channels...)
  403. }
  404. // ForEachShard concurrently calls the fn on each live shard in the ring.
  405. // It returns the first error if any.
  406. func (c *Ring) ForEachShard(fn func(client *Client) error) error {
  407. shards := c.shards.List()
  408. var wg sync.WaitGroup
  409. errCh := make(chan error, 1)
  410. for _, shard := range shards {
  411. if shard.IsDown() {
  412. continue
  413. }
  414. wg.Add(1)
  415. go func(shard *ringShard) {
  416. defer wg.Done()
  417. err := fn(shard.Client)
  418. if err != nil {
  419. select {
  420. case errCh <- err:
  421. default:
  422. }
  423. }
  424. }(shard)
  425. }
  426. wg.Wait()
  427. select {
  428. case err := <-errCh:
  429. return err
  430. default:
  431. return nil
  432. }
  433. }
  434. func (c *Ring) cmdsInfo() (map[string]*CommandInfo, error) {
  435. shards := c.shards.List()
  436. firstErr := errRingShardsDown
  437. for _, shard := range shards {
  438. cmdsInfo, err := shard.Client.Command().Result()
  439. if err == nil {
  440. return cmdsInfo, nil
  441. }
  442. if firstErr == nil {
  443. firstErr = err
  444. }
  445. }
  446. return nil, firstErr
  447. }
  448. func (c *Ring) cmdInfo(name string) *CommandInfo {
  449. cmdsInfo, err := c.cmdsInfoCache.Get()
  450. if err != nil {
  451. return nil
  452. }
  453. info := cmdsInfo[name]
  454. if info == nil {
  455. internal.Logger.Printf("info for cmd=%s not found", name)
  456. }
  457. return info
  458. }
  459. func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
  460. cmdInfo := c.cmdInfo(cmd.Name())
  461. pos := cmdFirstKeyPos(cmd, cmdInfo)
  462. if pos == 0 {
  463. return c.shards.Random()
  464. }
  465. firstKey := cmd.stringArg(pos)
  466. return c.shards.GetByKey(firstKey)
  467. }
  468. func (c *Ring) process(ctx context.Context, cmd Cmder) error {
  469. err := c._process(ctx, cmd)
  470. if err != nil {
  471. cmd.SetErr(err)
  472. return err
  473. }
  474. return nil
  475. }
  476. func (c *Ring) _process(ctx context.Context, cmd Cmder) error {
  477. var lastErr error
  478. for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
  479. if attempt > 0 {
  480. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  481. return err
  482. }
  483. }
  484. shard, err := c.cmdShard(cmd)
  485. if err != nil {
  486. return err
  487. }
  488. lastErr = shard.Client._process(ctx, cmd)
  489. if lastErr == nil || !isRetryableError(lastErr, cmd.readTimeout() == nil) {
  490. return lastErr
  491. }
  492. }
  493. return lastErr
  494. }
  495. func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  496. return c.Pipeline().Pipelined(fn)
  497. }
  498. func (c *Ring) Pipeline() Pipeliner {
  499. pipe := Pipeline{
  500. ctx: c.ctx,
  501. exec: c.processPipeline,
  502. }
  503. pipe.init()
  504. return &pipe
  505. }
  506. func (c *Ring) processPipeline(ctx context.Context, cmds []Cmder) error {
  507. return c.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
  508. return c.generalProcessPipeline(ctx, cmds, false)
  509. })
  510. }
  511. func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  512. return c.TxPipeline().Pipelined(fn)
  513. }
  514. func (c *Ring) TxPipeline() Pipeliner {
  515. pipe := Pipeline{
  516. ctx: c.ctx,
  517. exec: c.processTxPipeline,
  518. }
  519. pipe.init()
  520. return &pipe
  521. }
  522. func (c *Ring) processTxPipeline(ctx context.Context, cmds []Cmder) error {
  523. return c.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
  524. return c.generalProcessPipeline(ctx, cmds, true)
  525. })
  526. }
  527. func (c *Ring) generalProcessPipeline(
  528. ctx context.Context, cmds []Cmder, tx bool,
  529. ) error {
  530. cmdsMap := make(map[string][]Cmder)
  531. for _, cmd := range cmds {
  532. cmdInfo := c.cmdInfo(cmd.Name())
  533. hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
  534. if hash != "" {
  535. hash = c.shards.Hash(hashtag.Key(hash))
  536. }
  537. cmdsMap[hash] = append(cmdsMap[hash], cmd)
  538. }
  539. var wg sync.WaitGroup
  540. for hash, cmds := range cmdsMap {
  541. wg.Add(1)
  542. go func(hash string, cmds []Cmder) {
  543. defer wg.Done()
  544. err := c.processShardPipeline(ctx, hash, cmds, tx)
  545. if err != nil {
  546. setCmdsErr(cmds, err)
  547. }
  548. }(hash, cmds)
  549. }
  550. wg.Wait()
  551. return cmdsFirstErr(cmds)
  552. }
  553. func (c *Ring) processShardPipeline(
  554. ctx context.Context, hash string, cmds []Cmder, tx bool,
  555. ) error {
  556. //TODO: retry?
  557. shard, err := c.shards.GetByHash(hash)
  558. if err != nil {
  559. return err
  560. }
  561. if tx {
  562. err = shard.Client._generalProcessPipeline(
  563. ctx, cmds, shard.Client.txPipelineProcessCmds)
  564. } else {
  565. err = shard.Client._generalProcessPipeline(
  566. ctx, cmds, shard.Client.pipelineProcessCmds)
  567. }
  568. return err
  569. }
  570. // Close closes the ring client, releasing any open resources.
  571. //
  572. // It is rare to Close a Ring, as the Ring is meant to be long-lived
  573. // and shared between many goroutines.
  574. func (c *Ring) Close() error {
  575. return c.shards.Close()
  576. }
  577. func (c *Ring) Watch(fn func(*Tx) error, keys ...string) error {
  578. if len(keys) == 0 {
  579. return fmt.Errorf("redis: Watch requires at least one key")
  580. }
  581. var shards []*ringShard
  582. for _, key := range keys {
  583. if key != "" {
  584. shard, err := c.shards.GetByKey(hashtag.Key(key))
  585. if err != nil {
  586. return err
  587. }
  588. shards = append(shards, shard)
  589. }
  590. }
  591. if len(shards) == 0 {
  592. return fmt.Errorf("redis: Watch requires at least one shard")
  593. }
  594. if len(shards) > 1 {
  595. for _, shard := range shards[1:] {
  596. if shard.Client != shards[0].Client {
  597. err := fmt.Errorf("redis: Watch requires all keys to be in the same shard")
  598. return err
  599. }
  600. }
  601. }
  602. return shards[0].Client.Watch(fn, keys...)
  603. }
  604. func newConsistentHash(opt *RingOptions) *consistenthash.Map {
  605. return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
  606. }