cachedsql_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. package sqlc
  2. import (
  3. "database/sql"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "runtime"
  11. "sync"
  12. "sync/atomic"
  13. "testing"
  14. "time"
  15. "github.com/alicebob/miniredis/v2"
  16. "github.com/stretchr/testify/assert"
  17. "github.com/tal-tech/go-zero/core/fx"
  18. "github.com/tal-tech/go-zero/core/logx"
  19. "github.com/tal-tech/go-zero/core/stat"
  20. "github.com/tal-tech/go-zero/core/stores/cache"
  21. "github.com/tal-tech/go-zero/core/stores/redis"
  22. "github.com/tal-tech/go-zero/core/stores/redis/redistest"
  23. "github.com/tal-tech/go-zero/core/stores/sqlx"
  24. )
  25. func init() {
  26. logx.Disable()
  27. stat.SetReporter(nil)
  28. }
  29. func TestCachedConn_GetCache(t *testing.T) {
  30. resetStats()
  31. r, clean, err := redistest.CreateRedis()
  32. assert.Nil(t, err)
  33. defer clean()
  34. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  35. var value string
  36. err = c.GetCache("any", &value)
  37. assert.Equal(t, ErrNotFound, err)
  38. r.Set("any", `"value"`)
  39. err = c.GetCache("any", &value)
  40. assert.Nil(t, err)
  41. assert.Equal(t, "value", value)
  42. }
  43. func TestStat(t *testing.T) {
  44. resetStats()
  45. r, clean, err := redistest.CreateRedis()
  46. assert.Nil(t, err)
  47. defer clean()
  48. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  49. for i := 0; i < 10; i++ {
  50. var str string
  51. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  52. *v.(*string) = "zero"
  53. return nil
  54. })
  55. if err != nil {
  56. t.Error(err)
  57. }
  58. }
  59. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  60. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  61. }
  62. func TestCachedConn_QueryRowIndex_NoCache(t *testing.T) {
  63. resetStats()
  64. r, clean, err := redistest.CreateRedis()
  65. assert.Nil(t, err)
  66. defer clean()
  67. c := NewConn(dummySqlConn{}, cache.CacheConf{
  68. {
  69. RedisConf: redis.RedisConf{
  70. Host: r.Addr,
  71. Type: redis.NodeType,
  72. },
  73. Weight: 100,
  74. },
  75. }, cache.WithExpiry(time.Second*10))
  76. var str string
  77. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  78. return fmt.Sprintf("%s/1234", s)
  79. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  80. *v.(*string) = "zero"
  81. return "primary", errors.New("foo")
  82. }, func(conn sqlx.SqlConn, v, pri interface{}) error {
  83. assert.Equal(t, "primary", pri)
  84. *v.(*string) = "xin"
  85. return nil
  86. })
  87. assert.NotNil(t, err)
  88. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  89. return fmt.Sprintf("%s/1234", s)
  90. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  91. *v.(*string) = "zero"
  92. return "primary", nil
  93. }, func(conn sqlx.SqlConn, v, pri interface{}) error {
  94. assert.Equal(t, "primary", pri)
  95. *v.(*string) = "xin"
  96. return nil
  97. })
  98. assert.Nil(t, err)
  99. assert.Equal(t, "zero", str)
  100. val, err := r.Get("index")
  101. assert.Nil(t, err)
  102. assert.Equal(t, `"primary"`, val)
  103. val, err = r.Get("primary/1234")
  104. assert.Nil(t, err)
  105. assert.Equal(t, `"zero"`, val)
  106. }
  107. func TestCachedConn_QueryRowIndex_HasCache(t *testing.T) {
  108. resetStats()
  109. r, clean, err := redistest.CreateRedis()
  110. assert.Nil(t, err)
  111. defer clean()
  112. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  113. cache.WithNotFoundExpiry(time.Second))
  114. var str string
  115. r.Set("index", `"primary"`)
  116. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  117. return fmt.Sprintf("%s/1234", s)
  118. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  119. assert.Fail(t, "should not go here")
  120. return "primary", nil
  121. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  122. *v.(*string) = "xin"
  123. assert.Equal(t, "primary", primary)
  124. return nil
  125. })
  126. assert.Nil(t, err)
  127. assert.Equal(t, "xin", str)
  128. val, err := r.Get("index")
  129. assert.Nil(t, err)
  130. assert.Equal(t, `"primary"`, val)
  131. val, err = r.Get("primary/1234")
  132. assert.Nil(t, err)
  133. assert.Equal(t, `"xin"`, val)
  134. }
  135. func TestCachedConn_QueryRowIndex_HasCache_IntPrimary(t *testing.T) {
  136. const (
  137. primaryInt8 int8 = 100
  138. primaryInt16 int16 = 10000
  139. primaryInt32 int32 = 10000000
  140. primaryInt64 int64 = 10000000
  141. primaryUint8 uint8 = 100
  142. primaryUint16 uint16 = 10000
  143. primaryUint32 uint32 = 10000000
  144. primaryUint64 uint64 = 10000000
  145. )
  146. tests := []struct {
  147. name string
  148. primary interface{}
  149. primaryCache string
  150. }{
  151. {
  152. name: "int8 primary",
  153. primary: primaryInt8,
  154. primaryCache: fmt.Sprint(primaryInt8),
  155. },
  156. {
  157. name: "int16 primary",
  158. primary: primaryInt16,
  159. primaryCache: fmt.Sprint(primaryInt16),
  160. },
  161. {
  162. name: "int32 primary",
  163. primary: primaryInt32,
  164. primaryCache: fmt.Sprint(primaryInt32),
  165. },
  166. {
  167. name: "int64 primary",
  168. primary: primaryInt64,
  169. primaryCache: fmt.Sprint(primaryInt64),
  170. },
  171. {
  172. name: "uint8 primary",
  173. primary: primaryUint8,
  174. primaryCache: fmt.Sprint(primaryUint8),
  175. },
  176. {
  177. name: "uint16 primary",
  178. primary: primaryUint16,
  179. primaryCache: fmt.Sprint(primaryUint16),
  180. },
  181. {
  182. name: "uint32 primary",
  183. primary: primaryUint32,
  184. primaryCache: fmt.Sprint(primaryUint32),
  185. },
  186. {
  187. name: "uint64 primary",
  188. primary: primaryUint64,
  189. primaryCache: fmt.Sprint(primaryUint64),
  190. },
  191. }
  192. for _, test := range tests {
  193. t.Run(test.name, func(t *testing.T) {
  194. resetStats()
  195. r, clean, err := redistest.CreateRedis()
  196. assert.Nil(t, err)
  197. defer clean()
  198. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  199. cache.WithNotFoundExpiry(time.Second))
  200. var str string
  201. r.Set("index", test.primaryCache)
  202. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  203. return fmt.Sprintf("%v/1234", s)
  204. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  205. assert.Fail(t, "should not go here")
  206. return test.primary, nil
  207. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  208. *v.(*string) = "xin"
  209. assert.Equal(t, primary, primary)
  210. return nil
  211. })
  212. assert.Nil(t, err)
  213. assert.Equal(t, "xin", str)
  214. val, err := r.Get("index")
  215. assert.Nil(t, err)
  216. assert.Equal(t, test.primaryCache, val)
  217. val, err = r.Get(test.primaryCache + "/1234")
  218. assert.Nil(t, err)
  219. assert.Equal(t, `"xin"`, val)
  220. })
  221. }
  222. }
  223. func TestCachedConn_QueryRowIndex_HasWrongCache(t *testing.T) {
  224. caches := map[string]string{
  225. "index": "primary",
  226. "primary/1234": "xin",
  227. }
  228. for k, v := range caches {
  229. t.Run(k+"/"+v, func(t *testing.T) {
  230. resetStats()
  231. r, clean, err := redistest.CreateRedis()
  232. assert.Nil(t, err)
  233. defer clean()
  234. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10),
  235. cache.WithNotFoundExpiry(time.Second))
  236. var str string
  237. r.Set(k, v)
  238. err = c.QueryRowIndex(&str, "index", func(s interface{}) string {
  239. return fmt.Sprintf("%s/1234", s)
  240. }, func(conn sqlx.SqlConn, v interface{}) (interface{}, error) {
  241. *v.(*string) = "xin"
  242. return "primary", nil
  243. }, func(conn sqlx.SqlConn, v, primary interface{}) error {
  244. *v.(*string) = "xin"
  245. assert.Equal(t, "primary", primary)
  246. return nil
  247. })
  248. assert.Nil(t, err)
  249. assert.Equal(t, "xin", str)
  250. val, err := r.Get("index")
  251. assert.Nil(t, err)
  252. assert.Equal(t, `"primary"`, val)
  253. val, err = r.Get("primary/1234")
  254. assert.Nil(t, err)
  255. assert.Equal(t, `"xin"`, val)
  256. })
  257. }
  258. }
  259. func TestStatCacheFails(t *testing.T) {
  260. resetStats()
  261. log.SetOutput(ioutil.Discard)
  262. defer log.SetOutput(os.Stdout)
  263. r := redis.NewRedis("localhost:59999", redis.NodeType)
  264. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  265. for i := 0; i < 20; i++ {
  266. var str string
  267. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  268. return errors.New("db failed")
  269. })
  270. assert.NotNil(t, err)
  271. }
  272. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  273. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  274. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Miss))
  275. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.DbFails))
  276. }
  277. func TestStatDbFails(t *testing.T) {
  278. resetStats()
  279. r, clean, err := redistest.CreateRedis()
  280. assert.Nil(t, err)
  281. defer clean()
  282. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  283. for i := 0; i < 20; i++ {
  284. var str string
  285. err = c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  286. return errors.New("db failed")
  287. })
  288. assert.NotNil(t, err)
  289. }
  290. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.Total))
  291. assert.Equal(t, uint64(0), atomic.LoadUint64(&stats.Hit))
  292. assert.Equal(t, uint64(20), atomic.LoadUint64(&stats.DbFails))
  293. }
  294. func TestStatFromMemory(t *testing.T) {
  295. resetStats()
  296. r, clean, err := redistest.CreateRedis()
  297. assert.Nil(t, err)
  298. defer clean()
  299. c := NewNodeConn(dummySqlConn{}, r, cache.WithExpiry(time.Second*10))
  300. var all sync.WaitGroup
  301. var wait sync.WaitGroup
  302. all.Add(10)
  303. wait.Add(4)
  304. go func() {
  305. var str string
  306. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  307. *v.(*string) = "zero"
  308. return nil
  309. })
  310. if err != nil {
  311. t.Error(err)
  312. }
  313. wait.Wait()
  314. runtime.Gosched()
  315. all.Done()
  316. }()
  317. for i := 0; i < 4; i++ {
  318. go func() {
  319. var str string
  320. wait.Done()
  321. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  322. *v.(*string) = "zero"
  323. return nil
  324. })
  325. if err != nil {
  326. t.Error(err)
  327. }
  328. all.Done()
  329. }()
  330. }
  331. for i := 0; i < 5; i++ {
  332. go func() {
  333. var str string
  334. err := c.QueryRow(&str, "name", func(conn sqlx.SqlConn, v interface{}) error {
  335. *v.(*string) = "zero"
  336. return nil
  337. })
  338. if err != nil {
  339. t.Error(err)
  340. }
  341. all.Done()
  342. }()
  343. }
  344. all.Wait()
  345. assert.Equal(t, uint64(10), atomic.LoadUint64(&stats.Total))
  346. assert.Equal(t, uint64(9), atomic.LoadUint64(&stats.Hit))
  347. }
  348. func TestCachedConnQueryRow(t *testing.T) {
  349. r, clean, err := redistest.CreateRedis()
  350. assert.Nil(t, err)
  351. defer clean()
  352. const (
  353. key = "user"
  354. value = "any"
  355. )
  356. var conn trackedConn
  357. var user string
  358. var ran bool
  359. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  360. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  361. ran = true
  362. user = value
  363. return nil
  364. })
  365. assert.Nil(t, err)
  366. actualValue, err := r.Get(key)
  367. assert.Nil(t, err)
  368. var actual string
  369. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  370. assert.Equal(t, value, actual)
  371. assert.Equal(t, value, user)
  372. assert.True(t, ran)
  373. }
  374. func TestCachedConnQueryRowFromCache(t *testing.T) {
  375. r, clean, err := redistest.CreateRedis()
  376. assert.Nil(t, err)
  377. defer clean()
  378. const (
  379. key = "user"
  380. value = "any"
  381. )
  382. var conn trackedConn
  383. var user string
  384. var ran bool
  385. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  386. assert.Nil(t, c.SetCache(key, value))
  387. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  388. ran = true
  389. user = value
  390. return nil
  391. })
  392. assert.Nil(t, err)
  393. actualValue, err := r.Get(key)
  394. assert.Nil(t, err)
  395. var actual string
  396. assert.Nil(t, json.Unmarshal([]byte(actualValue), &actual))
  397. assert.Equal(t, value, actual)
  398. assert.Equal(t, value, user)
  399. assert.False(t, ran)
  400. }
  401. func TestQueryRowNotFound(t *testing.T) {
  402. r, clean, err := redistest.CreateRedis()
  403. assert.Nil(t, err)
  404. defer clean()
  405. const key = "user"
  406. var conn trackedConn
  407. var user string
  408. var ran int
  409. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  410. for i := 0; i < 20; i++ {
  411. err = c.QueryRow(&user, key, func(conn sqlx.SqlConn, v interface{}) error {
  412. ran++
  413. return sql.ErrNoRows
  414. })
  415. assert.Exactly(t, sqlx.ErrNotFound, err)
  416. }
  417. assert.Equal(t, 1, ran)
  418. }
  419. func TestCachedConnExec(t *testing.T) {
  420. r, clean, err := redistest.CreateRedis()
  421. assert.Nil(t, err)
  422. defer clean()
  423. var conn trackedConn
  424. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  425. _, err = c.ExecNoCache("delete from user_table where id='kevin'")
  426. assert.Nil(t, err)
  427. assert.True(t, conn.execValue)
  428. }
  429. func TestCachedConnExecDropCache(t *testing.T) {
  430. r, err := miniredis.Run()
  431. assert.Nil(t, err)
  432. defer fx.DoWithTimeout(func() error {
  433. r.Close()
  434. return nil
  435. }, time.Second)
  436. const (
  437. key = "user"
  438. value = "any"
  439. )
  440. var conn trackedConn
  441. c := NewNodeConn(&conn, redis.NewRedis(r.Addr(), redis.NodeType), cache.WithExpiry(time.Second*30))
  442. assert.Nil(t, c.SetCache(key, value))
  443. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  444. return conn.Exec("delete from user_table where id='kevin'")
  445. }, key)
  446. assert.Nil(t, err)
  447. assert.True(t, conn.execValue)
  448. _, err = r.Get(key)
  449. assert.Exactly(t, miniredis.ErrKeyNotFound, err)
  450. _, err = c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  451. return nil, errors.New("foo")
  452. }, key)
  453. assert.NotNil(t, err)
  454. }
  455. func TestCachedConnExecDropCacheFailed(t *testing.T) {
  456. const key = "user"
  457. var conn trackedConn
  458. r := redis.NewRedis("anyredis:8888", redis.NodeType)
  459. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  460. _, err := c.Exec(func(conn sqlx.SqlConn) (result sql.Result, e error) {
  461. return conn.Exec("delete from user_table where id='kevin'")
  462. }, key)
  463. // async background clean, retry logic
  464. assert.Nil(t, err)
  465. }
  466. func TestCachedConnQueryRows(t *testing.T) {
  467. r, clean, err := redistest.CreateRedis()
  468. assert.Nil(t, err)
  469. defer clean()
  470. var conn trackedConn
  471. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  472. var users []string
  473. err = c.QueryRowsNoCache(&users, "select user from user_table where id='kevin'")
  474. assert.Nil(t, err)
  475. assert.True(t, conn.queryRowsValue)
  476. }
  477. func TestCachedConnTransact(t *testing.T) {
  478. r, clean, err := redistest.CreateRedis()
  479. assert.Nil(t, err)
  480. defer clean()
  481. var conn trackedConn
  482. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*10))
  483. err = c.Transact(func(session sqlx.Session) error {
  484. return nil
  485. })
  486. assert.Nil(t, err)
  487. assert.True(t, conn.transactValue)
  488. }
  489. func TestQueryRowNoCache(t *testing.T) {
  490. r, clean, err := redistest.CreateRedis()
  491. assert.Nil(t, err)
  492. defer clean()
  493. const (
  494. key = "user"
  495. value = "any"
  496. )
  497. var user string
  498. var ran bool
  499. conn := dummySqlConn{queryRow: func(v interface{}, q string, args ...interface{}) error {
  500. user = value
  501. ran = true
  502. return nil
  503. }}
  504. c := NewNodeConn(&conn, r, cache.WithExpiry(time.Second*30))
  505. err = c.QueryRowNoCache(&user, key)
  506. assert.Nil(t, err)
  507. assert.Equal(t, value, user)
  508. assert.True(t, ran)
  509. }
  510. func resetStats() {
  511. atomic.StoreUint64(&stats.Total, 0)
  512. atomic.StoreUint64(&stats.Hit, 0)
  513. atomic.StoreUint64(&stats.Miss, 0)
  514. atomic.StoreUint64(&stats.DbFails, 0)
  515. }
  516. type dummySqlConn struct {
  517. queryRow func(interface{}, string, ...interface{}) error
  518. }
  519. func (d dummySqlConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  520. return nil, nil
  521. }
  522. func (d dummySqlConn) Prepare(query string) (sqlx.StmtSession, error) {
  523. return nil, nil
  524. }
  525. func (d dummySqlConn) QueryRow(v interface{}, query string, args ...interface{}) error {
  526. if d.queryRow != nil {
  527. return d.queryRow(v, query, args...)
  528. }
  529. return nil
  530. }
  531. func (d dummySqlConn) QueryRowPartial(v interface{}, query string, args ...interface{}) error {
  532. return nil
  533. }
  534. func (d dummySqlConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  535. return nil
  536. }
  537. func (d dummySqlConn) QueryRowsPartial(v interface{}, query string, args ...interface{}) error {
  538. return nil
  539. }
  540. func (d dummySqlConn) Transact(func(session sqlx.Session) error) error {
  541. return nil
  542. }
  543. type trackedConn struct {
  544. dummySqlConn
  545. execValue bool
  546. queryRowsValue bool
  547. transactValue bool
  548. }
  549. func (c *trackedConn) Exec(query string, args ...interface{}) (sql.Result, error) {
  550. c.execValue = true
  551. return c.dummySqlConn.Exec(query, args...)
  552. }
  553. func (c *trackedConn) QueryRows(v interface{}, query string, args ...interface{}) error {
  554. c.queryRowsValue = true
  555. return c.dummySqlConn.QueryRows(v, query, args...)
  556. }
  557. func (c *trackedConn) Transact(fn func(session sqlx.Session) error) error {
  558. c.transactValue = true
  559. return c.dummySqlConn.Transact(fn)
  560. }