store.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. package kv
  2. import (
  3. "errors"
  4. "log"
  5. "github.com/tal-tech/go-zero/core/errorx"
  6. "github.com/tal-tech/go-zero/core/hash"
  7. "github.com/tal-tech/go-zero/core/stores/cache"
  8. "github.com/tal-tech/go-zero/core/stores/redis"
  9. )
  10. var ErrNoRedisNode = errors.New("no redis node")
  11. type (
  12. Store interface {
  13. Del(keys ...string) (int, error)
  14. Eval(script string, key string, args ...interface{}) (interface{}, error)
  15. Exists(key string) (bool, error)
  16. Expire(key string, seconds int) error
  17. Expireat(key string, expireTime int64) error
  18. Get(key string) (string, error)
  19. Hdel(key, field string) (bool, error)
  20. Hexists(key, field string) (bool, error)
  21. Hget(key, field string) (string, error)
  22. Hgetall(key string) (map[string]string, error)
  23. Hincrby(key, field string, increment int) (int, error)
  24. Hkeys(key string) ([]string, error)
  25. Hlen(key string) (int, error)
  26. Hmget(key string, fields ...string) ([]string, error)
  27. Hset(key, field, value string) error
  28. Hsetnx(key, field, value string) (bool, error)
  29. Hmset(key string, fieldsAndValues map[string]string) error
  30. Hvals(key string) ([]string, error)
  31. Incr(key string) (int64, error)
  32. Incrby(key string, increment int64) (int64, error)
  33. Llen(key string) (int, error)
  34. Lpop(key string) (string, error)
  35. Lpush(key string, values ...interface{}) (int, error)
  36. Lrange(key string, start int, stop int) ([]string, error)
  37. Lrem(key string, count int, value string) (int, error)
  38. Persist(key string) (bool, error)
  39. Pfadd(key string, values ...interface{}) (bool, error)
  40. Pfcount(key string) (int64, error)
  41. Rpush(key string, values ...interface{}) (int, error)
  42. Sadd(key string, values ...interface{}) (int, error)
  43. Scard(key string) (int64, error)
  44. Set(key string, value string) error
  45. Setex(key, value string, seconds int) error
  46. Setnx(key, value string) (bool, error)
  47. SetnxEx(key, value string, seconds int) (bool, error)
  48. Sismember(key string, value interface{}) (bool, error)
  49. Smembers(key string) ([]string, error)
  50. Spop(key string) (string, error)
  51. Srandmember(key string, count int) ([]string, error)
  52. Srem(key string, values ...interface{}) (int, error)
  53. Sscan(key string, cursor uint64, match string, count int64) (keys []string, cur uint64, err error)
  54. Ttl(key string) (int, error)
  55. Zadd(key string, score int64, value string) (bool, error)
  56. Zadds(key string, ps ...redis.Pair) (int64, error)
  57. Zcard(key string) (int, error)
  58. Zcount(key string, start, stop int64) (int, error)
  59. Zincrby(key string, increment int64, field string) (int64, error)
  60. Zrange(key string, start, stop int64) ([]string, error)
  61. ZrangeWithScores(key string, start, stop int64) ([]redis.Pair, error)
  62. ZrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error)
  63. ZrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) ([]redis.Pair, error)
  64. Zrank(key, field string) (int64, error)
  65. Zrem(key string, values ...interface{}) (int, error)
  66. Zremrangebyrank(key string, start, stop int64) (int, error)
  67. Zremrangebyscore(key string, start, stop int64) (int, error)
  68. Zrevrange(key string, start, stop int64) ([]string, error)
  69. ZrevrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error)
  70. ZrevrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) ([]redis.Pair, error)
  71. Zscore(key string, value string) (int64, error)
  72. Zrevrank(key, field string) (int64, error)
  73. }
  74. clusterStore struct {
  75. dispatcher *hash.ConsistentHash
  76. }
  77. )
  78. func NewStore(c KvConf) Store {
  79. if len(c) == 0 || cache.TotalWeights(c) <= 0 {
  80. log.Fatal("no cache nodes")
  81. }
  82. // even if only one node, we chose to use consistent hash,
  83. // because Store and redis.Redis has different methods.
  84. dispatcher := hash.NewConsistentHash()
  85. for _, node := range c {
  86. cn := node.NewRedis()
  87. dispatcher.AddWithWeight(cn, node.Weight)
  88. }
  89. return clusterStore{
  90. dispatcher: dispatcher,
  91. }
  92. }
  93. func (cs clusterStore) Del(keys ...string) (int, error) {
  94. var val int
  95. var be errorx.BatchError
  96. for _, key := range keys {
  97. node, e := cs.getRedis(key)
  98. if e != nil {
  99. be.Add(e)
  100. continue
  101. }
  102. if v, e := node.Del(key); e != nil {
  103. be.Add(e)
  104. } else {
  105. val += v
  106. }
  107. }
  108. return val, be.Err()
  109. }
  110. func (cs clusterStore) Eval(script string, key string, args ...interface{}) (interface{}, error) {
  111. node, err := cs.getRedis(key)
  112. if err != nil {
  113. return nil, err
  114. }
  115. return node.Eval(script, []string{key}, args...)
  116. }
  117. func (cs clusterStore) Exists(key string) (bool, error) {
  118. node, err := cs.getRedis(key)
  119. if err != nil {
  120. return false, err
  121. }
  122. return node.Exists(key)
  123. }
  124. func (cs clusterStore) Expire(key string, seconds int) error {
  125. node, err := cs.getRedis(key)
  126. if err != nil {
  127. return err
  128. }
  129. return node.Expire(key, seconds)
  130. }
  131. func (cs clusterStore) Expireat(key string, expireTime int64) error {
  132. node, err := cs.getRedis(key)
  133. if err != nil {
  134. return err
  135. }
  136. return node.Expireat(key, expireTime)
  137. }
  138. func (cs clusterStore) Get(key string) (string, error) {
  139. node, err := cs.getRedis(key)
  140. if err != nil {
  141. return "", err
  142. }
  143. return node.Get(key)
  144. }
  145. func (cs clusterStore) Hdel(key, field string) (bool, error) {
  146. node, err := cs.getRedis(key)
  147. if err != nil {
  148. return false, err
  149. }
  150. return node.Hdel(key, field)
  151. }
  152. func (cs clusterStore) Hexists(key, field string) (bool, error) {
  153. node, err := cs.getRedis(key)
  154. if err != nil {
  155. return false, err
  156. }
  157. return node.Hexists(key, field)
  158. }
  159. func (cs clusterStore) Hget(key, field string) (string, error) {
  160. node, err := cs.getRedis(key)
  161. if err != nil {
  162. return "", err
  163. }
  164. return node.Hget(key, field)
  165. }
  166. func (cs clusterStore) Hgetall(key string) (map[string]string, error) {
  167. node, err := cs.getRedis(key)
  168. if err != nil {
  169. return nil, err
  170. }
  171. return node.Hgetall(key)
  172. }
  173. func (cs clusterStore) Hincrby(key, field string, increment int) (int, error) {
  174. node, err := cs.getRedis(key)
  175. if err != nil {
  176. return 0, err
  177. }
  178. return node.Hincrby(key, field, increment)
  179. }
  180. func (cs clusterStore) Hkeys(key string) ([]string, error) {
  181. node, err := cs.getRedis(key)
  182. if err != nil {
  183. return nil, err
  184. }
  185. return node.Hkeys(key)
  186. }
  187. func (cs clusterStore) Hlen(key string) (int, error) {
  188. node, err := cs.getRedis(key)
  189. if err != nil {
  190. return 0, err
  191. }
  192. return node.Hlen(key)
  193. }
  194. func (cs clusterStore) Hmget(key string, fields ...string) ([]string, error) {
  195. node, err := cs.getRedis(key)
  196. if err != nil {
  197. return nil, err
  198. }
  199. return node.Hmget(key, fields...)
  200. }
  201. func (cs clusterStore) Hset(key, field, value string) error {
  202. node, err := cs.getRedis(key)
  203. if err != nil {
  204. return err
  205. }
  206. return node.Hset(key, field, value)
  207. }
  208. func (cs clusterStore) Hsetnx(key, field, value string) (bool, error) {
  209. node, err := cs.getRedis(key)
  210. if err != nil {
  211. return false, err
  212. }
  213. return node.Hsetnx(key, field, value)
  214. }
  215. func (cs clusterStore) Hmset(key string, fieldsAndValues map[string]string) error {
  216. node, err := cs.getRedis(key)
  217. if err != nil {
  218. return err
  219. }
  220. return node.Hmset(key, fieldsAndValues)
  221. }
  222. func (cs clusterStore) Hvals(key string) ([]string, error) {
  223. node, err := cs.getRedis(key)
  224. if err != nil {
  225. return nil, err
  226. }
  227. return node.Hvals(key)
  228. }
  229. func (cs clusterStore) Incr(key string) (int64, error) {
  230. node, err := cs.getRedis(key)
  231. if err != nil {
  232. return 0, err
  233. }
  234. return node.Incr(key)
  235. }
  236. func (cs clusterStore) Incrby(key string, increment int64) (int64, error) {
  237. node, err := cs.getRedis(key)
  238. if err != nil {
  239. return 0, err
  240. }
  241. return node.Incrby(key, increment)
  242. }
  243. func (cs clusterStore) Llen(key string) (int, error) {
  244. node, err := cs.getRedis(key)
  245. if err != nil {
  246. return 0, err
  247. }
  248. return node.Llen(key)
  249. }
  250. func (cs clusterStore) Lpop(key string) (string, error) {
  251. node, err := cs.getRedis(key)
  252. if err != nil {
  253. return "", err
  254. }
  255. return node.Lpop(key)
  256. }
  257. func (cs clusterStore) Lpush(key string, values ...interface{}) (int, error) {
  258. node, err := cs.getRedis(key)
  259. if err != nil {
  260. return 0, err
  261. }
  262. return node.Lpush(key, values...)
  263. }
  264. func (cs clusterStore) Lrange(key string, start int, stop int) ([]string, error) {
  265. node, err := cs.getRedis(key)
  266. if err != nil {
  267. return nil, err
  268. }
  269. return node.Lrange(key, start, stop)
  270. }
  271. func (cs clusterStore) Lrem(key string, count int, value string) (int, error) {
  272. node, err := cs.getRedis(key)
  273. if err != nil {
  274. return 0, err
  275. }
  276. return node.Lrem(key, count, value)
  277. }
  278. func (cs clusterStore) Persist(key string) (bool, error) {
  279. node, err := cs.getRedis(key)
  280. if err != nil {
  281. return false, err
  282. }
  283. return node.Persist(key)
  284. }
  285. func (cs clusterStore) Pfadd(key string, values ...interface{}) (bool, error) {
  286. node, err := cs.getRedis(key)
  287. if err != nil {
  288. return false, err
  289. }
  290. return node.Pfadd(key, values...)
  291. }
  292. func (cs clusterStore) Pfcount(key string) (int64, error) {
  293. node, err := cs.getRedis(key)
  294. if err != nil {
  295. return 0, err
  296. }
  297. return node.Pfcount(key)
  298. }
  299. func (cs clusterStore) Rpush(key string, values ...interface{}) (int, error) {
  300. node, err := cs.getRedis(key)
  301. if err != nil {
  302. return 0, err
  303. }
  304. return node.Rpush(key, values...)
  305. }
  306. func (cs clusterStore) Sadd(key string, values ...interface{}) (int, error) {
  307. node, err := cs.getRedis(key)
  308. if err != nil {
  309. return 0, err
  310. }
  311. return node.Sadd(key, values...)
  312. }
  313. func (cs clusterStore) Scard(key string) (int64, error) {
  314. node, err := cs.getRedis(key)
  315. if err != nil {
  316. return 0, err
  317. }
  318. return node.Scard(key)
  319. }
  320. func (cs clusterStore) Set(key string, value string) error {
  321. node, err := cs.getRedis(key)
  322. if err != nil {
  323. return err
  324. }
  325. return node.Set(key, value)
  326. }
  327. func (cs clusterStore) Setex(key, value string, seconds int) error {
  328. node, err := cs.getRedis(key)
  329. if err != nil {
  330. return err
  331. }
  332. return node.Setex(key, value, seconds)
  333. }
  334. func (cs clusterStore) Setnx(key, value string) (bool, error) {
  335. node, err := cs.getRedis(key)
  336. if err != nil {
  337. return false, err
  338. }
  339. return node.Setnx(key, value)
  340. }
  341. func (cs clusterStore) SetnxEx(key, value string, seconds int) (bool, error) {
  342. node, err := cs.getRedis(key)
  343. if err != nil {
  344. return false, err
  345. }
  346. return node.SetnxEx(key, value, seconds)
  347. }
  348. func (cs clusterStore) Sismember(key string, value interface{}) (bool, error) {
  349. node, err := cs.getRedis(key)
  350. if err != nil {
  351. return false, err
  352. }
  353. return node.Sismember(key, value)
  354. }
  355. func (cs clusterStore) Smembers(key string) ([]string, error) {
  356. node, err := cs.getRedis(key)
  357. if err != nil {
  358. return nil, err
  359. }
  360. return node.Smembers(key)
  361. }
  362. func (cs clusterStore) Spop(key string) (string, error) {
  363. node, err := cs.getRedis(key)
  364. if err != nil {
  365. return "", err
  366. }
  367. return node.Spop(key)
  368. }
  369. func (cs clusterStore) Srandmember(key string, count int) ([]string, error) {
  370. node, err := cs.getRedis(key)
  371. if err != nil {
  372. return nil, err
  373. }
  374. return node.Srandmember(key, count)
  375. }
  376. func (cs clusterStore) Srem(key string, values ...interface{}) (int, error) {
  377. node, err := cs.getRedis(key)
  378. if err != nil {
  379. return 0, err
  380. }
  381. return node.Srem(key, values...)
  382. }
  383. func (cs clusterStore) Sscan(key string, cursor uint64, match string, count int64) (
  384. keys []string, cur uint64, err error) {
  385. node, err := cs.getRedis(key)
  386. if err != nil {
  387. return nil, 0, err
  388. }
  389. return node.Sscan(key, cursor, match, count)
  390. }
  391. func (cs clusterStore) Ttl(key string) (int, error) {
  392. node, err := cs.getRedis(key)
  393. if err != nil {
  394. return 0, err
  395. }
  396. return node.Ttl(key)
  397. }
  398. func (cs clusterStore) Zadd(key string, score int64, value string) (bool, error) {
  399. node, err := cs.getRedis(key)
  400. if err != nil {
  401. return false, err
  402. }
  403. return node.Zadd(key, score, value)
  404. }
  405. func (cs clusterStore) Zadds(key string, ps ...redis.Pair) (int64, error) {
  406. node, err := cs.getRedis(key)
  407. if err != nil {
  408. return 0, err
  409. }
  410. return node.Zadds(key, ps...)
  411. }
  412. func (cs clusterStore) Zcard(key string) (int, error) {
  413. node, err := cs.getRedis(key)
  414. if err != nil {
  415. return 0, err
  416. }
  417. return node.Zcard(key)
  418. }
  419. func (cs clusterStore) Zcount(key string, start, stop int64) (int, error) {
  420. node, err := cs.getRedis(key)
  421. if err != nil {
  422. return 0, err
  423. }
  424. return node.Zcount(key, start, stop)
  425. }
  426. func (cs clusterStore) Zincrby(key string, increment int64, field string) (int64, error) {
  427. node, err := cs.getRedis(key)
  428. if err != nil {
  429. return 0, err
  430. }
  431. return node.Zincrby(key, increment, field)
  432. }
  433. func (cs clusterStore) Zrank(key, field string) (int64, error) {
  434. node, err := cs.getRedis(key)
  435. if err != nil {
  436. return 0, err
  437. }
  438. return node.Zrank(key, field)
  439. }
  440. func (cs clusterStore) Zrange(key string, start, stop int64) ([]string, error) {
  441. node, err := cs.getRedis(key)
  442. if err != nil {
  443. return nil, err
  444. }
  445. return node.Zrange(key, start, stop)
  446. }
  447. func (cs clusterStore) ZrangeWithScores(key string, start, stop int64) ([]redis.Pair, error) {
  448. node, err := cs.getRedis(key)
  449. if err != nil {
  450. return nil, err
  451. }
  452. return node.ZrangeWithScores(key, start, stop)
  453. }
  454. func (cs clusterStore) ZrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error) {
  455. node, err := cs.getRedis(key)
  456. if err != nil {
  457. return nil, err
  458. }
  459. return node.ZrangebyscoreWithScores(key, start, stop)
  460. }
  461. func (cs clusterStore) ZrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) (
  462. []redis.Pair, error) {
  463. node, err := cs.getRedis(key)
  464. if err != nil {
  465. return nil, err
  466. }
  467. return node.ZrangebyscoreWithScoresAndLimit(key, start, stop, page, size)
  468. }
  469. func (cs clusterStore) Zrem(key string, values ...interface{}) (int, error) {
  470. node, err := cs.getRedis(key)
  471. if err != nil {
  472. return 0, err
  473. }
  474. return node.Zrem(key, values...)
  475. }
  476. func (cs clusterStore) Zremrangebyrank(key string, start, stop int64) (int, error) {
  477. node, err := cs.getRedis(key)
  478. if err != nil {
  479. return 0, err
  480. }
  481. return node.Zremrangebyrank(key, start, stop)
  482. }
  483. func (cs clusterStore) Zremrangebyscore(key string, start, stop int64) (int, error) {
  484. node, err := cs.getRedis(key)
  485. if err != nil {
  486. return 0, err
  487. }
  488. return node.Zremrangebyscore(key, start, stop)
  489. }
  490. func (cs clusterStore) Zrevrange(key string, start, stop int64) ([]string, error) {
  491. node, err := cs.getRedis(key)
  492. if err != nil {
  493. return nil, err
  494. }
  495. return node.Zrevrange(key, start, stop)
  496. }
  497. func (cs clusterStore) ZrevrangebyscoreWithScores(key string, start, stop int64) ([]redis.Pair, error) {
  498. node, err := cs.getRedis(key)
  499. if err != nil {
  500. return nil, err
  501. }
  502. return node.ZrevrangebyscoreWithScores(key, start, stop)
  503. }
  504. func (cs clusterStore) ZrevrangebyscoreWithScoresAndLimit(key string, start, stop int64, page, size int) (
  505. []redis.Pair, error) {
  506. node, err := cs.getRedis(key)
  507. if err != nil {
  508. return nil, err
  509. }
  510. return node.ZrevrangebyscoreWithScoresAndLimit(key, start, stop, page, size)
  511. }
  512. func (cs clusterStore) Zrevrank(key, field string) (int64, error) {
  513. node, err := cs.getRedis(key)
  514. if err != nil {
  515. return 0, err
  516. }
  517. return node.Zrevrank(key, field)
  518. }
  519. func (cs clusterStore) Zscore(key string, value string) (int64, error) {
  520. node, err := cs.getRedis(key)
  521. if err != nil {
  522. return 0, err
  523. }
  524. return node.Zscore(key, value)
  525. }
  526. func (cs clusterStore) getRedis(key string) (*redis.Redis, error) {
  527. if val, ok := cs.dispatcher.Get(key); !ok {
  528. return nil, ErrNoRedisNode
  529. } else {
  530. return val.(*redis.Redis), nil
  531. }
  532. }