cassandra_test.go 72 KB


  1. // +build all integration
  2. package gocql
  3. import (
  4. "bytes"
  5. "flag"
  6. "fmt"
  7. "log"
  8. "math"
  9. "math/big"
  10. "net"
  11. "reflect"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "testing"
  16. "time"
  17. "unicode"
  18. "gopkg.in/inf.v0"
  19. )
  20. var (
  21. flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
  22. flagProto = flag.Int("proto", 2, "protcol version")
  23. flagCQL = flag.String("cql", "3.0.0", "CQL version")
  24. flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
  25. clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
  26. flagRetry = flag.Int("retries", 5, "number of times to retry queries")
  27. flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
  28. flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test")
  29. flagRunAuthTest = flag.Bool("runauth", false, "Set to true to run authentication test")
  30. flagCompressTest = flag.String("compressor", "", "compressor to use")
  31. flagTimeout = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations")
  32. clusterHosts []string
  33. )
  34. func init() {
  35. flag.Parse()
  36. clusterHosts = strings.Split(*flagCluster, ",")
  37. log.SetFlags(log.Lshortfile | log.LstdFlags)
  38. }
  39. func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
  40. if *flagRunSslTest {
  41. cluster.SslOpts = &SslOptions{
  42. CertPath: "testdata/pki/gocql.crt",
  43. KeyPath: "testdata/pki/gocql.key",
  44. CaPath: "testdata/pki/ca.crt",
  45. EnableHostVerification: false,
  46. }
  47. }
  48. return cluster
  49. }
  50. var initOnce sync.Once
  51. func createTable(s *Session, table string) error {
  52. if err := s.control.query(table).Close(); err != nil {
  53. return err
  54. }
  55. return s.control.awaitSchemaAgreement()
  56. }
  57. func createCluster() *ClusterConfig {
  58. cluster := NewCluster(clusterHosts...)
  59. cluster.ProtoVersion = *flagProto
  60. cluster.CQLVersion = *flagCQL
  61. cluster.Timeout = *flagTimeout
  62. cluster.Consistency = Quorum
  63. if *flagRetry > 0 {
  64. cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
  65. }
  66. switch *flagCompressTest {
  67. case "snappy":
  68. cluster.Compressor = &SnappyCompressor{}
  69. case "":
  70. default:
  71. panic("invalid compressor: " + *flagCompressTest)
  72. }
  73. cluster = addSslOptions(cluster)
  74. return cluster
  75. }
  76. func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string) {
  77. c := *cluster
  78. c.Keyspace = "system"
  79. c.Timeout = 20 * time.Second
  80. session, err := c.CreateSession()
  81. if err != nil {
  82. tb.Fatal("createSession:", err)
  83. }
  84. err = session.control.query(`DROP KEYSPACE IF EXISTS ` + keyspace).Close()
  85. if err != nil {
  86. tb.Fatal(err)
  87. }
  88. if err = session.control.awaitSchemaAgreement(); err != nil {
  89. tb.Fatal(err)
  90. }
  91. err = session.control.query(fmt.Sprintf(`CREATE KEYSPACE %s
  92. WITH replication = {
  93. 'class' : 'SimpleStrategy',
  94. 'replication_factor' : %d
  95. }`, keyspace, *flagRF)).Close()
  96. if err != nil {
  97. tb.Fatal(err)
  98. }
  99. // the schema version might be out of data between 2 nodes, so wait for the
  100. // cluster to settle.
  101. // TODO(zariel): use events here to know when the cluster has resolved to the
  102. // new schema version
  103. if err = session.control.awaitSchemaAgreement(); err != nil {
  104. tb.Fatal(err)
  105. }
  106. }
  107. func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
  108. // Drop and re-create the keyspace once. Different tests should use their own
  109. // individual tables, but can assume that the table does not exist before.
  110. initOnce.Do(func() {
  111. createKeyspace(tb, cluster, "gocql_test")
  112. })
  113. cluster.Keyspace = "gocql_test"
  114. session, err := cluster.CreateSession()
  115. if err != nil {
  116. tb.Fatal("createSession:", err)
  117. }
  118. return session
  119. }
  120. func createSession(tb testing.TB) *Session {
  121. cluster := createCluster()
  122. return createSessionFromCluster(cluster, tb)
  123. }
  124. // TestAuthentication verifies that gocql will work with a host configured to only accept authenticated connections
  125. func TestAuthentication(t *testing.T) {
  126. if *flagProto < 2 {
  127. t.Skip("Authentication is not supported with protocol < 2")
  128. }
  129. if !*flagRunAuthTest {
  130. t.Skip("Authentication is not configured in the target cluster")
  131. }
  132. cluster := createCluster()
  133. cluster.Authenticator = PasswordAuthenticator{
  134. Username: "cassandra",
  135. Password: "cassandra",
  136. }
  137. session, err := cluster.CreateSession()
  138. if err != nil {
  139. t.Fatalf("Authentication error: %s", err)
  140. }
  141. session.Close()
  142. }
  143. //TestRingDiscovery makes sure that you can autodiscover other cluster members when you seed a cluster config with just one node
  144. func TestRingDiscovery(t *testing.T) {
  145. cluster := createCluster()
  146. cluster.Hosts = clusterHosts[:1]
  147. cluster.DiscoverHosts = true
  148. session, err := cluster.CreateSession()
  149. if err != nil {
  150. t.Fatalf("got error connecting to the cluster %v", err)
  151. }
  152. if *clusterSize > 1 {
  153. // wait for autodiscovery to update the pool with the list of known hosts
  154. time.Sleep(*flagAutoWait)
  155. }
  156. session.pool.mu.RLock()
  157. size := len(session.pool.hostConnPools)
  158. session.pool.mu.RUnlock()
  159. if *clusterSize != size {
  160. t.Logf("WARN: Expected a cluster size of %d, but actual size was %d", *clusterSize, size)
  161. }
  162. session.Close()
  163. }
  164. func TestEmptyHosts(t *testing.T) {
  165. cluster := createCluster()
  166. cluster.Hosts = nil
  167. if session, err := cluster.CreateSession(); err == nil {
  168. session.Close()
  169. t.Error("expected err, got nil")
  170. }
  171. }
  172. //TestUseStatementError checks to make sure the correct error is returned when the user tries to execute a use statement.
  173. func TestUseStatementError(t *testing.T) {
  174. session := createSession(t)
  175. defer session.Close()
  176. if err := session.Query("USE gocql_test").Exec(); err != nil {
  177. if err != ErrUseStmt {
  178. t.Error("expected ErrUseStmt, got " + err.Error())
  179. }
  180. } else {
  181. t.Error("expected err, got nil.")
  182. }
  183. }
  184. //TestInvalidKeyspace checks that an invalid keyspace will return promptly and without a flood of connections
  185. func TestInvalidKeyspace(t *testing.T) {
  186. cluster := createCluster()
  187. cluster.Keyspace = "invalidKeyspace"
  188. session, err := cluster.CreateSession()
  189. if err != nil {
  190. if err != ErrNoConnectionsStarted {
  191. t.Errorf("Expected ErrNoConnections but got %v", err)
  192. }
  193. } else {
  194. session.Close() //Clean up the session
  195. t.Error("expected err, got nil.")
  196. }
  197. }
  198. func TestTracing(t *testing.T) {
  199. session := createSession(t)
  200. defer session.Close()
  201. if err := createTable(session, `CREATE TABLE gocql_test.trace (id int primary key)`); err != nil {
  202. t.Fatal("create:", err)
  203. }
  204. buf := &bytes.Buffer{}
  205. trace := NewTraceWriter(session, buf)
  206. if err := session.Query(`INSERT INTO trace (id) VALUES (?)`, 42).Trace(trace).Exec(); err != nil {
  207. t.Error("insert:", err)
  208. } else if buf.Len() == 0 {
  209. t.Error("insert: failed to obtain any tracing")
  210. }
  211. buf.Reset()
  212. var value int
  213. if err := session.Query(`SELECT id FROM trace WHERE id = ?`, 42).Trace(trace).Scan(&value); err != nil {
  214. t.Error("select:", err)
  215. } else if value != 42 {
  216. t.Errorf("value: expected %d, got %d", 42, value)
  217. } else if buf.Len() == 0 {
  218. t.Error("select: failed to obtain any tracing")
  219. }
  220. }
  221. func TestPaging(t *testing.T) {
  222. if *flagProto == 1 {
  223. t.Skip("Paging not supported. Please use Cassandra >= 2.0")
  224. }
  225. session := createSession(t)
  226. defer session.Close()
  227. if err := createTable(session, "CREATE TABLE gocql_test.paging (id int primary key)"); err != nil {
  228. t.Fatal("create table:", err)
  229. }
  230. for i := 0; i < 100; i++ {
  231. if err := session.Query("INSERT INTO paging (id) VALUES (?)", i).Exec(); err != nil {
  232. t.Fatal("insert:", err)
  233. }
  234. }
  235. iter := session.Query("SELECT id FROM paging").PageSize(10).Iter()
  236. var id int
  237. count := 0
  238. for iter.Scan(&id) {
  239. count++
  240. }
  241. if err := iter.Close(); err != nil {
  242. t.Fatal("close:", err)
  243. }
  244. if count != 100 {
  245. t.Fatalf("expected %d, got %d", 100, count)
  246. }
  247. }
  248. func TestCAS(t *testing.T) {
  249. if *flagProto == 1 {
  250. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  251. }
  252. session := createSession(t)
  253. defer session.Close()
  254. session.cfg.SerialConsistency = LocalSerial
  255. if err := createTable(session, `CREATE TABLE gocql_test.cas_table (
  256. title varchar,
  257. revid timeuuid,
  258. last_modified timestamp,
  259. PRIMARY KEY (title, revid)
  260. )`); err != nil {
  261. t.Fatal("create:", err)
  262. }
  263. title, revid, modified := "baz", TimeUUID(), time.Now()
  264. var titleCAS string
  265. var revidCAS UUID
  266. var modifiedCAS time.Time
  267. if applied, err := session.Query(`INSERT INTO cas_table (title, revid, last_modified)
  268. VALUES (?, ?, ?) IF NOT EXISTS`,
  269. title, revid, modified).ScanCAS(&titleCAS, &revidCAS, &modifiedCAS); err != nil {
  270. t.Fatal("insert:", err)
  271. } else if !applied {
  272. t.Fatal("insert should have been applied")
  273. }
  274. if applied, err := session.Query(`INSERT INTO cas_table (title, revid, last_modified)
  275. VALUES (?, ?, ?) IF NOT EXISTS`,
  276. title, revid, modified).ScanCAS(&titleCAS, &revidCAS, &modifiedCAS); err != nil {
  277. t.Fatal("insert:", err)
  278. } else if applied {
  279. t.Fatal("insert should not have been applied")
  280. } else if title != titleCAS || revid != revidCAS {
  281. t.Fatalf("expected %s/%v/%v but got %s/%v/%v", title, revid, modified, titleCAS, revidCAS, modifiedCAS)
  282. }
  283. tenSecondsLater := modified.Add(10 * time.Second)
  284. if applied, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
  285. title, revid, tenSecondsLater).ScanCAS(&modifiedCAS); err != nil {
  286. t.Fatal("delete:", err)
  287. } else if applied {
  288. t.Fatal("delete should have not been applied")
  289. }
  290. if modifiedCAS.Unix() != tenSecondsLater.Add(-10*time.Second).Unix() {
  291. t.Fatalf("Was expecting modified CAS to be %v; but was one second later", modifiedCAS.UTC())
  292. }
  293. if _, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
  294. title, revid, tenSecondsLater).ScanCAS(); err.Error() != "count mismatch" {
  295. t.Fatalf("delete: was expecting count mismatch error but got %s", err)
  296. }
  297. if applied, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
  298. title, revid, modified).ScanCAS(&modifiedCAS); err != nil {
  299. t.Fatal("delete:", err)
  300. } else if !applied {
  301. t.Fatal("delete should have been applied")
  302. }
  303. if err := session.Query(`TRUNCATE cas_table`).Exec(); err != nil {
  304. t.Fatal("truncate:", err)
  305. }
  306. successBatch := session.NewBatch(LoggedBatch)
  307. successBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES (?, ?, ?) IF NOT EXISTS", title, revid, modified)
  308. if applied, _, err := session.ExecuteBatchCAS(successBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
  309. t.Fatal("insert:", err)
  310. } else if !applied {
  311. t.Fatal("insert should have been applied")
  312. }
  313. successBatch = session.NewBatch(LoggedBatch)
  314. successBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES (?, ?, ?) IF NOT EXISTS", title+"_foo", revid, modified)
  315. casMap := make(map[string]interface{})
  316. if applied, _, err := session.MapExecuteBatchCAS(successBatch, casMap); err != nil {
  317. t.Fatal("insert:", err)
  318. } else if !applied {
  319. t.Fatal("insert should have been applied")
  320. }
  321. failBatch := session.NewBatch(LoggedBatch)
  322. failBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES (?, ?, ?) IF NOT EXISTS", title, revid, modified)
  323. if applied, _, err := session.ExecuteBatchCAS(successBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
  324. t.Fatal("insert:", err)
  325. } else if applied {
  326. t.Fatal("insert shouldn't have been applied")
  327. }
  328. insertBatch := session.NewBatch(LoggedBatch)
  329. insertBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES ('_foo', 2c3af400-73a4-11e5-9381-29463d90c3f0, DATEOF(NOW()))")
  330. insertBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES ('_foo', 3e4ad2f1-73a4-11e5-9381-29463d90c3f0, DATEOF(NOW()))")
  331. if err := session.ExecuteBatch(insertBatch); err != nil {
  332. t.Fatal("insert:", err)
  333. }
  334. failBatch = session.NewBatch(LoggedBatch)
  335. failBatch.Query("UPDATE cas_table SET last_modified = DATEOF(NOW()) WHERE title='_foo' AND revid=2c3af400-73a4-11e5-9381-29463d90c3f0 IF last_modified=DATEOF(NOW());")
  336. failBatch.Query("UPDATE cas_table SET last_modified = DATEOF(NOW()) WHERE title='_foo' AND revid=3e4ad2f1-73a4-11e5-9381-29463d90c3f0 IF last_modified=DATEOF(NOW());")
  337. if applied, iter, err := session.ExecuteBatchCAS(failBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
  338. t.Fatal("insert:", err)
  339. } else if applied {
  340. t.Fatal("insert shouldn't have been applied")
  341. } else {
  342. if scan := iter.Scan(&applied, &titleCAS, &revidCAS, &modifiedCAS); scan && applied {
  343. t.Fatal("insert shouldn't have been applied")
  344. } else if !scan {
  345. t.Fatal("should have scanned another row")
  346. }
  347. if err := iter.Close(); err != nil {
  348. t.Fatal("scan:", err)
  349. }
  350. }
  351. }
  352. func TestMapScanCAS(t *testing.T) {
  353. if *flagProto == 1 {
  354. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  355. }
  356. session := createSession(t)
  357. defer session.Close()
  358. if err := createTable(session, `CREATE TABLE gocql_test.cas_table2 (
  359. title varchar,
  360. revid timeuuid,
  361. last_modified timestamp,
  362. deleted boolean,
  363. PRIMARY KEY (title, revid)
  364. )`); err != nil {
  365. t.Fatal("create:", err)
  366. }
  367. title, revid, modified, deleted := "baz", TimeUUID(), time.Now(), false
  368. mapCAS := map[string]interface{}{}
  369. if applied, err := session.Query(`INSERT INTO cas_table2 (title, revid, last_modified, deleted)
  370. VALUES (?, ?, ?, ?) IF NOT EXISTS`,
  371. title, revid, modified, deleted).MapScanCAS(mapCAS); err != nil {
  372. t.Fatal("insert:", err)
  373. } else if !applied {
  374. t.Fatal("insert should have been applied")
  375. }
  376. mapCAS = map[string]interface{}{}
  377. if applied, err := session.Query(`INSERT INTO cas_table2 (title, revid, last_modified, deleted)
  378. VALUES (?, ?, ?, ?) IF NOT EXISTS`,
  379. title, revid, modified, deleted).MapScanCAS(mapCAS); err != nil {
  380. t.Fatal("insert:", err)
  381. } else if applied {
  382. t.Fatal("insert should not have been applied")
  383. } else if title != mapCAS["title"] || revid != mapCAS["revid"] || deleted != mapCAS["deleted"] {
  384. t.Fatalf("expected %s/%v/%v/%v but got %s/%v/%v%v", title, revid, modified, false, mapCAS["title"], mapCAS["revid"], mapCAS["last_modified"], mapCAS["deleted"])
  385. }
  386. }
  387. func TestBatch(t *testing.T) {
  388. if *flagProto == 1 {
  389. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  390. }
  391. session := createSession(t)
  392. defer session.Close()
  393. if err := createTable(session, `CREATE TABLE gocql_test.batch_table (id int primary key)`); err != nil {
  394. t.Fatal("create table:", err)
  395. }
  396. batch := NewBatch(LoggedBatch)
  397. for i := 0; i < 100; i++ {
  398. batch.Query(`INSERT INTO batch_table (id) VALUES (?)`, i)
  399. }
  400. if err := session.ExecuteBatch(batch); err != nil {
  401. t.Fatal("execute batch:", err)
  402. }
  403. count := 0
  404. if err := session.Query(`SELECT COUNT(*) FROM batch_table`).Scan(&count); err != nil {
  405. t.Fatal("select count:", err)
  406. } else if count != 100 {
  407. t.Fatalf("count: expected %d, got %d\n", 100, count)
  408. }
  409. }
  410. func TestUnpreparedBatch(t *testing.T) {
  411. if *flagProto == 1 {
  412. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  413. }
  414. session := createSession(t)
  415. defer session.Close()
  416. if err := createTable(session, `CREATE TABLE gocql_test.batch_unprepared (id int primary key, c counter)`); err != nil {
  417. t.Fatal("create table:", err)
  418. }
  419. var batch *Batch
  420. if *flagProto == 2 {
  421. batch = NewBatch(CounterBatch)
  422. } else {
  423. batch = NewBatch(UnloggedBatch)
  424. }
  425. for i := 0; i < 100; i++ {
  426. batch.Query(`UPDATE batch_unprepared SET c = c + 1 WHERE id = 1`)
  427. }
  428. if err := session.ExecuteBatch(batch); err != nil {
  429. t.Fatal("execute batch:", err)
  430. }
  431. count := 0
  432. if err := session.Query(`SELECT COUNT(*) FROM batch_unprepared`).Scan(&count); err != nil {
  433. t.Fatal("select count:", err)
  434. } else if count != 1 {
  435. t.Fatalf("count: expected %d, got %d\n", 100, count)
  436. }
  437. if err := session.Query(`SELECT c FROM batch_unprepared`).Scan(&count); err != nil {
  438. t.Fatal("select count:", err)
  439. } else if count != 100 {
  440. t.Fatalf("count: expected %d, got %d\n", 100, count)
  441. }
  442. }
  443. // TestBatchLimit tests gocql to make sure batch operations larger than the maximum
  444. // statement limit are not submitted to a cassandra node.
  445. func TestBatchLimit(t *testing.T) {
  446. if *flagProto == 1 {
  447. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  448. }
  449. session := createSession(t)
  450. defer session.Close()
  451. if err := createTable(session, `CREATE TABLE gocql_test.batch_table2 (id int primary key)`); err != nil {
  452. t.Fatal("create table:", err)
  453. }
  454. batch := NewBatch(LoggedBatch)
  455. for i := 0; i < 65537; i++ {
  456. batch.Query(`INSERT INTO batch_table2 (id) VALUES (?)`, i)
  457. }
  458. if err := session.ExecuteBatch(batch); err != ErrTooManyStmts {
  459. t.Fatal("gocql attempted to execute a batch larger than the support limit of statements.")
  460. }
  461. }
  462. func TestWhereIn(t *testing.T) {
  463. session := createSession(t)
  464. defer session.Close()
  465. if err := createTable(session, `CREATE TABLE gocql_test.where_in_table (id int, cluster int, primary key (id,cluster))`); err != nil {
  466. t.Fatal("create table:", err)
  467. }
  468. if err := session.Query("INSERT INTO where_in_table (id, cluster) VALUES (?,?)", 100, 200).Exec(); err != nil {
  469. t.Fatal("insert:", err)
  470. }
  471. iter := session.Query("SELECT * FROM where_in_table WHERE id = ? AND cluster IN (?)", 100, 200).Iter()
  472. var id, cluster int
  473. count := 0
  474. for iter.Scan(&id, &cluster) {
  475. count++
  476. }
  477. if id != 100 || cluster != 200 {
  478. t.Fatalf("Was expecting id and cluster to be (100,200) but were (%d,%d)", id, cluster)
  479. }
  480. }
  481. // TestTooManyQueryArgs tests to make sure the library correctly handles the application level bug
  482. // whereby too many query arguments are passed to a query
  483. func TestTooManyQueryArgs(t *testing.T) {
  484. if *flagProto == 1 {
  485. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  486. }
  487. session := createSession(t)
  488. defer session.Close()
  489. if err := createTable(session, `CREATE TABLE gocql_test.too_many_query_args (id int primary key, value int)`); err != nil {
  490. t.Fatal("create table:", err)
  491. }
  492. _, err := session.Query(`SELECT * FROM too_many_query_args WHERE id = ?`, 1, 2).Iter().SliceMap()
  493. if err == nil {
  494. t.Fatal("'`SELECT * FROM too_many_query_args WHERE id = ?`, 1, 2' should return an ErrQueryArgLength")
  495. }
  496. if err != ErrQueryArgLength {
  497. t.Fatalf("'`SELECT * FROM too_many_query_args WHERE id = ?`, 1, 2' should return an ErrQueryArgLength, but returned: %s", err)
  498. }
  499. batch := session.NewBatch(UnloggedBatch)
  500. batch.Query("INSERT INTO too_many_query_args (id, value) VALUES (?, ?)", 1, 2, 3)
  501. err = session.ExecuteBatch(batch)
  502. if err == nil {
  503. t.Fatal("'`INSERT INTO too_many_query_args (id, value) VALUES (?, ?)`, 1, 2, 3' should return an ErrQueryArgLength")
  504. }
  505. if err != ErrQueryArgLength {
  506. t.Fatalf("'INSERT INTO too_many_query_args (id, value) VALUES (?, ?)`, 1, 2, 3' should return an ErrQueryArgLength, but returned: %s", err)
  507. }
  508. }
  509. // TestNotEnoughQueryArgs tests to make sure the library correctly handles the application level bug
  510. // whereby not enough query arguments are passed to a query
  511. func TestNotEnoughQueryArgs(t *testing.T) {
  512. if *flagProto == 1 {
  513. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  514. }
  515. session := createSession(t)
  516. defer session.Close()
  517. if err := createTable(session, `CREATE TABLE gocql_test.not_enough_query_args (id int, cluster int, value int, primary key (id, cluster))`); err != nil {
  518. t.Fatal("create table:", err)
  519. }
  520. _, err := session.Query(`SELECT * FROM not_enough_query_args WHERE id = ? and cluster = ?`, 1).Iter().SliceMap()
  521. if err == nil {
  522. t.Fatal("'`SELECT * FROM not_enough_query_args WHERE id = ? and cluster = ?`, 1' should return an ErrQueryArgLength")
  523. }
  524. if err != ErrQueryArgLength {
  525. t.Fatalf("'`SELECT * FROM too_few_query_args WHERE id = ? and cluster = ?`, 1' should return an ErrQueryArgLength, but returned: %s", err)
  526. }
  527. batch := session.NewBatch(UnloggedBatch)
  528. batch.Query("INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)", 1, 2)
  529. err = session.ExecuteBatch(batch)
  530. if err == nil {
  531. t.Fatal("'`INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)`, 1, 2' should return an ErrQueryArgLength")
  532. }
  533. if err != ErrQueryArgLength {
  534. t.Fatalf("'INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)`, 1, 2' should return an ErrQueryArgLength, but returned: %s", err)
  535. }
  536. }
  537. // TestCreateSessionTimeout tests to make sure the CreateSession function timeouts out correctly
  538. // and prevents an infinite loop of connection retries.
  539. func TestCreateSessionTimeout(t *testing.T) {
  540. go func() {
  541. <-time.After(2 * time.Second)
  542. t.Error("no startup timeout")
  543. }()
  544. cluster := createCluster()
  545. cluster.Hosts = []string{"127.0.0.1:1"}
  546. session, err := cluster.CreateSession()
  547. if err == nil {
  548. session.Close()
  549. t.Fatal("expected ErrNoConnectionsStarted, but no error was returned.")
  550. }
  551. if err != ErrNoConnectionsStarted {
  552. t.Fatalf("expected ErrNoConnectionsStarted, but received %v", err)
  553. }
  554. }
  555. type FullName struct {
  556. FirstName string
  557. LastName string
  558. }
  559. func (n FullName) MarshalCQL(info TypeInfo) ([]byte, error) {
  560. return []byte(n.FirstName + " " + n.LastName), nil
  561. }
  562. func (n *FullName) UnmarshalCQL(info TypeInfo, data []byte) error {
  563. t := strings.SplitN(string(data), " ", 2)
  564. n.FirstName, n.LastName = t[0], t[1]
  565. return nil
  566. }
  567. func TestMapScanWithRefMap(t *testing.T) {
  568. session := createSession(t)
  569. defer session.Close()
  570. if err := createTable(session, `CREATE TABLE gocql_test.scan_map_ref_table (
  571. testtext text PRIMARY KEY,
  572. testfullname text,
  573. testint int,
  574. )`); err != nil {
  575. t.Fatal("create table:", err)
  576. }
  577. m := make(map[string]interface{})
  578. m["testtext"] = "testtext"
  579. m["testfullname"] = FullName{"John", "Doe"}
  580. m["testint"] = 100
  581. if err := session.Query(`INSERT INTO scan_map_ref_table (testtext, testfullname, testint) values (?,?,?)`, m["testtext"], m["testfullname"], m["testint"]).Exec(); err != nil {
  582. t.Fatal("insert:", err)
  583. }
  584. var testText string
  585. var testFullName FullName
  586. ret := map[string]interface{}{
  587. "testtext": &testText,
  588. "testfullname": &testFullName,
  589. // testint is not set here.
  590. }
  591. iter := session.Query(`SELECT * FROM scan_map_ref_table`).Iter()
  592. if ok := iter.MapScan(ret); !ok {
  593. t.Fatal("select:", iter.Close())
  594. } else {
  595. if ret["testtext"] != "testtext" {
  596. t.Fatal("returned testtext did not match")
  597. }
  598. f := ret["testfullname"].(FullName)
  599. if f.FirstName != "John" || f.LastName != "Doe" {
  600. t.Fatal("returned testfullname did not match")
  601. }
  602. if ret["testint"] != 100 {
  603. t.Fatal("returned testinit did not match")
  604. }
  605. }
  606. }
  607. func TestSliceMap(t *testing.T) {
  608. session := createSession(t)
  609. defer session.Close()
  610. if err := createTable(session, `CREATE TABLE gocql_test.slice_map_table (
  611. testuuid timeuuid PRIMARY KEY,
  612. testtimestamp timestamp,
  613. testvarchar varchar,
  614. testbigint bigint,
  615. testblob blob,
  616. testbool boolean,
  617. testfloat float,
  618. testdouble double,
  619. testint int,
  620. testdecimal decimal,
  621. testlist list<text>,
  622. testset set<int>,
  623. testmap map<varchar, varchar>,
  624. testvarint varint,
  625. testinet inet
  626. )`); err != nil {
  627. t.Fatal("create table:", err)
  628. }
  629. m := make(map[string]interface{})
  630. bigInt := new(big.Int)
  631. if _, ok := bigInt.SetString("830169365738487321165427203929228", 10); !ok {
  632. t.Fatal("Failed setting bigint by string")
  633. }
  634. m["testuuid"] = TimeUUID()
  635. m["testvarchar"] = "Test VarChar"
  636. m["testbigint"] = time.Now().Unix()
  637. m["testtimestamp"] = time.Now().Truncate(time.Millisecond).UTC()
  638. m["testblob"] = []byte("test blob")
  639. m["testbool"] = true
  640. m["testfloat"] = float32(4.564)
  641. m["testdouble"] = float64(4.815162342)
  642. m["testint"] = 2343
  643. m["testdecimal"] = inf.NewDec(100, 0)
  644. m["testlist"] = []string{"quux", "foo", "bar", "baz", "quux"}
  645. m["testset"] = []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
  646. m["testmap"] = map[string]string{"field1": "val1", "field2": "val2", "field3": "val3"}
  647. m["testvarint"] = bigInt
  648. m["testinet"] = "213.212.2.19"
  649. sliceMap := []map[string]interface{}{m}
  650. if err := session.Query(`INSERT INTO slice_map_table (testuuid, testtimestamp, testvarchar, testbigint, testblob, testbool, testfloat, testdouble, testint, testdecimal, testlist, testset, testmap, testvarint, testinet) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
  651. m["testuuid"], m["testtimestamp"], m["testvarchar"], m["testbigint"], m["testblob"], m["testbool"], m["testfloat"], m["testdouble"], m["testint"], m["testdecimal"], m["testlist"], m["testset"], m["testmap"], m["testvarint"], m["testinet"]).Exec(); err != nil {
  652. t.Fatal("insert:", err)
  653. }
  654. if returned, retErr := session.Query(`SELECT * FROM slice_map_table`).Iter().SliceMap(); retErr != nil {
  655. t.Fatal("select:", retErr)
  656. } else {
  657. matchSliceMap(t, sliceMap, returned[0])
  658. }
  659. // Test for Iter.MapScan()
  660. {
  661. testMap := make(map[string]interface{})
  662. if !session.Query(`SELECT * FROM slice_map_table`).Iter().MapScan(testMap) {
  663. t.Fatal("MapScan failed to work with one row")
  664. }
  665. matchSliceMap(t, sliceMap, testMap)
  666. }
  667. // Test for Query.MapScan()
  668. {
  669. testMap := make(map[string]interface{})
  670. if session.Query(`SELECT * FROM slice_map_table`).MapScan(testMap) != nil {
  671. t.Fatal("MapScan failed to work with one row")
  672. }
  673. matchSliceMap(t, sliceMap, testMap)
  674. }
  675. }
  676. func matchSliceMap(t *testing.T, sliceMap []map[string]interface{}, testMap map[string]interface{}) {
  677. if sliceMap[0]["testuuid"] != testMap["testuuid"] {
  678. t.Fatal("returned testuuid did not match")
  679. }
  680. if sliceMap[0]["testtimestamp"] != testMap["testtimestamp"] {
  681. t.Fatal("returned testtimestamp did not match")
  682. }
  683. if sliceMap[0]["testvarchar"] != testMap["testvarchar"] {
  684. t.Fatal("returned testvarchar did not match")
  685. }
  686. if sliceMap[0]["testbigint"] != testMap["testbigint"] {
  687. t.Fatal("returned testbigint did not match")
  688. }
  689. if !reflect.DeepEqual(sliceMap[0]["testblob"], testMap["testblob"]) {
  690. t.Fatal("returned testblob did not match")
  691. }
  692. if sliceMap[0]["testbool"] != testMap["testbool"] {
  693. t.Fatal("returned testbool did not match")
  694. }
  695. if sliceMap[0]["testfloat"] != testMap["testfloat"] {
  696. t.Fatal("returned testfloat did not match")
  697. }
  698. if sliceMap[0]["testdouble"] != testMap["testdouble"] {
  699. t.Fatal("returned testdouble did not match")
  700. }
  701. if sliceMap[0]["testinet"] != testMap["testinet"] {
  702. t.Fatal("returned testinet did not match")
  703. }
  704. expectedDecimal := sliceMap[0]["testdecimal"].(*inf.Dec)
  705. returnedDecimal := testMap["testdecimal"].(*inf.Dec)
  706. if expectedDecimal.Cmp(returnedDecimal) != 0 {
  707. t.Fatal("returned testdecimal did not match")
  708. }
  709. if !reflect.DeepEqual(sliceMap[0]["testlist"], testMap["testlist"]) {
  710. t.Fatal("returned testlist did not match")
  711. }
  712. if !reflect.DeepEqual(sliceMap[0]["testset"], testMap["testset"]) {
  713. t.Fatal("returned testset did not match")
  714. }
  715. if !reflect.DeepEqual(sliceMap[0]["testmap"], testMap["testmap"]) {
  716. t.Fatal("returned testmap did not match")
  717. }
  718. if sliceMap[0]["testint"] != testMap["testint"] {
  719. t.Fatal("returned testint did not match")
  720. }
  721. }
  722. func TestScanWithNilArguments(t *testing.T) {
  723. session := createSession(t)
  724. defer session.Close()
  725. if err := createTable(session, `CREATE TABLE gocql_test.scan_with_nil_arguments (
  726. foo varchar,
  727. bar int,
  728. PRIMARY KEY (foo, bar)
  729. )`); err != nil {
  730. t.Fatal("create:", err)
  731. }
  732. for i := 1; i <= 20; i++ {
  733. if err := session.Query("INSERT INTO scan_with_nil_arguments (foo, bar) VALUES (?, ?)",
  734. "squares", i*i).Exec(); err != nil {
  735. t.Fatal("insert:", err)
  736. }
  737. }
  738. iter := session.Query("SELECT * FROM scan_with_nil_arguments WHERE foo = ?", "squares").Iter()
  739. var n int
  740. count := 0
  741. for iter.Scan(nil, &n) {
  742. count += n
  743. }
  744. if err := iter.Close(); err != nil {
  745. t.Fatal("close:", err)
  746. }
  747. if count != 2870 {
  748. t.Fatalf("expected %d, got %d", 2870, count)
  749. }
  750. }
  751. func TestScanCASWithNilArguments(t *testing.T) {
  752. if *flagProto == 1 {
  753. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  754. }
  755. session := createSession(t)
  756. defer session.Close()
  757. if err := createTable(session, `CREATE TABLE gocql_test.scan_cas_with_nil_arguments (
  758. foo varchar,
  759. bar varchar,
  760. PRIMARY KEY (foo, bar)
  761. )`); err != nil {
  762. t.Fatal("create:", err)
  763. }
  764. foo := "baz"
  765. var cas string
  766. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  767. VALUES (?, ?) IF NOT EXISTS`,
  768. foo, foo).ScanCAS(nil, nil); err != nil {
  769. t.Fatal("insert:", err)
  770. } else if !applied {
  771. t.Fatal("insert should have been applied")
  772. }
  773. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  774. VALUES (?, ?) IF NOT EXISTS`,
  775. foo, foo).ScanCAS(&cas, nil); err != nil {
  776. t.Fatal("insert:", err)
  777. } else if applied {
  778. t.Fatal("insert should not have been applied")
  779. } else if foo != cas {
  780. t.Fatalf("expected %v but got %v", foo, cas)
  781. }
  782. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  783. VALUES (?, ?) IF NOT EXISTS`,
  784. foo, foo).ScanCAS(nil, &cas); err != nil {
  785. t.Fatal("insert:", err)
  786. } else if applied {
  787. t.Fatal("insert should not have been applied")
  788. } else if foo != cas {
  789. t.Fatalf("expected %v but got %v", foo, cas)
  790. }
  791. }
  792. func TestRebindQueryInfo(t *testing.T) {
  793. session := createSession(t)
  794. defer session.Close()
  795. if err := createTable(session, "CREATE TABLE gocql_test.rebind_query (id int, value text, PRIMARY KEY (id))"); err != nil {
  796. t.Fatalf("failed to create table with error '%v'", err)
  797. }
  798. if err := session.Query("INSERT INTO rebind_query (id, value) VALUES (?, ?)", 23, "quux").Exec(); err != nil {
  799. t.Fatalf("insert into rebind_query failed, err '%v'", err)
  800. }
  801. if err := session.Query("INSERT INTO rebind_query (id, value) VALUES (?, ?)", 24, "w00t").Exec(); err != nil {
  802. t.Fatalf("insert into rebind_query failed, err '%v'", err)
  803. }
  804. q := session.Query("SELECT value FROM rebind_query WHERE ID = ?")
  805. q.Bind(23)
  806. iter := q.Iter()
  807. var value string
  808. for iter.Scan(&value) {
  809. }
  810. if value != "quux" {
  811. t.Fatalf("expected %v but got %v", "quux", value)
  812. }
  813. q.Bind(24)
  814. iter = q.Iter()
  815. for iter.Scan(&value) {
  816. }
  817. if value != "w00t" {
  818. t.Fatalf("expected %v but got %v", "quux", value)
  819. }
  820. }
  821. //TestStaticQueryInfo makes sure that the application can manually bind query parameters using the simplest possible static binding strategy
  822. func TestStaticQueryInfo(t *testing.T) {
  823. session := createSession(t)
  824. defer session.Close()
  825. if err := createTable(session, "CREATE TABLE gocql_test.static_query_info (id int, value text, PRIMARY KEY (id))"); err != nil {
  826. t.Fatalf("failed to create table with error '%v'", err)
  827. }
  828. if err := session.Query("INSERT INTO static_query_info (id, value) VALUES (?, ?)", 113, "foo").Exec(); err != nil {
  829. t.Fatalf("insert into static_query_info failed, err '%v'", err)
  830. }
  831. autobinder := func(q *QueryInfo) ([]interface{}, error) {
  832. values := make([]interface{}, 1)
  833. values[0] = 113
  834. return values, nil
  835. }
  836. qry := session.Bind("SELECT id, value FROM static_query_info WHERE id = ?", autobinder)
  837. if err := qry.Exec(); err != nil {
  838. t.Fatalf("expose query info failed, error '%v'", err)
  839. }
  840. iter := qry.Iter()
  841. var id int
  842. var value string
  843. iter.Scan(&id, &value)
  844. if err := iter.Close(); err != nil {
  845. t.Fatalf("query with exposed info failed, err '%v'", err)
  846. }
  847. if value != "foo" {
  848. t.Fatalf("Expected value %s, but got %s", "foo", value)
  849. }
  850. }
  851. type ClusteredKeyValue struct {
  852. Id int
  853. Cluster int
  854. Value string
  855. }
  856. func (kv *ClusteredKeyValue) Bind(q *QueryInfo) ([]interface{}, error) {
  857. values := make([]interface{}, len(q.Args))
  858. for i, info := range q.Args {
  859. fieldName := upcaseInitial(info.Name)
  860. value := reflect.ValueOf(kv)
  861. field := reflect.Indirect(value).FieldByName(fieldName)
  862. values[i] = field.Addr().Interface()
  863. }
  864. return values, nil
  865. }
  866. func upcaseInitial(str string) string {
  867. for i, v := range str {
  868. return string(unicode.ToUpper(v)) + str[i+1:]
  869. }
  870. return ""
  871. }
  872. //TestBoundQueryInfo makes sure that the application can manually bind query parameters using the query meta data supplied at runtime
  873. func TestBoundQueryInfo(t *testing.T) {
  874. session := createSession(t)
  875. defer session.Close()
  876. if err := createTable(session, "CREATE TABLE gocql_test.clustered_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
  877. t.Fatalf("failed to create table with error '%v'", err)
  878. }
  879. write := &ClusteredKeyValue{Id: 200, Cluster: 300, Value: "baz"}
  880. insert := session.Bind("INSERT INTO clustered_query_info (id, cluster, value) VALUES (?, ?,?)", write.Bind)
  881. if err := insert.Exec(); err != nil {
  882. t.Fatalf("insert into clustered_query_info failed, err '%v'", err)
  883. }
  884. read := &ClusteredKeyValue{Id: 200, Cluster: 300}
  885. qry := session.Bind("SELECT id, cluster, value FROM clustered_query_info WHERE id = ? and cluster = ?", read.Bind)
  886. iter := qry.Iter()
  887. var id, cluster int
  888. var value string
  889. iter.Scan(&id, &cluster, &value)
  890. if err := iter.Close(); err != nil {
  891. t.Fatalf("query with clustered_query_info info failed, err '%v'", err)
  892. }
  893. if value != "baz" {
  894. t.Fatalf("Expected value %s, but got %s", "baz", value)
  895. }
  896. }
  897. //TestBatchQueryInfo makes sure that the application can manually bind query parameters when executing in a batch
  898. func TestBatchQueryInfo(t *testing.T) {
  899. if *flagProto == 1 {
  900. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  901. }
  902. session := createSession(t)
  903. defer session.Close()
  904. if err := createTable(session, "CREATE TABLE gocql_test.batch_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
  905. t.Fatalf("failed to create table with error '%v'", err)
  906. }
  907. write := func(q *QueryInfo) ([]interface{}, error) {
  908. values := make([]interface{}, 3)
  909. values[0] = 4000
  910. values[1] = 5000
  911. values[2] = "bar"
  912. return values, nil
  913. }
  914. batch := session.NewBatch(LoggedBatch)
  915. batch.Bind("INSERT INTO batch_query_info (id, cluster, value) VALUES (?, ?,?)", write)
  916. if err := session.ExecuteBatch(batch); err != nil {
  917. t.Fatalf("batch insert into batch_query_info failed, err '%v'", err)
  918. }
  919. read := func(q *QueryInfo) ([]interface{}, error) {
  920. values := make([]interface{}, 2)
  921. values[0] = 4000
  922. values[1] = 5000
  923. return values, nil
  924. }
  925. qry := session.Bind("SELECT id, cluster, value FROM batch_query_info WHERE id = ? and cluster = ?", read)
  926. iter := qry.Iter()
  927. var id, cluster int
  928. var value string
  929. iter.Scan(&id, &cluster, &value)
  930. if err := iter.Close(); err != nil {
  931. t.Fatalf("query with batch_query_info info failed, err '%v'", err)
  932. }
  933. if value != "bar" {
  934. t.Fatalf("Expected value %s, but got %s", "bar", value)
  935. }
  936. }
  937. func injectInvalidPreparedStatement(t *testing.T, session *Session, table string) (string, *Conn) {
  938. if err := createTable(session, `CREATE TABLE gocql_test.`+table+` (
  939. foo varchar,
  940. bar int,
  941. PRIMARY KEY (foo, bar)
  942. )`); err != nil {
  943. t.Fatal("create:", err)
  944. }
  945. stmt := "INSERT INTO " + table + " (foo, bar) VALUES (?, 7)"
  946. conn := session.pool.Pick(nil)
  947. flight := new(inflightPrepare)
  948. stmtsLRU.Lock()
  949. stmtsLRU.lru.Add(conn.addr+stmt, flight)
  950. stmtsLRU.Unlock()
  951. flight.info = QueryInfo{
  952. Id: []byte{'f', 'o', 'o', 'b', 'a', 'r'},
  953. Args: []ColumnInfo{
  954. {
  955. Keyspace: "gocql_test",
  956. Table: table,
  957. Name: "foo",
  958. TypeInfo: NativeType{
  959. typ: TypeVarchar,
  960. },
  961. },
  962. },
  963. }
  964. return stmt, conn
  965. }
  966. func TestMissingSchemaPrepare(t *testing.T) {
  967. s := createSession(t)
  968. conn := s.pool.Pick(nil)
  969. defer s.Close()
  970. insertQry := &Query{stmt: "INSERT INTO invalidschemaprep (val) VALUES (?)", values: []interface{}{5}, cons: s.cons,
  971. session: s, pageSize: s.pageSize, trace: s.trace,
  972. prefetch: s.prefetch, rt: s.cfg.RetryPolicy}
  973. if err := conn.executeQuery(insertQry).err; err == nil {
  974. t.Fatal("expected error, but got nil.")
  975. }
  976. if err := createTable(s, "CREATE TABLE gocql_test.invalidschemaprep (val int, PRIMARY KEY (val))"); err != nil {
  977. t.Fatal("create table:", err)
  978. }
  979. if err := conn.executeQuery(insertQry).err; err != nil {
  980. t.Fatal(err) // unconfigured columnfamily
  981. }
  982. }
  983. func TestReprepareStatement(t *testing.T) {
  984. session := createSession(t)
  985. defer session.Close()
  986. stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement")
  987. query := session.Query(stmt, "bar")
  988. if err := conn.executeQuery(query).Close(); err != nil {
  989. t.Fatalf("Failed to execute query for reprepare statement: %v", err)
  990. }
  991. }
  992. func TestReprepareBatch(t *testing.T) {
  993. if *flagProto == 1 {
  994. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  995. }
  996. session := createSession(t)
  997. defer session.Close()
  998. stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement_batch")
  999. batch := session.NewBatch(UnloggedBatch)
  1000. batch.Query(stmt, "bar")
  1001. if _, err := conn.executeBatch(batch); err != nil {
  1002. t.Fatalf("Failed to execute query for reprepare statement: %v", err)
  1003. }
  1004. }
  1005. func TestQueryInfo(t *testing.T) {
  1006. session := createSession(t)
  1007. defer session.Close()
  1008. conn := session.pool.Pick(nil)
  1009. info, err := conn.prepareStatement("SELECT release_version, host_id FROM system.local WHERE key = ?", nil)
  1010. if err != nil {
  1011. t.Fatalf("Failed to execute query for preparing statement: %v", err)
  1012. }
  1013. if x := len(info.Args); x != 1 {
  1014. t.Fatalf("Was not expecting meta data for %d query arguments, but got %d\n", 1, x)
  1015. }
  1016. if *flagProto > 1 {
  1017. if x := len(info.Rval); x != 2 {
  1018. t.Fatalf("Was not expecting meta data for %d result columns, but got %d\n", 2, x)
  1019. }
  1020. }
  1021. }
  1022. //TestPreparedCacheEviction will make sure that the cache size is maintained
  1023. func TestPreparedCacheEviction(t *testing.T) {
  1024. session := createSession(t)
  1025. defer session.Close()
  1026. stmtsLRU.Lock()
  1027. stmtsLRU.Max(4)
  1028. stmtsLRU.Unlock()
  1029. if err := createTable(session, "CREATE TABLE gocql_test.prepcachetest (id int,mod int,PRIMARY KEY (id))"); err != nil {
  1030. t.Fatalf("failed to create table with error '%v'", err)
  1031. }
  1032. //Fill the table
  1033. for i := 0; i < 2; i++ {
  1034. if err := session.Query("INSERT INTO prepcachetest (id,mod) VALUES (?, ?)", i, 10000%(i+1)).Exec(); err != nil {
  1035. t.Fatalf("insert into prepcachetest failed, err '%v'", err)
  1036. }
  1037. }
  1038. //Populate the prepared statement cache with select statements
  1039. var id, mod int
  1040. for i := 0; i < 2; i++ {
  1041. err := session.Query("SELECT id,mod FROM prepcachetest WHERE id = "+strconv.FormatInt(int64(i), 10)).Scan(&id, &mod)
  1042. if err != nil {
  1043. t.Fatalf("select from prepcachetest failed, error '%v'", err)
  1044. }
  1045. }
  1046. //generate an update statement to test they are prepared
  1047. err := session.Query("UPDATE prepcachetest SET mod = ? WHERE id = ?", 1, 11).Exec()
  1048. if err != nil {
  1049. t.Fatalf("update prepcachetest failed, error '%v'", err)
  1050. }
  1051. //generate a delete statement to test they are prepared
  1052. err = session.Query("DELETE FROM prepcachetest WHERE id = ?", 1).Exec()
  1053. if err != nil {
  1054. t.Fatalf("delete from prepcachetest failed, error '%v'", err)
  1055. }
  1056. //generate an insert statement to test they are prepared
  1057. err = session.Query("INSERT INTO prepcachetest (id,mod) VALUES (?, ?)", 3, 11).Exec()
  1058. if err != nil {
  1059. t.Fatalf("insert into prepcachetest failed, error '%v'", err)
  1060. }
  1061. stmtsLRU.Lock()
  1062. //Make sure the cache size is maintained
  1063. if stmtsLRU.lru.Len() != stmtsLRU.lru.MaxEntries {
  1064. t.Fatalf("expected cache size of %v, got %v", stmtsLRU.lru.MaxEntries, stmtsLRU.lru.Len())
  1065. }
  1066. //Walk through all the configured hosts and test cache retention and eviction
  1067. var selFound, insFound, updFound, delFound, selEvict bool
  1068. for i := range session.cfg.Hosts {
  1069. _, ok := stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042gocql_testSELECT id,mod FROM prepcachetest WHERE id = 1")
  1070. selFound = selFound || ok
  1071. _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042gocql_testINSERT INTO prepcachetest (id,mod) VALUES (?, ?)")
  1072. insFound = insFound || ok
  1073. _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042gocql_testUPDATE prepcachetest SET mod = ? WHERE id = ?")
  1074. updFound = updFound || ok
  1075. _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042gocql_testDELETE FROM prepcachetest WHERE id = ?")
  1076. delFound = delFound || ok
  1077. _, ok = stmtsLRU.lru.Get(session.cfg.Hosts[i] + ":9042gocql_testSELECT id,mod FROM prepcachetest WHERE id = 0")
  1078. selEvict = selEvict || !ok
  1079. }
  1080. stmtsLRU.Unlock()
  1081. if !selEvict {
  1082. t.Fatalf("expected first select statement to be purged, but statement was found in the cache.")
  1083. }
  1084. if !selFound {
  1085. t.Fatalf("expected second select statement to be cached, but statement was purged or not prepared.")
  1086. }
  1087. if !insFound {
  1088. t.Fatalf("expected insert statement to be cached, but statement was purged or not prepared.")
  1089. }
  1090. if !updFound {
  1091. t.Fatalf("expected update statement to be cached, but statement was purged or not prepared.")
  1092. }
  1093. if !delFound {
  1094. t.Error("expected delete statement to be cached, but statement was purged or not prepared.")
  1095. }
  1096. }
  1097. func TestPreparedCacheKey(t *testing.T) {
  1098. session := createSession(t)
  1099. defer session.Close()
  1100. // create a second keyspace
  1101. cluster2 := createCluster()
  1102. createKeyspace(t, cluster2, "gocql_test2")
  1103. cluster2.Keyspace = "gocql_test2"
  1104. session2, err := cluster2.CreateSession()
  1105. if err != nil {
  1106. t.Fatal("create session:", err)
  1107. }
  1108. defer session2.Close()
  1109. // both keyspaces have a table named "test_stmt_cache_key"
  1110. if err := createTable(session, "CREATE TABLE gocql_test.test_stmt_cache_key (id varchar primary key, field varchar)"); err != nil {
  1111. t.Fatal("create table:", err)
  1112. }
  1113. if err := createTable(session2, "CREATE TABLE gocql_test2.test_stmt_cache_key (id varchar primary key, field varchar)"); err != nil {
  1114. t.Fatal("create table:", err)
  1115. }
  1116. // both tables have a single row with the same partition key but different column value
  1117. if err = session.Query(`INSERT INTO test_stmt_cache_key (id, field) VALUES (?, ?)`, "key", "one").Exec(); err != nil {
  1118. t.Fatal("insert:", err)
  1119. }
  1120. if err = session2.Query(`INSERT INTO test_stmt_cache_key (id, field) VALUES (?, ?)`, "key", "two").Exec(); err != nil {
  1121. t.Fatal("insert:", err)
  1122. }
  1123. // should be able to see different values in each keyspace
  1124. var value string
  1125. if err = session.Query("SELECT field FROM test_stmt_cache_key WHERE id = ?", "key").Scan(&value); err != nil {
  1126. t.Fatal("select:", err)
  1127. }
  1128. if value != "one" {
  1129. t.Errorf("Expected one, got %s", value)
  1130. }
  1131. if err = session2.Query("SELECT field FROM test_stmt_cache_key WHERE id = ?", "key").Scan(&value); err != nil {
  1132. t.Fatal("select:", err)
  1133. }
  1134. if value != "two" {
  1135. t.Errorf("Expected two, got %s", value)
  1136. }
  1137. }
  1138. //TestMarshalFloat64Ptr tests to see that a pointer to a float64 is marshalled correctly.
  1139. func TestMarshalFloat64Ptr(t *testing.T) {
  1140. session := createSession(t)
  1141. defer session.Close()
  1142. if err := createTable(session, "CREATE TABLE gocql_test.float_test (id double, test double, primary key (id))"); err != nil {
  1143. t.Fatal("create table:", err)
  1144. }
  1145. testNum := float64(7500)
  1146. if err := session.Query(`INSERT INTO float_test (id,test) VALUES (?,?)`, float64(7500.00), &testNum).Exec(); err != nil {
  1147. t.Fatal("insert float64:", err)
  1148. }
  1149. }
  1150. //TestMarshalInet tests to see that a pointer to a float64 is marshalled correctly.
  1151. func TestMarshalInet(t *testing.T) {
  1152. session := createSession(t)
  1153. defer session.Close()
  1154. if err := createTable(session, "CREATE TABLE gocql_test.inet_test (ip inet, name text, primary key (ip))"); err != nil {
  1155. t.Fatal("create table:", err)
  1156. }
  1157. stringIp := "123.34.45.56"
  1158. if err := session.Query(`INSERT INTO inet_test (ip,name) VALUES (?,?)`, stringIp, "Test IP 1").Exec(); err != nil {
  1159. t.Fatal("insert string inet:", err)
  1160. }
  1161. var stringResult string
  1162. if err := session.Query("SELECT ip FROM inet_test").Scan(&stringResult); err != nil {
  1163. t.Fatalf("select for string from inet_test 1 failed: %v", err)
  1164. }
  1165. if stringResult != stringIp {
  1166. t.Errorf("Expected %s, was %s", stringIp, stringResult)
  1167. }
  1168. var ipResult net.IP
  1169. if err := session.Query("SELECT ip FROM inet_test").Scan(&ipResult); err != nil {
  1170. t.Fatalf("select for net.IP from inet_test 1 failed: %v", err)
  1171. }
  1172. if ipResult.String() != stringIp {
  1173. t.Errorf("Expected %s, was %s", stringIp, ipResult.String())
  1174. }
  1175. if err := session.Query(`DELETE FROM inet_test WHERE ip = ?`, stringIp).Exec(); err != nil {
  1176. t.Fatal("delete inet table:", err)
  1177. }
  1178. netIp := net.ParseIP("222.43.54.65")
  1179. if err := session.Query(`INSERT INTO inet_test (ip,name) VALUES (?,?)`, netIp, "Test IP 2").Exec(); err != nil {
  1180. t.Fatal("insert netIp inet:", err)
  1181. }
  1182. if err := session.Query("SELECT ip FROM inet_test").Scan(&stringResult); err != nil {
  1183. t.Fatalf("select for string from inet_test 2 failed: %v", err)
  1184. }
  1185. if stringResult != netIp.String() {
  1186. t.Errorf("Expected %s, was %s", netIp.String(), stringResult)
  1187. }
  1188. if err := session.Query("SELECT ip FROM inet_test").Scan(&ipResult); err != nil {
  1189. t.Fatalf("select for net.IP from inet_test 2 failed: %v", err)
  1190. }
  1191. if ipResult.String() != netIp.String() {
  1192. t.Errorf("Expected %s, was %s", netIp.String(), ipResult.String())
  1193. }
  1194. }
  1195. func TestVarint(t *testing.T) {
  1196. session := createSession(t)
  1197. defer session.Close()
  1198. if err := createTable(session, "CREATE TABLE gocql_test.varint_test (id varchar, test varint, test2 varint, primary key (id))"); err != nil {
  1199. t.Fatalf("failed to create table with error '%v'", err)
  1200. }
  1201. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", 0).Exec(); err != nil {
  1202. t.Fatalf("insert varint: %v", err)
  1203. }
  1204. var result int
  1205. if err := session.Query("SELECT test FROM varint_test").Scan(&result); err != nil {
  1206. t.Fatalf("select from varint_test failed: %v", err)
  1207. }
  1208. if result != 0 {
  1209. t.Errorf("Expected 0, was %d", result)
  1210. }
  1211. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", -1).Exec(); err != nil {
  1212. t.Fatalf("insert varint: %v", err)
  1213. }
  1214. if err := session.Query("SELECT test FROM varint_test").Scan(&result); err != nil {
  1215. t.Fatalf("select from varint_test failed: %v", err)
  1216. }
  1217. if result != -1 {
  1218. t.Errorf("Expected -1, was %d", result)
  1219. }
  1220. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", int64(math.MaxInt32)+1).Exec(); err != nil {
  1221. t.Fatalf("insert varint: %v", err)
  1222. }
  1223. var result64 int64
  1224. if err := session.Query("SELECT test FROM varint_test").Scan(&result64); err != nil {
  1225. t.Fatalf("select from varint_test failed: %v", err)
  1226. }
  1227. if result64 != int64(math.MaxInt32)+1 {
  1228. t.Errorf("Expected %d, was %d", int64(math.MaxInt32)+1, result64)
  1229. }
  1230. biggie := new(big.Int)
  1231. biggie.SetString("36893488147419103232", 10) // > 2**64
  1232. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", biggie).Exec(); err != nil {
  1233. t.Fatalf("insert varint: %v", err)
  1234. }
  1235. resultBig := new(big.Int)
  1236. if err := session.Query("SELECT test FROM varint_test").Scan(resultBig); err != nil {
  1237. t.Fatalf("select from varint_test failed: %v", err)
  1238. }
  1239. if resultBig.String() != biggie.String() {
  1240. t.Errorf("Expected %s, was %s", biggie.String(), resultBig.String())
  1241. }
  1242. err := session.Query("SELECT test FROM varint_test").Scan(&result64)
  1243. if err == nil || strings.Index(err.Error(), "out of range") == -1 {
  1244. t.Errorf("expected out of range error since value is too big for int64")
  1245. }
  1246. // value not set in cassandra, leave bind variable empty
  1247. resultBig = new(big.Int)
  1248. if err := session.Query("SELECT test2 FROM varint_test").Scan(resultBig); err != nil {
  1249. t.Fatalf("select from varint_test failed: %v", err)
  1250. }
  1251. if resultBig.Int64() != 0 {
  1252. t.Errorf("Expected %s, was %s", biggie.String(), resultBig.String())
  1253. }
  1254. // can use double pointer to explicitly detect value is not set in cassandra
  1255. if err := session.Query("SELECT test2 FROM varint_test").Scan(&resultBig); err != nil {
  1256. t.Fatalf("select from varint_test failed: %v", err)
  1257. }
  1258. if resultBig != nil {
  1259. t.Errorf("Expected %v, was %v", nil, *resultBig)
  1260. }
  1261. }
  1262. //TestQueryStats confirms that the stats are returning valid data. Accuracy may be questionable.
  1263. func TestQueryStats(t *testing.T) {
  1264. session := createSession(t)
  1265. defer session.Close()
  1266. qry := session.Query("SELECT * FROM system.peers")
  1267. if err := qry.Exec(); err != nil {
  1268. t.Fatalf("query failed. %v", err)
  1269. } else {
  1270. if qry.Attempts() < 1 {
  1271. t.Fatal("expected at least 1 attempt, but got 0")
  1272. }
  1273. if qry.Latency() <= 0 {
  1274. t.Fatalf("expected latency to be greater than 0, but got %v instead.", qry.Latency())
  1275. }
  1276. }
  1277. }
  1278. //TestBatchStats confirms that the stats are returning valid data. Accuracy may be questionable.
  1279. func TestBatchStats(t *testing.T) {
  1280. if *flagProto == 1 {
  1281. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  1282. }
  1283. session := createSession(t)
  1284. defer session.Close()
  1285. if err := createTable(session, "CREATE TABLE gocql_test.batchStats (id int, PRIMARY KEY (id))"); err != nil {
  1286. t.Fatalf("failed to create table with error '%v'", err)
  1287. }
  1288. b := session.NewBatch(LoggedBatch)
  1289. b.Query("INSERT INTO batchStats (id) VALUES (?)", 1)
  1290. b.Query("INSERT INTO batchStats (id) VALUES (?)", 2)
  1291. if err := session.ExecuteBatch(b); err != nil {
  1292. t.Fatalf("query failed. %v", err)
  1293. } else {
  1294. if b.Attempts() < 1 {
  1295. t.Fatal("expected at least 1 attempt, but got 0")
  1296. }
  1297. if b.Latency() <= 0 {
  1298. t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
  1299. }
  1300. }
  1301. }
  1302. //TestNilInQuery tests to see that a nil value passed to a query is handled by Cassandra
  1303. //TODO validate the nil value by reading back the nil. Need to fix Unmarshalling.
  1304. func TestNilInQuery(t *testing.T) {
  1305. session := createSession(t)
  1306. defer session.Close()
  1307. if err := createTable(session, "CREATE TABLE gocql_test.testNilInsert (id int, count int, PRIMARY KEY (id))"); err != nil {
  1308. t.Fatalf("failed to create table with error '%v'", err)
  1309. }
  1310. if err := session.Query("INSERT INTO testNilInsert (id,count) VALUES (?,?)", 1, nil).Exec(); err != nil {
  1311. t.Fatalf("failed to insert with err: %v", err)
  1312. }
  1313. var id int
  1314. if err := session.Query("SELECT id FROM testNilInsert").Scan(&id); err != nil {
  1315. t.Fatalf("failed to select with err: %v", err)
  1316. } else if id != 1 {
  1317. t.Fatalf("expected id to be 1, got %v", id)
  1318. }
  1319. }
  1320. // Don't initialize time.Time bind variable if cassandra timestamp column is empty
  1321. func TestEmptyTimestamp(t *testing.T) {
  1322. session := createSession(t)
  1323. defer session.Close()
  1324. if err := createTable(session, "CREATE TABLE gocql_test.test_empty_timestamp (id int, time timestamp, num int, PRIMARY KEY (id))"); err != nil {
  1325. t.Fatalf("failed to create table with error '%v'", err)
  1326. }
  1327. if err := session.Query("INSERT INTO test_empty_timestamp (id, num) VALUES (?,?)", 1, 561).Exec(); err != nil {
  1328. t.Fatalf("failed to insert with err: %v", err)
  1329. }
  1330. var timeVal time.Time
  1331. if err := session.Query("SELECT time FROM test_empty_timestamp where id = ?", 1).Scan(&timeVal); err != nil {
  1332. t.Fatalf("failed to select with err: %v", err)
  1333. }
  1334. if !timeVal.IsZero() {
  1335. t.Errorf("time.Time bind variable should still be empty (was %s)", timeVal)
  1336. }
  1337. }
  1338. // Integration test of just querying for data from the system.schema_keyspace table
  1339. func TestGetKeyspaceMetadata(t *testing.T) {
  1340. session := createSession(t)
  1341. defer session.Close()
  1342. keyspaceMetadata, err := getKeyspaceMetadata(session, "gocql_test")
  1343. if err != nil {
  1344. t.Fatalf("failed to query the keyspace metadata with err: %v", err)
  1345. }
  1346. if keyspaceMetadata == nil {
  1347. t.Fatal("failed to query the keyspace metadata, nil returned")
  1348. }
  1349. if keyspaceMetadata.Name != "gocql_test" {
  1350. t.Errorf("Expected keyspace name to be 'gocql' but was '%s'", keyspaceMetadata.Name)
  1351. }
  1352. if keyspaceMetadata.StrategyClass != "org.apache.cassandra.locator.SimpleStrategy" {
  1353. t.Errorf("Expected replication strategy class to be 'org.apache.cassandra.locator.SimpleStrategy' but was '%s'", keyspaceMetadata.StrategyClass)
  1354. }
  1355. if keyspaceMetadata.StrategyOptions == nil {
  1356. t.Error("Expected replication strategy options map but was nil")
  1357. }
  1358. rfStr, ok := keyspaceMetadata.StrategyOptions["replication_factor"]
  1359. if !ok {
  1360. t.Fatalf("Expected strategy option 'replication_factor' but was not found in %v", keyspaceMetadata.StrategyOptions)
  1361. }
  1362. rfInt, err := strconv.Atoi(rfStr.(string))
  1363. if err != nil {
  1364. t.Fatalf("Error converting string to int with err: %v", err)
  1365. }
  1366. if rfInt != *flagRF {
  1367. t.Errorf("Expected replication factor to be %d but was %d", *flagRF, rfInt)
  1368. }
  1369. }
  1370. // Integration test of just querying for data from the system.schema_columnfamilies table
  1371. func TestGetTableMetadata(t *testing.T) {
  1372. session := createSession(t)
  1373. defer session.Close()
  1374. if err := createTable(session, "CREATE TABLE gocql_test.test_table_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
  1375. t.Fatalf("failed to create table with error '%v'", err)
  1376. }
  1377. tables, err := getTableMetadata(session, "gocql_test")
  1378. if err != nil {
  1379. t.Fatalf("failed to query the table metadata with err: %v", err)
  1380. }
  1381. if tables == nil {
  1382. t.Fatal("failed to query the table metadata, nil returned")
  1383. }
  1384. var testTable *TableMetadata
  1385. // verify all tables have minimum expected data
  1386. for i := range tables {
  1387. table := &tables[i]
  1388. if table.Name == "" {
  1389. t.Errorf("Expected table name to be set, but it was empty: index=%d metadata=%+v", i, table)
  1390. }
  1391. if table.Keyspace != "gocql_test" {
  1392. t.Errorf("Expected keyspace for '%d' table metadata to be 'gocql_test' but was '%s'", table.Name, table.Keyspace)
  1393. }
  1394. if table.KeyValidator == "" {
  1395. t.Errorf("Expected key validator to be set for table %s", table.Name)
  1396. }
  1397. if table.Comparator == "" {
  1398. t.Errorf("Expected comparator to be set for table %s", table.Name)
  1399. }
  1400. if table.DefaultValidator == "" {
  1401. t.Errorf("Expected default validator to be set for table %s", table.Name)
  1402. }
  1403. // these fields are not set until the metadata is compiled
  1404. if table.PartitionKey != nil {
  1405. t.Errorf("Did not expect partition key for table %s", table.Name)
  1406. }
  1407. if table.ClusteringColumns != nil {
  1408. t.Errorf("Did not expect clustering columns for table %s", table.Name)
  1409. }
  1410. if table.Columns != nil {
  1411. t.Errorf("Did not expect columns for table %s", table.Name)
  1412. }
  1413. // for the next part of the test after this loop, find the metadata for the test table
  1414. if table.Name == "test_table_metadata" {
  1415. testTable = table
  1416. }
  1417. }
  1418. // verify actual values on the test tables
  1419. if testTable == nil {
  1420. t.Fatal("Expected table metadata for name 'test_table_metadata'")
  1421. }
  1422. if testTable.KeyValidator != "org.apache.cassandra.db.marshal.Int32Type" {
  1423. t.Errorf("Expected test_table_metadata key validator to be 'org.apache.cassandra.db.marshal.Int32Type' but was '%s'", testTable.KeyValidator)
  1424. }
  1425. if testTable.Comparator != "org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.Int32Type,org.apache.cassandra.db.marshal.UTF8Type)" {
  1426. t.Errorf("Expected test_table_metadata key validator to be 'org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.Int32Type,org.apache.cassandra.db.marshal.UTF8Type)' but was '%s'", testTable.Comparator)
  1427. }
  1428. if testTable.DefaultValidator != "org.apache.cassandra.db.marshal.BytesType" {
  1429. t.Errorf("Expected test_table_metadata key validator to be 'org.apache.cassandra.db.marshal.BytesType' but was '%s'", testTable.DefaultValidator)
  1430. }
  1431. if *flagProto < protoVersion4 {
  1432. expectedKeyAliases := []string{"first_id"}
  1433. if !reflect.DeepEqual(testTable.KeyAliases, expectedKeyAliases) {
  1434. t.Errorf("Expected key aliases %v but was %v", expectedKeyAliases, testTable.KeyAliases)
  1435. }
  1436. expectedColumnAliases := []string{"second_id"}
  1437. if !reflect.DeepEqual(testTable.ColumnAliases, expectedColumnAliases) {
  1438. t.Errorf("Expected key aliases %v but was %v", expectedColumnAliases, testTable.ColumnAliases)
  1439. }
  1440. }
  1441. if testTable.ValueAlias != "" {
  1442. t.Errorf("Expected value alias '' but was '%s'", testTable.ValueAlias)
  1443. }
  1444. }
  1445. // Integration test of just querying for data from the system.schema_columns table
  1446. func TestGetColumnMetadata(t *testing.T) {
  1447. session := createSession(t)
  1448. defer session.Close()
  1449. if err := createTable(session, "CREATE TABLE gocql_test.test_column_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
  1450. t.Fatalf("failed to create table with error '%v'", err)
  1451. }
  1452. if err := session.Query("CREATE INDEX index_column_metadata ON test_column_metadata ( third_id )").Exec(); err != nil {
  1453. t.Fatalf("failed to create index with err: %v", err)
  1454. }
  1455. columns, err := getColumnMetadata(session, "gocql_test")
  1456. if err != nil {
  1457. t.Fatalf("failed to query column metadata with err: %v", err)
  1458. }
  1459. if columns == nil {
  1460. t.Fatal("failed to query column metadata, nil returned")
  1461. }
  1462. testColumns := map[string]*ColumnMetadata{}
  1463. // verify actual values on the test columns
  1464. for i := range columns {
  1465. column := &columns[i]
  1466. if column.Name == "" {
  1467. t.Errorf("Expected column name to be set, but it was empty: index=%d metadata=%+v", i, column)
  1468. }
  1469. if column.Table == "" {
  1470. t.Errorf("Expected column %s table name to be set, but it was empty", column.Name)
  1471. }
  1472. if column.Keyspace != "gocql_test" {
  1473. t.Errorf("Expected column %s keyspace name to be 'gocql_test', but it was '%s'", column.Name, column.Keyspace)
  1474. }
  1475. if column.Kind == "" {
  1476. t.Errorf("Expected column %s kind to be set, but it was empty", column.Name)
  1477. }
  1478. if session.cfg.ProtoVersion == 1 && column.Kind != "regular" {
  1479. t.Errorf("Expected column %s kind to be set to 'regular' for proto V1 but it was '%s'", column.Name, column.Kind)
  1480. }
  1481. if column.Validator == "" {
  1482. t.Errorf("Expected column %s validator to be set, but it was empty", column.Name)
  1483. }
  1484. // find the test table columns for the next step after this loop
  1485. if column.Table == "test_column_metadata" {
  1486. testColumns[column.Name] = column
  1487. }
  1488. }
  1489. if *flagProto == 1 {
  1490. // V1 proto only returns "regular columns"
  1491. if len(testColumns) != 1 {
  1492. t.Errorf("Expected 1 test columns but there were %d", len(testColumns))
  1493. }
  1494. thirdID, found := testColumns["third_id"]
  1495. if !found {
  1496. t.Fatalf("Expected to find column 'third_id' metadata but there was only %v", testColumns)
  1497. }
  1498. if thirdID.Kind != REGULAR {
  1499. t.Errorf("Expected %s column kind to be '%s' but it was '%s'", thirdID.Name, REGULAR, thirdID.Kind)
  1500. }
  1501. if thirdID.Index.Name != "index_column_metadata" {
  1502. t.Errorf("Expected %s column index name to be 'index_column_metadata' but it was '%s'", thirdID.Name, thirdID.Index.Name)
  1503. }
  1504. } else {
  1505. if len(testColumns) != 3 {
  1506. t.Errorf("Expected 3 test columns but there were %d", len(testColumns))
  1507. }
  1508. firstID, found := testColumns["first_id"]
  1509. if !found {
  1510. t.Fatalf("Expected to find column 'first_id' metadata but there was only %v", testColumns)
  1511. }
  1512. secondID, found := testColumns["second_id"]
  1513. if !found {
  1514. t.Fatalf("Expected to find column 'second_id' metadata but there was only %v", testColumns)
  1515. }
  1516. thirdID, found := testColumns["third_id"]
  1517. if !found {
  1518. t.Fatalf("Expected to find column 'third_id' metadata but there was only %v", testColumns)
  1519. }
  1520. if firstID.Kind != PARTITION_KEY {
  1521. t.Errorf("Expected %s column kind to be '%s' but it was '%s'", firstID.Name, PARTITION_KEY, firstID.Kind)
  1522. }
  1523. if secondID.Kind != CLUSTERING_KEY {
  1524. t.Errorf("Expected %s column kind to be '%s' but it was '%s'", secondID.Name, CLUSTERING_KEY, secondID.Kind)
  1525. }
  1526. if thirdID.Kind != REGULAR {
  1527. t.Errorf("Expected %s column kind to be '%s' but it was '%s'", thirdID.Name, REGULAR, thirdID.Kind)
  1528. }
  1529. if thirdID.Index.Name != "index_column_metadata" {
  1530. t.Errorf("Expected %s column index name to be 'index_column_metadata' but it was '%s'", thirdID.Name, thirdID.Index.Name)
  1531. }
  1532. }
  1533. }
  1534. // Integration test of querying and composition the keyspace metadata
  1535. func TestKeyspaceMetadata(t *testing.T) {
  1536. session := createSession(t)
  1537. defer session.Close()
  1538. if err := createTable(session, "CREATE TABLE gocql_test.test_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
  1539. t.Fatalf("failed to create table with error '%v'", err)
  1540. }
  1541. if err := session.Query("CREATE INDEX index_metadata ON test_metadata ( third_id )").Exec(); err != nil {
  1542. t.Fatalf("failed to create index with err: %v", err)
  1543. }
  1544. keyspaceMetadata, err := session.KeyspaceMetadata("gocql_test")
  1545. if err != nil {
  1546. t.Fatalf("failed to query keyspace metadata with err: %v", err)
  1547. }
  1548. if keyspaceMetadata == nil {
  1549. t.Fatal("expected the keyspace metadata to not be nil, but it was nil")
  1550. }
  1551. if keyspaceMetadata.Name != session.cfg.Keyspace {
  1552. t.Fatalf("Expected the keyspace name to be %s but was %s", session.cfg.Keyspace, keyspaceMetadata.Name)
  1553. }
  1554. if len(keyspaceMetadata.Tables) == 0 {
  1555. t.Errorf("Expected tables but there were none")
  1556. }
  1557. tableMetadata, found := keyspaceMetadata.Tables["test_metadata"]
  1558. if !found {
  1559. t.Fatalf("failed to find the test_metadata table metadata")
  1560. }
  1561. if len(tableMetadata.PartitionKey) != 1 {
  1562. t.Errorf("expected partition key length of 1, but was %d", len(tableMetadata.PartitionKey))
  1563. }
  1564. for i, column := range tableMetadata.PartitionKey {
  1565. if column == nil {
  1566. t.Errorf("partition key column metadata at index %d was nil", i)
  1567. }
  1568. }
  1569. if tableMetadata.PartitionKey[0].Name != "first_id" {
  1570. t.Errorf("Expected the first partition key column to be 'first_id' but was '%s'", tableMetadata.PartitionKey[0].Name)
  1571. }
  1572. if len(tableMetadata.ClusteringColumns) != 1 {
  1573. t.Fatalf("expected clustering columns length of 1, but was %d", len(tableMetadata.ClusteringColumns))
  1574. }
  1575. for i, column := range tableMetadata.ClusteringColumns {
  1576. if column == nil {
  1577. t.Fatalf("clustering column metadata at index %d was nil", i)
  1578. }
  1579. }
  1580. if tableMetadata.ClusteringColumns[0].Name != "second_id" {
  1581. t.Errorf("Expected the first clustering column to be 'second_id' but was '%s'", tableMetadata.ClusteringColumns[0].Name)
  1582. }
  1583. thirdColumn, found := tableMetadata.Columns["third_id"]
  1584. if !found {
  1585. t.Fatalf("Expected a column definition for 'third_id'")
  1586. }
  1587. if thirdColumn.Index.Name != "index_metadata" {
  1588. t.Errorf("Expected column index named 'index_metadata' but was '%s'", thirdColumn.Index.Name)
  1589. }
  1590. }
  1591. // Integration test of the routing key calculation
  1592. func TestRoutingKey(t *testing.T) {
  1593. session := createSession(t)
  1594. defer session.Close()
  1595. if err := createTable(session, "CREATE TABLE gocql_test.test_single_routing_key (first_id int, second_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
  1596. t.Fatalf("failed to create table with error '%v'", err)
  1597. }
  1598. if err := createTable(session, "CREATE TABLE gocql_test.test_composite_routing_key (first_id int, second_id int, PRIMARY KEY ((first_id, second_id)))"); err != nil {
  1599. t.Fatalf("failed to create table with error '%v'", err)
  1600. }
  1601. routingKeyInfo, err := session.routingKeyInfo("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?")
  1602. if err != nil {
  1603. t.Fatalf("failed to get routing key info due to error: %v", err)
  1604. }
  1605. if routingKeyInfo == nil {
  1606. t.Fatal("Expected routing key info, but was nil")
  1607. }
  1608. if len(routingKeyInfo.indexes) != 1 {
  1609. t.Fatalf("Expected routing key indexes length to be 1 but was %d", len(routingKeyInfo.indexes))
  1610. }
  1611. if routingKeyInfo.indexes[0] != 1 {
  1612. t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
  1613. }
  1614. if len(routingKeyInfo.types) != 1 {
  1615. t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
  1616. }
  1617. if routingKeyInfo.types[0] == nil {
  1618. t.Fatal("Expected routing key types[0] to be non-nil")
  1619. }
  1620. if routingKeyInfo.types[0].Type() != TypeInt {
  1621. t.Fatalf("Expected routing key types[0].Type to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
  1622. }
  1623. // verify the cache is working
  1624. routingKeyInfo, err = session.routingKeyInfo("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?")
  1625. if err != nil {
  1626. t.Fatalf("failed to get routing key info due to error: %v", err)
  1627. }
  1628. if len(routingKeyInfo.indexes) != 1 {
  1629. t.Fatalf("Expected routing key indexes length to be 1 but was %d", len(routingKeyInfo.indexes))
  1630. }
  1631. if routingKeyInfo.indexes[0] != 1 {
  1632. t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
  1633. }
  1634. if len(routingKeyInfo.types) != 1 {
  1635. t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
  1636. }
  1637. if routingKeyInfo.types[0] == nil {
  1638. t.Fatal("Expected routing key types[0] to be non-nil")
  1639. }
  1640. if routingKeyInfo.types[0].Type() != TypeInt {
  1641. t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
  1642. }
  1643. cacheSize := session.routingKeyInfoCache.lru.Len()
  1644. if cacheSize != 1 {
  1645. t.Errorf("Expected cache size to be 1 but was %d", cacheSize)
  1646. }
  1647. query := session.Query("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", 1, 2)
  1648. routingKey, err := query.GetRoutingKey()
  1649. if err != nil {
  1650. t.Fatalf("Failed to get routing key due to error: %v", err)
  1651. }
  1652. expectedRoutingKey := []byte{0, 0, 0, 2}
  1653. if !reflect.DeepEqual(expectedRoutingKey, routingKey) {
  1654. t.Errorf("Expected routing key %v but was %v", expectedRoutingKey, routingKey)
  1655. }
  1656. routingKeyInfo, err = session.routingKeyInfo("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?")
  1657. if err != nil {
  1658. t.Fatalf("failed to get routing key info due to error: %v", err)
  1659. }
  1660. if routingKeyInfo == nil {
  1661. t.Fatal("Expected routing key info, but was nil")
  1662. }
  1663. if len(routingKeyInfo.indexes) != 2 {
  1664. t.Fatalf("Expected routing key indexes length to be 2 but was %d", len(routingKeyInfo.indexes))
  1665. }
  1666. if routingKeyInfo.indexes[0] != 1 {
  1667. t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
  1668. }
  1669. if routingKeyInfo.indexes[1] != 0 {
  1670. t.Errorf("Expected routing key index[1] to be 0 but was %d", routingKeyInfo.indexes[1])
  1671. }
  1672. if len(routingKeyInfo.types) != 2 {
  1673. t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
  1674. }
  1675. if routingKeyInfo.types[0] == nil {
  1676. t.Fatal("Expected routing key types[0] to be non-nil")
  1677. }
  1678. if routingKeyInfo.types[0].Type() != TypeInt {
  1679. t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
  1680. }
  1681. if routingKeyInfo.types[1] == nil {
  1682. t.Fatal("Expected routing key types[1] to be non-nil")
  1683. }
  1684. if routingKeyInfo.types[1].Type() != TypeInt {
  1685. t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[1].Type())
  1686. }
  1687. query = session.Query("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", 1, 2)
  1688. routingKey, err = query.GetRoutingKey()
  1689. if err != nil {
  1690. t.Fatalf("Failed to get routing key due to error: %v", err)
  1691. }
  1692. expectedRoutingKey = []byte{0, 4, 0, 0, 0, 2, 0, 0, 4, 0, 0, 0, 1, 0}
  1693. if !reflect.DeepEqual(expectedRoutingKey, routingKey) {
  1694. t.Errorf("Expected routing key %v but was %v", expectedRoutingKey, routingKey)
  1695. }
  1696. // verify the cache is working
  1697. cacheSize = session.routingKeyInfoCache.lru.Len()
  1698. if cacheSize != 2 {
  1699. t.Errorf("Expected cache size to be 2 but was %d", cacheSize)
  1700. }
  1701. }
  1702. // Integration test of the token-aware policy-based connection pool
  1703. func TestTokenAwareConnPool(t *testing.T) {
  1704. cluster := createCluster()
  1705. cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(RoundRobinHostPolicy())
  1706. cluster.DiscoverHosts = true
  1707. session := createSessionFromCluster(cluster, t)
  1708. defer session.Close()
  1709. if session.pool.Size() != cluster.NumConns*len(cluster.Hosts) {
  1710. t.Errorf("Expected pool size %d but was %d", cluster.NumConns*len(cluster.Hosts), session.pool.Size())
  1711. }
  1712. if err := createTable(session, "CREATE TABLE gocql_test.test_token_aware (id int, data text, PRIMARY KEY (id))"); err != nil {
  1713. t.Fatalf("failed to create test_token_aware table with err: %v", err)
  1714. }
  1715. query := session.Query("INSERT INTO test_token_aware (id, data) VALUES (?,?)", 42, "8 * 6 =")
  1716. if err := query.Exec(); err != nil {
  1717. t.Fatalf("failed to insert with err: %v", err)
  1718. }
  1719. query = session.Query("SELECT data FROM test_token_aware where id = ?", 42).Consistency(One)
  1720. var data string
  1721. if err := query.Scan(&data); err != nil {
  1722. t.Error(err)
  1723. }
  1724. // TODO add verification that the query went to the correct host
  1725. }
  1726. type frameWriterFunc func(framer *framer, streamID int) error
  1727. func (f frameWriterFunc) writeFrame(framer *framer, streamID int) error {
  1728. return f(framer, streamID)
  1729. }
  1730. func TestStream0(t *testing.T) {
  1731. session := createSession(t)
  1732. defer session.Close()
  1733. var conn *Conn
  1734. for i := 0; i < 5; i++ {
  1735. if conn != nil {
  1736. break
  1737. }
  1738. conn = session.pool.Pick(nil)
  1739. }
  1740. if conn == nil {
  1741. t.Fatal("no connections available in the pool")
  1742. }
  1743. writer := frameWriterFunc(func(f *framer, streamID int) error {
  1744. if streamID == 0 {
  1745. t.Fatal("should not use stream 0 for requests")
  1746. }
  1747. f.writeHeader(0, opError, streamID)
  1748. f.writeString("i am a bad frame")
  1749. f.wbuf[0] = 0xFF
  1750. return f.finishWrite()
  1751. })
  1752. const expErr = "gocql: error on stream 0:"
  1753. // need to write out an invalid frame, which we need a connection to do
  1754. frame, err := conn.exec(writer, nil)
  1755. if err == nil {
  1756. t.Fatal("expected to get an error on stream 0")
  1757. } else if !strings.HasPrefix(err.Error(), expErr) {
  1758. t.Fatalf("expected to get error prefix %q got %q", expErr, err.Error())
  1759. } else if frame != nil {
  1760. t.Fatalf("expected to get nil frame got %+v", frame)
  1761. }
  1762. }
  1763. func TestNegativeStream(t *testing.T) {
  1764. session := createSession(t)
  1765. defer session.Close()
  1766. var conn *Conn
  1767. for i := 0; i < 5; i++ {
  1768. if conn != nil {
  1769. break
  1770. }
  1771. conn = session.pool.Pick(nil)
  1772. }
  1773. if conn == nil {
  1774. t.Fatal("no connections available in the pool")
  1775. }
  1776. const stream = -50
  1777. writer := frameWriterFunc(func(f *framer, streamID int) error {
  1778. f.writeHeader(0, opOptions, stream)
  1779. return f.finishWrite()
  1780. })
  1781. frame, err := conn.exec(writer, nil)
  1782. if err == nil {
  1783. t.Fatalf("expected to get an error on stream %d", stream)
  1784. } else if frame != nil {
  1785. t.Fatalf("expected to get nil frame got %+v", frame)
  1786. }
  1787. }
  1788. func TestManualQueryPaging(t *testing.T) {
  1789. const rowsToInsert = 5
  1790. session := createSession(t)
  1791. defer session.Close()
  1792. if err := createTable(session, "CREATE TABLE gocql_test.testManualPaging (id int, count int, PRIMARY KEY (id))"); err != nil {
  1793. t.Fatal(err)
  1794. }
  1795. for i := 0; i < rowsToInsert; i++ {
  1796. err := session.Query("INSERT INTO testManualPaging(id, count) VALUES(?, ?)", i, i*i).Exec()
  1797. if err != nil {
  1798. t.Fatal(err)
  1799. }
  1800. }
  1801. // disable auto paging, 1 page per iteration
  1802. query := session.Query("SELECT id, count FROM testManualPaging").PageState(nil).PageSize(2)
  1803. var id, count, fetched int
  1804. iter := query.Iter()
  1805. // NOTE: this isnt very indicitive of how it should be used, the idea is that
  1806. // the page state is returned to some client who will send it back to manually
  1807. // page through the results.
  1808. for {
  1809. for iter.Scan(&id, &count) {
  1810. if count != (id * id) {
  1811. t.Fatalf("got wrong value from iteration: got %d expected %d", count, id*id)
  1812. }
  1813. fetched++
  1814. }
  1815. if len(iter.PageState()) > 0 {
  1816. // more pages
  1817. iter = query.PageState(iter.PageState()).Iter()
  1818. } else {
  1819. break
  1820. }
  1821. }
  1822. if err := iter.Close(); err != nil {
  1823. t.Fatal(err)
  1824. }
  1825. if fetched != rowsToInsert {
  1826. t.Fatalf("expected to fetch %d rows got %d", fetched, rowsToInsert)
  1827. }
  1828. }
  1829. func TestLexicalUUIDType(t *testing.T) {
  1830. session := createSession(t)
  1831. defer session.Close()
  1832. if err := createTable(session, `CREATE TABLE gocql_test.test_lexical_uuid (
  1833. key varchar,
  1834. column1 'org.apache.cassandra.db.marshal.LexicalUUIDType',
  1835. value int,
  1836. PRIMARY KEY (key, column1)
  1837. )`); err != nil {
  1838. t.Fatal("create:", err)
  1839. }
  1840. key := TimeUUID().String()
  1841. column1 := TimeUUID()
  1842. err := session.Query("INSERT INTO test_lexical_uuid(key, column1, value) VALUES(?, ?, ?)", key, column1, 55).Exec()
  1843. if err != nil {
  1844. t.Fatal(err)
  1845. }
  1846. var gotUUID UUID
  1847. if err := session.Query("SELECT column1 from test_lexical_uuid where key = ? AND column1 = ?", key, column1).Scan(&gotUUID); err != nil {
  1848. t.Fatal(err)
  1849. }
  1850. if gotUUID != column1 {
  1851. t.Errorf("got %s, expected %s", gotUUID, column1)
  1852. }
  1853. }
  1854. // Issue 475
  1855. func TestSessionBindRoutingKey(t *testing.T) {
  1856. cluster := createCluster()
  1857. cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(RoundRobinHostPolicy())
  1858. session := createSessionFromCluster(cluster, t)
  1859. defer session.Close()
  1860. if err := createTable(session, `CREATE TABLE gocql_test.test_bind_routing_key (
  1861. key varchar,
  1862. value int,
  1863. PRIMARY KEY (key)
  1864. )`); err != nil {
  1865. t.Fatal(err)
  1866. }
  1867. const (
  1868. key = "routing-key"
  1869. value = 5
  1870. )
  1871. fn := func(info *QueryInfo) ([]interface{}, error) {
  1872. return []interface{}{key, value}, nil
  1873. }
  1874. q := session.Bind("INSERT INTO test_bind_routing_key(key, value) VALUES(?, ?)", fn)
  1875. if err := q.Exec(); err != nil {
  1876. t.Fatal(err)
  1877. }
  1878. }
  1879. func TestJSONSupport(t *testing.T) {
  1880. if *flagProto < 4 {
  1881. t.Skip("skipping JSON support on proto < 4")
  1882. }
  1883. session := createSession(t)
  1884. defer session.Close()
  1885. if err := createTable(session, `CREATE TABLE gocql_test.test_json (
  1886. id text PRIMARY KEY,
  1887. age int,
  1888. state text
  1889. )`); err != nil {
  1890. t.Fatal(err)
  1891. }
  1892. err := session.Query("INSERT INTO test_json JSON ?", `{"id": "user123", "age": 42, "state": "TX"}`).Exec()
  1893. if err != nil {
  1894. t.Fatal(err)
  1895. }
  1896. var (
  1897. id string
  1898. age int
  1899. state string
  1900. )
  1901. err = session.Query("SELECT id, age, state FROM test_json WHERE id = ?", "user123").Scan(&id, &age, &state)
  1902. if err != nil {
  1903. t.Fatal(err)
  1904. }
  1905. if id != "user123" {
  1906. t.Errorf("got id %q expected %q", id, "user123")
  1907. }
  1908. if age != 42 {
  1909. t.Errorf("got age %d expected %d", age, 42)
  1910. }
  1911. if state != "TX" {
  1912. t.Errorf("got state %q expected %q", state, "TX")
  1913. }
  1914. }