cassandra_test.go 45 KB


  1. // +build all integration
  2. package gocql
  3. import (
  4. "bytes"
  5. "flag"
  6. "fmt"
  7. "log"
  8. "math"
  9. "math/big"
  10. "net"
  11. "reflect"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "testing"
  16. "time"
  17. "unicode"
  18. "speter.net/go/exp/math/dec/inf"
  19. )
  20. var (
  21. flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
  22. flagProto = flag.Int("proto", 2, "protcol version")
  23. flagCQL = flag.String("cql", "3.0.0", "CQL version")
  24. flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
  25. clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
  26. flagRetry = flag.Int("retries", 5, "number of times to retry queries")
  27. flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
  28. flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test")
  29. clusterHosts []string
  30. )
  31. func init() {
  32. flag.Parse()
  33. clusterHosts = strings.Split(*flagCluster, ",")
  34. log.SetFlags(log.Lshortfile | log.LstdFlags)
  35. }
  36. func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
  37. if *flagRunSslTest {
  38. cluster.SslOpts = &SslOptions{
  39. CertPath: "testdata/pki/gocql.crt",
  40. KeyPath: "testdata/pki/gocql.key",
  41. CaPath: "testdata/pki/ca.crt",
  42. EnableHostVerification: false,
  43. }
  44. }
  45. return cluster
  46. }
  47. var initOnce sync.Once
  48. func createTable(s *Session, table string) error {
  49. err := s.Query(table).Consistency(All).Exec()
  50. if *clusterSize > 1 {
  51. // wait for table definition to propogate
  52. time.Sleep(250 * time.Millisecond)
  53. }
  54. return err
  55. }
  56. func createCluster() *ClusterConfig {
  57. cluster := NewCluster(clusterHosts...)
  58. cluster.ProtoVersion = *flagProto
  59. cluster.CQLVersion = *flagCQL
  60. cluster.Timeout = 5 * time.Second
  61. cluster.Consistency = Quorum
  62. if *flagRetry > 0 {
  63. cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
  64. }
  65. cluster = addSslOptions(cluster)
  66. return cluster
  67. }
  68. func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
  69. session, err := cluster.CreateSession()
  70. if err != nil {
  71. tb.Fatal("createSession:", err)
  72. }
  73. if err = session.Query(`DROP KEYSPACE ` + keyspace).Exec(); err != nil {
  74. tb.Log("drop keyspace:", err)
  75. }
  76. if err := session.Query(fmt.Sprintf(`CREATE KEYSPACE %s
  77. WITH replication = {
  78. 'class' : 'SimpleStrategy',
  79. 'replication_factor' : %d
  80. }`, keyspace, *flagRF)).Consistency(All).Exec(); err != nil {
  81. tb.Fatalf("error creating keyspace %s: %v", keyspace, err)
  82. }
  83. tb.Logf("Created keyspace %s", keyspace)
  84. session.Close()
  85. }
  86. func createSession(tb testing.TB) *Session {
  87. cluster := createCluster()
  88. // Drop and re-create the keyspace once. Different tests should use their own
  89. // individual tables, but can assume that the table does not exist before.
  90. initOnce.Do(func() {
  91. createKeyspace(tb, cluster, "gocql_test")
  92. })
  93. cluster.Keyspace = "gocql_test"
  94. session, err := cluster.CreateSession()
  95. if err != nil {
  96. tb.Fatal("createSession:", err)
  97. }
  98. return session
  99. }
  100. //TestRingDiscovery makes sure that you can autodiscover other cluster members when you seed a cluster config with just one node
  101. func TestRingDiscovery(t *testing.T) {
  102. cluster := NewCluster(clusterHosts[0])
  103. cluster.ProtoVersion = *flagProto
  104. cluster.CQLVersion = *flagCQL
  105. cluster.Timeout = 5 * time.Second
  106. cluster.Consistency = Quorum
  107. if *flagRetry > 0 {
  108. cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
  109. }
  110. cluster.DiscoverHosts = true
  111. cluster = addSslOptions(cluster)
  112. session, err := cluster.CreateSession()
  113. if err != nil {
  114. t.Errorf("got error connecting to the cluster %v", err)
  115. }
  116. if *clusterSize > 1 {
  117. // wait for autodiscovery to update the pool with the list of known hosts
  118. time.Sleep(*flagAutoWait)
  119. }
  120. size := len(session.Pool.(*SimplePool).connPool)
  121. if *clusterSize != size {
  122. t.Fatalf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size)
  123. }
  124. session.Close()
  125. }
  126. func TestEmptyHosts(t *testing.T) {
  127. cluster := NewCluster()
  128. cluster = addSslOptions(cluster)
  129. if session, err := cluster.CreateSession(); err == nil {
  130. session.Close()
  131. t.Error("expected err, got nil")
  132. }
  133. }
  134. //TestUseStatementError checks to make sure the correct error is returned when the user tries to execute a use statement.
  135. func TestUseStatementError(t *testing.T) {
  136. session := createSession(t)
  137. defer session.Close()
  138. if err := session.Query("USE gocql_test").Exec(); err != nil {
  139. if err != ErrUseStmt {
  140. t.Error("expected ErrUseStmt, got " + err.Error())
  141. }
  142. } else {
  143. t.Error("expected err, got nil.")
  144. }
  145. }
  146. //TestInvalidKeyspace checks that an invalid keyspace will return promptly and without a flood of connections
  147. func TestInvalidKeyspace(t *testing.T) {
  148. cluster := NewCluster(clusterHosts...)
  149. cluster.ProtoVersion = *flagProto
  150. cluster.CQLVersion = *flagCQL
  151. cluster.Keyspace = "invalidKeyspace"
  152. cluster = addSslOptions(cluster)
  153. session, err := cluster.CreateSession()
  154. if err != nil {
  155. if err != ErrNoConnectionsStarted {
  156. t.Errorf("Expected ErrNoConnections but got %v", err)
  157. }
  158. } else {
  159. session.Close() //Clean up the session
  160. t.Error("expected err, got nil.")
  161. }
  162. }
  163. func TestTracing(t *testing.T) {
  164. session := createSession(t)
  165. defer session.Close()
  166. if err := createTable(session, `CREATE TABLE trace (id int primary key)`); err != nil {
  167. t.Fatal("create:", err)
  168. }
  169. buf := &bytes.Buffer{}
  170. trace := NewTraceWriter(session, buf)
  171. if err := session.Query(`INSERT INTO trace (id) VALUES (?)`, 42).Trace(trace).Exec(); err != nil {
  172. t.Error("insert:", err)
  173. } else if buf.Len() == 0 {
  174. t.Error("insert: failed to obtain any tracing")
  175. }
  176. buf.Reset()
  177. var value int
  178. if err := session.Query(`SELECT id FROM trace WHERE id = ?`, 42).Trace(trace).Scan(&value); err != nil {
  179. t.Error("select:", err)
  180. } else if value != 42 {
  181. t.Errorf("value: expected %d, got %d", 42, value)
  182. } else if buf.Len() == 0 {
  183. t.Error("select: failed to obtain any tracing")
  184. }
  185. }
  186. func TestPaging(t *testing.T) {
  187. if *flagProto == 1 {
  188. t.Skip("Paging not supported. Please use Cassandra >= 2.0")
  189. }
  190. session := createSession(t)
  191. defer session.Close()
  192. if err := createTable(session, "CREATE TABLE paging (id int primary key)"); err != nil {
  193. t.Fatal("create table:", err)
  194. }
  195. for i := 0; i < 100; i++ {
  196. if err := session.Query("INSERT INTO paging (id) VALUES (?)", i).Exec(); err != nil {
  197. t.Fatal("insert:", err)
  198. }
  199. }
  200. iter := session.Query("SELECT id FROM paging").PageSize(10).Iter()
  201. var id int
  202. count := 0
  203. for iter.Scan(&id) {
  204. count++
  205. }
  206. if err := iter.Close(); err != nil {
  207. t.Fatal("close:", err)
  208. }
  209. if count != 100 {
  210. t.Fatalf("expected %d, got %d", 100, count)
  211. }
  212. }
  213. func TestCAS(t *testing.T) {
  214. if *flagProto == 1 {
  215. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  216. }
  217. session := createSession(t)
  218. defer session.Close()
  219. if err := createTable(session, `CREATE TABLE cas_table (
  220. title varchar,
  221. revid timeuuid,
  222. last_modified timestamp,
  223. PRIMARY KEY (title, revid)
  224. )`); err != nil {
  225. t.Fatal("create:", err)
  226. }
  227. title, revid, modified := "baz", TimeUUID(), time.Now()
  228. var titleCAS string
  229. var revidCAS UUID
  230. var modifiedCAS time.Time
  231. if applied, err := session.Query(`INSERT INTO cas_table (title, revid, last_modified)
  232. VALUES (?, ?, ?) IF NOT EXISTS`,
  233. title, revid, modified).ScanCAS(&titleCAS, &revidCAS, &modifiedCAS); err != nil {
  234. t.Fatal("insert:", err)
  235. } else if !applied {
  236. t.Fatal("insert should have been applied")
  237. }
  238. if applied, err := session.Query(`INSERT INTO cas_table (title, revid, last_modified)
  239. VALUES (?, ?, ?) IF NOT EXISTS`,
  240. title, revid, modified).ScanCAS(&titleCAS, &revidCAS, &modifiedCAS); err != nil {
  241. t.Fatal("insert:", err)
  242. } else if applied {
  243. t.Fatal("insert should not have been applied")
  244. } else if title != titleCAS || revid != revidCAS {
  245. t.Fatalf("expected %s/%v/%v but got %s/%v/%v", title, revid, modified, titleCAS, revidCAS, modifiedCAS)
  246. }
  247. tenSecondsLater := modified.Add(10 * time.Second)
  248. if applied, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
  249. title, revid, tenSecondsLater).ScanCAS(&modifiedCAS); err != nil {
  250. t.Fatal("delete:", err)
  251. } else if applied {
  252. t.Fatal("delete should have not been applied")
  253. }
  254. if modifiedCAS.Unix() != tenSecondsLater.Add(-10*time.Second).Unix() {
  255. t.Fatalf("Was expecting modified CAS to be %v; but was one second later", modifiedCAS.UTC())
  256. }
  257. if _, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
  258. title, revid, tenSecondsLater).ScanCAS(); err.Error() != "count mismatch" {
  259. t.Fatalf("delete: was expecting count mismatch error but got %s", err)
  260. }
  261. if applied, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
  262. title, revid, modified).ScanCAS(&modifiedCAS); err != nil {
  263. t.Fatal("delete:", err)
  264. } else if !applied {
  265. t.Fatal("delete should have been applied")
  266. }
  267. }
  268. func TestMapScanCAS(t *testing.T) {
  269. if *flagProto == 1 {
  270. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  271. }
  272. session := createSession(t)
  273. defer session.Close()
  274. if err := createTable(session, `CREATE TABLE cas_table2 (
  275. title varchar,
  276. revid timeuuid,
  277. last_modified timestamp,
  278. deleted boolean,
  279. PRIMARY KEY (title, revid)
  280. )`); err != nil {
  281. t.Fatal("create:", err)
  282. }
  283. title, revid, modified, deleted := "baz", TimeUUID(), time.Now(), false
  284. mapCAS := map[string]interface{}{}
  285. if applied, err := session.Query(`INSERT INTO cas_table2 (title, revid, last_modified, deleted)
  286. VALUES (?, ?, ?, ?) IF NOT EXISTS`,
  287. title, revid, modified, deleted).MapScanCAS(mapCAS); err != nil {
  288. t.Fatal("insert:", err)
  289. } else if !applied {
  290. t.Fatal("insert should have been applied")
  291. }
  292. mapCAS = map[string]interface{}{}
  293. if applied, err := session.Query(`INSERT INTO cas_table2 (title, revid, last_modified, deleted)
  294. VALUES (?, ?, ?, ?) IF NOT EXISTS`,
  295. title, revid, modified, deleted).MapScanCAS(mapCAS); err != nil {
  296. t.Fatal("insert:", err)
  297. } else if applied {
  298. t.Fatal("insert should not have been applied")
  299. } else if title != mapCAS["title"] || revid != mapCAS["revid"] || deleted != mapCAS["deleted"] {
  300. t.Fatalf("expected %s/%v/%v/%v but got %s/%v/%v%v", title, revid, modified, false, mapCAS["title"], mapCAS["revid"], mapCAS["last_modified"], mapCAS["deleted"])
  301. }
  302. }
  303. func TestBatch(t *testing.T) {
  304. if *flagProto == 1 {
  305. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  306. }
  307. session := createSession(t)
  308. defer session.Close()
  309. if err := createTable(session, `CREATE TABLE batch_table (id int primary key)`); err != nil {
  310. t.Fatal("create table:", err)
  311. }
  312. batch := NewBatch(LoggedBatch)
  313. for i := 0; i < 100; i++ {
  314. batch.Query(`INSERT INTO batch_table (id) VALUES (?)`, i)
  315. }
  316. if err := session.ExecuteBatch(batch); err != nil {
  317. t.Fatal("execute batch:", err)
  318. }
  319. count := 0
  320. if err := session.Query(`SELECT COUNT(*) FROM batch_table`).Scan(&count); err != nil {
  321. t.Fatal("select count:", err)
  322. } else if count != 100 {
  323. t.Fatalf("count: expected %d, got %d\n", 100, count)
  324. }
  325. }
  326. // TestBatchLimit tests gocql to make sure batch operations larger than the maximum
  327. // statement limit are not submitted to a cassandra node.
  328. func TestBatchLimit(t *testing.T) {
  329. if *flagProto == 1 {
  330. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  331. }
  332. session := createSession(t)
  333. defer session.Close()
  334. if err := createTable(session, `CREATE TABLE batch_table2 (id int primary key)`); err != nil {
  335. t.Fatal("create table:", err)
  336. }
  337. batch := NewBatch(LoggedBatch)
  338. for i := 0; i < 65537; i++ {
  339. batch.Query(`INSERT INTO batch_table2 (id) VALUES (?)`, i)
  340. }
  341. if err := session.ExecuteBatch(batch); err != ErrTooManyStmts {
  342. t.Fatal("gocql attempted to execute a batch larger than the support limit of statements.")
  343. }
  344. }
  345. // TestTooManyQueryArgs tests to make sure the library correctly handles the application level bug
  346. // whereby too many query arguments are passed to a query
  347. func TestTooManyQueryArgs(t *testing.T) {
  348. if *flagProto == 1 {
  349. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  350. }
  351. session := createSession(t)
  352. defer session.Close()
  353. if err := createTable(session, `CREATE TABLE too_many_query_args (id int primary key, value int)`); err != nil {
  354. t.Fatal("create table:", err)
  355. }
  356. _, err := session.Query(`SELECT * FROM too_many_query_args WHERE id = ?`, 1, 2).Iter().SliceMap()
  357. if err == nil {
  358. t.Fatal("'`SELECT * FROM too_many_query_args WHERE id = ?`, 1, 2' should return an ErrQueryArgLength")
  359. }
  360. if err != ErrQueryArgLength {
  361. t.Fatalf("'`SELECT * FROM too_many_query_args WHERE id = ?`, 1, 2' should return an ErrQueryArgLength, but returned: %s", err)
  362. }
  363. batch := session.NewBatch(UnloggedBatch)
  364. batch.Query("INSERT INTO too_many_query_args (id, value) VALUES (?, ?)", 1, 2, 3)
  365. err = session.ExecuteBatch(batch)
  366. if err == nil {
  367. t.Fatal("'`INSERT INTO too_many_query_args (id, value) VALUES (?, ?)`, 1, 2, 3' should return an ErrQueryArgLength")
  368. }
  369. if err != ErrQueryArgLength {
  370. t.Fatalf("'INSERT INTO too_many_query_args (id, value) VALUES (?, ?)`, 1, 2, 3' should return an ErrQueryArgLength, but returned: %s", err)
  371. }
  372. }
  373. // TestNotEnoughQueryArgs tests to make sure the library correctly handles the application level bug
  374. // whereby not enough query arguments are passed to a query
  375. func TestNotEnoughQueryArgs(t *testing.T) {
  376. if *flagProto == 1 {
  377. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  378. }
  379. session := createSession(t)
  380. defer session.Close()
  381. if err := createTable(session, `CREATE TABLE not_enough_query_args (id int, cluster int, value int, primary key (id, cluster))`); err != nil {
  382. t.Fatal("create table:", err)
  383. }
  384. _, err := session.Query(`SELECT * FROM not_enough_query_args WHERE id = ? and cluster = ?`, 1).Iter().SliceMap()
  385. if err == nil {
  386. t.Fatal("'`SELECT * FROM not_enough_query_args WHERE id = ? and cluster = ?`, 1' should return an ErrQueryArgLength")
  387. }
  388. if err != ErrQueryArgLength {
  389. t.Fatalf("'`SELECT * FROM too_few_query_args WHERE id = ? and cluster = ?`, 1' should return an ErrQueryArgLength, but returned: %s", err)
  390. }
  391. batch := session.NewBatch(UnloggedBatch)
  392. batch.Query("INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)", 1, 2)
  393. err = session.ExecuteBatch(batch)
  394. if err == nil {
  395. t.Fatal("'`INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)`, 1, 2' should return an ErrQueryArgLength")
  396. }
  397. if err != ErrQueryArgLength {
  398. t.Fatalf("'INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)`, 1, 2' should return an ErrQueryArgLength, but returned: %s", err)
  399. }
  400. }
  401. // TestCreateSessionTimeout tests to make sure the CreateSession function timeouts out correctly
  402. // and prevents an infinite loop of connection retries.
  403. func TestCreateSessionTimeout(t *testing.T) {
  404. go func() {
  405. <-time.After(2 * time.Second)
  406. t.Fatal("no startup timeout")
  407. }()
  408. c := NewCluster("127.0.0.1:1")
  409. c = addSslOptions(c)
  410. _, err := c.CreateSession()
  411. if err == nil {
  412. t.Fatal("expected ErrNoConnectionsStarted, but no error was returned.")
  413. }
  414. if err != ErrNoConnectionsStarted {
  415. t.Fatalf("expected ErrNoConnectionsStarted, but received %v", err)
  416. }
  417. }
  418. type FullName struct {
  419. FirstName string
  420. LastName string
  421. }
  422. func (n FullName) MarshalCQL(info *TypeInfo) ([]byte, error) {
  423. return []byte(n.FirstName + " " + n.LastName), nil
  424. }
  425. func (n *FullName) UnmarshalCQL(info *TypeInfo, data []byte) error {
  426. t := strings.SplitN(string(data), " ", 2)
  427. n.FirstName, n.LastName = t[0], t[1]
  428. return nil
  429. }
  430. func TestMapScanWithRefMap(t *testing.T) {
  431. session := createSession(t)
  432. defer session.Close()
  433. if err := createTable(session, `CREATE TABLE scan_map_ref_table (
  434. testtext text PRIMARY KEY,
  435. testfullname text,
  436. testint int,
  437. )`); err != nil {
  438. t.Fatal("create table:", err)
  439. }
  440. m := make(map[string]interface{})
  441. m["testtext"] = "testtext"
  442. m["testfullname"] = FullName{"John", "Doe"}
  443. m["testint"] = 100
  444. if err := session.Query(`INSERT INTO scan_map_ref_table (testtext, testfullname, testint) values (?,?,?)`, m["testtext"], m["testfullname"], m["testint"]).Exec(); err != nil {
  445. t.Fatal("insert:", err)
  446. }
  447. var testText string
  448. var testFullName FullName
  449. ret := map[string]interface{}{
  450. "testtext": &testText,
  451. "testfullname": &testFullName,
  452. // testint is not set here.
  453. }
  454. iter := session.Query(`SELECT * FROM scan_map_ref_table`).Iter()
  455. if ok := iter.MapScan(ret); !ok {
  456. t.Fatal("select:", iter.Close())
  457. } else {
  458. if ret["testtext"] != "testtext" {
  459. t.Fatal("returned testtext did not match")
  460. }
  461. f := ret["testfullname"].(FullName)
  462. if f.FirstName != "John" || f.LastName != "Doe" {
  463. t.Fatal("returned testfullname did not match")
  464. }
  465. if ret["testint"] != 100 {
  466. t.Fatal("returned testinit did not match")
  467. }
  468. }
  469. }
  470. func TestSliceMap(t *testing.T) {
  471. session := createSession(t)
  472. defer session.Close()
  473. if err := createTable(session, `CREATE TABLE slice_map_table (
  474. testuuid timeuuid PRIMARY KEY,
  475. testtimestamp timestamp,
  476. testvarchar varchar,
  477. testbigint bigint,
  478. testblob blob,
  479. testbool boolean,
  480. testfloat float,
  481. testdouble double,
  482. testint int,
  483. testdecimal decimal,
  484. testlist list<text>,
  485. testset set<int>,
  486. testmap map<varchar, varchar>,
  487. testvarint varint,
  488. testinet inet
  489. )`); err != nil {
  490. t.Fatal("create table:", err)
  491. }
  492. m := make(map[string]interface{})
  493. bigInt := new(big.Int)
  494. if _, ok := bigInt.SetString("830169365738487321165427203929228", 10); !ok {
  495. t.Fatal("Failed setting bigint by string")
  496. }
  497. m["testuuid"] = TimeUUID()
  498. m["testvarchar"] = "Test VarChar"
  499. m["testbigint"] = time.Now().Unix()
  500. m["testtimestamp"] = time.Now().Truncate(time.Millisecond).UTC()
  501. m["testblob"] = []byte("test blob")
  502. m["testbool"] = true
  503. m["testfloat"] = float32(4.564)
  504. m["testdouble"] = float64(4.815162342)
  505. m["testint"] = 2343
  506. m["testdecimal"] = inf.NewDec(100, 0)
  507. m["testlist"] = []string{"quux", "foo", "bar", "baz", "quux"}
  508. m["testset"] = []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
  509. m["testmap"] = map[string]string{"field1": "val1", "field2": "val2", "field3": "val3"}
  510. m["testvarint"] = bigInt
  511. m["testinet"] = "213.212.2.19"
  512. sliceMap := []map[string]interface{}{m}
  513. if err := session.Query(`INSERT INTO slice_map_table (testuuid, testtimestamp, testvarchar, testbigint, testblob, testbool, testfloat, testdouble, testint, testdecimal, testlist, testset, testmap, testvarint, testinet) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
  514. m["testuuid"], m["testtimestamp"], m["testvarchar"], m["testbigint"], m["testblob"], m["testbool"], m["testfloat"], m["testdouble"], m["testint"], m["testdecimal"], m["testlist"], m["testset"], m["testmap"], m["testvarint"], m["testinet"]).Exec(); err != nil {
  515. t.Fatal("insert:", err)
  516. }
  517. if returned, retErr := session.Query(`SELECT * FROM slice_map_table`).Iter().SliceMap(); retErr != nil {
  518. t.Fatal("select:", retErr)
  519. } else {
  520. if sliceMap[0]["testuuid"] != returned[0]["testuuid"] {
  521. t.Fatal("returned testuuid did not match")
  522. }
  523. if sliceMap[0]["testtimestamp"] != returned[0]["testtimestamp"] {
  524. t.Fatalf("returned testtimestamp did not match: %v %v", sliceMap[0]["testtimestamp"], returned[0]["testtimestamp"])
  525. }
  526. if sliceMap[0]["testvarchar"] != returned[0]["testvarchar"] {
  527. t.Fatal("returned testvarchar did not match")
  528. }
  529. if sliceMap[0]["testbigint"] != returned[0]["testbigint"] {
  530. t.Fatal("returned testbigint did not match")
  531. }
  532. if !reflect.DeepEqual(sliceMap[0]["testblob"], returned[0]["testblob"]) {
  533. t.Fatal("returned testblob did not match")
  534. }
  535. if sliceMap[0]["testbool"] != returned[0]["testbool"] {
  536. t.Fatal("returned testbool did not match")
  537. }
  538. if sliceMap[0]["testfloat"] != returned[0]["testfloat"] {
  539. t.Fatal("returned testfloat did not match")
  540. }
  541. if sliceMap[0]["testdouble"] != returned[0]["testdouble"] {
  542. t.Fatal("returned testdouble did not match")
  543. }
  544. if sliceMap[0]["testint"] != returned[0]["testint"] {
  545. t.Fatal("returned testint did not match")
  546. }
  547. expectedDecimal := sliceMap[0]["testdecimal"].(*inf.Dec)
  548. returnedDecimal := returned[0]["testdecimal"].(*inf.Dec)
  549. if expectedDecimal.Cmp(returnedDecimal) != 0 {
  550. t.Fatal("returned testdecimal did not match")
  551. }
  552. if !reflect.DeepEqual(sliceMap[0]["testlist"], returned[0]["testlist"]) {
  553. t.Fatal("returned testlist did not match")
  554. }
  555. if !reflect.DeepEqual(sliceMap[0]["testset"], returned[0]["testset"]) {
  556. t.Fatal("returned testset did not match")
  557. }
  558. if !reflect.DeepEqual(sliceMap[0]["testmap"], returned[0]["testmap"]) {
  559. t.Fatal("returned testmap did not match")
  560. }
  561. expectedBigInt := sliceMap[0]["testvarint"].(*big.Int)
  562. returnedBigInt := returned[0]["testvarint"].(*big.Int)
  563. if expectedBigInt.Cmp(returnedBigInt) != 0 {
  564. t.Fatal("returned testvarint did not match")
  565. }
  566. if sliceMap[0]["testinet"] != returned[0]["testinet"] {
  567. t.Fatal("returned testinet did not match")
  568. }
  569. }
  570. // Test for MapScan()
  571. testMap := make(map[string]interface{})
  572. if !session.Query(`SELECT * FROM slice_map_table`).Iter().MapScan(testMap) {
  573. t.Fatal("MapScan failed to work with one row")
  574. }
  575. if sliceMap[0]["testuuid"] != testMap["testuuid"] {
  576. t.Fatal("returned testuuid did not match")
  577. }
  578. if sliceMap[0]["testtimestamp"] != testMap["testtimestamp"] {
  579. t.Fatal("returned testtimestamp did not match")
  580. }
  581. if sliceMap[0]["testvarchar"] != testMap["testvarchar"] {
  582. t.Fatal("returned testvarchar did not match")
  583. }
  584. if sliceMap[0]["testbigint"] != testMap["testbigint"] {
  585. t.Fatal("returned testbigint did not match")
  586. }
  587. if !reflect.DeepEqual(sliceMap[0]["testblob"], testMap["testblob"]) {
  588. t.Fatal("returned testblob did not match")
  589. }
  590. if sliceMap[0]["testbool"] != testMap["testbool"] {
  591. t.Fatal("returned testbool did not match")
  592. }
  593. if sliceMap[0]["testfloat"] != testMap["testfloat"] {
  594. t.Fatal("returned testfloat did not match")
  595. }
  596. if sliceMap[0]["testdouble"] != testMap["testdouble"] {
  597. t.Fatal("returned testdouble did not match")
  598. }
  599. if sliceMap[0]["testinet"] != testMap["testinet"] {
  600. t.Fatal("returned testinet did not match")
  601. }
  602. expectedDecimal := sliceMap[0]["testdecimal"].(*inf.Dec)
  603. returnedDecimal := testMap["testdecimal"].(*inf.Dec)
  604. if expectedDecimal.Cmp(returnedDecimal) != 0 {
  605. t.Fatal("returned testdecimal did not match")
  606. }
  607. if !reflect.DeepEqual(sliceMap[0]["testlist"], testMap["testlist"]) {
  608. t.Fatal("returned testlist did not match")
  609. }
  610. if !reflect.DeepEqual(sliceMap[0]["testset"], testMap["testset"]) {
  611. t.Fatal("returned testset did not match")
  612. }
  613. if !reflect.DeepEqual(sliceMap[0]["testmap"], testMap["testmap"]) {
  614. t.Fatal("returned testmap did not match")
  615. }
  616. if sliceMap[0]["testint"] != testMap["testint"] {
  617. t.Fatal("returned testint did not match")
  618. }
  619. }
  620. func TestScanWithNilArguments(t *testing.T) {
  621. session := createSession(t)
  622. defer session.Close()
  623. if err := createTable(session, `CREATE TABLE scan_with_nil_arguments (
  624. foo varchar,
  625. bar int,
  626. PRIMARY KEY (foo, bar)
  627. )`); err != nil {
  628. t.Fatal("create:", err)
  629. }
  630. for i := 1; i <= 20; i++ {
  631. if err := session.Query("INSERT INTO scan_with_nil_arguments (foo, bar) VALUES (?, ?)",
  632. "squares", i*i).Exec(); err != nil {
  633. t.Fatal("insert:", err)
  634. }
  635. }
  636. iter := session.Query("SELECT * FROM scan_with_nil_arguments WHERE foo = ?", "squares").Iter()
  637. var n int
  638. count := 0
  639. for iter.Scan(nil, &n) {
  640. count += n
  641. }
  642. if err := iter.Close(); err != nil {
  643. t.Fatal("close:", err)
  644. }
  645. if count != 2870 {
  646. t.Fatalf("expected %d, got %d", 2870, count)
  647. }
  648. }
  649. func TestScanCASWithNilArguments(t *testing.T) {
  650. if *flagProto == 1 {
  651. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  652. }
  653. session := createSession(t)
  654. defer session.Close()
  655. if err := createTable(session, `CREATE TABLE scan_cas_with_nil_arguments (
  656. foo varchar,
  657. bar varchar,
  658. PRIMARY KEY (foo, bar)
  659. )`); err != nil {
  660. t.Fatal("create:", err)
  661. }
  662. foo := "baz"
  663. var cas string
  664. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  665. VALUES (?, ?) IF NOT EXISTS`,
  666. foo, foo).ScanCAS(nil, nil); err != nil {
  667. t.Fatal("insert:", err)
  668. } else if !applied {
  669. t.Fatal("insert should have been applied")
  670. }
  671. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  672. VALUES (?, ?) IF NOT EXISTS`,
  673. foo, foo).ScanCAS(&cas, nil); err != nil {
  674. t.Fatal("insert:", err)
  675. } else if applied {
  676. t.Fatal("insert should not have been applied")
  677. } else if foo != cas {
  678. t.Fatalf("expected %v but got %v", foo, cas)
  679. }
  680. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  681. VALUES (?, ?) IF NOT EXISTS`,
  682. foo, foo).ScanCAS(nil, &cas); err != nil {
  683. t.Fatal("insert:", err)
  684. } else if applied {
  685. t.Fatal("insert should not have been applied")
  686. } else if foo != cas {
  687. t.Fatalf("expected %v but got %v", foo, cas)
  688. }
  689. }
  690. func TestRebindQueryInfo(t *testing.T) {
  691. session := createSession(t)
  692. defer session.Close()
  693. if err := createTable(session, "CREATE TABLE rebind_query (id int, value text, PRIMARY KEY (id))"); err != nil {
  694. t.Fatalf("failed to create table with error '%v'", err)
  695. }
  696. if err := session.Query("INSERT INTO rebind_query (id, value) VALUES (?, ?)", 23, "quux").Exec(); err != nil {
  697. t.Fatalf("insert into rebind_query failed, err '%v'", err)
  698. }
  699. if err := session.Query("INSERT INTO rebind_query (id, value) VALUES (?, ?)", 24, "w00t").Exec(); err != nil {
  700. t.Fatalf("insert into rebind_query failed, err '%v'", err)
  701. }
  702. q := session.Query("SELECT value FROM rebind_query WHERE ID = ?")
  703. q.Bind(23)
  704. iter := q.Iter()
  705. var value string
  706. for iter.Scan(&value) {
  707. }
  708. if value != "quux" {
  709. t.Fatalf("expected %v but got %v", "quux", value)
  710. }
  711. q.Bind(24)
  712. iter = q.Iter()
  713. for iter.Scan(&value) {
  714. }
  715. if value != "w00t" {
  716. t.Fatalf("expected %v but got %v", "quux", value)
  717. }
  718. }
  719. //TestStaticQueryInfo makes sure that the application can manually bind query parameters using the simplest possible static binding strategy
  720. func TestStaticQueryInfo(t *testing.T) {
  721. session := createSession(t)
  722. defer session.Close()
  723. if err := createTable(session, "CREATE TABLE static_query_info (id int, value text, PRIMARY KEY (id))"); err != nil {
  724. t.Fatalf("failed to create table with error '%v'", err)
  725. }
  726. if err := session.Query("INSERT INTO static_query_info (id, value) VALUES (?, ?)", 113, "foo").Exec(); err != nil {
  727. t.Fatalf("insert into static_query_info failed, err '%v'", err)
  728. }
  729. autobinder := func(q *QueryInfo) ([]interface{}, error) {
  730. values := make([]interface{}, 1)
  731. values[0] = 113
  732. return values, nil
  733. }
  734. qry := session.Bind("SELECT id, value FROM static_query_info WHERE id = ?", autobinder)
  735. if err := qry.Exec(); err != nil {
  736. t.Fatalf("expose query info failed, error '%v'", err)
  737. }
  738. iter := qry.Iter()
  739. var id int
  740. var value string
  741. iter.Scan(&id, &value)
  742. if err := iter.Close(); err != nil {
  743. t.Fatalf("query with exposed info failed, err '%v'", err)
  744. }
  745. if value != "foo" {
  746. t.Fatalf("Expected value %s, but got %s", "foo", value)
  747. }
  748. }
  749. type ClusteredKeyValue struct {
  750. Id int
  751. Cluster int
  752. Value string
  753. }
  754. func (kv *ClusteredKeyValue) Bind(q *QueryInfo) ([]interface{}, error) {
  755. values := make([]interface{}, len(q.Args))
  756. for i, info := range q.Args {
  757. fieldName := upcaseInitial(info.Name)
  758. value := reflect.ValueOf(kv)
  759. field := reflect.Indirect(value).FieldByName(fieldName)
  760. values[i] = field.Addr().Interface()
  761. }
  762. return values, nil
  763. }
  764. func upcaseInitial(str string) string {
  765. for i, v := range str {
  766. return string(unicode.ToUpper(v)) + str[i+1:]
  767. }
  768. return ""
  769. }
  770. //TestBoundQueryInfo makes sure that the application can manually bind query parameters using the query meta data supplied at runtime
  771. func TestBoundQueryInfo(t *testing.T) {
  772. session := createSession(t)
  773. defer session.Close()
  774. if err := createTable(session, "CREATE TABLE clustered_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
  775. t.Fatalf("failed to create table with error '%v'", err)
  776. }
  777. write := &ClusteredKeyValue{Id: 200, Cluster: 300, Value: "baz"}
  778. insert := session.Bind("INSERT INTO clustered_query_info (id, cluster, value) VALUES (?, ?,?)", write.Bind)
  779. if err := insert.Exec(); err != nil {
  780. t.Fatalf("insert into clustered_query_info failed, err '%v'", err)
  781. }
  782. read := &ClusteredKeyValue{Id: 200, Cluster: 300}
  783. qry := session.Bind("SELECT id, cluster, value FROM clustered_query_info WHERE id = ? and cluster = ?", read.Bind)
  784. iter := qry.Iter()
  785. var id, cluster int
  786. var value string
  787. iter.Scan(&id, &cluster, &value)
  788. if err := iter.Close(); err != nil {
  789. t.Fatalf("query with clustered_query_info info failed, err '%v'", err)
  790. }
  791. if value != "baz" {
  792. t.Fatalf("Expected value %s, but got %s", "baz", value)
  793. }
  794. }
  795. //TestBatchQueryInfo makes sure that the application can manually bind query parameters when executing in a batch
  796. func TestBatchQueryInfo(t *testing.T) {
  797. if *flagProto == 1 {
  798. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  799. }
  800. session := createSession(t)
  801. defer session.Close()
  802. if err := createTable(session, "CREATE TABLE batch_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
  803. t.Fatalf("failed to create table with error '%v'", err)
  804. }
  805. write := func(q *QueryInfo) ([]interface{}, error) {
  806. values := make([]interface{}, 3)
  807. values[0] = 4000
  808. values[1] = 5000
  809. values[2] = "bar"
  810. return values, nil
  811. }
  812. batch := session.NewBatch(LoggedBatch)
  813. batch.Bind("INSERT INTO batch_query_info (id, cluster, value) VALUES (?, ?,?)", write)
  814. if err := session.ExecuteBatch(batch); err != nil {
  815. t.Fatalf("batch insert into batch_query_info failed, err '%v'", err)
  816. }
  817. read := func(q *QueryInfo) ([]interface{}, error) {
  818. values := make([]interface{}, 2)
  819. values[0] = 4000
  820. values[1] = 5000
  821. return values, nil
  822. }
  823. qry := session.Bind("SELECT id, cluster, value FROM batch_query_info WHERE id = ? and cluster = ?", read)
  824. iter := qry.Iter()
  825. var id, cluster int
  826. var value string
  827. iter.Scan(&id, &cluster, &value)
  828. if err := iter.Close(); err != nil {
  829. t.Fatalf("query with batch_query_info info failed, err '%v'", err)
  830. }
  831. if value != "bar" {
  832. t.Fatalf("Expected value %s, but got %s", "bar", value)
  833. }
  834. }
  835. func injectInvalidPreparedStatement(t *testing.T, session *Session, table string) (string, *Conn) {
  836. if err := createTable(session, `CREATE TABLE `+table+` (
  837. foo varchar,
  838. bar int,
  839. PRIMARY KEY (foo, bar)
  840. )`); err != nil {
  841. t.Fatal("create:", err)
  842. }
  843. stmt := "INSERT INTO " + table + " (foo, bar) VALUES (?, 7)"
  844. conn := session.Pool.Pick(nil)
  845. flight := new(inflightPrepare)
  846. stmtsLRU.mu.Lock()
  847. stmtsLRU.lru.Add(conn.addr+stmt, flight)
  848. stmtsLRU.mu.Unlock()
  849. flight.info = &QueryInfo{
  850. Id: []byte{'f', 'o', 'o', 'b', 'a', 'r'},
  851. Args: []ColumnInfo{ColumnInfo{
  852. Keyspace: "gocql_test",
  853. Table: table,
  854. Name: "foo",
  855. TypeInfo: &TypeInfo{
  856. Type: TypeVarchar,
  857. },
  858. }},
  859. }
  860. return stmt, conn
  861. }
  862. func TestMissingSchemaPrepare(t *testing.T) {
  863. s := createSession(t)
  864. conn := s.Pool.Pick(nil)
  865. defer s.Close()
  866. insertQry := &Query{stmt: "INSERT INTO invalidschemaprep (val) VALUES (?)", values: []interface{}{5}, cons: s.cons,
  867. session: s, pageSize: s.pageSize, trace: s.trace,
  868. prefetch: s.prefetch, rt: s.cfg.RetryPolicy}
  869. if err := conn.executeQuery(insertQry).err; err == nil {
  870. t.Fatal("expected error, but got nil.")
  871. }
  872. if err := createTable(s, "CREATE TABLE invalidschemaprep (val int, PRIMARY KEY (val))"); err != nil {
  873. t.Fatal("create table:", err)
  874. }
  875. if err := conn.executeQuery(insertQry).err; err != nil {
  876. t.Fatal(err) // unconfigured columnfamily
  877. }
  878. }
  879. func TestReprepareStatement(t *testing.T) {
  880. session := createSession(t)
  881. defer session.Close()
  882. stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement")
  883. query := session.Query(stmt, "bar")
  884. if err := conn.executeQuery(query).Close(); err != nil {
  885. t.Fatalf("Failed to execute query for reprepare statement: %v", err)
  886. }
  887. }
  888. func TestReprepareBatch(t *testing.T) {
  889. if *flagProto == 1 {
  890. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  891. }
  892. session := createSession(t)
  893. defer session.Close()
  894. stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement_batch")
  895. batch := session.NewBatch(UnloggedBatch)
  896. batch.Query(stmt, "bar")
  897. if err := conn.executeBatch(batch); err != nil {
  898. t.Fatalf("Failed to execute query for reprepare statement: %v", err)
  899. }
  900. }
  901. func TestQueryInfo(t *testing.T) {
  902. session := createSession(t)
  903. defer session.Close()
  904. conn := session.Pool.Pick(nil)
  905. info, err := conn.prepareStatement("SELECT release_version, host_id FROM system.local WHERE key = ?", nil)
  906. if err != nil {
  907. t.Fatalf("Failed to execute query for preparing statement: %v", err)
  908. }
  909. if len(info.Args) != 1 {
  910. t.Fatalf("Was not expecting meta data for %d query arguments, but got %d\n", 1, len(info.Args))
  911. }
  912. if *flagProto > 1 {
  913. if len(info.Rval) != 2 {
  914. t.Fatalf("Was not expecting meta data for %d result columns, but got %d\n", 2, len(info.Rval))
  915. }
  916. }
  917. }
  918. //TestPreparedCacheEviction will make sure that the cache size is maintained
  919. func TestPreparedCacheEviction(t *testing.T) {
  920. session := createSession(t)
  921. defer session.Close()
  922. stmtsLRU.mu.Lock()
  923. stmtsLRU.Max(4)
  924. stmtsLRU.mu.Unlock()
  925. if err := createTable(session, "CREATE TABLE prepcachetest (id int,mod int,PRIMARY KEY (id))"); err != nil {
  926. t.Fatalf("failed to create table with error '%v'", err)
  927. }
  928. //Fill the table
  929. for i := 0; i < 2; i++ {
  930. if err := session.Query("INSERT INTO prepcachetest (id,mod) VALUES (?, ?)", i, 10000%(i+1)).Exec(); err != nil {
  931. t.Fatalf("insert into prepcachetest failed, err '%v'", err)
  932. }
  933. }
  934. //Populate the prepared statement cache with select statements
  935. var id, mod int
  936. for i := 0; i < 2; i++ {
  937. err := session.Query("SELECT id,mod FROM prepcachetest WHERE id = "+strconv.FormatInt(int64(i), 10)).Scan(&id, &mod)
  938. if err != nil {
  939. t.Fatalf("select from prepcachetest failed, error '%v'", err)
  940. }
  941. }
  942. //generate an update statement to test they are prepared
  943. err := session.Query("UPDATE prepcachetest SET mod = ? WHERE id = ?", 1, 11).Exec()
  944. if err != nil {
  945. t.Fatalf("update prepcachetest failed, error '%v'", err)
  946. }
  947. //generate a delete statement to test they are prepared
  948. err = session.Query("DELETE FROM prepcachetest WHERE id = ?", 1).Exec()
  949. if err != nil {
  950. t.Fatalf("delete from prepcachetest failed, error '%v'", err)
  951. }
  952. //generate an insert statement to test they are prepared
  953. err = session.Query("INSERT INTO prepcachetest (id,mod) VALUES (?, ?)", 3, 11).Exec()
  954. if err != nil {
  955. t.Fatalf("insert into prepcachetest failed, error '%v'", err)
  956. }
  957. //Make sure the cache size is maintained
  958. if stmtsLRU.lru.Len() != stmtsLRU.lru.MaxEntries {
  959. t.Fatalf("expected cache size of %v, got %v", stmtsLRU.lru.MaxEntries, stmtsLRU.lru.Len())
  960. }
  961. //Walk through all the configured hosts and test cache retention and eviction
  962. var selFound, insFound, updFound, delFound, selEvict bool
  963. for i := range session.cfg.Hosts {
  964. _, ok := stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042gocql_testSELECT id,mod FROM prepcachetest WHERE id = 1")
  965. selFound = selFound || ok
  966. _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042gocql_testINSERT INTO prepcachetest (id,mod) VALUES (?, ?)")
  967. insFound = insFound || ok
  968. _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042gocql_testUPDATE prepcachetest SET mod = ? WHERE id = ?")
  969. updFound = updFound || ok
  970. _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042gocql_testDELETE FROM prepcachetest WHERE id = ?")
  971. delFound = delFound || ok
  972. _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042gocql_testSELECT id,mod FROM prepcachetest WHERE id = 0")
  973. selEvict = selEvict || !ok
  974. }
  975. if !selEvict {
  976. t.Fatalf("expected first select statement to be purged, but statement was found in the cache.")
  977. }
  978. if !selFound {
  979. t.Fatalf("expected second select statement to be cached, but statement was purged or not prepared.")
  980. }
  981. if !insFound {
  982. t.Fatalf("expected insert statement to be cached, but statement was purged or not prepared.")
  983. }
  984. if !updFound {
  985. t.Fatalf("expected update statement to be cached, but statement was purged or not prepared.")
  986. }
  987. if !delFound {
  988. t.Error("expected delete statement to be cached, but statement was purged or not prepared.")
  989. }
  990. }
  991. func TestPreparedCacheKey(t *testing.T) {
  992. session := createSession(t)
  993. defer session.Close()
  994. // create a second keyspace
  995. cluster2 := createCluster()
  996. createKeyspace(t, cluster2, "gocql_test2")
  997. cluster2.Keyspace = "gocql_test2"
  998. session2, err := cluster2.CreateSession()
  999. if err != nil {
  1000. t.Fatal("create session:", err)
  1001. }
  1002. defer session2.Close()
  1003. // both keyspaces have a table named "test_stmt_cache_key"
  1004. if err := createTable(session, "CREATE TABLE test_stmt_cache_key (id varchar primary key, field varchar)"); err != nil {
  1005. t.Fatal("create table:", err)
  1006. }
  1007. if err := createTable(session2, "CREATE TABLE test_stmt_cache_key (id varchar primary key, field varchar)"); err != nil {
  1008. t.Fatal("create table:", err)
  1009. }
  1010. // both tables have a single row with the same partition key but different column value
  1011. if err = session.Query(`INSERT INTO test_stmt_cache_key (id, field) VALUES (?, ?)`, "key", "one").Exec(); err != nil {
  1012. t.Fatal("insert:", err)
  1013. }
  1014. if err = session2.Query(`INSERT INTO test_stmt_cache_key (id, field) VALUES (?, ?)`, "key", "two").Exec(); err != nil {
  1015. t.Fatal("insert:", err)
  1016. }
  1017. // should be able to see different values in each keyspace
  1018. var value string
  1019. if err = session.Query("SELECT field FROM test_stmt_cache_key WHERE id = ?", "key").Scan(&value); err != nil {
  1020. t.Fatal("select:", err)
  1021. }
  1022. if value != "one" {
  1023. t.Errorf("Expected one, got %s", value)
  1024. }
  1025. if err = session2.Query("SELECT field FROM test_stmt_cache_key WHERE id = ?", "key").Scan(&value); err != nil {
  1026. t.Fatal("select:", err)
  1027. }
  1028. if value != "two" {
  1029. t.Errorf("Expected two, got %s", value)
  1030. }
  1031. }
  1032. //TestMarshalFloat64Ptr tests to see that a pointer to a float64 is marshalled correctly.
  1033. func TestMarshalFloat64Ptr(t *testing.T) {
  1034. session := createSession(t)
  1035. defer session.Close()
  1036. if err := createTable(session, "CREATE TABLE float_test (id double, test double, primary key (id))"); err != nil {
  1037. t.Fatal("create table:", err)
  1038. }
  1039. testNum := float64(7500)
  1040. if err := session.Query(`INSERT INTO float_test (id,test) VALUES (?,?)`, float64(7500.00), &testNum).Exec(); err != nil {
  1041. t.Fatal("insert float64:", err)
  1042. }
  1043. }
  1044. //TestMarshalInet tests to see that a pointer to a float64 is marshalled correctly.
  1045. func TestMarshalInet(t *testing.T) {
  1046. session := createSession(t)
  1047. defer session.Close()
  1048. if err := createTable(session, "CREATE TABLE inet_test (ip inet, name text, primary key (ip))"); err != nil {
  1049. t.Fatal("create table:", err)
  1050. }
  1051. stringIp := "123.34.45.56"
  1052. if err := session.Query(`INSERT INTO inet_test (ip,name) VALUES (?,?)`, stringIp, "Test IP 1").Exec(); err != nil {
  1053. t.Fatal("insert string inet:", err)
  1054. }
  1055. var stringResult string
  1056. if err := session.Query("SELECT ip FROM inet_test").Scan(&stringResult); err != nil {
  1057. t.Fatalf("select for string from inet_test 1 failed: %v", err)
  1058. }
  1059. if stringResult != stringIp {
  1060. t.Errorf("Expected %s, was %s", stringIp, stringResult)
  1061. }
  1062. var ipResult net.IP
  1063. if err := session.Query("SELECT ip FROM inet_test").Scan(&ipResult); err != nil {
  1064. t.Fatalf("select for net.IP from inet_test 1 failed: %v", err)
  1065. }
  1066. if ipResult.String() != stringIp {
  1067. t.Errorf("Expected %s, was %s", stringIp, ipResult.String())
  1068. }
  1069. if err := session.Query(`DELETE FROM inet_test WHERE ip = ?`, stringIp).Exec(); err != nil {
  1070. t.Fatal("delete inet table:", err)
  1071. }
  1072. netIp := net.ParseIP("222.43.54.65")
  1073. if err := session.Query(`INSERT INTO inet_test (ip,name) VALUES (?,?)`, netIp, "Test IP 2").Exec(); err != nil {
  1074. t.Fatal("insert netIp inet:", err)
  1075. }
  1076. if err := session.Query("SELECT ip FROM inet_test").Scan(&stringResult); err != nil {
  1077. t.Fatalf("select for string from inet_test 2 failed: %v", err)
  1078. }
  1079. if stringResult != netIp.String() {
  1080. t.Errorf("Expected %s, was %s", netIp.String(), stringResult)
  1081. }
  1082. if err := session.Query("SELECT ip FROM inet_test").Scan(&ipResult); err != nil {
  1083. t.Fatalf("select for net.IP from inet_test 2 failed: %v", err)
  1084. }
  1085. if ipResult.String() != netIp.String() {
  1086. t.Errorf("Expected %s, was %s", netIp.String(), ipResult.String())
  1087. }
  1088. }
  1089. func TestVarint(t *testing.T) {
  1090. session := createSession(t)
  1091. defer session.Close()
  1092. if err := createTable(session, "CREATE TABLE varint_test (id varchar, test varint, test2 varint, primary key (id))"); err != nil {
  1093. t.Fatalf("failed to create table with error '%v'", err)
  1094. }
  1095. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", 0).Exec(); err != nil {
  1096. t.Fatalf("insert varint: %v", err)
  1097. }
  1098. var result int
  1099. if err := session.Query("SELECT test FROM varint_test").Scan(&result); err != nil {
  1100. t.Fatalf("select from varint_test failed: %v", err)
  1101. }
  1102. if result != 0 {
  1103. t.Errorf("Expected 0, was %d", result)
  1104. }
  1105. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", -1).Exec(); err != nil {
  1106. t.Fatalf("insert varint: %v", err)
  1107. }
  1108. if err := session.Query("SELECT test FROM varint_test").Scan(&result); err != nil {
  1109. t.Fatalf("select from varint_test failed: %v", err)
  1110. }
  1111. if result != -1 {
  1112. t.Errorf("Expected -1, was %d", result)
  1113. }
  1114. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", int64(math.MaxInt32)+1).Exec(); err != nil {
  1115. t.Fatalf("insert varint: %v", err)
  1116. }
  1117. var result64 int64
  1118. if err := session.Query("SELECT test FROM varint_test").Scan(&result64); err != nil {
  1119. t.Fatalf("select from varint_test failed: %v", err)
  1120. }
  1121. if result64 != int64(math.MaxInt32)+1 {
  1122. t.Errorf("Expected %d, was %d", int64(math.MaxInt32)+1, result64)
  1123. }
  1124. biggie := new(big.Int)
  1125. biggie.SetString("36893488147419103232", 10) // > 2**64
  1126. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", biggie).Exec(); err != nil {
  1127. t.Fatalf("insert varint: %v", err)
  1128. }
  1129. resultBig := new(big.Int)
  1130. if err := session.Query("SELECT test FROM varint_test").Scan(resultBig); err != nil {
  1131. t.Fatalf("select from varint_test failed: %v", err)
  1132. }
  1133. if resultBig.String() != biggie.String() {
  1134. t.Errorf("Expected %s, was %s", biggie.String(), resultBig.String())
  1135. }
  1136. err := session.Query("SELECT test FROM varint_test").Scan(&result64)
  1137. if err == nil || strings.Index(err.Error(), "out of range") == -1 {
  1138. t.Errorf("expected out of range error since value is too big for int64")
  1139. }
  1140. // value not set in cassandra, leave bind variable empty
  1141. resultBig = new(big.Int)
  1142. if err := session.Query("SELECT test2 FROM varint_test").Scan(resultBig); err != nil {
  1143. t.Fatalf("select from varint_test failed: %v", err)
  1144. }
  1145. if resultBig.Int64() != 0 {
  1146. t.Errorf("Expected %s, was %s", biggie.String(), resultBig.String())
  1147. }
  1148. // can use double pointer to explicitly detect value is not set in cassandra
  1149. if err := session.Query("SELECT test2 FROM varint_test").Scan(&resultBig); err != nil {
  1150. t.Fatalf("select from varint_test failed: %v", err)
  1151. }
  1152. if resultBig != nil {
  1153. t.Errorf("Expected %v, was %v", nil, *resultBig)
  1154. }
  1155. }
  1156. //TestQueryStats confirms that the stats are returning valid data. Accuracy may be questionable.
  1157. func TestQueryStats(t *testing.T) {
  1158. session := createSession(t)
  1159. defer session.Close()
  1160. qry := session.Query("SELECT * FROM system.peers")
  1161. if err := qry.Exec(); err != nil {
  1162. t.Fatalf("query failed. %v", err)
  1163. } else {
  1164. if qry.Attempts() < 1 {
  1165. t.Fatal("expected at least 1 attempt, but got 0")
  1166. }
  1167. if qry.Latency() <= 0 {
  1168. t.Fatalf("expected latency to be greater than 0, but got %v instead.", qry.Latency())
  1169. }
  1170. }
  1171. }
  1172. //TestBatchStats confirms that the stats are returning valid data. Accuracy may be questionable.
  1173. func TestBatchStats(t *testing.T) {
  1174. if *flagProto == 1 {
  1175. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  1176. }
  1177. session := createSession(t)
  1178. defer session.Close()
  1179. if err := createTable(session, "CREATE TABLE batchStats (id int, PRIMARY KEY (id))"); err != nil {
  1180. t.Fatalf("failed to create table with error '%v'", err)
  1181. }
  1182. b := session.NewBatch(LoggedBatch)
  1183. b.Query("INSERT INTO batchStats (id) VALUES (?)", 1)
  1184. b.Query("INSERT INTO batchStats (id) VALUES (?)", 2)
  1185. if err := session.ExecuteBatch(b); err != nil {
  1186. t.Fatalf("query failed. %v", err)
  1187. } else {
  1188. if b.Attempts() < 1 {
  1189. t.Fatal("expected at least 1 attempt, but got 0")
  1190. }
  1191. if b.Latency() <= 0 {
  1192. t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
  1193. }
  1194. }
  1195. }
  1196. //TestNilInQuery tests to see that a nil value passed to a query is handled by Cassandra
  1197. //TODO validate the nil value by reading back the nil. Need to fix Unmarshalling.
  1198. func TestNilInQuery(t *testing.T) {
  1199. session := createSession(t)
  1200. defer session.Close()
  1201. if err := createTable(session, "CREATE TABLE testNilInsert (id int, count int, PRIMARY KEY (id))"); err != nil {
  1202. t.Fatalf("failed to create table with error '%v'", err)
  1203. }
  1204. if err := session.Query("INSERT INTO testNilInsert (id,count) VALUES (?,?)", 1, nil).Exec(); err != nil {
  1205. t.Fatalf("failed to insert with err: %v", err)
  1206. }
  1207. var id int
  1208. if err := session.Query("SELECT id FROM testNilInsert").Scan(&id); err != nil {
  1209. t.Fatalf("failed to select with err: %v", err)
  1210. } else if id != 1 {
  1211. t.Fatalf("expected id to be 1, got %v", id)
  1212. }
  1213. }
  1214. // Don't initialize time.Time bind variable if cassandra timestamp column is empty
  1215. func TestEmptyTimestamp(t *testing.T) {
  1216. session := createSession(t)
  1217. defer session.Close()
  1218. if err := createTable(session, "CREATE TABLE test_empty_timestamp (id int, time timestamp, num int, PRIMARY KEY (id))"); err != nil {
  1219. t.Fatalf("failed to create table with error '%v'", err)
  1220. }
  1221. if err := session.Query("INSERT INTO test_empty_timestamp (id, num) VALUES (?,?)", 1, 561).Exec(); err != nil {
  1222. t.Fatalf("failed to insert with err: %v", err)
  1223. }
  1224. var timeVal time.Time
  1225. if err := session.Query("SELECT time FROM test_empty_timestamp where id = ?", 1).Scan(&timeVal); err != nil {
  1226. t.Fatalf("failed to select with err: %v", err)
  1227. }
  1228. if !timeVal.IsZero() {
  1229. t.Errorf("time.Time bind variable should still be empty (was %s)", timeVal)
  1230. }
  1231. }