cassandra_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package gocql
  5. import (
  6. "bytes"
  7. "flag"
  8. "reflect"
  9. "sort"
  10. "strings"
  11. "sync"
  12. "testing"
  13. "time"
  14. )
  15. var (
  16. flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
  17. flagProto = flag.Int("proto", 2, "protcol version")
  18. flagCQL = flag.String("cql", "3.0.0", "CQL version")
  19. )
  20. var initOnce sync.Once
  21. func createSession(t *testing.T) *Session {
  22. cluster := NewCluster(strings.Split(*flagCluster, ",")...)
  23. cluster.ProtoVersion = *flagProto
  24. cluster.CQLVersion = *flagCQL
  25. cluster.Authenticator = PasswordAuthenticator{
  26. Username: "cassandra",
  27. Password: "cassandra",
  28. }
  29. session, err := cluster.CreateSession()
  30. if err != nil {
  31. t.Fatal("createSession:", err)
  32. }
  33. initOnce.Do(func() {
  34. // Drop and re-create the keyspace once. Different tests should use their own
  35. // individual tables, but can assume that the table does not exist before.
  36. if err := session.Query(`DROP KEYSPACE gocql_test`).Exec(); err != nil {
  37. t.Log("drop keyspace:", err)
  38. }
  39. if err := session.Query(`CREATE KEYSPACE gocql_test
  40. WITH replication = {
  41. 'class' : 'SimpleStrategy',
  42. 'replication_factor' : 1
  43. }`).Exec(); err != nil {
  44. t.Fatal("create keyspace:", err)
  45. }
  46. })
  47. if err := session.Query(`USE gocql_test`).Exec(); err != nil {
  48. t.Fatal("createSession:", err)
  49. }
  50. return session
  51. }
  52. func TestEmptyHosts(t *testing.T) {
  53. cluster := NewCluster()
  54. if session, err := cluster.CreateSession(); err == nil {
  55. session.Close()
  56. t.Error("expected err, got nil")
  57. }
  58. }
  59. func TestCRUD(t *testing.T) {
  60. session := createSession(t)
  61. defer session.Close()
  62. if err := session.Query(`CREATE TABLE page (
  63. title varchar,
  64. revid timeuuid,
  65. body varchar,
  66. views bigint,
  67. protected boolean,
  68. modified timestamp,
  69. tags set<varchar>,
  70. attachments map<varchar, text>,
  71. PRIMARY KEY (title, revid)
  72. )`).Exec(); err != nil {
  73. t.Fatal("create table:", err)
  74. }
  75. for _, page := range pageTestData {
  76. if err := session.Query(`INSERT INTO page
  77. (title, revid, body, views, protected, modified, tags, attachments)
  78. VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
  79. page.Title, page.RevId, page.Body, page.Views, page.Protected,
  80. page.Modified, page.Tags, page.Attachments).Exec(); err != nil {
  81. t.Fatal("insert:", err)
  82. }
  83. }
  84. var count int
  85. if err := session.Query("SELECT COUNT(*) FROM page").Scan(&count); err != nil {
  86. t.Error("select count:", err)
  87. }
  88. if count != len(pageTestData) {
  89. t.Errorf("count: expected %d, got %d\n", len(pageTestData), count)
  90. }
  91. for _, original := range pageTestData {
  92. page := new(Page)
  93. err := session.Query(`SELECT title, revid, body, views, protected, modified,
  94. tags, attachments
  95. FROM page WHERE title = ? AND revid = ? LIMIT 1`,
  96. original.Title, original.RevId).Scan(&page.Title, &page.RevId,
  97. &page.Body, &page.Views, &page.Protected, &page.Modified, &page.Tags,
  98. &page.Attachments)
  99. if err != nil {
  100. t.Error("select page:", err)
  101. continue
  102. }
  103. sort.Sort(sort.StringSlice(page.Tags))
  104. sort.Sort(sort.StringSlice(original.Tags))
  105. if !reflect.DeepEqual(page, original) {
  106. t.Errorf("page: expected %#v, got %#v\n", original, page)
  107. }
  108. }
  109. }
  110. func TestTracing(t *testing.T) {
  111. session := createSession(t)
  112. defer session.Close()
  113. if err := session.Query(`CREATE TABLE trace (id int primary key)`).Exec(); err != nil {
  114. t.Fatal("create:", err)
  115. }
  116. buf := &bytes.Buffer{}
  117. trace := NewTraceWriter(session, buf)
  118. if err := session.Query(`INSERT INTO trace (id) VALUES (?)`, 42).Trace(trace).Exec(); err != nil {
  119. t.Error("insert:", err)
  120. } else if buf.Len() == 0 {
  121. t.Error("insert: failed to obtain any tracing")
  122. }
  123. buf.Reset()
  124. var value int
  125. if err := session.Query(`SELECT id FROM trace WHERE id = ?`, 42).Trace(trace).Scan(&value); err != nil {
  126. t.Error("select:", err)
  127. } else if value != 42 {
  128. t.Errorf("value: expected %d, got %d", 42, value)
  129. } else if buf.Len() == 0 {
  130. t.Error("select: failed to obtain any tracing")
  131. }
  132. }
  133. func TestPaging(t *testing.T) {
  134. t.Skip("Skip until https://github.com/gocql/gocql/issues/110 is resolved")
  135. if *flagProto == 1 {
  136. t.Skip("Paging not supported. Please use Cassandra >= 2.0")
  137. }
  138. session := createSession(t)
  139. defer session.Close()
  140. if err := session.Query("CREATE TABLE large (id int primary key)").Exec(); err != nil {
  141. t.Fatal("create table:", err)
  142. }
  143. for i := 0; i < 100; i++ {
  144. if err := session.Query("INSERT INTO large (id) VALUES (?)", i).Exec(); err != nil {
  145. t.Fatal("insert:", err)
  146. }
  147. }
  148. iter := session.Query("SELECT id FROM large").PageSize(10).Iter()
  149. var id int
  150. count := 0
  151. for iter.Scan(&id) {
  152. count++
  153. }
  154. if err := iter.Close(); err != nil {
  155. t.Fatal("close:", err)
  156. }
  157. if count != 100 {
  158. t.Fatalf("expected %d, got %d", 100, count)
  159. }
  160. }
  161. func TestCAS(t *testing.T) {
  162. if *flagProto == 1 {
  163. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  164. }
  165. session := createSession(t)
  166. defer session.Close()
  167. if err := session.Query(`CREATE TABLE cas_table (
  168. title varchar,
  169. revid timeuuid,
  170. PRIMARY KEY (title, revid)
  171. )`).Exec(); err != nil {
  172. t.Fatal("create:", err)
  173. }
  174. title, revid := "baz", TimeUUID()
  175. var titleCAS string
  176. var revidCAS UUID
  177. if applied, err := session.Query(`INSERT INTO cas_table (title, revid)
  178. VALUES (?, ?) IF NOT EXISTS`,
  179. title, revid).ScanCAS(&titleCAS, &revidCAS); err != nil {
  180. t.Fatal("insert:", err)
  181. } else if !applied {
  182. t.Fatal("insert should have been applied")
  183. }
  184. if applied, err := session.Query(`INSERT INTO cas_table (title, revid)
  185. VALUES (?, ?) IF NOT EXISTS`,
  186. title, revid).ScanCAS(&titleCAS, &revidCAS); err != nil {
  187. t.Fatal("insert:", err)
  188. } else if applied {
  189. t.Fatal("insert should not have been applied")
  190. } else if title != titleCAS || revid != revidCAS {
  191. t.Fatalf("expected %s/%v but got %s/%v", title, revid, titleCAS, revidCAS)
  192. }
  193. }
  194. func TestBatch(t *testing.T) {
  195. if *flagProto == 1 {
  196. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  197. }
  198. session := createSession(t)
  199. defer session.Close()
  200. if err := session.Query(`CREATE TABLE batch_table (id int primary key)`).Exec(); err != nil {
  201. t.Fatal("create table:", err)
  202. }
  203. batch := NewBatch(LoggedBatch)
  204. for i := 0; i < 100; i++ {
  205. batch.Query(`INSERT INTO batch_table (id) VALUES (?)`, i)
  206. }
  207. if err := session.ExecuteBatch(batch); err != nil {
  208. t.Fatal("execute batch:", err)
  209. }
  210. count := 0
  211. if err := session.Query(`SELECT COUNT(*) FROM batch_table`).Scan(&count); err != nil {
  212. t.Fatal("select count:", err)
  213. } else if count != 100 {
  214. t.Fatalf("count: expected %d, got %d\n", 100, count)
  215. }
  216. }
  217. // TestBatchLimit tests gocql to make sure batch operations larger than the maximum
  218. // statement limit are not submitted to a cassandra node.
  219. func TestBatchLimit(t *testing.T) {
  220. if *flagProto == 1 {
  221. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  222. }
  223. session := createSession(t)
  224. defer session.Close()
  225. if err := session.Query(`CREATE TABLE batch_table2 (id int primary key)`).Exec(); err != nil {
  226. t.Fatal("create table:", err)
  227. }
  228. batch := NewBatch(LoggedBatch)
  229. for i := 0; i < 65537; i++ {
  230. batch.Query(`INSERT INTO batch_table2 (id) VALUES (?)`, i)
  231. }
  232. if err := session.ExecuteBatch(batch); err != ErrTooManyStmts {
  233. t.Fatal("gocql attempted to execute a batch larger than the support limit of statements.")
  234. }
  235. }
  236. // TestCreateSessionTimeout tests to make sure the CreateSession function timeouts out correctly
  237. // and prevents an infinite loop of connection retries.
  238. func TestCreateSessionTimeout(t *testing.T) {
  239. go func() {
  240. <-time.After(2 * time.Second)
  241. t.Fatal("no startup timeout")
  242. }()
  243. c := NewCluster("127.0.0.1:1")
  244. c.StartupTimeout = 1 * time.Second
  245. _, err := c.CreateSession()
  246. if err == nil {
  247. t.Fatal("expected ErrNoConncetions, but no error was returned.")
  248. }
  249. if err != ErrNoConnections {
  250. t.Fatal("expected ErrNoConnections, but received %v", err)
  251. }
  252. }
  253. type Page struct {
  254. Title string
  255. RevId UUID
  256. Body string
  257. Views int64
  258. Protected bool
  259. Modified time.Time
  260. Tags []string
  261. Attachments map[string]Attachment
  262. }
  263. type Attachment []byte
  264. var pageTestData = []*Page{
  265. &Page{
  266. Title: "Frontpage",
  267. RevId: TimeUUID(),
  268. Body: "Welcome to this wiki page!",
  269. Modified: time.Date(2013, time.August, 13, 9, 52, 3, 0, time.UTC),
  270. Tags: []string{"start", "important", "test"},
  271. Attachments: map[string]Attachment{
  272. "logo": Attachment("\x00company logo\x00"),
  273. "favicon": Attachment("favicon.ico"),
  274. },
  275. },
  276. &Page{
  277. Title: "Foobar",
  278. RevId: TimeUUID(),
  279. Body: "foo::Foo f = new foo::Foo(foo::Foo::INIT);",
  280. Modified: time.Date(2013, time.August, 13, 9, 52, 3, 0, time.UTC),
  281. },
  282. }
  283. func TestSliceMap(t *testing.T) {
  284. session := createSession(t)
  285. defer session.Close()
  286. if err := session.Query(`CREATE TABLE slice_map_table (
  287. testuuid timeuuid PRIMARY KEY,
  288. testtimestamp timestamp,
  289. testvarchar varchar,
  290. testbigint bigint,
  291. testblob blob,
  292. testbool boolean,
  293. testfloat float,
  294. testdouble double,
  295. testint int,
  296. testset set<int>,
  297. testmap map<varchar, varchar>
  298. )`).Exec(); err != nil {
  299. t.Fatal("create table:", err)
  300. }
  301. m := make(map[string]interface{})
  302. m["testuuid"] = TimeUUID()
  303. m["testvarchar"] = "Test VarChar"
  304. m["testbigint"] = time.Now().Unix()
  305. m["testtimestamp"] = time.Now().Truncate(time.Millisecond).UTC()
  306. m["testblob"] = []byte("test blob")
  307. m["testbool"] = true
  308. m["testfloat"] = float32(4.564)
  309. m["testdouble"] = float64(4.815162342)
  310. m["testint"] = 2343
  311. m["testset"] = []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
  312. m["testmap"] = map[string]string{"field1": "val1", "field2": "val2", "field3": "val3"}
  313. sliceMap := []map[string]interface{}{m}
  314. if err := session.Query(`INSERT INTO slice_map_table (testuuid, testtimestamp, testvarchar, testbigint, testblob, testbool, testfloat, testdouble, testint, testset, testmap) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
  315. m["testuuid"], m["testtimestamp"], m["testvarchar"], m["testbigint"], m["testblob"], m["testbool"], m["testfloat"], m["testdouble"], m["testint"], m["testset"], m["testmap"]).Exec(); err != nil {
  316. t.Fatal("insert:", err)
  317. }
  318. if returned, retErr := session.Query(`SELECT * FROM slice_map_table`).Iter().SliceMap(); retErr != nil {
  319. t.Fatal("select:", retErr)
  320. } else {
  321. if sliceMap[0]["testuuid"] != returned[0]["testuuid"] {
  322. t.Fatal("returned testuuid did not match")
  323. }
  324. if sliceMap[0]["testtimestamp"] != returned[0]["testtimestamp"] {
  325. t.Fatalf("returned testtimestamp did not match: %v %v", sliceMap[0]["testtimestamp"], returned[0]["testtimestamp"])
  326. }
  327. if sliceMap[0]["testvarchar"] != returned[0]["testvarchar"] {
  328. t.Fatal("returned testvarchar did not match")
  329. }
  330. if sliceMap[0]["testbigint"] != returned[0]["testbigint"] {
  331. t.Fatal("returned testbigint did not match")
  332. }
  333. if !reflect.DeepEqual(sliceMap[0]["testblob"], returned[0]["testblob"]) {
  334. t.Fatal("returned testblob did not match")
  335. }
  336. if sliceMap[0]["testbool"] != returned[0]["testbool"] {
  337. t.Fatal("returned testbool did not match")
  338. }
  339. if sliceMap[0]["testfloat"] != returned[0]["testfloat"] {
  340. t.Fatal("returned testfloat did not match")
  341. }
  342. if sliceMap[0]["testdouble"] != returned[0]["testdouble"] {
  343. t.Fatal("returned testdouble did not match")
  344. }
  345. if sliceMap[0]["testint"] != returned[0]["testint"] {
  346. t.Fatal("returned testint did not match")
  347. }
  348. if !reflect.DeepEqual(sliceMap[0]["testset"], returned[0]["testset"]) {
  349. t.Fatal("returned testset did not match")
  350. }
  351. if !reflect.DeepEqual(sliceMap[0]["testmap"], returned[0]["testmap"]) {
  352. t.Fatal("returned testmap did not match")
  353. }
  354. }
  355. // Test for MapScan()
  356. testMap := make(map[string]interface{})
  357. if !session.Query(`SELECT * FROM slice_map_table`).Iter().MapScan(testMap) {
  358. t.Fatal("MapScan failed to work with one row")
  359. }
  360. if sliceMap[0]["testuuid"] != testMap["testuuid"] {
  361. t.Fatal("returned testuuid did not match")
  362. }
  363. if sliceMap[0]["testtimestamp"] != testMap["testtimestamp"] {
  364. t.Fatal("returned testtimestamp did not match")
  365. }
  366. if sliceMap[0]["testvarchar"] != testMap["testvarchar"] {
  367. t.Fatal("returned testvarchar did not match")
  368. }
  369. if sliceMap[0]["testbigint"] != testMap["testbigint"] {
  370. t.Fatal("returned testbigint did not match")
  371. }
  372. if !reflect.DeepEqual(sliceMap[0]["testblob"], testMap["testblob"]) {
  373. t.Fatal("returned testblob did not match")
  374. }
  375. if sliceMap[0]["testbool"] != testMap["testbool"] {
  376. t.Fatal("returned testbool did not match")
  377. }
  378. if sliceMap[0]["testfloat"] != testMap["testfloat"] {
  379. t.Fatal("returned testfloat did not match")
  380. }
  381. if sliceMap[0]["testdouble"] != testMap["testdouble"] {
  382. t.Fatal("returned testdouble did not match")
  383. }
  384. if sliceMap[0]["testint"] != testMap["testint"] {
  385. t.Fatal("returned testint did not match")
  386. }
  387. if !reflect.DeepEqual(sliceMap[0]["testset"], testMap["testset"]) {
  388. t.Fatal("returned testset did not match")
  389. }
  390. if !reflect.DeepEqual(sliceMap[0]["testmap"], testMap["testmap"]) {
  391. t.Fatal("returned testmap did not match")
  392. }
  393. }
  394. func TestScanWithNilArguments(t *testing.T) {
  395. session := createSession(t)
  396. defer session.Close()
  397. if err := session.Query(`CREATE TABLE scan_with_nil_arguments (
  398. foo varchar,
  399. bar int,
  400. PRIMARY KEY (foo, bar)
  401. )`).Exec(); err != nil {
  402. t.Fatal("create:", err)
  403. }
  404. for i := 1; i <= 20; i++ {
  405. if err := session.Query("INSERT INTO scan_with_nil_arguments (foo, bar) VALUES (?, ?)",
  406. "squares", i*i).Exec(); err != nil {
  407. t.Fatal("insert:", err)
  408. }
  409. }
  410. iter := session.Query("SELECT * FROM scan_with_nil_arguments WHERE foo = ?", "squares").Iter()
  411. var n int
  412. count := 0
  413. for iter.Scan(nil, &n) {
  414. count += n
  415. }
  416. if err := iter.Close(); err != nil {
  417. t.Fatal("close:", err)
  418. }
  419. if count != 2870 {
  420. t.Fatalf("expected %d, got %d", 2870, count)
  421. }
  422. }
  423. func TestScanCASWithNilArguments(t *testing.T) {
  424. if *flagProto == 1 {
  425. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  426. }
  427. session := createSession(t)
  428. defer session.Close()
  429. if err := session.Query(`CREATE TABLE scan_cas_with_nil_arguments (
  430. foo varchar,
  431. bar varchar,
  432. PRIMARY KEY (foo, bar)
  433. )`).Exec(); err != nil {
  434. t.Fatal("create:", err)
  435. }
  436. foo := "baz"
  437. var cas string
  438. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  439. VALUES (?, ?) IF NOT EXISTS`,
  440. foo, foo).ScanCAS(nil, nil); err != nil {
  441. t.Fatal("insert:", err)
  442. } else if !applied {
  443. t.Fatal("insert should have been applied")
  444. }
  445. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  446. VALUES (?, ?) IF NOT EXISTS`,
  447. foo, foo).ScanCAS(&cas, nil); err != nil {
  448. t.Fatal("insert:", err)
  449. } else if applied {
  450. t.Fatal("insert should not have been applied")
  451. } else if foo != cas {
  452. t.Fatalf("expected %v but got %v", foo, cas)
  453. }
  454. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  455. VALUES (?, ?) IF NOT EXISTS`,
  456. foo, foo).ScanCAS(nil, &cas); err != nil {
  457. t.Fatal("insert:", err)
  458. } else if applied {
  459. t.Fatal("insert should not have been applied")
  460. } else if foo != cas {
  461. t.Fatalf("expected %v but got %v", foo, cas)
  462. }
  463. }