cassandra_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bytes"
  7. "flag"
  8. "reflect"
  9. "sort"
  10. "speter.net/go/exp/math/dec/inf"
  11. "strings"
  12. "sync"
  13. "testing"
  14. "time"
  15. )
  16. var (
  17. flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
  18. flagProto = flag.Int("proto", 2, "protcol version")
  19. flagCQL = flag.String("cql", "3.0.0", "CQL version")
  20. )
  21. var initOnce sync.Once
  22. func createSession(t *testing.T) *Session {
  23. cluster := NewCluster(strings.Split(*flagCluster, ",")...)
  24. cluster.ProtoVersion = *flagProto
  25. cluster.CQLVersion = *flagCQL
  26. cluster.Authenticator = PasswordAuthenticator{
  27. Username: "cassandra",
  28. Password: "cassandra",
  29. }
  30. session, err := cluster.CreateSession()
  31. if err != nil {
  32. t.Fatal("createSession:", err)
  33. }
  34. initOnce.Do(func() {
  35. // Drop and re-create the keyspace once. Different tests should use their own
  36. // individual tables, but can assume that the table does not exist before.
  37. if err := session.Query(`DROP KEYSPACE gocql_test`).Exec(); err != nil {
  38. t.Log("drop keyspace:", err)
  39. }
  40. if err := session.Query(`CREATE KEYSPACE gocql_test
  41. WITH replication = {
  42. 'class' : 'SimpleStrategy',
  43. 'replication_factor' : 1
  44. }`).Exec(); err != nil {
  45. t.Fatal("create keyspace:", err)
  46. }
  47. })
  48. if err := session.Query(`USE gocql_test`).Exec(); err != nil {
  49. t.Fatal("createSession:", err)
  50. }
  51. return session
  52. }
  53. func TestEmptyHosts(t *testing.T) {
  54. cluster := NewCluster()
  55. if session, err := cluster.CreateSession(); err == nil {
  56. session.Close()
  57. t.Error("expected err, got nil")
  58. }
  59. }
  60. func TestCRUD(t *testing.T) {
  61. session := createSession(t)
  62. defer session.Close()
  63. if err := session.Query(`CREATE TABLE page (
  64. title varchar,
  65. revid timeuuid,
  66. body varchar,
  67. views bigint,
  68. protected boolean,
  69. modified timestamp,
  70. rating decimal,
  71. tags set<varchar>,
  72. attachments map<varchar, text>,
  73. PRIMARY KEY (title, revid)
  74. )`).Exec(); err != nil {
  75. t.Fatal("create table:", err)
  76. }
  77. for _, page := range pageTestData {
  78. if err := session.Query(`INSERT INTO page
  79. (title, revid, body, views, protected, modified, rating, tags, attachments)
  80. VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
  81. page.Title, page.RevId, page.Body, page.Views, page.Protected,
  82. page.Modified, page.Rating, page.Tags, page.Attachments).Exec(); err != nil {
  83. t.Fatal("insert:", err)
  84. }
  85. }
  86. var count int
  87. if err := session.Query("SELECT COUNT(*) FROM page").Scan(&count); err != nil {
  88. t.Error("select count:", err)
  89. }
  90. if count != len(pageTestData) {
  91. t.Errorf("count: expected %d, got %d\n", len(pageTestData), count)
  92. }
  93. for _, original := range pageTestData {
  94. page := new(Page)
  95. err := session.Query(`SELECT title, revid, body, views, protected, modified,
  96. tags, attachments, rating
  97. FROM page WHERE title = ? AND revid = ? LIMIT 1`,
  98. original.Title, original.RevId).Scan(&page.Title, &page.RevId,
  99. &page.Body, &page.Views, &page.Protected, &page.Modified, &page.Tags,
  100. &page.Attachments, &page.Rating)
  101. if err != nil {
  102. t.Error("select page:", err)
  103. continue
  104. }
  105. sort.Sort(sort.StringSlice(page.Tags))
  106. sort.Sort(sort.StringSlice(original.Tags))
  107. if !reflect.DeepEqual(page, original) {
  108. t.Errorf("page: expected %#v, got %#v\n", original, page)
  109. }
  110. }
  111. }
  112. func TestTracing(t *testing.T) {
  113. session := createSession(t)
  114. defer session.Close()
  115. if err := session.Query(`CREATE TABLE trace (id int primary key)`).Exec(); err != nil {
  116. t.Fatal("create:", err)
  117. }
  118. buf := &bytes.Buffer{}
  119. trace := NewTraceWriter(session, buf)
  120. if err := session.Query(`INSERT INTO trace (id) VALUES (?)`, 42).Trace(trace).Exec(); err != nil {
  121. t.Error("insert:", err)
  122. } else if buf.Len() == 0 {
  123. t.Error("insert: failed to obtain any tracing")
  124. }
  125. buf.Reset()
  126. var value int
  127. if err := session.Query(`SELECT id FROM trace WHERE id = ?`, 42).Trace(trace).Scan(&value); err != nil {
  128. t.Error("select:", err)
  129. } else if value != 42 {
  130. t.Errorf("value: expected %d, got %d", 42, value)
  131. } else if buf.Len() == 0 {
  132. t.Error("select: failed to obtain any tracing")
  133. }
  134. }
  135. func TestPaging(t *testing.T) {
  136. t.Skip("Skip until https://github.com/gocql/gocql/issues/110 is resolved")
  137. if *flagProto == 1 {
  138. t.Skip("Paging not supported. Please use Cassandra >= 2.0")
  139. }
  140. session := createSession(t)
  141. defer session.Close()
  142. if err := session.Query("CREATE TABLE large (id int primary key)").Exec(); err != nil {
  143. t.Fatal("create table:", err)
  144. }
  145. for i := 0; i < 100; i++ {
  146. if err := session.Query("INSERT INTO large (id) VALUES (?)", i).Exec(); err != nil {
  147. t.Fatal("insert:", err)
  148. }
  149. }
  150. iter := session.Query("SELECT id FROM large").PageSize(10).Iter()
  151. var id int
  152. count := 0
  153. for iter.Scan(&id) {
  154. count++
  155. }
  156. if err := iter.Close(); err != nil {
  157. t.Fatal("close:", err)
  158. }
  159. if count != 100 {
  160. t.Fatalf("expected %d, got %d", 100, count)
  161. }
  162. }
  163. func TestCAS(t *testing.T) {
  164. if *flagProto == 1 {
  165. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  166. }
  167. session := createSession(t)
  168. defer session.Close()
  169. if err := session.Query(`CREATE TABLE cas_table (
  170. title varchar,
  171. revid timeuuid,
  172. PRIMARY KEY (title, revid)
  173. )`).Exec(); err != nil {
  174. t.Fatal("create:", err)
  175. }
  176. title, revid := "baz", TimeUUID()
  177. var titleCAS string
  178. var revidCAS UUID
  179. if applied, err := session.Query(`INSERT INTO cas_table (title, revid)
  180. VALUES (?, ?) IF NOT EXISTS`,
  181. title, revid).ScanCAS(&titleCAS, &revidCAS); err != nil {
  182. t.Fatal("insert:", err)
  183. } else if !applied {
  184. t.Fatal("insert should have been applied")
  185. }
  186. if applied, err := session.Query(`INSERT INTO cas_table (title, revid)
  187. VALUES (?, ?) IF NOT EXISTS`,
  188. title, revid).ScanCAS(&titleCAS, &revidCAS); err != nil {
  189. t.Fatal("insert:", err)
  190. } else if applied {
  191. t.Fatal("insert should not have been applied")
  192. } else if title != titleCAS || revid != revidCAS {
  193. t.Fatalf("expected %s/%v but got %s/%v", title, revid, titleCAS, revidCAS)
  194. }
  195. }
  196. func TestBatch(t *testing.T) {
  197. if *flagProto == 1 {
  198. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  199. }
  200. session := createSession(t)
  201. defer session.Close()
  202. if err := session.Query(`CREATE TABLE batch_table (id int primary key)`).Exec(); err != nil {
  203. t.Fatal("create table:", err)
  204. }
  205. batch := NewBatch(LoggedBatch)
  206. for i := 0; i < 100; i++ {
  207. batch.Query(`INSERT INTO batch_table (id) VALUES (?)`, i)
  208. }
  209. if err := session.ExecuteBatch(batch); err != nil {
  210. t.Fatal("execute batch:", err)
  211. }
  212. count := 0
  213. if err := session.Query(`SELECT COUNT(*) FROM batch_table`).Scan(&count); err != nil {
  214. t.Fatal("select count:", err)
  215. } else if count != 100 {
  216. t.Fatalf("count: expected %d, got %d\n", 100, count)
  217. }
  218. }
  219. // TestBatchLimit tests gocql to make sure batch operations larger than the maximum
  220. // statement limit are not submitted to a cassandra node.
  221. func TestBatchLimit(t *testing.T) {
  222. if *flagProto == 1 {
  223. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  224. }
  225. session := createSession(t)
  226. defer session.Close()
  227. if err := session.Query(`CREATE TABLE batch_table2 (id int primary key)`).Exec(); err != nil {
  228. t.Fatal("create table:", err)
  229. }
  230. batch := NewBatch(LoggedBatch)
  231. for i := 0; i < 65537; i++ {
  232. batch.Query(`INSERT INTO batch_table2 (id) VALUES (?)`, i)
  233. }
  234. if err := session.ExecuteBatch(batch); err != ErrTooManyStmts {
  235. t.Fatal("gocql attempted to execute a batch larger than the support limit of statements.")
  236. }
  237. }
  238. // TestCreateSessionTimeout tests to make sure the CreateSession function timeouts out correctly
  239. // and prevents an infinite loop of connection retries.
  240. func TestCreateSessionTimeout(t *testing.T) {
  241. go func() {
  242. <-time.After(2 * time.Second)
  243. t.Fatal("no startup timeout")
  244. }()
  245. c := NewCluster("127.0.0.1:1")
  246. c.StartupTimeout = 1 * time.Second
  247. _, err := c.CreateSession()
  248. if err == nil {
  249. t.Fatal("expected ErrNoConncetions, but no error was returned.")
  250. }
  251. if err != ErrNoConnections {
  252. t.Fatal("expected ErrNoConnections, but received %v", err)
  253. }
  254. }
  255. type Page struct {
  256. Title string
  257. RevId UUID
  258. Body string
  259. Views int64
  260. Protected bool
  261. Modified time.Time
  262. Rating *inf.Dec
  263. Tags []string
  264. Attachments map[string]Attachment
  265. }
  266. type Attachment []byte
  267. var rating, _ = inf.NewDec(0, 0).SetString("0.131")
  268. var pageTestData = []*Page{
  269. &Page{
  270. Title: "Frontpage",
  271. RevId: TimeUUID(),
  272. Body: "Welcome to this wiki page!",
  273. Rating: rating,
  274. Modified: time.Date(2013, time.August, 13, 9, 52, 3, 0, time.UTC),
  275. Tags: []string{"start", "important", "test"},
  276. Attachments: map[string]Attachment{
  277. "logo": Attachment("\x00company logo\x00"),
  278. "favicon": Attachment("favicon.ico"),
  279. },
  280. },
  281. &Page{
  282. Title: "Foobar",
  283. RevId: TimeUUID(),
  284. Body: "foo::Foo f = new foo::Foo(foo::Foo::INIT);",
  285. Modified: time.Date(2013, time.August, 13, 9, 52, 3, 0, time.UTC),
  286. },
  287. }
  288. func TestSliceMap(t *testing.T) {
  289. session := createSession(t)
  290. defer session.Close()
  291. if err := session.Query(`CREATE TABLE slice_map_table (
  292. testuuid timeuuid PRIMARY KEY,
  293. testvarchar varchar,
  294. testbigint bigint,
  295. testblob blob,
  296. testbool boolean,
  297. testfloat float,
  298. testdouble double,
  299. testint int,
  300. testset set<int>,
  301. testmap map<varchar, varchar>
  302. )`).Exec(); err != nil {
  303. t.Fatal("create table:", err)
  304. }
  305. m := make(map[string]interface{})
  306. m["testuuid"] = TimeUUID()
  307. m["testvarchar"] = "Test VarChar"
  308. m["testbigint"] = time.Now().Unix()
  309. m["testblob"] = []byte("test blob")
  310. m["testbool"] = true
  311. m["testfloat"] = float32(4.564)
  312. m["testdouble"] = float64(4.815162342)
  313. m["testint"] = 2343
  314. m["testset"] = []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
  315. m["testmap"] = map[string]string{"field1": "val1", "field2": "val2", "field3": "val3"}
  316. sliceMap := []map[string]interface{}{m}
  317. if err := session.Query(`INSERT INTO slice_map_table (testuuid, testvarchar, testbigint, testblob, testbool, testfloat, testdouble, testint, testset, testmap) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
  318. m["testuuid"], m["testvarchar"], m["testbigint"], m["testblob"], m["testbool"], m["testfloat"], m["testdouble"], m["testint"], m["testset"], m["testmap"]).Exec(); err != nil {
  319. t.Fatal("insert:", err)
  320. }
  321. if returned, retErr := session.Query(`SELECT * FROM slice_map_table`).Iter().SliceMap(); retErr != nil {
  322. t.Fatal("select:", retErr)
  323. } else {
  324. if sliceMap[0]["testuuid"] != returned[0]["testuuid"] {
  325. t.Fatal("returned testuuid did not match")
  326. }
  327. if sliceMap[0]["testvarchar"] != returned[0]["testvarchar"] {
  328. t.Fatal("returned testvarchar did not match")
  329. }
  330. if sliceMap[0]["testbigint"] != returned[0]["testbigint"] {
  331. t.Fatal("returned testbigint did not match")
  332. }
  333. if !reflect.DeepEqual(sliceMap[0]["testblob"], returned[0]["testblob"]) {
  334. t.Fatal("returned testblob did not match")
  335. }
  336. if sliceMap[0]["testbool"] != returned[0]["testbool"] {
  337. t.Fatal("returned testbool did not match")
  338. }
  339. if sliceMap[0]["testfloat"] != returned[0]["testfloat"] {
  340. t.Fatal("returned testfloat did not match")
  341. }
  342. if sliceMap[0]["testdouble"] != returned[0]["testdouble"] {
  343. t.Fatal("returned testdouble did not match")
  344. }
  345. if sliceMap[0]["testint"] != returned[0]["testint"] {
  346. t.Fatal("returned testint did not match")
  347. }
  348. if !reflect.DeepEqual(sliceMap[0]["testset"], returned[0]["testset"]) {
  349. t.Fatal("returned testset did not match")
  350. }
  351. if !reflect.DeepEqual(sliceMap[0]["testmap"], returned[0]["testmap"]) {
  352. t.Fatal("returned testmap did not match")
  353. }
  354. }
  355. // Test for MapScan()
  356. testMap := make(map[string]interface{})
  357. if !session.Query(`SELECT * FROM slice_map_table`).Iter().MapScan(testMap) {
  358. t.Fatal("MapScan failed to work with one row")
  359. }
  360. if sliceMap[0]["testuuid"] != testMap["testuuid"] {
  361. t.Fatal("returned testuuid did not match")
  362. }
  363. if sliceMap[0]["testvarchar"] != testMap["testvarchar"] {
  364. t.Fatal("returned testvarchar did not match")
  365. }
  366. if sliceMap[0]["testbigint"] != testMap["testbigint"] {
  367. t.Fatal("returned testbigint did not match")
  368. }
  369. if !reflect.DeepEqual(sliceMap[0]["testblob"], testMap["testblob"]) {
  370. t.Fatal("returned testblob did not match")
  371. }
  372. if sliceMap[0]["testbool"] != testMap["testbool"] {
  373. t.Fatal("returned testbool did not match")
  374. }
  375. if sliceMap[0]["testfloat"] != testMap["testfloat"] {
  376. t.Fatal("returned testfloat did not match")
  377. }
  378. if sliceMap[0]["testdouble"] != testMap["testdouble"] {
  379. t.Fatal("returned testdouble did not match")
  380. }
  381. if sliceMap[0]["testint"] != testMap["testint"] {
  382. t.Fatal("returned testint did not match")
  383. }
  384. if !reflect.DeepEqual(sliceMap[0]["testset"], testMap["testset"]) {
  385. t.Fatal("returned testset did not match")
  386. }
  387. if !reflect.DeepEqual(sliceMap[0]["testmap"], testMap["testmap"]) {
  388. t.Fatal("returned testmap did not match")
  389. }
  390. }
  391. func TestScanWithNilArguments(t *testing.T) {
  392. session := createSession(t)
  393. defer session.Close()
  394. if err := session.Query(`CREATE TABLE scan_with_nil_arguments (
  395. foo varchar,
  396. bar int,
  397. PRIMARY KEY (foo, bar)
  398. )`).Exec(); err != nil {
  399. t.Fatal("create:", err)
  400. }
  401. for i := 1; i <= 20; i++ {
  402. if err := session.Query("INSERT INTO scan_with_nil_arguments (foo, bar) VALUES (?, ?)",
  403. "squares", i*i).Exec(); err != nil {
  404. t.Fatal("insert:", err)
  405. }
  406. }
  407. iter := session.Query("SELECT * FROM scan_with_nil_arguments WHERE foo = ?", "squares").Iter()
  408. var n int
  409. count := 0
  410. for iter.Scan(nil, &n) {
  411. count += n
  412. }
  413. if err := iter.Close(); err != nil {
  414. t.Fatal("close:", err)
  415. }
  416. if count != 2870 {
  417. t.Fatalf("expected %d, got %d", 2870, count)
  418. }
  419. }
  420. func TestScanCASWithNilArguments(t *testing.T) {
  421. if *flagProto == 1 {
  422. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  423. }
  424. session := createSession(t)
  425. defer session.Close()
  426. if err := session.Query(`CREATE TABLE scan_cas_with_nil_arguments (
  427. foo varchar,
  428. bar varchar,
  429. PRIMARY KEY (foo, bar)
  430. )`).Exec(); err != nil {
  431. t.Fatal("create:", err)
  432. }
  433. foo := "baz"
  434. var cas string
  435. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  436. VALUES (?, ?) IF NOT EXISTS`,
  437. foo, foo).ScanCAS(nil, nil); err != nil {
  438. t.Fatal("insert:", err)
  439. } else if !applied {
  440. t.Fatal("insert should have been applied")
  441. }
  442. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  443. VALUES (?, ?) IF NOT EXISTS`,
  444. foo, foo).ScanCAS(&cas, nil); err != nil {
  445. t.Fatal("insert:", err)
  446. } else if applied {
  447. t.Fatal("insert should not have been applied")
  448. } else if foo != cas {
  449. t.Fatalf("expected %v but got %v", foo, cas)
  450. }
  451. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  452. VALUES (?, ?) IF NOT EXISTS`,
  453. foo, foo).ScanCAS(nil, &cas); err != nil {
  454. t.Fatal("insert:", err)
  455. } else if applied {
  456. t.Fatal("insert should not have been applied")
  457. } else if foo != cas {
  458. t.Fatalf("expected %v but got %v", foo, cas)
  459. }
  460. }