cluster.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634
  1. package redis
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "math"
  7. "math/rand"
  8. "net"
  9. "runtime"
  10. "sort"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "github.com/go-redis/redis/internal"
  15. "github.com/go-redis/redis/internal/hashtag"
  16. "github.com/go-redis/redis/internal/pool"
  17. "github.com/go-redis/redis/internal/proto"
  18. )
  19. var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
  20. // ClusterOptions are used to configure a cluster client and should be
  21. // passed to NewClusterClient.
  22. type ClusterOptions struct {
  23. // A seed list of host:port addresses of cluster nodes.
  24. Addrs []string
  25. // The maximum number of retries before giving up. Command is retried
  26. // on network errors and MOVED/ASK redirects.
  27. // Default is 8 retries.
  28. MaxRedirects int
  29. // Enables read-only commands on slave nodes.
  30. ReadOnly bool
  31. // Allows routing read-only commands to the closest master or slave node.
  32. // It automatically enables ReadOnly.
  33. RouteByLatency bool
  34. // Allows routing read-only commands to the random master or slave node.
  35. // It automatically enables ReadOnly.
  36. RouteRandomly bool
  37. // Optional function that returns cluster slots information.
  38. // It is useful to manually create cluster of standalone Redis servers
  39. // and load-balance read/write operations between master and slaves.
  40. // It can use service like ZooKeeper to maintain configuration information
  41. // and Cluster.ReloadState to manually trigger state reloading.
  42. ClusterSlots func() ([]ClusterSlot, error)
  43. // Optional hook that is called when a new node is created.
  44. OnNewNode func(*Client)
  45. // Following options are copied from Options struct.
  46. Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
  47. OnConnect func(*Conn) error
  48. Password string
  49. MaxRetries int
  50. MinRetryBackoff time.Duration
  51. MaxRetryBackoff time.Duration
  52. DialTimeout time.Duration
  53. ReadTimeout time.Duration
  54. WriteTimeout time.Duration
  55. // PoolSize applies per cluster node and not for the whole cluster.
  56. PoolSize int
  57. MinIdleConns int
  58. MaxConnAge time.Duration
  59. PoolTimeout time.Duration
  60. IdleTimeout time.Duration
  61. IdleCheckFrequency time.Duration
  62. TLSConfig *tls.Config
  63. }
  64. func (opt *ClusterOptions) init() {
  65. if opt.MaxRedirects == -1 {
  66. opt.MaxRedirects = 0
  67. } else if opt.MaxRedirects == 0 {
  68. opt.MaxRedirects = 8
  69. }
  70. if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
  71. opt.ReadOnly = true
  72. }
  73. if opt.PoolSize == 0 {
  74. opt.PoolSize = 5 * runtime.NumCPU()
  75. }
  76. switch opt.ReadTimeout {
  77. case -1:
  78. opt.ReadTimeout = 0
  79. case 0:
  80. opt.ReadTimeout = 3 * time.Second
  81. }
  82. switch opt.WriteTimeout {
  83. case -1:
  84. opt.WriteTimeout = 0
  85. case 0:
  86. opt.WriteTimeout = opt.ReadTimeout
  87. }
  88. switch opt.MinRetryBackoff {
  89. case -1:
  90. opt.MinRetryBackoff = 0
  91. case 0:
  92. opt.MinRetryBackoff = 8 * time.Millisecond
  93. }
  94. switch opt.MaxRetryBackoff {
  95. case -1:
  96. opt.MaxRetryBackoff = 0
  97. case 0:
  98. opt.MaxRetryBackoff = 512 * time.Millisecond
  99. }
  100. }
  101. func (opt *ClusterOptions) clientOptions() *Options {
  102. const disableIdleCheck = -1
  103. return &Options{
  104. Dialer: opt.Dialer,
  105. OnConnect: opt.OnConnect,
  106. MaxRetries: opt.MaxRetries,
  107. MinRetryBackoff: opt.MinRetryBackoff,
  108. MaxRetryBackoff: opt.MaxRetryBackoff,
  109. Password: opt.Password,
  110. readOnly: opt.ReadOnly,
  111. DialTimeout: opt.DialTimeout,
  112. ReadTimeout: opt.ReadTimeout,
  113. WriteTimeout: opt.WriteTimeout,
  114. PoolSize: opt.PoolSize,
  115. MinIdleConns: opt.MinIdleConns,
  116. MaxConnAge: opt.MaxConnAge,
  117. PoolTimeout: opt.PoolTimeout,
  118. IdleTimeout: opt.IdleTimeout,
  119. IdleCheckFrequency: disableIdleCheck,
  120. TLSConfig: opt.TLSConfig,
  121. }
  122. }
  123. //------------------------------------------------------------------------------
  124. type clusterNode struct {
  125. Client *Client
  126. latency uint32 // atomic
  127. generation uint32 // atomic
  128. failing uint32 // atomic
  129. }
  130. func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
  131. opt := clOpt.clientOptions()
  132. opt.Addr = addr
  133. node := clusterNode{
  134. Client: NewClient(opt),
  135. }
  136. node.latency = math.MaxUint32
  137. if clOpt.RouteByLatency {
  138. go node.updateLatency()
  139. }
  140. if clOpt.OnNewNode != nil {
  141. clOpt.OnNewNode(node.Client)
  142. }
  143. return &node
  144. }
  145. func (n *clusterNode) String() string {
  146. return n.Client.String()
  147. }
  148. func (n *clusterNode) Close() error {
  149. return n.Client.Close()
  150. }
  151. func (n *clusterNode) updateLatency() {
  152. const probes = 10
  153. var latency uint32
  154. for i := 0; i < probes; i++ {
  155. start := time.Now()
  156. n.Client.Ping()
  157. probe := uint32(time.Since(start) / time.Microsecond)
  158. latency = (latency + probe) / 2
  159. }
  160. atomic.StoreUint32(&n.latency, latency)
  161. }
  162. func (n *clusterNode) Latency() time.Duration {
  163. latency := atomic.LoadUint32(&n.latency)
  164. return time.Duration(latency) * time.Microsecond
  165. }
  166. func (n *clusterNode) MarkAsFailing() {
  167. atomic.StoreUint32(&n.failing, uint32(time.Now().Unix()))
  168. }
  169. func (n *clusterNode) Failing() bool {
  170. const timeout = 15 // 15 seconds
  171. failing := atomic.LoadUint32(&n.failing)
  172. if failing == 0 {
  173. return false
  174. }
  175. if time.Now().Unix()-int64(failing) < timeout {
  176. return true
  177. }
  178. atomic.StoreUint32(&n.failing, 0)
  179. return false
  180. }
  181. func (n *clusterNode) Generation() uint32 {
  182. return atomic.LoadUint32(&n.generation)
  183. }
  184. func (n *clusterNode) SetGeneration(gen uint32) {
  185. for {
  186. v := atomic.LoadUint32(&n.generation)
  187. if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
  188. break
  189. }
  190. }
  191. }
  192. //------------------------------------------------------------------------------
  193. type clusterNodes struct {
  194. opt *ClusterOptions
  195. mu sync.RWMutex
  196. allAddrs []string
  197. allNodes map[string]*clusterNode
  198. clusterAddrs []string
  199. closed bool
  200. _generation uint32 // atomic
  201. }
  202. func newClusterNodes(opt *ClusterOptions) *clusterNodes {
  203. return &clusterNodes{
  204. opt: opt,
  205. allAddrs: opt.Addrs,
  206. allNodes: make(map[string]*clusterNode),
  207. }
  208. }
  209. func (c *clusterNodes) Close() error {
  210. c.mu.Lock()
  211. defer c.mu.Unlock()
  212. if c.closed {
  213. return nil
  214. }
  215. c.closed = true
  216. var firstErr error
  217. for _, node := range c.allNodes {
  218. if err := node.Client.Close(); err != nil && firstErr == nil {
  219. firstErr = err
  220. }
  221. }
  222. c.allNodes = nil
  223. c.clusterAddrs = nil
  224. return firstErr
  225. }
  226. func (c *clusterNodes) Addrs() ([]string, error) {
  227. var addrs []string
  228. c.mu.RLock()
  229. closed := c.closed
  230. if !closed {
  231. if len(c.clusterAddrs) > 0 {
  232. addrs = c.clusterAddrs
  233. } else {
  234. addrs = c.allAddrs
  235. }
  236. }
  237. c.mu.RUnlock()
  238. if closed {
  239. return nil, pool.ErrClosed
  240. }
  241. if len(addrs) == 0 {
  242. return nil, errClusterNoNodes
  243. }
  244. return addrs, nil
  245. }
  246. func (c *clusterNodes) NextGeneration() uint32 {
  247. return atomic.AddUint32(&c._generation, 1)
  248. }
  249. // GC removes unused nodes.
  250. func (c *clusterNodes) GC(generation uint32) {
  251. //nolint:prealloc
  252. var collected []*clusterNode
  253. c.mu.Lock()
  254. for addr, node := range c.allNodes {
  255. if node.Generation() >= generation {
  256. continue
  257. }
  258. c.clusterAddrs = remove(c.clusterAddrs, addr)
  259. delete(c.allNodes, addr)
  260. collected = append(collected, node)
  261. }
  262. c.mu.Unlock()
  263. for _, node := range collected {
  264. _ = node.Client.Close()
  265. }
  266. }
  267. func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
  268. node, err := c.get(addr)
  269. if err != nil {
  270. return nil, err
  271. }
  272. if node != nil {
  273. return node, nil
  274. }
  275. c.mu.Lock()
  276. defer c.mu.Unlock()
  277. if c.closed {
  278. return nil, pool.ErrClosed
  279. }
  280. node, ok := c.allNodes[addr]
  281. if ok {
  282. return node, err
  283. }
  284. node = newClusterNode(c.opt, addr)
  285. c.allAddrs = appendIfNotExists(c.allAddrs, addr)
  286. c.clusterAddrs = append(c.clusterAddrs, addr)
  287. c.allNodes[addr] = node
  288. return node, err
  289. }
  290. func (c *clusterNodes) get(addr string) (*clusterNode, error) {
  291. var node *clusterNode
  292. var err error
  293. c.mu.RLock()
  294. if c.closed {
  295. err = pool.ErrClosed
  296. } else {
  297. node = c.allNodes[addr]
  298. }
  299. c.mu.RUnlock()
  300. return node, err
  301. }
  302. func (c *clusterNodes) All() ([]*clusterNode, error) {
  303. c.mu.RLock()
  304. defer c.mu.RUnlock()
  305. if c.closed {
  306. return nil, pool.ErrClosed
  307. }
  308. cp := make([]*clusterNode, 0, len(c.allNodes))
  309. for _, node := range c.allNodes {
  310. cp = append(cp, node)
  311. }
  312. return cp, nil
  313. }
  314. func (c *clusterNodes) Random() (*clusterNode, error) {
  315. addrs, err := c.Addrs()
  316. if err != nil {
  317. return nil, err
  318. }
  319. n := rand.Intn(len(addrs))
  320. return c.Get(addrs[n])
  321. }
  322. //------------------------------------------------------------------------------
  323. type clusterSlot struct {
  324. start, end int
  325. nodes []*clusterNode
  326. }
  327. type clusterSlotSlice []*clusterSlot
  328. func (p clusterSlotSlice) Len() int {
  329. return len(p)
  330. }
  331. func (p clusterSlotSlice) Less(i, j int) bool {
  332. return p[i].start < p[j].start
  333. }
  334. func (p clusterSlotSlice) Swap(i, j int) {
  335. p[i], p[j] = p[j], p[i]
  336. }
  337. type clusterState struct {
  338. nodes *clusterNodes
  339. Masters []*clusterNode
  340. Slaves []*clusterNode
  341. slots []*clusterSlot
  342. generation uint32
  343. createdAt time.Time
  344. }
  345. func newClusterState(
  346. nodes *clusterNodes, slots []ClusterSlot, origin string,
  347. ) (*clusterState, error) {
  348. c := clusterState{
  349. nodes: nodes,
  350. slots: make([]*clusterSlot, 0, len(slots)),
  351. generation: nodes.NextGeneration(),
  352. createdAt: time.Now(),
  353. }
  354. originHost, _, _ := net.SplitHostPort(origin)
  355. isLoopbackOrigin := isLoopback(originHost)
  356. for _, slot := range slots {
  357. var nodes []*clusterNode
  358. for i, slotNode := range slot.Nodes {
  359. addr := slotNode.Addr
  360. if !isLoopbackOrigin {
  361. addr = replaceLoopbackHost(addr, originHost)
  362. }
  363. node, err := c.nodes.Get(addr)
  364. if err != nil {
  365. return nil, err
  366. }
  367. node.SetGeneration(c.generation)
  368. nodes = append(nodes, node)
  369. if i == 0 {
  370. c.Masters = appendUniqueNode(c.Masters, node)
  371. } else {
  372. c.Slaves = appendUniqueNode(c.Slaves, node)
  373. }
  374. }
  375. c.slots = append(c.slots, &clusterSlot{
  376. start: slot.Start,
  377. end: slot.End,
  378. nodes: nodes,
  379. })
  380. }
  381. sort.Sort(clusterSlotSlice(c.slots))
  382. time.AfterFunc(time.Minute, func() {
  383. nodes.GC(c.generation)
  384. })
  385. return &c, nil
  386. }
  387. func replaceLoopbackHost(nodeAddr, originHost string) string {
  388. nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
  389. if err != nil {
  390. return nodeAddr
  391. }
  392. nodeIP := net.ParseIP(nodeHost)
  393. if nodeIP == nil {
  394. return nodeAddr
  395. }
  396. if !nodeIP.IsLoopback() {
  397. return nodeAddr
  398. }
  399. // Use origin host which is not loopback and node port.
  400. return net.JoinHostPort(originHost, nodePort)
  401. }
  402. func isLoopback(host string) bool {
  403. ip := net.ParseIP(host)
  404. if ip == nil {
  405. return true
  406. }
  407. return ip.IsLoopback()
  408. }
  409. func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
  410. nodes := c.slotNodes(slot)
  411. if len(nodes) > 0 {
  412. return nodes[0], nil
  413. }
  414. return c.nodes.Random()
  415. }
  416. func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
  417. nodes := c.slotNodes(slot)
  418. switch len(nodes) {
  419. case 0:
  420. return c.nodes.Random()
  421. case 1:
  422. return nodes[0], nil
  423. case 2:
  424. if slave := nodes[1]; !slave.Failing() {
  425. return slave, nil
  426. }
  427. return nodes[0], nil
  428. default:
  429. var slave *clusterNode
  430. for i := 0; i < 10; i++ {
  431. n := rand.Intn(len(nodes)-1) + 1
  432. slave = nodes[n]
  433. if !slave.Failing() {
  434. return slave, nil
  435. }
  436. }
  437. // All slaves are loading - use master.
  438. return nodes[0], nil
  439. }
  440. }
  441. func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
  442. const threshold = time.Millisecond
  443. nodes := c.slotNodes(slot)
  444. if len(nodes) == 0 {
  445. return c.nodes.Random()
  446. }
  447. var node *clusterNode
  448. for _, n := range nodes {
  449. if n.Failing() {
  450. continue
  451. }
  452. if node == nil || node.Latency()-n.Latency() > threshold {
  453. node = n
  454. }
  455. }
  456. return node, nil
  457. }
  458. func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
  459. nodes := c.slotNodes(slot)
  460. if len(nodes) == 0 {
  461. return c.nodes.Random()
  462. }
  463. n := rand.Intn(len(nodes))
  464. return nodes[n], nil
  465. }
  466. func (c *clusterState) slotNodes(slot int) []*clusterNode {
  467. i := sort.Search(len(c.slots), func(i int) bool {
  468. return c.slots[i].end >= slot
  469. })
  470. if i >= len(c.slots) {
  471. return nil
  472. }
  473. x := c.slots[i]
  474. if slot >= x.start && slot <= x.end {
  475. return x.nodes
  476. }
  477. return nil
  478. }
  479. //------------------------------------------------------------------------------
  480. type clusterStateHolder struct {
  481. load func() (*clusterState, error)
  482. state atomic.Value
  483. reloading uint32 // atomic
  484. }
  485. func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder {
  486. return &clusterStateHolder{
  487. load: fn,
  488. }
  489. }
  490. func (c *clusterStateHolder) Reload() (*clusterState, error) {
  491. state, err := c.load()
  492. if err != nil {
  493. return nil, err
  494. }
  495. c.state.Store(state)
  496. return state, nil
  497. }
  498. func (c *clusterStateHolder) LazyReload() {
  499. if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
  500. return
  501. }
  502. go func() {
  503. defer atomic.StoreUint32(&c.reloading, 0)
  504. _, err := c.Reload()
  505. if err != nil {
  506. return
  507. }
  508. time.Sleep(100 * time.Millisecond)
  509. }()
  510. }
  511. func (c *clusterStateHolder) Get() (*clusterState, error) {
  512. v := c.state.Load()
  513. if v != nil {
  514. state := v.(*clusterState)
  515. if time.Since(state.createdAt) > time.Minute {
  516. c.LazyReload()
  517. }
  518. return state, nil
  519. }
  520. return c.Reload()
  521. }
  522. func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
  523. state, err := c.Reload()
  524. if err == nil {
  525. return state, nil
  526. }
  527. return c.Get()
  528. }
  529. //------------------------------------------------------------------------------
  530. type clusterClient struct {
  531. opt *ClusterOptions
  532. nodes *clusterNodes
  533. state *clusterStateHolder //nolint:structcheck
  534. cmdsInfoCache *cmdsInfoCache //nolint:structcheck
  535. }
  536. // ClusterClient is a Redis Cluster client representing a pool of zero
  537. // or more underlying connections. It's safe for concurrent use by
  538. // multiple goroutines.
  539. type ClusterClient struct {
  540. *clusterClient
  541. cmdable
  542. hooks
  543. ctx context.Context
  544. }
  545. // NewClusterClient returns a Redis Cluster client as described in
  546. // http://redis.io/topics/cluster-spec.
  547. func NewClusterClient(opt *ClusterOptions) *ClusterClient {
  548. opt.init()
  549. c := &ClusterClient{
  550. clusterClient: &clusterClient{
  551. opt: opt,
  552. nodes: newClusterNodes(opt),
  553. },
  554. ctx: context.Background(),
  555. }
  556. c.state = newClusterStateHolder(c.loadState)
  557. c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
  558. c.cmdable = c.Process
  559. if opt.IdleCheckFrequency > 0 {
  560. go c.reaper(opt.IdleCheckFrequency)
  561. }
  562. return c
  563. }
  564. func (c *ClusterClient) Context() context.Context {
  565. return c.ctx
  566. }
  567. func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
  568. if ctx == nil {
  569. panic("nil context")
  570. }
  571. clone := *c
  572. clone.cmdable = clone.Process
  573. clone.hooks.Lock()
  574. clone.ctx = ctx
  575. return &clone
  576. }
  577. // Options returns read-only Options that were used to create the client.
  578. func (c *ClusterClient) Options() *ClusterOptions {
  579. return c.opt
  580. }
  581. // ReloadState reloads cluster state. If available it calls ClusterSlots func
  582. // to get cluster slots information.
  583. func (c *ClusterClient) ReloadState() error {
  584. _, err := c.state.Reload()
  585. return err
  586. }
  587. // Close closes the cluster client, releasing any open resources.
  588. //
  589. // It is rare to Close a ClusterClient, as the ClusterClient is meant
  590. // to be long-lived and shared between many goroutines.
  591. func (c *ClusterClient) Close() error {
  592. return c.nodes.Close()
  593. }
  594. // Do creates a Cmd from the args and processes the cmd.
  595. func (c *ClusterClient) Do(args ...interface{}) *Cmd {
  596. return c.DoContext(c.ctx, args...)
  597. }
  598. func (c *ClusterClient) DoContext(ctx context.Context, args ...interface{}) *Cmd {
  599. cmd := NewCmd(args...)
  600. _ = c.ProcessContext(ctx, cmd)
  601. return cmd
  602. }
  603. func (c *ClusterClient) Process(cmd Cmder) error {
  604. return c.ProcessContext(c.ctx, cmd)
  605. }
  606. func (c *ClusterClient) ProcessContext(ctx context.Context, cmd Cmder) error {
  607. return c.hooks.process(ctx, cmd, c.process)
  608. }
  609. func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
  610. err := c._process(ctx, cmd)
  611. if err != nil {
  612. cmd.SetErr(err)
  613. return err
  614. }
  615. return nil
  616. }
  617. func (c *ClusterClient) _process(ctx context.Context, cmd Cmder) error {
  618. cmdInfo := c.cmdInfo(cmd.Name())
  619. slot := c.cmdSlot(cmd)
  620. var node *clusterNode
  621. var ask bool
  622. var lastErr error
  623. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  624. if attempt > 0 {
  625. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  626. return err
  627. }
  628. }
  629. if node == nil {
  630. var err error
  631. node, err = c.cmdNode(cmdInfo, slot)
  632. if err != nil {
  633. return err
  634. }
  635. }
  636. if ask {
  637. pipe := node.Client.Pipeline()
  638. _ = pipe.Process(NewCmd("ASKING"))
  639. _ = pipe.Process(cmd)
  640. _, lastErr = pipe.ExecContext(ctx)
  641. _ = pipe.Close()
  642. ask = false
  643. } else {
  644. lastErr = node.Client._process(ctx, cmd)
  645. }
  646. // If there is no error - we are done.
  647. if lastErr == nil {
  648. return nil
  649. }
  650. if lastErr != Nil {
  651. c.state.LazyReload()
  652. }
  653. if lastErr == pool.ErrClosed || isReadOnlyError(lastErr) {
  654. node = nil
  655. continue
  656. }
  657. // If slave is loading - pick another node.
  658. if c.opt.ReadOnly && isLoadingError(lastErr) {
  659. node.MarkAsFailing()
  660. node = nil
  661. continue
  662. }
  663. var moved bool
  664. var addr string
  665. moved, ask, addr = isMovedError(lastErr)
  666. if moved || ask {
  667. var err error
  668. node, err = c.nodes.Get(addr)
  669. if err != nil {
  670. return err
  671. }
  672. continue
  673. }
  674. if isRetryableError(lastErr, cmd.readTimeout() == nil) {
  675. // First retry the same node.
  676. if attempt == 0 {
  677. continue
  678. }
  679. // Second try another node.
  680. node.MarkAsFailing()
  681. node = nil
  682. continue
  683. }
  684. return lastErr
  685. }
  686. return lastErr
  687. }
  688. // ForEachMaster concurrently calls the fn on each master node in the cluster.
  689. // It returns the first error if any.
  690. func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
  691. state, err := c.state.ReloadOrGet()
  692. if err != nil {
  693. return err
  694. }
  695. var wg sync.WaitGroup
  696. errCh := make(chan error, 1)
  697. for _, master := range state.Masters {
  698. wg.Add(1)
  699. go func(node *clusterNode) {
  700. defer wg.Done()
  701. err := fn(node.Client)
  702. if err != nil {
  703. select {
  704. case errCh <- err:
  705. default:
  706. }
  707. }
  708. }(master)
  709. }
  710. wg.Wait()
  711. select {
  712. case err := <-errCh:
  713. return err
  714. default:
  715. return nil
  716. }
  717. }
  718. // ForEachSlave concurrently calls the fn on each slave node in the cluster.
  719. // It returns the first error if any.
  720. func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
  721. state, err := c.state.ReloadOrGet()
  722. if err != nil {
  723. return err
  724. }
  725. var wg sync.WaitGroup
  726. errCh := make(chan error, 1)
  727. for _, slave := range state.Slaves {
  728. wg.Add(1)
  729. go func(node *clusterNode) {
  730. defer wg.Done()
  731. err := fn(node.Client)
  732. if err != nil {
  733. select {
  734. case errCh <- err:
  735. default:
  736. }
  737. }
  738. }(slave)
  739. }
  740. wg.Wait()
  741. select {
  742. case err := <-errCh:
  743. return err
  744. default:
  745. return nil
  746. }
  747. }
  748. // ForEachNode concurrently calls the fn on each known node in the cluster.
  749. // It returns the first error if any.
  750. func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
  751. state, err := c.state.ReloadOrGet()
  752. if err != nil {
  753. return err
  754. }
  755. var wg sync.WaitGroup
  756. errCh := make(chan error, 1)
  757. worker := func(node *clusterNode) {
  758. defer wg.Done()
  759. err := fn(node.Client)
  760. if err != nil {
  761. select {
  762. case errCh <- err:
  763. default:
  764. }
  765. }
  766. }
  767. for _, node := range state.Masters {
  768. wg.Add(1)
  769. go worker(node)
  770. }
  771. for _, node := range state.Slaves {
  772. wg.Add(1)
  773. go worker(node)
  774. }
  775. wg.Wait()
  776. select {
  777. case err := <-errCh:
  778. return err
  779. default:
  780. return nil
  781. }
  782. }
  783. // PoolStats returns accumulated connection pool stats.
  784. func (c *ClusterClient) PoolStats() *PoolStats {
  785. var acc PoolStats
  786. state, _ := c.state.Get()
  787. if state == nil {
  788. return &acc
  789. }
  790. for _, node := range state.Masters {
  791. s := node.Client.connPool.Stats()
  792. acc.Hits += s.Hits
  793. acc.Misses += s.Misses
  794. acc.Timeouts += s.Timeouts
  795. acc.TotalConns += s.TotalConns
  796. acc.IdleConns += s.IdleConns
  797. acc.StaleConns += s.StaleConns
  798. }
  799. for _, node := range state.Slaves {
  800. s := node.Client.connPool.Stats()
  801. acc.Hits += s.Hits
  802. acc.Misses += s.Misses
  803. acc.Timeouts += s.Timeouts
  804. acc.TotalConns += s.TotalConns
  805. acc.IdleConns += s.IdleConns
  806. acc.StaleConns += s.StaleConns
  807. }
  808. return &acc
  809. }
  810. func (c *ClusterClient) loadState() (*clusterState, error) {
  811. if c.opt.ClusterSlots != nil {
  812. slots, err := c.opt.ClusterSlots()
  813. if err != nil {
  814. return nil, err
  815. }
  816. return newClusterState(c.nodes, slots, "")
  817. }
  818. addrs, err := c.nodes.Addrs()
  819. if err != nil {
  820. return nil, err
  821. }
  822. var firstErr error
  823. for _, addr := range addrs {
  824. node, err := c.nodes.Get(addr)
  825. if err != nil {
  826. if firstErr == nil {
  827. firstErr = err
  828. }
  829. continue
  830. }
  831. slots, err := node.Client.ClusterSlots().Result()
  832. if err != nil {
  833. if firstErr == nil {
  834. firstErr = err
  835. }
  836. continue
  837. }
  838. return newClusterState(c.nodes, slots, node.Client.opt.Addr)
  839. }
  840. return nil, firstErr
  841. }
  842. // reaper closes idle connections to the cluster.
  843. func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
  844. ticker := time.NewTicker(idleCheckFrequency)
  845. defer ticker.Stop()
  846. for range ticker.C {
  847. nodes, err := c.nodes.All()
  848. if err != nil {
  849. break
  850. }
  851. for _, node := range nodes {
  852. _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
  853. if err != nil {
  854. internal.Logger.Printf("ReapStaleConns failed: %s", err)
  855. }
  856. }
  857. }
  858. }
  859. func (c *ClusterClient) Pipeline() Pipeliner {
  860. pipe := Pipeline{
  861. ctx: c.ctx,
  862. exec: c.processPipeline,
  863. }
  864. pipe.init()
  865. return &pipe
  866. }
  867. func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  868. return c.Pipeline().Pipelined(fn)
  869. }
  870. func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
  871. return c.hooks.processPipeline(ctx, cmds, c._processPipeline)
  872. }
  873. func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
  874. cmdsMap := newCmdsMap()
  875. err := c.mapCmdsByNode(cmdsMap, cmds)
  876. if err != nil {
  877. setCmdsErr(cmds, err)
  878. return err
  879. }
  880. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  881. if attempt > 0 {
  882. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  883. setCmdsErr(cmds, err)
  884. return err
  885. }
  886. }
  887. failedCmds := newCmdsMap()
  888. var wg sync.WaitGroup
  889. for node, cmds := range cmdsMap.m {
  890. wg.Add(1)
  891. go func(node *clusterNode, cmds []Cmder) {
  892. defer wg.Done()
  893. err := node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
  894. err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
  895. return writeCmd(wr, cmds...)
  896. })
  897. if err != nil {
  898. return err
  899. }
  900. return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
  901. return c.pipelineReadCmds(node, rd, cmds, failedCmds)
  902. })
  903. })
  904. if err == nil {
  905. return
  906. }
  907. if attempt < c.opt.MaxRedirects {
  908. if err := c.mapCmdsByNode(failedCmds, cmds); err != nil {
  909. setCmdsErr(cmds, err)
  910. }
  911. } else {
  912. setCmdsErr(cmds, err)
  913. }
  914. }(node, cmds)
  915. }
  916. wg.Wait()
  917. if len(failedCmds.m) == 0 {
  918. break
  919. }
  920. cmdsMap = failedCmds
  921. }
  922. return cmdsFirstErr(cmds)
  923. }
  924. func (c *ClusterClient) mapCmdsByNode(cmdsMap *cmdsMap, cmds []Cmder) error {
  925. state, err := c.state.Get()
  926. if err != nil {
  927. return err
  928. }
  929. if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) {
  930. for _, cmd := range cmds {
  931. slot := c.cmdSlot(cmd)
  932. node, err := c.slotReadOnlyNode(state, slot)
  933. if err != nil {
  934. return err
  935. }
  936. cmdsMap.Add(node, cmd)
  937. }
  938. return nil
  939. }
  940. for _, cmd := range cmds {
  941. slot := c.cmdSlot(cmd)
  942. node, err := state.slotMasterNode(slot)
  943. if err != nil {
  944. return err
  945. }
  946. cmdsMap.Add(node, cmd)
  947. }
  948. return nil
  949. }
  950. func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
  951. for _, cmd := range cmds {
  952. cmdInfo := c.cmdInfo(cmd.Name())
  953. if cmdInfo == nil || !cmdInfo.ReadOnly {
  954. return false
  955. }
  956. }
  957. return true
  958. }
  959. func (c *ClusterClient) pipelineReadCmds(
  960. node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
  961. ) error {
  962. for _, cmd := range cmds {
  963. err := cmd.readReply(rd)
  964. if err == nil {
  965. continue
  966. }
  967. if c.checkMovedErr(cmd, err, failedCmds) {
  968. continue
  969. }
  970. if c.opt.ReadOnly && isLoadingError(err) {
  971. node.MarkAsFailing()
  972. return err
  973. }
  974. if isRedisError(err) {
  975. continue
  976. }
  977. return err
  978. }
  979. return nil
  980. }
  981. func (c *ClusterClient) checkMovedErr(
  982. cmd Cmder, err error, failedCmds *cmdsMap,
  983. ) bool {
  984. moved, ask, addr := isMovedError(err)
  985. if !moved && !ask {
  986. return false
  987. }
  988. node, err := c.nodes.Get(addr)
  989. if err != nil {
  990. return false
  991. }
  992. if moved {
  993. c.state.LazyReload()
  994. failedCmds.Add(node, cmd)
  995. return true
  996. }
  997. if ask {
  998. failedCmds.Add(node, NewCmd("ASKING"), cmd)
  999. return true
  1000. }
  1001. panic("not reached")
  1002. }
  1003. // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
  1004. func (c *ClusterClient) TxPipeline() Pipeliner {
  1005. pipe := Pipeline{
  1006. ctx: c.ctx,
  1007. exec: c.processTxPipeline,
  1008. }
  1009. pipe.init()
  1010. return &pipe
  1011. }
  1012. func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  1013. return c.TxPipeline().Pipelined(fn)
  1014. }
  1015. func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
  1016. return c.hooks.processPipeline(ctx, cmds, c._processTxPipeline)
  1017. }
  1018. func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error {
  1019. state, err := c.state.Get()
  1020. if err != nil {
  1021. setCmdsErr(cmds, err)
  1022. return err
  1023. }
  1024. cmdsMap := c.mapCmdsBySlot(cmds)
  1025. for slot, cmds := range cmdsMap {
  1026. node, err := state.slotMasterNode(slot)
  1027. if err != nil {
  1028. setCmdsErr(cmds, err)
  1029. continue
  1030. }
  1031. cmdsMap := map[*clusterNode][]Cmder{node: cmds}
  1032. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1033. if attempt > 0 {
  1034. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  1035. setCmdsErr(cmds, err)
  1036. return err
  1037. }
  1038. }
  1039. failedCmds := newCmdsMap()
  1040. var wg sync.WaitGroup
  1041. for node, cmds := range cmdsMap {
  1042. wg.Add(1)
  1043. go func(node *clusterNode, cmds []Cmder) {
  1044. defer wg.Done()
  1045. err := node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
  1046. err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
  1047. return txPipelineWriteMulti(wr, cmds)
  1048. })
  1049. if err != nil {
  1050. return err
  1051. }
  1052. return cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error {
  1053. err := c.txPipelineReadQueued(rd, cmds, failedCmds)
  1054. if err != nil {
  1055. moved, ask, addr := isMovedError(err)
  1056. if moved || ask {
  1057. return c.cmdsMoved(cmds, moved, ask, addr, failedCmds)
  1058. }
  1059. return err
  1060. }
  1061. return pipelineReadCmds(rd, cmds)
  1062. })
  1063. })
  1064. if err == nil {
  1065. return
  1066. }
  1067. if attempt < c.opt.MaxRedirects {
  1068. if err := c.mapCmdsByNode(failedCmds, cmds); err != nil {
  1069. setCmdsErr(cmds, err)
  1070. }
  1071. } else {
  1072. setCmdsErr(cmds, err)
  1073. }
  1074. }(node, cmds)
  1075. }
  1076. wg.Wait()
  1077. if len(failedCmds.m) == 0 {
  1078. break
  1079. }
  1080. cmdsMap = failedCmds.m
  1081. }
  1082. }
  1083. return cmdsFirstErr(cmds)
  1084. }
  1085. func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
  1086. cmdsMap := make(map[int][]Cmder)
  1087. for _, cmd := range cmds {
  1088. slot := c.cmdSlot(cmd)
  1089. cmdsMap[slot] = append(cmdsMap[slot], cmd)
  1090. }
  1091. return cmdsMap
  1092. }
  1093. func (c *ClusterClient) txPipelineReadQueued(
  1094. rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
  1095. ) error {
  1096. // Parse queued replies.
  1097. var statusCmd StatusCmd
  1098. if err := statusCmd.readReply(rd); err != nil {
  1099. return err
  1100. }
  1101. for _, cmd := range cmds {
  1102. err := statusCmd.readReply(rd)
  1103. if err == nil || c.checkMovedErr(cmd, err, failedCmds) || isRedisError(err) {
  1104. continue
  1105. }
  1106. return err
  1107. }
  1108. // Parse number of replies.
  1109. line, err := rd.ReadLine()
  1110. if err != nil {
  1111. if err == Nil {
  1112. err = TxFailedErr
  1113. }
  1114. return err
  1115. }
  1116. switch line[0] {
  1117. case proto.ErrorReply:
  1118. return proto.ParseErrorReply(line)
  1119. case proto.ArrayReply:
  1120. // ok
  1121. default:
  1122. return fmt.Errorf("redis: expected '*', but got line %q", line)
  1123. }
  1124. return nil
  1125. }
  1126. func (c *ClusterClient) cmdsMoved(
  1127. cmds []Cmder, moved, ask bool, addr string, failedCmds *cmdsMap,
  1128. ) error {
  1129. node, err := c.nodes.Get(addr)
  1130. if err != nil {
  1131. return err
  1132. }
  1133. if moved {
  1134. c.state.LazyReload()
  1135. for _, cmd := range cmds {
  1136. failedCmds.Add(node, cmd)
  1137. }
  1138. return nil
  1139. }
  1140. if ask {
  1141. for _, cmd := range cmds {
  1142. failedCmds.Add(node, NewCmd("ASKING"), cmd)
  1143. }
  1144. return nil
  1145. }
  1146. return nil
  1147. }
  1148. func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
  1149. return c.WatchContext(c.ctx, fn, keys...)
  1150. }
  1151. func (c *ClusterClient) WatchContext(ctx context.Context, fn func(*Tx) error, keys ...string) error {
  1152. if len(keys) == 0 {
  1153. return fmt.Errorf("redis: Watch requires at least one key")
  1154. }
  1155. slot := hashtag.Slot(keys[0])
  1156. for _, key := range keys[1:] {
  1157. if hashtag.Slot(key) != slot {
  1158. err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
  1159. return err
  1160. }
  1161. }
  1162. node, err := c.slotMasterNode(slot)
  1163. if err != nil {
  1164. return err
  1165. }
  1166. for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
  1167. if attempt > 0 {
  1168. if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
  1169. return err
  1170. }
  1171. }
  1172. err = node.Client.WatchContext(ctx, fn, keys...)
  1173. if err == nil {
  1174. break
  1175. }
  1176. if err != Nil {
  1177. c.state.LazyReload()
  1178. }
  1179. moved, ask, addr := isMovedError(err)
  1180. if moved || ask {
  1181. node, err = c.nodes.Get(addr)
  1182. if err != nil {
  1183. return err
  1184. }
  1185. continue
  1186. }
  1187. if err == pool.ErrClosed || isReadOnlyError(err) {
  1188. node, err = c.slotMasterNode(slot)
  1189. if err != nil {
  1190. return err
  1191. }
  1192. continue
  1193. }
  1194. if isRetryableError(err, true) {
  1195. continue
  1196. }
  1197. return err
  1198. }
  1199. return err
  1200. }
  1201. func (c *ClusterClient) pubSub() *PubSub {
  1202. var node *clusterNode
  1203. pubsub := &PubSub{
  1204. opt: c.opt.clientOptions(),
  1205. newConn: func(channels []string) (*pool.Conn, error) {
  1206. if node != nil {
  1207. panic("node != nil")
  1208. }
  1209. var err error
  1210. if len(channels) > 0 {
  1211. slot := hashtag.Slot(channels[0])
  1212. node, err = c.slotMasterNode(slot)
  1213. } else {
  1214. node, err = c.nodes.Random()
  1215. }
  1216. if err != nil {
  1217. return nil, err
  1218. }
  1219. cn, err := node.Client.newConn(context.TODO())
  1220. if err != nil {
  1221. node = nil
  1222. return nil, err
  1223. }
  1224. return cn, nil
  1225. },
  1226. closeConn: func(cn *pool.Conn) error {
  1227. err := node.Client.connPool.CloseConn(cn)
  1228. node = nil
  1229. return err
  1230. },
  1231. }
  1232. pubsub.init()
  1233. return pubsub
  1234. }
  1235. // Subscribe subscribes the client to the specified channels.
  1236. // Channels can be omitted to create empty subscription.
  1237. func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
  1238. pubsub := c.pubSub()
  1239. if len(channels) > 0 {
  1240. _ = pubsub.Subscribe(channels...)
  1241. }
  1242. return pubsub
  1243. }
  1244. // PSubscribe subscribes the client to the given patterns.
  1245. // Patterns can be omitted to create empty subscription.
  1246. func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
  1247. pubsub := c.pubSub()
  1248. if len(channels) > 0 {
  1249. _ = pubsub.PSubscribe(channels...)
  1250. }
  1251. return pubsub
  1252. }
  1253. func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
  1254. return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
  1255. }
  1256. func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) {
  1257. addrs, err := c.nodes.Addrs()
  1258. if err != nil {
  1259. return nil, err
  1260. }
  1261. var firstErr error
  1262. for _, addr := range addrs {
  1263. node, err := c.nodes.Get(addr)
  1264. if err != nil {
  1265. return nil, err
  1266. }
  1267. if node == nil {
  1268. continue
  1269. }
  1270. info, err := node.Client.Command().Result()
  1271. if err == nil {
  1272. return info, nil
  1273. }
  1274. if firstErr == nil {
  1275. firstErr = err
  1276. }
  1277. }
  1278. return nil, firstErr
  1279. }
  1280. func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
  1281. cmdsInfo, err := c.cmdsInfoCache.Get()
  1282. if err != nil {
  1283. return nil
  1284. }
  1285. info := cmdsInfo[name]
  1286. if info == nil {
  1287. internal.Logger.Printf("info for cmd=%s not found", name)
  1288. }
  1289. return info
  1290. }
  1291. func (c *ClusterClient) cmdSlot(cmd Cmder) int {
  1292. args := cmd.Args()
  1293. if args[0] == "cluster" && args[1] == "getkeysinslot" {
  1294. return args[2].(int)
  1295. }
  1296. cmdInfo := c.cmdInfo(cmd.Name())
  1297. return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
  1298. }
  1299. func cmdSlot(cmd Cmder, pos int) int {
  1300. if pos == 0 {
  1301. return hashtag.RandomSlot()
  1302. }
  1303. firstKey := cmd.stringArg(pos)
  1304. return hashtag.Slot(firstKey)
  1305. }
  1306. func (c *ClusterClient) cmdNode(cmdInfo *CommandInfo, slot int) (*clusterNode, error) {
  1307. state, err := c.state.Get()
  1308. if err != nil {
  1309. return nil, err
  1310. }
  1311. if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
  1312. return c.slotReadOnlyNode(state, slot)
  1313. }
  1314. return state.slotMasterNode(slot)
  1315. }
  1316. func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
  1317. if c.opt.RouteByLatency {
  1318. return state.slotClosestNode(slot)
  1319. }
  1320. if c.opt.RouteRandomly {
  1321. return state.slotRandomNode(slot)
  1322. }
  1323. return state.slotSlaveNode(slot)
  1324. }
  1325. func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
  1326. state, err := c.state.Get()
  1327. if err != nil {
  1328. return nil, err
  1329. }
  1330. return state.slotMasterNode(slot)
  1331. }
  1332. func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
  1333. for _, n := range nodes {
  1334. if n == node {
  1335. return nodes
  1336. }
  1337. }
  1338. return append(nodes, node)
  1339. }
  1340. func appendIfNotExists(ss []string, es ...string) []string {
  1341. loop:
  1342. for _, e := range es {
  1343. for _, s := range ss {
  1344. if s == e {
  1345. continue loop
  1346. }
  1347. }
  1348. ss = append(ss, e)
  1349. }
  1350. return ss
  1351. }
  1352. func remove(ss []string, es ...string) []string {
  1353. if len(es) == 0 {
  1354. return ss[:0]
  1355. }
  1356. for _, e := range es {
  1357. for i, s := range ss {
  1358. if s == e {
  1359. ss = append(ss[:i], ss[i+1:]...)
  1360. break
  1361. }
  1362. }
  1363. }
  1364. return ss
  1365. }
  1366. //------------------------------------------------------------------------------
  1367. type cmdsMap struct {
  1368. mu sync.Mutex
  1369. m map[*clusterNode][]Cmder
  1370. }
  1371. func newCmdsMap() *cmdsMap {
  1372. return &cmdsMap{
  1373. m: make(map[*clusterNode][]Cmder),
  1374. }
  1375. }
  1376. func (m *cmdsMap) Add(node *clusterNode, cmds ...Cmder) {
  1377. m.mu.Lock()
  1378. m.m[node] = append(m.m[node], cmds...)
  1379. m.mu.Unlock()
  1380. }