cassandra_test.go 97 KB


  1. // +build all cassandra
  2. package gocql
  3. import (
  4. "bytes"
  5. "context"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "math"
  10. "math/big"
  11. "net"
  12. "reflect"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "testing"
  17. "time"
  18. "unicode"
  19. inf "gopkg.in/inf.v0"
  20. )
  21. func TestEmptyHosts(t *testing.T) {
  22. cluster := createCluster()
  23. cluster.Hosts = nil
  24. if session, err := cluster.CreateSession(); err == nil {
  25. session.Close()
  26. t.Error("expected err, got nil")
  27. }
  28. }
  29. func TestInvalidPeerEntry(t *testing.T) {
  30. t.Skip("dont mutate system tables, rewrite this to test what we mean to test")
  31. session := createSession(t)
  32. // rack, release_version, schema_version, tokens are all null
  33. query := session.Query("INSERT into system.peers (peer, data_center, host_id, rpc_address) VALUES (?, ?, ?, ?)",
  34. "169.254.235.45",
  35. "datacenter1",
  36. "35c0ec48-5109-40fd-9281-9e9d4add2f1e",
  37. "169.254.235.45",
  38. )
  39. if err := query.Exec(); err != nil {
  40. t.Fatal(err)
  41. }
  42. session.Close()
  43. cluster := createCluster()
  44. cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(RoundRobinHostPolicy())
  45. session = createSessionFromCluster(cluster, t)
  46. defer func() {
  47. session.Query("DELETE from system.peers where peer = ?", "169.254.235.45").Exec()
  48. session.Close()
  49. }()
  50. // check we can perform a query
  51. iter := session.Query("select peer from system.peers").Iter()
  52. var peer string
  53. for iter.Scan(&peer) {
  54. }
  55. if err := iter.Close(); err != nil {
  56. t.Fatal(err)
  57. }
  58. }
  59. //TestUseStatementError checks to make sure the correct error is returned when the user tries to execute a use statement.
  60. func TestUseStatementError(t *testing.T) {
  61. session := createSession(t)
  62. defer session.Close()
  63. if err := session.Query("USE gocql_test").Exec(); err != nil {
  64. if err != ErrUseStmt {
  65. t.Fatalf("expected ErrUseStmt, got " + err.Error())
  66. }
  67. } else {
  68. t.Fatal("expected err, got nil.")
  69. }
  70. }
  71. //TestInvalidKeyspace checks that an invalid keyspace will return promptly and without a flood of connections
  72. func TestInvalidKeyspace(t *testing.T) {
  73. cluster := createCluster()
  74. cluster.Keyspace = "invalidKeyspace"
  75. session, err := cluster.CreateSession()
  76. if err != nil {
  77. if err != ErrNoConnectionsStarted {
  78. t.Fatalf("Expected ErrNoConnections but got %v", err)
  79. }
  80. } else {
  81. session.Close() //Clean up the session
  82. t.Fatal("expected err, got nil.")
  83. }
  84. }
  85. func TestTracing(t *testing.T) {
  86. session := createSession(t)
  87. defer session.Close()
  88. if err := createTable(session, `CREATE TABLE gocql_test.trace (id int primary key)`); err != nil {
  89. t.Fatal("create:", err)
  90. }
  91. buf := &bytes.Buffer{}
  92. trace := &traceWriter{session: session, w: buf}
  93. if err := session.Query(`INSERT INTO trace (id) VALUES (?)`, 42).Trace(trace).Exec(); err != nil {
  94. t.Fatal("insert:", err)
  95. } else if buf.Len() == 0 {
  96. t.Fatal("insert: failed to obtain any tracing")
  97. }
  98. trace.mu.Lock()
  99. buf.Reset()
  100. trace.mu.Unlock()
  101. var value int
  102. if err := session.Query(`SELECT id FROM trace WHERE id = ?`, 42).Trace(trace).Scan(&value); err != nil {
  103. t.Fatal("select:", err)
  104. } else if value != 42 {
  105. t.Fatalf("value: expected %d, got %d", 42, value)
  106. } else if buf.Len() == 0 {
  107. t.Fatal("select: failed to obtain any tracing")
  108. }
  109. // also works from session tracer
  110. session.SetTrace(trace)
  111. trace.mu.Lock()
  112. buf.Reset()
  113. trace.mu.Unlock()
  114. if err := session.Query(`SELECT id FROM trace WHERE id = ?`, 42).Scan(&value); err != nil {
  115. t.Fatal("select:", err)
  116. }
  117. if buf.Len() == 0 {
  118. t.Fatal("select: failed to obtain any tracing")
  119. }
  120. }
  121. func TestObserve(t *testing.T) {
  122. session := createSession(t)
  123. defer session.Close()
  124. if err := createTable(session, `CREATE TABLE gocql_test.observe (id int primary key)`); err != nil {
  125. t.Fatal("create:", err)
  126. }
  127. var (
  128. observedErr error
  129. observedKeyspace string
  130. observedStmt string
  131. )
  132. const keyspace = "gocql_test"
  133. resetObserved := func() {
  134. observedErr = errors.New("placeholder only") // used to distinguish err=nil cases
  135. observedKeyspace = ""
  136. observedStmt = ""
  137. }
  138. observer := funcQueryObserver(func(ctx context.Context, o ObservedQuery) {
  139. observedKeyspace = o.Keyspace
  140. observedStmt = o.Statement
  141. observedErr = o.Err
  142. })
  143. // select before inserted, will error but the reporting is err=nil as the query is valid
  144. resetObserved()
  145. var value int
  146. if err := session.Query(`SELECT id FROM observe WHERE id = ?`, 43).Observer(observer).Scan(&value); err == nil {
  147. t.Fatal("select: expected error")
  148. } else if observedErr != nil {
  149. t.Fatalf("select: observed error expected nil, got %q", observedErr)
  150. } else if observedKeyspace != keyspace {
  151. t.Fatal("select: unexpected observed keyspace", observedKeyspace)
  152. } else if observedStmt != `SELECT id FROM observe WHERE id = ?` {
  153. t.Fatal("select: unexpected observed stmt", observedStmt)
  154. }
  155. resetObserved()
  156. if err := session.Query(`INSERT INTO observe (id) VALUES (?)`, 42).Observer(observer).Exec(); err != nil {
  157. t.Fatal("insert:", err)
  158. } else if observedErr != nil {
  159. t.Fatal("insert:", observedErr)
  160. } else if observedKeyspace != keyspace {
  161. t.Fatal("insert: unexpected observed keyspace", observedKeyspace)
  162. } else if observedStmt != `INSERT INTO observe (id) VALUES (?)` {
  163. t.Fatal("insert: unexpected observed stmt", observedStmt)
  164. }
  165. resetObserved()
  166. value = 0
  167. if err := session.Query(`SELECT id FROM observe WHERE id = ?`, 42).Observer(observer).Scan(&value); err != nil {
  168. t.Fatal("select:", err)
  169. } else if value != 42 {
  170. t.Fatalf("value: expected %d, got %d", 42, value)
  171. } else if observedErr != nil {
  172. t.Fatal("select:", observedErr)
  173. } else if observedKeyspace != keyspace {
  174. t.Fatal("select: unexpected observed keyspace", observedKeyspace)
  175. } else if observedStmt != `SELECT id FROM observe WHERE id = ?` {
  176. t.Fatal("select: unexpected observed stmt", observedStmt)
  177. }
  178. // also works from session observer
  179. resetObserved()
  180. oSession := createSession(t, func(config *ClusterConfig) { config.QueryObserver = observer })
  181. if err := oSession.Query(`SELECT id FROM observe WHERE id = ?`, 42).Scan(&value); err != nil {
  182. t.Fatal("select:", err)
  183. } else if observedErr != nil {
  184. t.Fatal("select:", err)
  185. } else if observedKeyspace != keyspace {
  186. t.Fatal("select: unexpected observed keyspace", observedKeyspace)
  187. } else if observedStmt != `SELECT id FROM observe WHERE id = ?` {
  188. t.Fatal("select: unexpected observed stmt", observedStmt)
  189. }
  190. // reports errors when the query is poorly formed
  191. resetObserved()
  192. value = 0
  193. if err := session.Query(`SELECT id FROM unknown_table WHERE id = ?`, 42).Observer(observer).Scan(&value); err == nil {
  194. t.Fatal("select: expecting error")
  195. } else if observedErr == nil {
  196. t.Fatal("select: expecting observed error")
  197. } else if observedKeyspace != keyspace {
  198. t.Fatal("select: unexpected observed keyspace", observedKeyspace)
  199. } else if observedStmt != `SELECT id FROM unknown_table WHERE id = ?` {
  200. t.Fatal("select: unexpected observed stmt", observedStmt)
  201. }
  202. }
  203. func TestObserve_Pagination(t *testing.T) {
  204. session := createSession(t)
  205. defer session.Close()
  206. if err := createTable(session, `CREATE TABLE gocql_test.observe2 (id int, PRIMARY KEY (id))`); err != nil {
  207. t.Fatal("create:", err)
  208. }
  209. var observedRows int
  210. resetObserved := func() {
  211. observedRows = -1
  212. }
  213. observer := funcQueryObserver(func(ctx context.Context, o ObservedQuery) {
  214. observedRows = o.Rows
  215. })
  216. // insert 100 entries, relevant for pagination
  217. for i := 0; i < 50; i++ {
  218. if err := session.Query(`INSERT INTO observe2 (id) VALUES (?)`, i).Exec(); err != nil {
  219. t.Fatal("insert:", err)
  220. }
  221. }
  222. resetObserved()
  223. // read the 100 entries in paginated entries of size 10. Expecting 5 observations, each with 10 rows
  224. scanner := session.Query(`SELECT id FROM observe2 LIMIT 100`).
  225. Observer(observer).
  226. PageSize(10).
  227. Iter().Scanner()
  228. for i := 0; i < 50; i++ {
  229. if !scanner.Next() {
  230. t.Fatalf("next: should still be true: %d: %v", i, scanner.Err())
  231. }
  232. if i%10 == 0 {
  233. if observedRows != 10 {
  234. t.Fatalf("next: expecting a paginated query with 10 entries, got: %d (%d)", observedRows, i)
  235. }
  236. } else if observedRows != -1 {
  237. t.Fatalf("next: not expecting paginated query (-1 entries), got: %d", observedRows)
  238. }
  239. resetObserved()
  240. }
  241. if scanner.Next() {
  242. t.Fatal("next: no more entries where expected")
  243. }
  244. }
  245. func TestPaging(t *testing.T) {
  246. session := createSession(t)
  247. defer session.Close()
  248. if session.cfg.ProtoVersion == 1 {
  249. t.Skip("Paging not supported. Please use Cassandra >= 2.0")
  250. }
  251. if err := createTable(session, "CREATE TABLE gocql_test.paging (id int primary key)"); err != nil {
  252. t.Fatal("create table:", err)
  253. }
  254. for i := 0; i < 100; i++ {
  255. if err := session.Query("INSERT INTO paging (id) VALUES (?)", i).Exec(); err != nil {
  256. t.Fatal("insert:", err)
  257. }
  258. }
  259. iter := session.Query("SELECT id FROM paging").PageSize(10).Iter()
  260. var id int
  261. count := 0
  262. for iter.Scan(&id) {
  263. count++
  264. }
  265. if err := iter.Close(); err != nil {
  266. t.Fatal("close:", err)
  267. }
  268. if count != 100 {
  269. t.Fatalf("expected %d, got %d", 100, count)
  270. }
  271. }
  272. func TestPagingWithBind(t *testing.T) {
  273. session := createSession(t)
  274. defer session.Close()
  275. if session.cfg.ProtoVersion == 1 {
  276. t.Skip("Paging not supported. Please use Cassandra >= 2.0")
  277. }
  278. if err := createTable(session, "CREATE TABLE gocql_test.paging_bind (id int, val int, primary key(id,val))"); err != nil {
  279. t.Fatal("create table:", err)
  280. }
  281. for i := 0; i < 100; i++ {
  282. if err := session.Query("INSERT INTO paging_bind (id,val) VALUES (?,?)", 1,i).Exec(); err != nil {
  283. t.Fatal("insert:", err)
  284. }
  285. }
  286. q := session.Query("SELECT val FROM paging_bind WHERE id = ? AND val < ?",1, 50).PageSize(10)
  287. iter := q.Iter()
  288. var id int
  289. count := 0
  290. for iter.Scan(&id) {
  291. count++
  292. }
  293. if err := iter.Close(); err != nil {
  294. t.Fatal("close:", err)
  295. }
  296. if count != 50 {
  297. t.Fatalf("expected %d, got %d", 50, count)
  298. }
  299. iter = q.Bind(1, 20).Iter()
  300. count = 0
  301. for iter.Scan(&id) {
  302. count++
  303. }
  304. if count != 20 {
  305. t.Fatalf("expected %d, got %d", 20, count)
  306. }
  307. if err := iter.Close(); err != nil {
  308. t.Fatal("close:", err)
  309. }
  310. }
  311. func TestCAS(t *testing.T) {
  312. cluster := createCluster()
  313. cluster.SerialConsistency = LocalSerial
  314. session := createSessionFromCluster(cluster, t)
  315. defer session.Close()
  316. if session.cfg.ProtoVersion == 1 {
  317. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  318. }
  319. if err := createTable(session, `CREATE TABLE gocql_test.cas_table (
  320. title varchar,
  321. revid timeuuid,
  322. last_modified timestamp,
  323. PRIMARY KEY (title, revid)
  324. )`); err != nil {
  325. t.Fatal("create:", err)
  326. }
  327. title, revid, modified := "baz", TimeUUID(), time.Now()
  328. var titleCAS string
  329. var revidCAS UUID
  330. var modifiedCAS time.Time
  331. if applied, err := session.Query(`INSERT INTO cas_table (title, revid, last_modified)
  332. VALUES (?, ?, ?) IF NOT EXISTS`,
  333. title, revid, modified).ScanCAS(&titleCAS, &revidCAS, &modifiedCAS); err != nil {
  334. t.Fatal("insert:", err)
  335. } else if !applied {
  336. t.Fatal("insert should have been applied")
  337. }
  338. if applied, err := session.Query(`INSERT INTO cas_table (title, revid, last_modified)
  339. VALUES (?, ?, ?) IF NOT EXISTS`,
  340. title, revid, modified).ScanCAS(&titleCAS, &revidCAS, &modifiedCAS); err != nil {
  341. t.Fatal("insert:", err)
  342. } else if applied {
  343. t.Fatal("insert should not have been applied")
  344. } else if title != titleCAS || revid != revidCAS {
  345. t.Fatalf("expected %s/%v/%v but got %s/%v/%v", title, revid, modified, titleCAS, revidCAS, modifiedCAS)
  346. }
  347. tenSecondsLater := modified.Add(10 * time.Second)
  348. if applied, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
  349. title, revid, tenSecondsLater).ScanCAS(&modifiedCAS); err != nil {
  350. t.Fatal("delete:", err)
  351. } else if applied {
  352. t.Fatal("delete should have not been applied")
  353. }
  354. if modifiedCAS.Unix() != tenSecondsLater.Add(-10*time.Second).Unix() {
  355. t.Fatalf("Was expecting modified CAS to be %v; but was one second later", modifiedCAS.UTC())
  356. }
  357. if _, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
  358. title, revid, tenSecondsLater).ScanCAS(); !strings.HasPrefix(err.Error(), "gocql: not enough columns to scan into") {
  359. t.Fatalf("delete: was expecting count mismatch error but got: %q", err.Error())
  360. }
  361. if applied, err := session.Query(`DELETE FROM cas_table WHERE title = ? and revid = ? IF last_modified = ?`,
  362. title, revid, modified).ScanCAS(&modifiedCAS); err != nil {
  363. t.Fatal("delete:", err)
  364. } else if !applied {
  365. t.Fatal("delete should have been applied")
  366. }
  367. if err := session.Query(`TRUNCATE cas_table`).Exec(); err != nil {
  368. t.Fatal("truncate:", err)
  369. }
  370. successBatch := session.NewBatch(LoggedBatch)
  371. successBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES (?, ?, ?) IF NOT EXISTS", title, revid, modified)
  372. if applied, _, err := session.ExecuteBatchCAS(successBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
  373. t.Fatal("insert:", err)
  374. } else if !applied {
  375. t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", titleCAS, revidCAS, modifiedCAS)
  376. }
  377. successBatch = session.NewBatch(LoggedBatch)
  378. successBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES (?, ?, ?) IF NOT EXISTS", title+"_foo", revid, modified)
  379. casMap := make(map[string]interface{})
  380. if applied, _, err := session.MapExecuteBatchCAS(successBatch, casMap); err != nil {
  381. t.Fatal("insert:", err)
  382. } else if !applied {
  383. t.Fatal("insert should have been applied")
  384. }
  385. failBatch := session.NewBatch(LoggedBatch)
  386. failBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES (?, ?, ?) IF NOT EXISTS", title, revid, modified)
  387. if applied, _, err := session.ExecuteBatchCAS(successBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
  388. t.Fatal("insert:", err)
  389. } else if applied {
  390. t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", titleCAS, revidCAS, modifiedCAS)
  391. }
  392. insertBatch := session.NewBatch(LoggedBatch)
  393. insertBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES ('_foo', 2c3af400-73a4-11e5-9381-29463d90c3f0, DATEOF(NOW()))")
  394. insertBatch.Query("INSERT INTO cas_table (title, revid, last_modified) VALUES ('_foo', 3e4ad2f1-73a4-11e5-9381-29463d90c3f0, DATEOF(NOW()))")
  395. if err := session.ExecuteBatch(insertBatch); err != nil {
  396. t.Fatal("insert:", err)
  397. }
  398. failBatch = session.NewBatch(LoggedBatch)
  399. failBatch.Query("UPDATE cas_table SET last_modified = DATEOF(NOW()) WHERE title='_foo' AND revid=2c3af400-73a4-11e5-9381-29463d90c3f0 IF last_modified=DATEOF(NOW());")
  400. failBatch.Query("UPDATE cas_table SET last_modified = DATEOF(NOW()) WHERE title='_foo' AND revid=3e4ad2f1-73a4-11e5-9381-29463d90c3f0 IF last_modified=DATEOF(NOW());")
  401. if applied, iter, err := session.ExecuteBatchCAS(failBatch, &titleCAS, &revidCAS, &modifiedCAS); err != nil {
  402. t.Fatal("insert:", err)
  403. } else if applied {
  404. t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", titleCAS, revidCAS, modifiedCAS)
  405. } else {
  406. if scan := iter.Scan(&applied, &titleCAS, &revidCAS, &modifiedCAS); scan && applied {
  407. t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", titleCAS, revidCAS, modifiedCAS)
  408. } else if !scan {
  409. t.Fatal("should have scanned another row")
  410. }
  411. if err := iter.Close(); err != nil {
  412. t.Fatal("scan:", err)
  413. }
  414. }
  415. }
  416. func TestDurationType(t *testing.T) {
  417. session := createSession(t)
  418. defer session.Close()
  419. if session.cfg.ProtoVersion < 5 {
  420. t.Skip("Duration type is not supported. Please use protocol version >= 4 and cassandra version >= 3.11")
  421. }
  422. if err := createTable(session, `CREATE TABLE gocql_test.duration_table (
  423. k int primary key, v duration
  424. )`); err != nil {
  425. t.Fatal("create:", err)
  426. }
  427. durations := []Duration{
  428. Duration{
  429. Months: 250,
  430. Days: 500,
  431. Nanoseconds: 300010001,
  432. },
  433. Duration{
  434. Months: -250,
  435. Days: -500,
  436. Nanoseconds: -300010001,
  437. },
  438. Duration{
  439. Months: 0,
  440. Days: 128,
  441. Nanoseconds: 127,
  442. },
  443. Duration{
  444. Months: 0x7FFFFFFF,
  445. Days: 0x7FFFFFFF,
  446. Nanoseconds: 0x7FFFFFFFFFFFFFFF,
  447. },
  448. }
  449. for _, durationSend := range durations {
  450. if err := session.Query(`INSERT INTO gocql_test.duration_table (k, v) VALUES (1, ?)`, durationSend).Exec(); err != nil {
  451. t.Fatal(err)
  452. }
  453. var id int
  454. var duration Duration
  455. if err := session.Query(`SELECT k, v FROM gocql_test.duration_table`).Scan(&id, &duration); err != nil {
  456. t.Fatal(err)
  457. }
  458. if duration.Months != durationSend.Months || duration.Days != durationSend.Days || duration.Nanoseconds != durationSend.Nanoseconds {
  459. t.Fatalf("Unexpeted value returned, expected=%v, received=%v", durationSend, duration)
  460. }
  461. }
  462. }
  463. func TestMapScanCAS(t *testing.T) {
  464. session := createSession(t)
  465. defer session.Close()
  466. if session.cfg.ProtoVersion == 1 {
  467. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  468. }
  469. if err := createTable(session, `CREATE TABLE gocql_test.cas_table2 (
  470. title varchar,
  471. revid timeuuid,
  472. last_modified timestamp,
  473. deleted boolean,
  474. PRIMARY KEY (title, revid)
  475. )`); err != nil {
  476. t.Fatal("create:", err)
  477. }
  478. title, revid, modified, deleted := "baz", TimeUUID(), time.Now(), false
  479. mapCAS := map[string]interface{}{}
  480. if applied, err := session.Query(`INSERT INTO cas_table2 (title, revid, last_modified, deleted)
  481. VALUES (?, ?, ?, ?) IF NOT EXISTS`,
  482. title, revid, modified, deleted).MapScanCAS(mapCAS); err != nil {
  483. t.Fatal("insert:", err)
  484. } else if !applied {
  485. t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", title, revid, modified)
  486. }
  487. mapCAS = map[string]interface{}{}
  488. if applied, err := session.Query(`INSERT INTO cas_table2 (title, revid, last_modified, deleted)
  489. VALUES (?, ?, ?, ?) IF NOT EXISTS`,
  490. title, revid, modified, deleted).MapScanCAS(mapCAS); err != nil {
  491. t.Fatal("insert:", err)
  492. } else if applied {
  493. t.Fatalf("insert should have been applied: title=%v revID=%v modified=%v", title, revid, modified)
  494. } else if title != mapCAS["title"] || revid != mapCAS["revid"] || deleted != mapCAS["deleted"] {
  495. t.Fatalf("expected %s/%v/%v/%v but got %s/%v/%v%v", title, revid, modified, false, mapCAS["title"], mapCAS["revid"], mapCAS["last_modified"], mapCAS["deleted"])
  496. }
  497. }
  498. func TestBatch(t *testing.T) {
  499. session := createSession(t)
  500. defer session.Close()
  501. if session.cfg.ProtoVersion == 1 {
  502. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  503. }
  504. if err := createTable(session, `CREATE TABLE gocql_test.batch_table (id int primary key)`); err != nil {
  505. t.Fatal("create table:", err)
  506. }
  507. batch := session.NewBatch(LoggedBatch)
  508. for i := 0; i < 100; i++ {
  509. batch.Query(`INSERT INTO batch_table (id) VALUES (?)`, i)
  510. }
  511. if err := session.ExecuteBatch(batch); err != nil {
  512. t.Fatal("execute batch:", err)
  513. }
  514. count := 0
  515. if err := session.Query(`SELECT COUNT(*) FROM batch_table`).Scan(&count); err != nil {
  516. t.Fatal("select count:", err)
  517. } else if count != 100 {
  518. t.Fatalf("count: expected %d, got %d\n", 100, count)
  519. }
  520. }
  521. func TestUnpreparedBatch(t *testing.T) {
  522. t.Skip("FLAKE skipping")
  523. session := createSession(t)
  524. defer session.Close()
  525. if session.cfg.ProtoVersion == 1 {
  526. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  527. }
  528. if err := createTable(session, `CREATE TABLE gocql_test.batch_unprepared (id int primary key, c counter)`); err != nil {
  529. t.Fatal("create table:", err)
  530. }
  531. var batch *Batch
  532. if session.cfg.ProtoVersion == 2 {
  533. batch = session.NewBatch(CounterBatch)
  534. } else {
  535. batch = session.NewBatch(UnloggedBatch)
  536. }
  537. for i := 0; i < 100; i++ {
  538. batch.Query(`UPDATE batch_unprepared SET c = c + 1 WHERE id = 1`)
  539. }
  540. if err := session.ExecuteBatch(batch); err != nil {
  541. t.Fatal("execute batch:", err)
  542. }
  543. count := 0
  544. if err := session.Query(`SELECT COUNT(*) FROM batch_unprepared`).Scan(&count); err != nil {
  545. t.Fatal("select count:", err)
  546. } else if count != 1 {
  547. t.Fatalf("count: expected %d, got %d\n", 100, count)
  548. }
  549. if err := session.Query(`SELECT c FROM batch_unprepared`).Scan(&count); err != nil {
  550. t.Fatal("select count:", err)
  551. } else if count != 100 {
  552. t.Fatalf("count: expected %d, got %d\n", 100, count)
  553. }
  554. }
  555. // TestBatchLimit tests gocql to make sure batch operations larger than the maximum
  556. // statement limit are not submitted to a cassandra node.
  557. func TestBatchLimit(t *testing.T) {
  558. session := createSession(t)
  559. defer session.Close()
  560. if session.cfg.ProtoVersion == 1 {
  561. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  562. }
  563. if err := createTable(session, `CREATE TABLE gocql_test.batch_table2 (id int primary key)`); err != nil {
  564. t.Fatal("create table:", err)
  565. }
  566. batch := session.NewBatch(LoggedBatch)
  567. for i := 0; i < 65537; i++ {
  568. batch.Query(`INSERT INTO batch_table2 (id) VALUES (?)`, i)
  569. }
  570. if err := session.ExecuteBatch(batch); err != ErrTooManyStmts {
  571. t.Fatal("gocql attempted to execute a batch larger than the support limit of statements.")
  572. }
  573. }
  574. func TestWhereIn(t *testing.T) {
  575. session := createSession(t)
  576. defer session.Close()
  577. if err := createTable(session, `CREATE TABLE gocql_test.where_in_table (id int, cluster int, primary key (id,cluster))`); err != nil {
  578. t.Fatal("create table:", err)
  579. }
  580. if err := session.Query("INSERT INTO where_in_table (id, cluster) VALUES (?,?)", 100, 200).Exec(); err != nil {
  581. t.Fatal("insert:", err)
  582. }
  583. iter := session.Query("SELECT * FROM where_in_table WHERE id = ? AND cluster IN (?)", 100, 200).Iter()
  584. var id, cluster int
  585. count := 0
  586. for iter.Scan(&id, &cluster) {
  587. count++
  588. }
  589. if id != 100 || cluster != 200 {
  590. t.Fatalf("Was expecting id and cluster to be (100,200) but were (%d,%d)", id, cluster)
  591. }
  592. }
  593. // TestTooManyQueryArgs tests to make sure the library correctly handles the application level bug
  594. // whereby too many query arguments are passed to a query
  595. func TestTooManyQueryArgs(t *testing.T) {
  596. session := createSession(t)
  597. defer session.Close()
  598. if session.cfg.ProtoVersion == 1 {
  599. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  600. }
  601. if err := createTable(session, `CREATE TABLE gocql_test.too_many_query_args (id int primary key, value int)`); err != nil {
  602. t.Fatal("create table:", err)
  603. }
  604. _, err := session.Query(`SELECT * FROM too_many_query_args WHERE id = ?`, 1, 2).Iter().SliceMap()
  605. if err == nil {
  606. t.Fatal("'`SELECT * FROM too_many_query_args WHERE id = ?`, 1, 2' should return an error")
  607. }
  608. batch := session.NewBatch(UnloggedBatch)
  609. batch.Query("INSERT INTO too_many_query_args (id, value) VALUES (?, ?)", 1, 2, 3)
  610. err = session.ExecuteBatch(batch)
  611. if err == nil {
  612. t.Fatal("'`INSERT INTO too_many_query_args (id, value) VALUES (?, ?)`, 1, 2, 3' should return an error")
  613. }
  614. // TODO: should indicate via an error code that it is an invalid arg?
  615. }
  616. // TestNotEnoughQueryArgs tests to make sure the library correctly handles the application level bug
  617. // whereby not enough query arguments are passed to a query
  618. func TestNotEnoughQueryArgs(t *testing.T) {
  619. session := createSession(t)
  620. defer session.Close()
  621. if session.cfg.ProtoVersion == 1 {
  622. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  623. }
  624. if err := createTable(session, `CREATE TABLE gocql_test.not_enough_query_args (id int, cluster int, value int, primary key (id, cluster))`); err != nil {
  625. t.Fatal("create table:", err)
  626. }
  627. _, err := session.Query(`SELECT * FROM not_enough_query_args WHERE id = ? and cluster = ?`, 1).Iter().SliceMap()
  628. if err == nil {
  629. t.Fatal("'`SELECT * FROM not_enough_query_args WHERE id = ? and cluster = ?`, 1' should return an error")
  630. }
  631. batch := session.NewBatch(UnloggedBatch)
  632. batch.Query("INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)", 1, 2)
  633. err = session.ExecuteBatch(batch)
  634. if err == nil {
  635. t.Fatal("'`INSERT INTO not_enough_query_args (id, cluster, value) VALUES (?, ?, ?)`, 1, 2' should return an error")
  636. }
  637. }
  638. // TestCreateSessionTimeout tests to make sure the CreateSession function timeouts out correctly
  639. // and prevents an infinite loop of connection retries.
  640. func TestCreateSessionTimeout(t *testing.T) {
  641. ctx, cancel := context.WithCancel(context.Background())
  642. defer cancel()
  643. go func() {
  644. select {
  645. case <-time.After(2 * time.Second):
  646. t.Error("no startup timeout")
  647. case <-ctx.Done():
  648. }
  649. }()
  650. cluster := createCluster()
  651. cluster.Hosts = []string{"127.0.0.1:1"}
  652. session, err := cluster.CreateSession()
  653. if err == nil {
  654. session.Close()
  655. t.Fatal("expected ErrNoConnectionsStarted, but no error was returned.")
  656. }
  657. }
  658. func TestReconnection(t *testing.T) {
  659. cluster := createCluster()
  660. cluster.ReconnectInterval = 1 * time.Second
  661. session := createSessionFromCluster(cluster, t)
  662. defer session.Close()
  663. h := session.ring.allHosts()[0]
  664. session.handleNodeDown(h.ConnectAddress(), h.Port())
  665. if h.State() != NodeDown {
  666. t.Fatal("Host should be NodeDown but not.")
  667. }
  668. time.Sleep(cluster.ReconnectInterval + h.Version().nodeUpDelay() + 1*time.Second)
  669. if h.State() != NodeUp {
  670. t.Fatal("Host should be NodeUp but not. Failed to reconnect.")
  671. }
  672. }
  673. type FullName struct {
  674. FirstName string
  675. LastName string
  676. }
  677. func (n FullName) MarshalCQL(info TypeInfo) ([]byte, error) {
  678. return []byte(n.FirstName + " " + n.LastName), nil
  679. }
  680. func (n *FullName) UnmarshalCQL(info TypeInfo, data []byte) error {
  681. t := strings.SplitN(string(data), " ", 2)
  682. n.FirstName, n.LastName = t[0], t[1]
  683. return nil
  684. }
  685. func TestMapScanWithRefMap(t *testing.T) {
  686. session := createSession(t)
  687. defer session.Close()
  688. if err := createTable(session, `CREATE TABLE gocql_test.scan_map_ref_table (
  689. testtext text PRIMARY KEY,
  690. testfullname text,
  691. testint int,
  692. )`); err != nil {
  693. t.Fatal("create table:", err)
  694. }
  695. m := make(map[string]interface{})
  696. m["testtext"] = "testtext"
  697. m["testfullname"] = FullName{"John", "Doe"}
  698. m["testint"] = 100
  699. if err := session.Query(`INSERT INTO scan_map_ref_table (testtext, testfullname, testint) values (?,?,?)`,
  700. m["testtext"], m["testfullname"], m["testint"]).Exec(); err != nil {
  701. t.Fatal("insert:", err)
  702. }
  703. var testText string
  704. var testFullName FullName
  705. ret := map[string]interface{}{
  706. "testtext": &testText,
  707. "testfullname": &testFullName,
  708. // testint is not set here.
  709. }
  710. iter := session.Query(`SELECT * FROM scan_map_ref_table`).Iter()
  711. if ok := iter.MapScan(ret); !ok {
  712. t.Fatal("select:", iter.Close())
  713. } else {
  714. if ret["testtext"] != "testtext" {
  715. t.Fatal("returned testtext did not match")
  716. }
  717. f := ret["testfullname"].(FullName)
  718. if f.FirstName != "John" || f.LastName != "Doe" {
  719. t.Fatal("returned testfullname did not match")
  720. }
  721. if ret["testint"] != 100 {
  722. t.Fatal("returned testinit did not match")
  723. }
  724. }
  725. if testText != "testtext" {
  726. t.Fatal("returned testtext did not match")
  727. }
  728. if testFullName.FirstName != "John" || testFullName.LastName != "Doe" {
  729. t.Fatal("returned testfullname did not match")
  730. }
  731. // using MapScan to read a nil int value
  732. intp := new(int64)
  733. ret = map[string]interface{}{
  734. "testint": &intp,
  735. }
  736. if err := session.Query("INSERT INTO scan_map_ref_table(testtext, testint) VALUES(?, ?)", "null-int", nil).Exec(); err != nil {
  737. t.Fatal(err)
  738. }
  739. err := session.Query(`SELECT testint FROM scan_map_ref_table WHERE testtext = ?`, "null-int").MapScan(ret)
  740. if err != nil {
  741. t.Fatal(err)
  742. } else if v := ret["testint"].(*int64); v != nil {
  743. t.Fatalf("testint should be nil got %+#v", v)
  744. }
  745. }
  746. func TestMapScan(t *testing.T) {
  747. session := createSession(t)
  748. defer session.Close()
  749. if err := createTable(session, `CREATE TABLE gocql_test.scan_map_table (
  750. fullname text PRIMARY KEY,
  751. age int,
  752. address inet,
  753. )`); err != nil {
  754. t.Fatal("create table:", err)
  755. }
  756. if err := session.Query(`INSERT INTO scan_map_table (fullname, age, address) values (?,?,?)`,
  757. "Grace Hopper", 31, net.ParseIP("10.0.0.1")).Exec(); err != nil {
  758. t.Fatal("insert:", err)
  759. }
  760. if err := session.Query(`INSERT INTO scan_map_table (fullname, age, address) values (?,?,?)`,
  761. "Ada Lovelace", 30, net.ParseIP("10.0.0.2")).Exec(); err != nil {
  762. t.Fatal("insert:", err)
  763. }
  764. iter := session.Query(`SELECT * FROM scan_map_table`).Iter()
  765. // First iteration
  766. row := make(map[string]interface{})
  767. if !iter.MapScan(row) {
  768. t.Fatal("select:", iter.Close())
  769. }
  770. assertEqual(t, "fullname", "Ada Lovelace", row["fullname"])
  771. assertEqual(t, "age", 30, row["age"])
  772. assertEqual(t, "address", "10.0.0.2", row["address"])
  773. // Second iteration using a new map
  774. row = make(map[string]interface{})
  775. if !iter.MapScan(row) {
  776. t.Fatal("select:", iter.Close())
  777. }
  778. assertEqual(t, "fullname", "Grace Hopper", row["fullname"])
  779. assertEqual(t, "age", 31, row["age"])
  780. assertEqual(t, "address", "10.0.0.1", row["address"])
  781. }
  782. func TestSliceMap(t *testing.T) {
  783. session := createSession(t)
  784. defer session.Close()
  785. if err := createTable(session, `CREATE TABLE gocql_test.slice_map_table (
  786. testuuid timeuuid PRIMARY KEY,
  787. testtimestamp timestamp,
  788. testvarchar varchar,
  789. testbigint bigint,
  790. testblob blob,
  791. testbool boolean,
  792. testfloat float,
  793. testdouble double,
  794. testint int,
  795. testdecimal decimal,
  796. testlist list<text>,
  797. testset set<int>,
  798. testmap map<varchar, varchar>,
  799. testvarint varint,
  800. testinet inet
  801. )`); err != nil {
  802. t.Fatal("create table:", err)
  803. }
  804. m := make(map[string]interface{})
  805. bigInt := new(big.Int)
  806. if _, ok := bigInt.SetString("830169365738487321165427203929228", 10); !ok {
  807. t.Fatal("Failed setting bigint by string")
  808. }
  809. m["testuuid"] = TimeUUID()
  810. m["testvarchar"] = "Test VarChar"
  811. m["testbigint"] = time.Now().Unix()
  812. m["testtimestamp"] = time.Now().Truncate(time.Millisecond).UTC()
  813. m["testblob"] = []byte("test blob")
  814. m["testbool"] = true
  815. m["testfloat"] = float32(4.564)
  816. m["testdouble"] = float64(4.815162342)
  817. m["testint"] = 2343
  818. m["testdecimal"] = inf.NewDec(100, 0)
  819. m["testlist"] = []string{"quux", "foo", "bar", "baz", "quux"}
  820. m["testset"] = []int{1, 2, 3, 4, 5, 6, 7, 8, 9}
  821. m["testmap"] = map[string]string{"field1": "val1", "field2": "val2", "field3": "val3"}
  822. m["testvarint"] = bigInt
  823. m["testinet"] = "213.212.2.19"
  824. sliceMap := []map[string]interface{}{m}
  825. if err := session.Query(`INSERT INTO slice_map_table (testuuid, testtimestamp, testvarchar, testbigint, testblob, testbool, testfloat, testdouble, testint, testdecimal, testlist, testset, testmap, testvarint, testinet) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
  826. m["testuuid"], m["testtimestamp"], m["testvarchar"], m["testbigint"], m["testblob"], m["testbool"], m["testfloat"], m["testdouble"], m["testint"], m["testdecimal"], m["testlist"], m["testset"], m["testmap"], m["testvarint"], m["testinet"]).Exec(); err != nil {
  827. t.Fatal("insert:", err)
  828. }
  829. if returned, retErr := session.Query(`SELECT * FROM slice_map_table`).Iter().SliceMap(); retErr != nil {
  830. t.Fatal("select:", retErr)
  831. } else {
  832. matchSliceMap(t, sliceMap, returned[0])
  833. }
  834. // Test for Iter.MapScan()
  835. {
  836. testMap := make(map[string]interface{})
  837. if !session.Query(`SELECT * FROM slice_map_table`).Iter().MapScan(testMap) {
  838. t.Fatal("MapScan failed to work with one row")
  839. }
  840. matchSliceMap(t, sliceMap, testMap)
  841. }
  842. // Test for Query.MapScan()
  843. {
  844. testMap := make(map[string]interface{})
  845. if session.Query(`SELECT * FROM slice_map_table`).MapScan(testMap) != nil {
  846. t.Fatal("MapScan failed to work with one row")
  847. }
  848. matchSliceMap(t, sliceMap, testMap)
  849. }
  850. }
  851. func matchSliceMap(t *testing.T, sliceMap []map[string]interface{}, testMap map[string]interface{}) {
  852. if sliceMap[0]["testuuid"] != testMap["testuuid"] {
  853. t.Fatal("returned testuuid did not match")
  854. }
  855. if sliceMap[0]["testtimestamp"] != testMap["testtimestamp"] {
  856. t.Fatal("returned testtimestamp did not match")
  857. }
  858. if sliceMap[0]["testvarchar"] != testMap["testvarchar"] {
  859. t.Fatal("returned testvarchar did not match")
  860. }
  861. if sliceMap[0]["testbigint"] != testMap["testbigint"] {
  862. t.Fatal("returned testbigint did not match")
  863. }
  864. if !reflect.DeepEqual(sliceMap[0]["testblob"], testMap["testblob"]) {
  865. t.Fatal("returned testblob did not match")
  866. }
  867. if sliceMap[0]["testbool"] != testMap["testbool"] {
  868. t.Fatal("returned testbool did not match")
  869. }
  870. if sliceMap[0]["testfloat"] != testMap["testfloat"] {
  871. t.Fatal("returned testfloat did not match")
  872. }
  873. if sliceMap[0]["testdouble"] != testMap["testdouble"] {
  874. t.Fatal("returned testdouble did not match")
  875. }
  876. if sliceMap[0]["testinet"] != testMap["testinet"] {
  877. t.Fatal("returned testinet did not match")
  878. }
  879. expectedDecimal := sliceMap[0]["testdecimal"].(*inf.Dec)
  880. returnedDecimal := testMap["testdecimal"].(*inf.Dec)
  881. if expectedDecimal.Cmp(returnedDecimal) != 0 {
  882. t.Fatal("returned testdecimal did not match")
  883. }
  884. if !reflect.DeepEqual(sliceMap[0]["testlist"], testMap["testlist"]) {
  885. t.Fatal("returned testlist did not match")
  886. }
  887. if !reflect.DeepEqual(sliceMap[0]["testset"], testMap["testset"]) {
  888. t.Fatal("returned testset did not match")
  889. }
  890. if !reflect.DeepEqual(sliceMap[0]["testmap"], testMap["testmap"]) {
  891. t.Fatal("returned testmap did not match")
  892. }
  893. if sliceMap[0]["testint"] != testMap["testint"] {
  894. t.Fatal("returned testint did not match")
  895. }
  896. }
  897. func TestSmallInt(t *testing.T) {
  898. session := createSession(t)
  899. defer session.Close()
  900. if session.cfg.ProtoVersion < protoVersion4 {
  901. t.Skip("smallint is only supported in cassandra 2.2+")
  902. }
  903. if err := createTable(session, `CREATE TABLE gocql_test.smallint_table (
  904. testsmallint smallint PRIMARY KEY,
  905. )`); err != nil {
  906. t.Fatal("create table:", err)
  907. }
  908. m := make(map[string]interface{})
  909. m["testsmallint"] = int16(2)
  910. sliceMap := []map[string]interface{}{m}
  911. if err := session.Query(`INSERT INTO smallint_table (testsmallint) VALUES (?)`,
  912. m["testsmallint"]).Exec(); err != nil {
  913. t.Fatal("insert:", err)
  914. }
  915. if returned, retErr := session.Query(`SELECT * FROM smallint_table`).Iter().SliceMap(); retErr != nil {
  916. t.Fatal("select:", retErr)
  917. } else {
  918. if sliceMap[0]["testsmallint"] != returned[0]["testsmallint"] {
  919. t.Fatal("returned testsmallint did not match")
  920. }
  921. }
  922. }
  923. func TestScanWithNilArguments(t *testing.T) {
  924. session := createSession(t)
  925. defer session.Close()
  926. if err := createTable(session, `CREATE TABLE gocql_test.scan_with_nil_arguments (
  927. foo varchar,
  928. bar int,
  929. PRIMARY KEY (foo, bar)
  930. )`); err != nil {
  931. t.Fatal("create:", err)
  932. }
  933. for i := 1; i <= 20; i++ {
  934. if err := session.Query("INSERT INTO scan_with_nil_arguments (foo, bar) VALUES (?, ?)",
  935. "squares", i*i).Exec(); err != nil {
  936. t.Fatal("insert:", err)
  937. }
  938. }
  939. iter := session.Query("SELECT * FROM scan_with_nil_arguments WHERE foo = ?", "squares").Iter()
  940. var n int
  941. count := 0
  942. for iter.Scan(nil, &n) {
  943. count += n
  944. }
  945. if err := iter.Close(); err != nil {
  946. t.Fatal("close:", err)
  947. }
  948. if count != 2870 {
  949. t.Fatalf("expected %d, got %d", 2870, count)
  950. }
  951. }
  952. func TestScanCASWithNilArguments(t *testing.T) {
  953. session := createSession(t)
  954. defer session.Close()
  955. if session.cfg.ProtoVersion == 1 {
  956. t.Skip("lightweight transactions not supported. Please use Cassandra >= 2.0")
  957. }
  958. if err := createTable(session, `CREATE TABLE gocql_test.scan_cas_with_nil_arguments (
  959. foo varchar,
  960. bar varchar,
  961. PRIMARY KEY (foo, bar)
  962. )`); err != nil {
  963. t.Fatal("create:", err)
  964. }
  965. foo := "baz"
  966. var cas string
  967. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  968. VALUES (?, ?) IF NOT EXISTS`,
  969. foo, foo).ScanCAS(nil, nil); err != nil {
  970. t.Fatal("insert:", err)
  971. } else if !applied {
  972. t.Fatal("insert should have been applied")
  973. }
  974. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  975. VALUES (?, ?) IF NOT EXISTS`,
  976. foo, foo).ScanCAS(&cas, nil); err != nil {
  977. t.Fatal("insert:", err)
  978. } else if applied {
  979. t.Fatal("insert should not have been applied")
  980. } else if foo != cas {
  981. t.Fatalf("expected %v but got %v", foo, cas)
  982. }
  983. if applied, err := session.Query(`INSERT INTO scan_cas_with_nil_arguments (foo, bar)
  984. VALUES (?, ?) IF NOT EXISTS`,
  985. foo, foo).ScanCAS(nil, &cas); err != nil {
  986. t.Fatal("insert:", err)
  987. } else if applied {
  988. t.Fatal("insert should not have been applied")
  989. } else if foo != cas {
  990. t.Fatalf("expected %v but got %v", foo, cas)
  991. }
  992. }
  993. func TestRebindQueryInfo(t *testing.T) {
  994. session := createSession(t)
  995. defer session.Close()
  996. if err := createTable(session, "CREATE TABLE gocql_test.rebind_query (id int, value text, PRIMARY KEY (id))"); err != nil {
  997. t.Fatalf("failed to create table with error '%v'", err)
  998. }
  999. if err := session.Query("INSERT INTO rebind_query (id, value) VALUES (?, ?)", 23, "quux").Exec(); err != nil {
  1000. t.Fatalf("insert into rebind_query failed, err '%v'", err)
  1001. }
  1002. if err := session.Query("INSERT INTO rebind_query (id, value) VALUES (?, ?)", 24, "w00t").Exec(); err != nil {
  1003. t.Fatalf("insert into rebind_query failed, err '%v'", err)
  1004. }
  1005. q := session.Query("SELECT value FROM rebind_query WHERE ID = ?")
  1006. q.Bind(23)
  1007. iter := q.Iter()
  1008. var value string
  1009. for iter.Scan(&value) {
  1010. }
  1011. if value != "quux" {
  1012. t.Fatalf("expected %v but got %v", "quux", value)
  1013. }
  1014. q.Bind(24)
  1015. iter = q.Iter()
  1016. for iter.Scan(&value) {
  1017. }
  1018. if value != "w00t" {
  1019. t.Fatalf("expected %v but got %v", "w00t", value)
  1020. }
  1021. }
  1022. //TestStaticQueryInfo makes sure that the application can manually bind query parameters using the simplest possible static binding strategy
  1023. func TestStaticQueryInfo(t *testing.T) {
  1024. session := createSession(t)
  1025. defer session.Close()
  1026. if err := createTable(session, "CREATE TABLE gocql_test.static_query_info (id int, value text, PRIMARY KEY (id))"); err != nil {
  1027. t.Fatalf("failed to create table with error '%v'", err)
  1028. }
  1029. if err := session.Query("INSERT INTO static_query_info (id, value) VALUES (?, ?)", 113, "foo").Exec(); err != nil {
  1030. t.Fatalf("insert into static_query_info failed, err '%v'", err)
  1031. }
  1032. autobinder := func(q *QueryInfo) ([]interface{}, error) {
  1033. values := make([]interface{}, 1)
  1034. values[0] = 113
  1035. return values, nil
  1036. }
  1037. qry := session.Bind("SELECT id, value FROM static_query_info WHERE id = ?", autobinder)
  1038. if err := qry.Exec(); err != nil {
  1039. t.Fatalf("expose query info failed, error '%v'", err)
  1040. }
  1041. iter := qry.Iter()
  1042. var id int
  1043. var value string
  1044. iter.Scan(&id, &value)
  1045. if err := iter.Close(); err != nil {
  1046. t.Fatalf("query with exposed info failed, err '%v'", err)
  1047. }
  1048. if value != "foo" {
  1049. t.Fatalf("Expected value %s, but got %s", "foo", value)
  1050. }
  1051. }
  1052. type ClusteredKeyValue struct {
  1053. Id int
  1054. Cluster int
  1055. Value string
  1056. }
  1057. func (kv *ClusteredKeyValue) Bind(q *QueryInfo) ([]interface{}, error) {
  1058. values := make([]interface{}, len(q.Args))
  1059. for i, info := range q.Args {
  1060. fieldName := upcaseInitial(info.Name)
  1061. value := reflect.ValueOf(kv)
  1062. field := reflect.Indirect(value).FieldByName(fieldName)
  1063. values[i] = field.Addr().Interface()
  1064. }
  1065. return values, nil
  1066. }
  1067. func upcaseInitial(str string) string {
  1068. for i, v := range str {
  1069. return string(unicode.ToUpper(v)) + str[i+1:]
  1070. }
  1071. return ""
  1072. }
  1073. //TestBoundQueryInfo makes sure that the application can manually bind query parameters using the query meta data supplied at runtime
  1074. func TestBoundQueryInfo(t *testing.T) {
  1075. session := createSession(t)
  1076. defer session.Close()
  1077. if err := createTable(session, "CREATE TABLE gocql_test.clustered_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
  1078. t.Fatalf("failed to create table with error '%v'", err)
  1079. }
  1080. write := &ClusteredKeyValue{Id: 200, Cluster: 300, Value: "baz"}
  1081. insert := session.Bind("INSERT INTO clustered_query_info (id, cluster, value) VALUES (?, ?,?)", write.Bind)
  1082. if err := insert.Exec(); err != nil {
  1083. t.Fatalf("insert into clustered_query_info failed, err '%v'", err)
  1084. }
  1085. read := &ClusteredKeyValue{Id: 200, Cluster: 300}
  1086. qry := session.Bind("SELECT id, cluster, value FROM clustered_query_info WHERE id = ? and cluster = ?", read.Bind)
  1087. iter := qry.Iter()
  1088. var id, cluster int
  1089. var value string
  1090. iter.Scan(&id, &cluster, &value)
  1091. if err := iter.Close(); err != nil {
  1092. t.Fatalf("query with clustered_query_info info failed, err '%v'", err)
  1093. }
  1094. if value != "baz" {
  1095. t.Fatalf("Expected value %s, but got %s", "baz", value)
  1096. }
  1097. }
  1098. //TestBatchQueryInfo makes sure that the application can manually bind query parameters when executing in a batch
  1099. func TestBatchQueryInfo(t *testing.T) {
  1100. session := createSession(t)
  1101. defer session.Close()
  1102. if session.cfg.ProtoVersion == 1 {
  1103. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  1104. }
  1105. if err := createTable(session, "CREATE TABLE gocql_test.batch_query_info (id int, cluster int, value text, PRIMARY KEY (id, cluster))"); err != nil {
  1106. t.Fatalf("failed to create table with error '%v'", err)
  1107. }
  1108. write := func(q *QueryInfo) ([]interface{}, error) {
  1109. values := make([]interface{}, 3)
  1110. values[0] = 4000
  1111. values[1] = 5000
  1112. values[2] = "bar"
  1113. return values, nil
  1114. }
  1115. batch := session.NewBatch(LoggedBatch)
  1116. batch.Bind("INSERT INTO batch_query_info (id, cluster, value) VALUES (?, ?,?)", write)
  1117. if err := session.ExecuteBatch(batch); err != nil {
  1118. t.Fatalf("batch insert into batch_query_info failed, err '%v'", err)
  1119. }
  1120. read := func(q *QueryInfo) ([]interface{}, error) {
  1121. values := make([]interface{}, 2)
  1122. values[0] = 4000
  1123. values[1] = 5000
  1124. return values, nil
  1125. }
  1126. qry := session.Bind("SELECT id, cluster, value FROM batch_query_info WHERE id = ? and cluster = ?", read)
  1127. iter := qry.Iter()
  1128. var id, cluster int
  1129. var value string
  1130. iter.Scan(&id, &cluster, &value)
  1131. if err := iter.Close(); err != nil {
  1132. t.Fatalf("query with batch_query_info info failed, err '%v'", err)
  1133. }
  1134. if value != "bar" {
  1135. t.Fatalf("Expected value %s, but got %s", "bar", value)
  1136. }
  1137. }
  1138. func getRandomConn(t *testing.T, session *Session) *Conn {
  1139. conn := session.getConn()
  1140. if conn == nil {
  1141. t.Fatal("unable to get a connection")
  1142. }
  1143. return conn
  1144. }
  1145. func injectInvalidPreparedStatement(t *testing.T, session *Session, table string) (string, *Conn) {
  1146. if err := createTable(session, `CREATE TABLE gocql_test.`+table+` (
  1147. foo varchar,
  1148. bar int,
  1149. PRIMARY KEY (foo, bar)
  1150. )`); err != nil {
  1151. t.Fatal("create:", err)
  1152. }
  1153. stmt := "INSERT INTO " + table + " (foo, bar) VALUES (?, 7)"
  1154. conn := getRandomConn(t, session)
  1155. flight := new(inflightPrepare)
  1156. key := session.stmtsLRU.keyFor(conn.addr, "", stmt)
  1157. session.stmtsLRU.add(key, flight)
  1158. flight.preparedStatment = &preparedStatment{
  1159. id: []byte{'f', 'o', 'o', 'b', 'a', 'r'},
  1160. request: preparedMetadata{
  1161. resultMetadata: resultMetadata{
  1162. colCount: 1,
  1163. actualColCount: 1,
  1164. columns: []ColumnInfo{
  1165. {
  1166. Keyspace: "gocql_test",
  1167. Table: table,
  1168. Name: "foo",
  1169. TypeInfo: NativeType{
  1170. typ: TypeVarchar,
  1171. },
  1172. },
  1173. },
  1174. },
  1175. },
  1176. }
  1177. return stmt, conn
  1178. }
  1179. func TestPrepare_MissingSchemaPrepare(t *testing.T) {
  1180. ctx, cancel := context.WithCancel(context.Background())
  1181. defer cancel()
  1182. s := createSession(t)
  1183. conn := getRandomConn(t, s)
  1184. defer s.Close()
  1185. insertQry := s.Query("INSERT INTO invalidschemaprep (val) VALUES (?)", 5)
  1186. if err := conn.executeQuery(ctx, insertQry).err; err == nil {
  1187. t.Fatal("expected error, but got nil.")
  1188. }
  1189. if err := createTable(s, "CREATE TABLE gocql_test.invalidschemaprep (val int, PRIMARY KEY (val))"); err != nil {
  1190. t.Fatal("create table:", err)
  1191. }
  1192. if err := conn.executeQuery(ctx, insertQry).err; err != nil {
  1193. t.Fatal(err) // unconfigured columnfamily
  1194. }
  1195. }
  1196. func TestPrepare_ReprepareStatement(t *testing.T) {
  1197. ctx, cancel := context.WithCancel(context.Background())
  1198. defer cancel()
  1199. session := createSession(t)
  1200. defer session.Close()
  1201. stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement")
  1202. query := session.Query(stmt, "bar")
  1203. if err := conn.executeQuery(ctx, query).Close(); err != nil {
  1204. t.Fatalf("Failed to execute query for reprepare statement: %v", err)
  1205. }
  1206. }
  1207. func TestPrepare_ReprepareBatch(t *testing.T) {
  1208. ctx, cancel := context.WithCancel(context.Background())
  1209. defer cancel()
  1210. session := createSession(t)
  1211. defer session.Close()
  1212. if session.cfg.ProtoVersion == 1 {
  1213. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  1214. }
  1215. stmt, conn := injectInvalidPreparedStatement(t, session, "test_reprepare_statement_batch")
  1216. batch := session.NewBatch(UnloggedBatch)
  1217. batch.Query(stmt, "bar")
  1218. if err := conn.executeBatch(ctx, batch).Close(); err != nil {
  1219. t.Fatalf("Failed to execute query for reprepare statement: %v", err)
  1220. }
  1221. }
  1222. func TestQueryInfo(t *testing.T) {
  1223. session := createSession(t)
  1224. defer session.Close()
  1225. conn := getRandomConn(t, session)
  1226. info, err := conn.prepareStatement(context.Background(), "SELECT release_version, host_id FROM system.local WHERE key = ?", nil)
  1227. if err != nil {
  1228. t.Fatalf("Failed to execute query for preparing statement: %v", err)
  1229. }
  1230. if x := len(info.request.columns); x != 1 {
  1231. t.Fatalf("Was not expecting meta data for %d query arguments, but got %d\n", 1, x)
  1232. }
  1233. if session.cfg.ProtoVersion > 1 {
  1234. if x := len(info.response.columns); x != 2 {
  1235. t.Fatalf("Was not expecting meta data for %d result columns, but got %d\n", 2, x)
  1236. }
  1237. }
  1238. }
  1239. //TestPreparedCacheEviction will make sure that the cache size is maintained
  1240. func TestPrepare_PreparedCacheEviction(t *testing.T) {
  1241. const maxPrepared = 4
  1242. clusterHosts := getClusterHosts()
  1243. host := clusterHosts[0]
  1244. cluster := createCluster()
  1245. cluster.MaxPreparedStmts = maxPrepared
  1246. cluster.Events.DisableSchemaEvents = true
  1247. cluster.Hosts = []string{host}
  1248. cluster.HostFilter = WhiteListHostFilter(host)
  1249. session := createSessionFromCluster(cluster, t)
  1250. defer session.Close()
  1251. if err := createTable(session, "CREATE TABLE gocql_test.prepcachetest (id int,mod int,PRIMARY KEY (id))"); err != nil {
  1252. t.Fatalf("failed to create table with error '%v'", err)
  1253. }
  1254. // clear the cache
  1255. session.stmtsLRU.clear()
  1256. //Fill the table
  1257. for i := 0; i < 2; i++ {
  1258. if err := session.Query("INSERT INTO prepcachetest (id,mod) VALUES (?, ?)", i, 10000%(i+1)).Exec(); err != nil {
  1259. t.Fatalf("insert into prepcachetest failed, err '%v'", err)
  1260. }
  1261. }
  1262. //Populate the prepared statement cache with select statements
  1263. var id, mod int
  1264. for i := 0; i < 2; i++ {
  1265. err := session.Query("SELECT id,mod FROM prepcachetest WHERE id = "+strconv.FormatInt(int64(i), 10)).Scan(&id, &mod)
  1266. if err != nil {
  1267. t.Fatalf("select from prepcachetest failed, error '%v'", err)
  1268. }
  1269. }
  1270. //generate an update statement to test they are prepared
  1271. err := session.Query("UPDATE prepcachetest SET mod = ? WHERE id = ?", 1, 11).Exec()
  1272. if err != nil {
  1273. t.Fatalf("update prepcachetest failed, error '%v'", err)
  1274. }
  1275. //generate a delete statement to test they are prepared
  1276. err = session.Query("DELETE FROM prepcachetest WHERE id = ?", 1).Exec()
  1277. if err != nil {
  1278. t.Fatalf("delete from prepcachetest failed, error '%v'", err)
  1279. }
  1280. //generate an insert statement to test they are prepared
  1281. err = session.Query("INSERT INTO prepcachetest (id,mod) VALUES (?, ?)", 3, 11).Exec()
  1282. if err != nil {
  1283. t.Fatalf("insert into prepcachetest failed, error '%v'", err)
  1284. }
  1285. session.stmtsLRU.mu.Lock()
  1286. defer session.stmtsLRU.mu.Unlock()
  1287. //Make sure the cache size is maintained
  1288. if session.stmtsLRU.lru.Len() != session.stmtsLRU.lru.MaxEntries {
  1289. t.Fatalf("expected cache size of %v, got %v", session.stmtsLRU.lru.MaxEntries, session.stmtsLRU.lru.Len())
  1290. }
  1291. // Walk through all the configured hosts and test cache retention and eviction
  1292. for _, host := range session.cfg.Hosts {
  1293. _, ok := session.stmtsLRU.lru.Get(session.stmtsLRU.keyFor(host+":9042", session.cfg.Keyspace, "SELECT id,mod FROM prepcachetest WHERE id = 0"))
  1294. if ok {
  1295. t.Errorf("expected first select to be purged but was in cache for host=%q", host)
  1296. }
  1297. _, ok = session.stmtsLRU.lru.Get(session.stmtsLRU.keyFor(host+":9042", session.cfg.Keyspace, "SELECT id,mod FROM prepcachetest WHERE id = 1"))
  1298. if !ok {
  1299. t.Errorf("exepected second select to be in cache for host=%q", host)
  1300. }
  1301. _, ok = session.stmtsLRU.lru.Get(session.stmtsLRU.keyFor(host+":9042", session.cfg.Keyspace, "INSERT INTO prepcachetest (id,mod) VALUES (?, ?)"))
  1302. if !ok {
  1303. t.Errorf("expected insert to be in cache for host=%q", host)
  1304. }
  1305. _, ok = session.stmtsLRU.lru.Get(session.stmtsLRU.keyFor(host+":9042", session.cfg.Keyspace, "UPDATE prepcachetest SET mod = ? WHERE id = ?"))
  1306. if !ok {
  1307. t.Errorf("expected update to be in cached for host=%q", host)
  1308. }
  1309. _, ok = session.stmtsLRU.lru.Get(session.stmtsLRU.keyFor(host+":9042", session.cfg.Keyspace, "DELETE FROM prepcachetest WHERE id = ?"))
  1310. if !ok {
  1311. t.Errorf("expected delete to be cached for host=%q", host)
  1312. }
  1313. }
  1314. }
  1315. func TestPrepare_PreparedCacheKey(t *testing.T) {
  1316. session := createSession(t)
  1317. defer session.Close()
  1318. // create a second keyspace
  1319. cluster2 := createCluster()
  1320. createKeyspace(t, cluster2, "gocql_test2")
  1321. cluster2.Keyspace = "gocql_test2"
  1322. session2, err := cluster2.CreateSession()
  1323. if err != nil {
  1324. t.Fatal("create session:", err)
  1325. }
  1326. defer session2.Close()
  1327. // both keyspaces have a table named "test_stmt_cache_key"
  1328. if err := createTable(session, "CREATE TABLE gocql_test.test_stmt_cache_key (id varchar primary key, field varchar)"); err != nil {
  1329. t.Fatal("create table:", err)
  1330. }
  1331. if err := createTable(session2, "CREATE TABLE gocql_test2.test_stmt_cache_key (id varchar primary key, field varchar)"); err != nil {
  1332. t.Fatal("create table:", err)
  1333. }
  1334. // both tables have a single row with the same partition key but different column value
  1335. if err = session.Query(`INSERT INTO test_stmt_cache_key (id, field) VALUES (?, ?)`, "key", "one").Exec(); err != nil {
  1336. t.Fatal("insert:", err)
  1337. }
  1338. if err = session2.Query(`INSERT INTO test_stmt_cache_key (id, field) VALUES (?, ?)`, "key", "two").Exec(); err != nil {
  1339. t.Fatal("insert:", err)
  1340. }
  1341. // should be able to see different values in each keyspace
  1342. var value string
  1343. if err = session.Query("SELECT field FROM test_stmt_cache_key WHERE id = ?", "key").Scan(&value); err != nil {
  1344. t.Fatal("select:", err)
  1345. }
  1346. if value != "one" {
  1347. t.Errorf("Expected one, got %s", value)
  1348. }
  1349. if err = session2.Query("SELECT field FROM test_stmt_cache_key WHERE id = ?", "key").Scan(&value); err != nil {
  1350. t.Fatal("select:", err)
  1351. }
  1352. if value != "two" {
  1353. t.Errorf("Expected two, got %s", value)
  1354. }
  1355. }
  1356. //TestMarshalFloat64Ptr tests to see that a pointer to a float64 is marshalled correctly.
  1357. func TestMarshalFloat64Ptr(t *testing.T) {
  1358. session := createSession(t)
  1359. defer session.Close()
  1360. if err := createTable(session, "CREATE TABLE gocql_test.float_test (id double, test double, primary key (id))"); err != nil {
  1361. t.Fatal("create table:", err)
  1362. }
  1363. testNum := float64(7500)
  1364. if err := session.Query(`INSERT INTO float_test (id,test) VALUES (?,?)`, float64(7500.00), &testNum).Exec(); err != nil {
  1365. t.Fatal("insert float64:", err)
  1366. }
  1367. }
  1368. //TestMarshalInet tests to see that a pointer to a float64 is marshalled correctly.
  1369. func TestMarshalInet(t *testing.T) {
  1370. session := createSession(t)
  1371. defer session.Close()
  1372. if err := createTable(session, "CREATE TABLE gocql_test.inet_test (ip inet, name text, primary key (ip))"); err != nil {
  1373. t.Fatal("create table:", err)
  1374. }
  1375. stringIp := "123.34.45.56"
  1376. if err := session.Query(`INSERT INTO inet_test (ip,name) VALUES (?,?)`, stringIp, "Test IP 1").Exec(); err != nil {
  1377. t.Fatal("insert string inet:", err)
  1378. }
  1379. var stringResult string
  1380. if err := session.Query("SELECT ip FROM inet_test").Scan(&stringResult); err != nil {
  1381. t.Fatalf("select for string from inet_test 1 failed: %v", err)
  1382. }
  1383. if stringResult != stringIp {
  1384. t.Errorf("Expected %s, was %s", stringIp, stringResult)
  1385. }
  1386. var ipResult net.IP
  1387. if err := session.Query("SELECT ip FROM inet_test").Scan(&ipResult); err != nil {
  1388. t.Fatalf("select for net.IP from inet_test 1 failed: %v", err)
  1389. }
  1390. if ipResult.String() != stringIp {
  1391. t.Errorf("Expected %s, was %s", stringIp, ipResult.String())
  1392. }
  1393. if err := session.Query(`DELETE FROM inet_test WHERE ip = ?`, stringIp).Exec(); err != nil {
  1394. t.Fatal("delete inet table:", err)
  1395. }
  1396. netIp := net.ParseIP("222.43.54.65")
  1397. if err := session.Query(`INSERT INTO inet_test (ip,name) VALUES (?,?)`, netIp, "Test IP 2").Exec(); err != nil {
  1398. t.Fatal("insert netIp inet:", err)
  1399. }
  1400. if err := session.Query("SELECT ip FROM inet_test").Scan(&stringResult); err != nil {
  1401. t.Fatalf("select for string from inet_test 2 failed: %v", err)
  1402. }
  1403. if stringResult != netIp.String() {
  1404. t.Errorf("Expected %s, was %s", netIp.String(), stringResult)
  1405. }
  1406. if err := session.Query("SELECT ip FROM inet_test").Scan(&ipResult); err != nil {
  1407. t.Fatalf("select for net.IP from inet_test 2 failed: %v", err)
  1408. }
  1409. if ipResult.String() != netIp.String() {
  1410. t.Errorf("Expected %s, was %s", netIp.String(), ipResult.String())
  1411. }
  1412. }
  1413. func TestVarint(t *testing.T) {
  1414. session := createSession(t)
  1415. defer session.Close()
  1416. if err := createTable(session, "CREATE TABLE gocql_test.varint_test (id varchar, test varint, test2 varint, primary key (id))"); err != nil {
  1417. t.Fatalf("failed to create table with error '%v'", err)
  1418. }
  1419. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", 0).Exec(); err != nil {
  1420. t.Fatalf("insert varint: %v", err)
  1421. }
  1422. var result int
  1423. if err := session.Query("SELECT test FROM varint_test").Scan(&result); err != nil {
  1424. t.Fatalf("select from varint_test failed: %v", err)
  1425. }
  1426. if result != 0 {
  1427. t.Errorf("Expected 0, was %d", result)
  1428. }
  1429. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", -1).Exec(); err != nil {
  1430. t.Fatalf("insert varint: %v", err)
  1431. }
  1432. if err := session.Query("SELECT test FROM varint_test").Scan(&result); err != nil {
  1433. t.Fatalf("select from varint_test failed: %v", err)
  1434. }
  1435. if result != -1 {
  1436. t.Errorf("Expected -1, was %d", result)
  1437. }
  1438. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", nil).Exec(); err != nil {
  1439. t.Fatalf("insert varint: %v", err)
  1440. }
  1441. if err := session.Query("SELECT test FROM varint_test").Scan(&result); err != nil {
  1442. t.Fatalf("select from varint_test failed: %v", err)
  1443. }
  1444. if result != 0 {
  1445. t.Errorf("Expected 0, was %d", result)
  1446. }
  1447. var nullableResult *int
  1448. if err := session.Query("SELECT test FROM varint_test").Scan(&nullableResult); err != nil {
  1449. t.Fatalf("select from varint_test failed: %v", err)
  1450. }
  1451. if nullableResult != nil {
  1452. t.Errorf("Expected nil, was %d", nullableResult)
  1453. }
  1454. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", int64(math.MaxInt32)+1).Exec(); err != nil {
  1455. t.Fatalf("insert varint: %v", err)
  1456. }
  1457. var result64 int64
  1458. if err := session.Query("SELECT test FROM varint_test").Scan(&result64); err != nil {
  1459. t.Fatalf("select from varint_test failed: %v", err)
  1460. }
  1461. if result64 != int64(math.MaxInt32)+1 {
  1462. t.Errorf("Expected %d, was %d", int64(math.MaxInt32)+1, result64)
  1463. }
  1464. biggie := new(big.Int)
  1465. biggie.SetString("36893488147419103232", 10) // > 2**64
  1466. if err := session.Query(`INSERT INTO varint_test (id, test) VALUES (?, ?)`, "id", biggie).Exec(); err != nil {
  1467. t.Fatalf("insert varint: %v", err)
  1468. }
  1469. resultBig := new(big.Int)
  1470. if err := session.Query("SELECT test FROM varint_test").Scan(resultBig); err != nil {
  1471. t.Fatalf("select from varint_test failed: %v", err)
  1472. }
  1473. if resultBig.String() != biggie.String() {
  1474. t.Errorf("Expected %s, was %s", biggie.String(), resultBig.String())
  1475. }
  1476. err := session.Query("SELECT test FROM varint_test").Scan(&result64)
  1477. if err == nil || strings.Index(err.Error(), "out of range") == -1 {
  1478. t.Errorf("expected out of range error since value is too big for int64")
  1479. }
  1480. // value not set in cassandra, leave bind variable empty
  1481. resultBig = new(big.Int)
  1482. if err := session.Query("SELECT test2 FROM varint_test").Scan(resultBig); err != nil {
  1483. t.Fatalf("select from varint_test failed: %v", err)
  1484. }
  1485. if resultBig.Int64() != 0 {
  1486. t.Errorf("Expected %s, was %s", biggie.String(), resultBig.String())
  1487. }
  1488. // can use double pointer to explicitly detect value is not set in cassandra
  1489. if err := session.Query("SELECT test2 FROM varint_test").Scan(&resultBig); err != nil {
  1490. t.Fatalf("select from varint_test failed: %v", err)
  1491. }
  1492. if resultBig != nil {
  1493. t.Errorf("Expected %v, was %v", nil, *resultBig)
  1494. }
  1495. }
  1496. //TestQueryStats confirms that the stats are returning valid data. Accuracy may be questionable.
  1497. func TestQueryStats(t *testing.T) {
  1498. session := createSession(t)
  1499. defer session.Close()
  1500. qry := session.Query("SELECT * FROM system.peers")
  1501. if err := qry.Exec(); err != nil {
  1502. t.Fatalf("query failed. %v", err)
  1503. } else {
  1504. if qry.Attempts() < 1 {
  1505. t.Fatal("expected at least 1 attempt, but got 0")
  1506. }
  1507. if qry.Latency() <= 0 {
  1508. t.Fatalf("expected latency to be greater than 0, but got %v instead.", qry.Latency())
  1509. }
  1510. }
  1511. }
  1512. // TestIterHosts confirms that host is added to Iter when the query succeeds.
  1513. func TestIterHost(t *testing.T) {
  1514. session := createSession(t)
  1515. defer session.Close()
  1516. iter := session.Query("SELECT * FROM system.peers").Iter()
  1517. // check if Host method works
  1518. if iter.Host() == nil {
  1519. t.Error("No host in iter")
  1520. }
  1521. }
  1522. //TestBatchStats confirms that the stats are returning valid data. Accuracy may be questionable.
  1523. func TestBatchStats(t *testing.T) {
  1524. session := createSession(t)
  1525. defer session.Close()
  1526. if session.cfg.ProtoVersion == 1 {
  1527. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  1528. }
  1529. if err := createTable(session, "CREATE TABLE gocql_test.batchStats (id int, PRIMARY KEY (id))"); err != nil {
  1530. t.Fatalf("failed to create table with error '%v'", err)
  1531. }
  1532. b := session.NewBatch(LoggedBatch)
  1533. b.Query("INSERT INTO batchStats (id) VALUES (?)", 1)
  1534. b.Query("INSERT INTO batchStats (id) VALUES (?)", 2)
  1535. if err := session.ExecuteBatch(b); err != nil {
  1536. t.Fatalf("query failed. %v", err)
  1537. } else {
  1538. if b.Attempts() < 1 {
  1539. t.Fatal("expected at least 1 attempt, but got 0")
  1540. }
  1541. if b.Latency() <= 0 {
  1542. t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
  1543. }
  1544. }
  1545. }
  1546. type funcBatchObserver func(context.Context, ObservedBatch)
  1547. func (f funcBatchObserver) ObserveBatch(ctx context.Context, o ObservedBatch) {
  1548. f(ctx, o)
  1549. }
  1550. func TestBatchObserve(t *testing.T) {
  1551. session := createSession(t)
  1552. defer session.Close()
  1553. if session.cfg.ProtoVersion == 1 {
  1554. t.Skip("atomic batches not supported. Please use Cassandra >= 2.0")
  1555. }
  1556. if err := createTable(session, `CREATE TABLE gocql_test.batch_observe_table (id int, other int, PRIMARY KEY (id))`); err != nil {
  1557. t.Fatal("create table:", err)
  1558. }
  1559. type observation struct {
  1560. observedErr error
  1561. observedKeyspace string
  1562. observedStmts []string
  1563. }
  1564. var observedBatch *observation
  1565. batch := session.NewBatch(LoggedBatch)
  1566. batch.Observer(funcBatchObserver(func(ctx context.Context, o ObservedBatch) {
  1567. if observedBatch != nil {
  1568. t.Fatal("batch observe called more than once")
  1569. }
  1570. observedBatch = &observation{
  1571. observedKeyspace: o.Keyspace,
  1572. observedStmts: o.Statements,
  1573. observedErr: o.Err,
  1574. }
  1575. }))
  1576. for i := 0; i < 100; i++ {
  1577. // hard coding 'i' into one of the values for better testing of observation
  1578. batch.Query(fmt.Sprintf(`INSERT INTO batch_observe_table (id,other) VALUES (?,%d)`, i), i)
  1579. }
  1580. if err := session.ExecuteBatch(batch); err != nil {
  1581. t.Fatal("execute batch:", err)
  1582. }
  1583. if observedBatch == nil {
  1584. t.Fatal("batch observation has not been called")
  1585. }
  1586. if len(observedBatch.observedStmts) != 100 {
  1587. t.Fatal("expecting 100 observed statements, got", len(observedBatch.observedStmts))
  1588. }
  1589. if observedBatch.observedErr != nil {
  1590. t.Fatal("not expecting to observe an error", observedBatch.observedErr)
  1591. }
  1592. if observedBatch.observedKeyspace != "gocql_test" {
  1593. t.Fatalf("expecting keyspace 'gocql_test', got %q", observedBatch.observedKeyspace)
  1594. }
  1595. for i, stmt := range observedBatch.observedStmts {
  1596. if stmt != fmt.Sprintf(`INSERT INTO batch_observe_table (id,other) VALUES (?,%d)`, i) {
  1597. t.Fatal("unexpected query", stmt)
  1598. }
  1599. }
  1600. }
  1601. //TestNilInQuery tests to see that a nil value passed to a query is handled by Cassandra
  1602. //TODO validate the nil value by reading back the nil. Need to fix Unmarshalling.
  1603. func TestNilInQuery(t *testing.T) {
  1604. session := createSession(t)
  1605. defer session.Close()
  1606. if err := createTable(session, "CREATE TABLE gocql_test.testNilInsert (id int, count int, PRIMARY KEY (id))"); err != nil {
  1607. t.Fatalf("failed to create table with error '%v'", err)
  1608. }
  1609. if err := session.Query("INSERT INTO testNilInsert (id,count) VALUES (?,?)", 1, nil).Exec(); err != nil {
  1610. t.Fatalf("failed to insert with err: %v", err)
  1611. }
  1612. var id int
  1613. if err := session.Query("SELECT id FROM testNilInsert").Scan(&id); err != nil {
  1614. t.Fatalf("failed to select with err: %v", err)
  1615. } else if id != 1 {
  1616. t.Fatalf("expected id to be 1, got %v", id)
  1617. }
  1618. }
  1619. // Don't initialize time.Time bind variable if cassandra timestamp column is empty
  1620. func TestEmptyTimestamp(t *testing.T) {
  1621. session := createSession(t)
  1622. defer session.Close()
  1623. if err := createTable(session, "CREATE TABLE gocql_test.test_empty_timestamp (id int, time timestamp, num int, PRIMARY KEY (id))"); err != nil {
  1624. t.Fatalf("failed to create table with error '%v'", err)
  1625. }
  1626. if err := session.Query("INSERT INTO test_empty_timestamp (id, num) VALUES (?,?)", 1, 561).Exec(); err != nil {
  1627. t.Fatalf("failed to insert with err: %v", err)
  1628. }
  1629. var timeVal time.Time
  1630. if err := session.Query("SELECT time FROM test_empty_timestamp where id = ?", 1).Scan(&timeVal); err != nil {
  1631. t.Fatalf("failed to select with err: %v", err)
  1632. }
  1633. if !timeVal.IsZero() {
  1634. t.Errorf("time.Time bind variable should still be empty (was %s)", timeVal)
  1635. }
  1636. }
  1637. // Integration test of just querying for data from the system.schema_keyspace table where the keyspace DOES exist.
  1638. func TestGetKeyspaceMetadata(t *testing.T) {
  1639. session := createSession(t)
  1640. defer session.Close()
  1641. keyspaceMetadata, err := getKeyspaceMetadata(session, "gocql_test")
  1642. if err != nil {
  1643. t.Fatalf("failed to query the keyspace metadata with err: %v", err)
  1644. }
  1645. if keyspaceMetadata == nil {
  1646. t.Fatal("failed to query the keyspace metadata, nil returned")
  1647. }
  1648. if keyspaceMetadata.Name != "gocql_test" {
  1649. t.Errorf("Expected keyspace name to be 'gocql' but was '%s'", keyspaceMetadata.Name)
  1650. }
  1651. if keyspaceMetadata.StrategyClass != "org.apache.cassandra.locator.SimpleStrategy" {
  1652. t.Errorf("Expected replication strategy class to be 'org.apache.cassandra.locator.SimpleStrategy' but was '%s'", keyspaceMetadata.StrategyClass)
  1653. }
  1654. if keyspaceMetadata.StrategyOptions == nil {
  1655. t.Error("Expected replication strategy options map but was nil")
  1656. }
  1657. rfStr, ok := keyspaceMetadata.StrategyOptions["replication_factor"]
  1658. if !ok {
  1659. t.Fatalf("Expected strategy option 'replication_factor' but was not found in %v", keyspaceMetadata.StrategyOptions)
  1660. }
  1661. rfInt, err := strconv.Atoi(rfStr.(string))
  1662. if err != nil {
  1663. t.Fatalf("Error converting string to int with err: %v", err)
  1664. }
  1665. if rfInt != *flagRF {
  1666. t.Errorf("Expected replication factor to be %d but was %d", *flagRF, rfInt)
  1667. }
  1668. }
  1669. // Integration test of just querying for data from the system.schema_keyspace table where the keyspace DOES NOT exist.
  1670. func TestGetKeyspaceMetadataFails(t *testing.T) {
  1671. session := createSession(t)
  1672. defer session.Close()
  1673. _, err := getKeyspaceMetadata(session, "gocql_keyspace_does_not_exist")
  1674. if err != ErrKeyspaceDoesNotExist || err == nil {
  1675. t.Fatalf("Expected error of type ErrKeySpaceDoesNotExist. Instead, error was %v", err)
  1676. }
  1677. }
  1678. // Integration test of just querying for data from the system.schema_columnfamilies table
  1679. func TestGetTableMetadata(t *testing.T) {
  1680. session := createSession(t)
  1681. defer session.Close()
  1682. if err := createTable(session, "CREATE TABLE gocql_test.test_table_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
  1683. t.Fatalf("failed to create table with error '%v'", err)
  1684. }
  1685. tables, err := getTableMetadata(session, "gocql_test")
  1686. if err != nil {
  1687. t.Fatalf("failed to query the table metadata with err: %v", err)
  1688. }
  1689. if tables == nil {
  1690. t.Fatal("failed to query the table metadata, nil returned")
  1691. }
  1692. var testTable *TableMetadata
  1693. // verify all tables have minimum expected data
  1694. for i := range tables {
  1695. table := &tables[i]
  1696. if table.Name == "" {
  1697. t.Errorf("Expected table name to be set, but it was empty: index=%d metadata=%+v", i, table)
  1698. }
  1699. if table.Keyspace != "gocql_test" {
  1700. t.Errorf("Expected keyspace for '%s' table metadata to be 'gocql_test' but was '%s'", table.Name, table.Keyspace)
  1701. }
  1702. if session.cfg.ProtoVersion < 4 {
  1703. // TODO(zariel): there has to be a better way to detect what metadata version
  1704. // we are in, and a better way to structure the code so that it is abstracted away
  1705. // from us here
  1706. if table.KeyValidator == "" {
  1707. t.Errorf("Expected key validator to be set for table %s", table.Name)
  1708. }
  1709. if table.Comparator == "" {
  1710. t.Errorf("Expected comparator to be set for table %s", table.Name)
  1711. }
  1712. if table.DefaultValidator == "" {
  1713. t.Errorf("Expected default validator to be set for table %s", table.Name)
  1714. }
  1715. }
  1716. // these fields are not set until the metadata is compiled
  1717. if table.PartitionKey != nil {
  1718. t.Errorf("Did not expect partition key for table %s", table.Name)
  1719. }
  1720. if table.ClusteringColumns != nil {
  1721. t.Errorf("Did not expect clustering columns for table %s", table.Name)
  1722. }
  1723. if table.Columns != nil {
  1724. t.Errorf("Did not expect columns for table %s", table.Name)
  1725. }
  1726. // for the next part of the test after this loop, find the metadata for the test table
  1727. if table.Name == "test_table_metadata" {
  1728. testTable = table
  1729. }
  1730. }
  1731. // verify actual values on the test tables
  1732. if testTable == nil {
  1733. t.Fatal("Expected table metadata for name 'test_table_metadata'")
  1734. }
  1735. if session.cfg.ProtoVersion == protoVersion1 {
  1736. if testTable.KeyValidator != "org.apache.cassandra.db.marshal.Int32Type" {
  1737. t.Errorf("Expected test_table_metadata key validator to be 'org.apache.cassandra.db.marshal.Int32Type' but was '%s'", testTable.KeyValidator)
  1738. }
  1739. if testTable.Comparator != "org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.Int32Type,org.apache.cassandra.db.marshal.UTF8Type)" {
  1740. t.Errorf("Expected test_table_metadata key validator to be 'org.apache.cassandra.db.marshal.CompositeType(org.apache.cassandra.db.marshal.Int32Type,org.apache.cassandra.db.marshal.UTF8Type)' but was '%s'", testTable.Comparator)
  1741. }
  1742. if testTable.DefaultValidator != "org.apache.cassandra.db.marshal.BytesType" {
  1743. t.Errorf("Expected test_table_metadata key validator to be 'org.apache.cassandra.db.marshal.BytesType' but was '%s'", testTable.DefaultValidator)
  1744. }
  1745. expectedKeyAliases := []string{"first_id"}
  1746. if !reflect.DeepEqual(testTable.KeyAliases, expectedKeyAliases) {
  1747. t.Errorf("Expected key aliases %v but was %v", expectedKeyAliases, testTable.KeyAliases)
  1748. }
  1749. expectedColumnAliases := []string{"second_id"}
  1750. if !reflect.DeepEqual(testTable.ColumnAliases, expectedColumnAliases) {
  1751. t.Errorf("Expected key aliases %v but was %v", expectedColumnAliases, testTable.ColumnAliases)
  1752. }
  1753. }
  1754. if testTable.ValueAlias != "" {
  1755. t.Errorf("Expected value alias '' but was '%s'", testTable.ValueAlias)
  1756. }
  1757. }
  1758. // Integration test of just querying for data from the system.schema_columns table
  1759. func TestGetColumnMetadata(t *testing.T) {
  1760. session := createSession(t)
  1761. defer session.Close()
  1762. if err := createTable(session, "CREATE TABLE gocql_test.test_column_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
  1763. t.Fatalf("failed to create table with error '%v'", err)
  1764. }
  1765. if err := session.Query("CREATE INDEX index_column_metadata ON test_column_metadata ( third_id )").Exec(); err != nil {
  1766. t.Fatalf("failed to create index with err: %v", err)
  1767. }
  1768. columns, err := getColumnMetadata(session, "gocql_test")
  1769. if err != nil {
  1770. t.Fatalf("failed to query column metadata with err: %v", err)
  1771. }
  1772. if columns == nil {
  1773. t.Fatal("failed to query column metadata, nil returned")
  1774. }
  1775. testColumns := map[string]*ColumnMetadata{}
  1776. // verify actual values on the test columns
  1777. for i := range columns {
  1778. column := &columns[i]
  1779. if column.Name == "" {
  1780. t.Errorf("Expected column name to be set, but it was empty: index=%d metadata=%+v", i, column)
  1781. }
  1782. if column.Table == "" {
  1783. t.Errorf("Expected column %s table name to be set, but it was empty", column.Name)
  1784. }
  1785. if column.Keyspace != "gocql_test" {
  1786. t.Errorf("Expected column %s keyspace name to be 'gocql_test', but it was '%s'", column.Name, column.Keyspace)
  1787. }
  1788. if column.Kind == ColumnUnkownKind {
  1789. t.Errorf("Expected column %s kind to be set, but it was empty", column.Name)
  1790. }
  1791. if session.cfg.ProtoVersion == 1 && column.Kind != ColumnRegular {
  1792. t.Errorf("Expected column %s kind to be set to 'regular' for proto V1 but it was '%s'", column.Name, column.Kind)
  1793. }
  1794. if column.Validator == "" {
  1795. t.Errorf("Expected column %s validator to be set, but it was empty", column.Name)
  1796. }
  1797. // find the test table columns for the next step after this loop
  1798. if column.Table == "test_column_metadata" {
  1799. testColumns[column.Name] = column
  1800. }
  1801. }
  1802. if session.cfg.ProtoVersion == 1 {
  1803. // V1 proto only returns "regular columns"
  1804. if len(testColumns) != 1 {
  1805. t.Errorf("Expected 1 test columns but there were %d", len(testColumns))
  1806. }
  1807. thirdID, found := testColumns["third_id"]
  1808. if !found {
  1809. t.Fatalf("Expected to find column 'third_id' metadata but there was only %v", testColumns)
  1810. }
  1811. if thirdID.Kind != ColumnRegular {
  1812. t.Errorf("Expected %s column kind to be '%s' but it was '%s'", thirdID.Name, ColumnRegular, thirdID.Kind)
  1813. }
  1814. if thirdID.Index.Name != "index_column_metadata" {
  1815. t.Errorf("Expected %s column index name to be 'index_column_metadata' but it was '%s'", thirdID.Name, thirdID.Index.Name)
  1816. }
  1817. } else {
  1818. if len(testColumns) != 3 {
  1819. t.Errorf("Expected 3 test columns but there were %d", len(testColumns))
  1820. }
  1821. firstID, found := testColumns["first_id"]
  1822. if !found {
  1823. t.Fatalf("Expected to find column 'first_id' metadata but there was only %v", testColumns)
  1824. }
  1825. secondID, found := testColumns["second_id"]
  1826. if !found {
  1827. t.Fatalf("Expected to find column 'second_id' metadata but there was only %v", testColumns)
  1828. }
  1829. thirdID, found := testColumns["third_id"]
  1830. if !found {
  1831. t.Fatalf("Expected to find column 'third_id' metadata but there was only %v", testColumns)
  1832. }
  1833. if firstID.Kind != ColumnPartitionKey {
  1834. t.Errorf("Expected %s column kind to be '%s' but it was '%s'", firstID.Name, ColumnPartitionKey, firstID.Kind)
  1835. }
  1836. if secondID.Kind != ColumnClusteringKey {
  1837. t.Errorf("Expected %s column kind to be '%s' but it was '%s'", secondID.Name, ColumnClusteringKey, secondID.Kind)
  1838. }
  1839. if thirdID.Kind != ColumnRegular {
  1840. t.Errorf("Expected %s column kind to be '%s' but it was '%s'", thirdID.Name, ColumnRegular, thirdID.Kind)
  1841. }
  1842. if !session.useSystemSchema && thirdID.Index.Name != "index_column_metadata" {
  1843. // TODO(zariel): update metadata to scan index from system_schema
  1844. t.Errorf("Expected %s column index name to be 'index_column_metadata' but it was '%s'", thirdID.Name, thirdID.Index.Name)
  1845. }
  1846. }
  1847. }
  1848. func TestViewMetadata(t *testing.T) {
  1849. session := createSession(t)
  1850. defer session.Close()
  1851. createViews(t, session)
  1852. views, err := getViewsMetadata(session, "gocql_test")
  1853. if err != nil {
  1854. t.Fatalf("failed to query view metadata with err: %v", err)
  1855. }
  1856. if views == nil {
  1857. t.Fatal("failed to query view metadata, nil returned")
  1858. }
  1859. if len(views) != 1 {
  1860. t.Fatal("expected one view")
  1861. }
  1862. textType := TypeText
  1863. if flagCassVersion.Before(3, 0, 0) {
  1864. textType = TypeVarchar
  1865. }
  1866. expectedView := ViewMetadata{
  1867. Keyspace: "gocql_test",
  1868. Name: "basicview",
  1869. FieldNames: []string{"birthday", "nationality", "weight", "height"},
  1870. FieldTypes: []TypeInfo{
  1871. NativeType{typ: TypeTimestamp},
  1872. NativeType{typ: textType},
  1873. NativeType{typ: textType},
  1874. NativeType{typ: textType},
  1875. },
  1876. }
  1877. if !reflect.DeepEqual(views[0], expectedView) {
  1878. t.Fatalf("view is %+v, but expected %+v", views[0], expectedView)
  1879. }
  1880. }
  1881. func TestAggregateMetadata(t *testing.T) {
  1882. session := createSession(t)
  1883. defer session.Close()
  1884. createAggregate(t, session)
  1885. aggregates, err := getAggregatesMetadata(session, "gocql_test")
  1886. if err != nil {
  1887. t.Fatalf("failed to query aggregate metadata with err: %v", err)
  1888. }
  1889. if aggregates == nil {
  1890. t.Fatal("failed to query aggregate metadata, nil returned")
  1891. }
  1892. if len(aggregates) != 1 {
  1893. t.Fatal("expected only a single aggregate")
  1894. }
  1895. aggregate := aggregates[0]
  1896. expectedAggregrate := AggregateMetadata{
  1897. Keyspace: "gocql_test",
  1898. Name: "average",
  1899. ArgumentTypes: []TypeInfo{NativeType{typ: TypeInt}},
  1900. InitCond: "(0, 0)",
  1901. ReturnType: NativeType{typ: TypeDouble},
  1902. StateType: TupleTypeInfo{
  1903. NativeType: NativeType{typ: TypeTuple},
  1904. Elems: []TypeInfo{
  1905. NativeType{typ: TypeInt},
  1906. NativeType{typ: TypeBigInt},
  1907. },
  1908. },
  1909. stateFunc: "avgstate",
  1910. finalFunc: "avgfinal",
  1911. }
  1912. // In this case cassandra is returning a blob
  1913. if flagCassVersion.Before(3, 0, 0) {
  1914. expectedAggregrate.InitCond = string([]byte{0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0})
  1915. }
  1916. if !reflect.DeepEqual(aggregate, expectedAggregrate) {
  1917. t.Fatalf("aggregate is %+v, but expected %+v", aggregate, expectedAggregrate)
  1918. }
  1919. }
  1920. func TestFunctionMetadata(t *testing.T) {
  1921. session := createSession(t)
  1922. defer session.Close()
  1923. createFunctions(t, session)
  1924. functions, err := getFunctionsMetadata(session, "gocql_test")
  1925. if err != nil {
  1926. t.Fatalf("failed to query function metadata with err: %v", err)
  1927. }
  1928. if functions == nil {
  1929. t.Fatal("failed to query function metadata, nil returned")
  1930. }
  1931. if len(functions) != 2 {
  1932. t.Fatal("expected two functions")
  1933. }
  1934. avgState := functions[1]
  1935. avgFinal := functions[0]
  1936. avgStateBody := "if (val !=null) {state.setInt(0, state.getInt(0)+1); state.setLong(1, state.getLong(1)+val.intValue());}return state;"
  1937. expectedAvgState := FunctionMetadata{
  1938. Keyspace: "gocql_test",
  1939. Name: "avgstate",
  1940. ArgumentTypes: []TypeInfo{
  1941. TupleTypeInfo{
  1942. NativeType: NativeType{typ: TypeTuple},
  1943. Elems: []TypeInfo{
  1944. NativeType{typ: TypeInt},
  1945. NativeType{typ: TypeBigInt},
  1946. },
  1947. },
  1948. NativeType{typ: TypeInt},
  1949. },
  1950. ArgumentNames: []string{"state", "val"},
  1951. ReturnType: TupleTypeInfo{
  1952. NativeType: NativeType{typ: TypeTuple},
  1953. Elems: []TypeInfo{
  1954. NativeType{typ: TypeInt},
  1955. NativeType{typ: TypeBigInt},
  1956. },
  1957. },
  1958. CalledOnNullInput: true,
  1959. Language: "java",
  1960. Body: avgStateBody,
  1961. }
  1962. if !reflect.DeepEqual(avgState, expectedAvgState) {
  1963. t.Fatalf("function is %+v, but expected %+v", avgState, expectedAvgState)
  1964. }
  1965. finalStateBody := "double r = 0; if (state.getInt(0) == 0) return null; r = state.getLong(1); r/= state.getInt(0); return Double.valueOf(r);"
  1966. expectedAvgFinal := FunctionMetadata{
  1967. Keyspace: "gocql_test",
  1968. Name: "avgfinal",
  1969. ArgumentTypes: []TypeInfo{
  1970. TupleTypeInfo{
  1971. NativeType: NativeType{typ: TypeTuple},
  1972. Elems: []TypeInfo{
  1973. NativeType{typ: TypeInt},
  1974. NativeType{typ: TypeBigInt},
  1975. },
  1976. },
  1977. },
  1978. ArgumentNames: []string{"state"},
  1979. ReturnType: NativeType{typ: TypeDouble},
  1980. CalledOnNullInput: true,
  1981. Language: "java",
  1982. Body: finalStateBody,
  1983. }
  1984. if !reflect.DeepEqual(avgFinal, expectedAvgFinal) {
  1985. t.Fatalf("function is %+v, but expected %+v", avgFinal, expectedAvgFinal)
  1986. }
  1987. }
  1988. // Integration test of querying and composition the keyspace metadata
  1989. func TestKeyspaceMetadata(t *testing.T) {
  1990. session := createSession(t)
  1991. defer session.Close()
  1992. if err := createTable(session, "CREATE TABLE gocql_test.test_metadata (first_id int, second_id int, third_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
  1993. t.Fatalf("failed to create table with error '%v'", err)
  1994. }
  1995. createAggregate(t, session)
  1996. createViews(t, session)
  1997. if err := session.Query("CREATE INDEX index_metadata ON test_metadata ( third_id )").Exec(); err != nil {
  1998. t.Fatalf("failed to create index with err: %v", err)
  1999. }
  2000. keyspaceMetadata, err := session.KeyspaceMetadata("gocql_test")
  2001. if err != nil {
  2002. t.Fatalf("failed to query keyspace metadata with err: %v", err)
  2003. }
  2004. if keyspaceMetadata == nil {
  2005. t.Fatal("expected the keyspace metadata to not be nil, but it was nil")
  2006. }
  2007. if keyspaceMetadata.Name != session.cfg.Keyspace {
  2008. t.Fatalf("Expected the keyspace name to be %s but was %s", session.cfg.Keyspace, keyspaceMetadata.Name)
  2009. }
  2010. if len(keyspaceMetadata.Tables) == 0 {
  2011. t.Errorf("Expected tables but there were none")
  2012. }
  2013. tableMetadata, found := keyspaceMetadata.Tables["test_metadata"]
  2014. if !found {
  2015. t.Fatalf("failed to find the test_metadata table metadata")
  2016. }
  2017. if len(tableMetadata.PartitionKey) != 1 {
  2018. t.Errorf("expected partition key length of 1, but was %d", len(tableMetadata.PartitionKey))
  2019. }
  2020. for i, column := range tableMetadata.PartitionKey {
  2021. if column == nil {
  2022. t.Errorf("partition key column metadata at index %d was nil", i)
  2023. }
  2024. }
  2025. if tableMetadata.PartitionKey[0].Name != "first_id" {
  2026. t.Errorf("Expected the first partition key column to be 'first_id' but was '%s'", tableMetadata.PartitionKey[0].Name)
  2027. }
  2028. if len(tableMetadata.ClusteringColumns) != 1 {
  2029. t.Fatalf("expected clustering columns length of 1, but was %d", len(tableMetadata.ClusteringColumns))
  2030. }
  2031. for i, column := range tableMetadata.ClusteringColumns {
  2032. if column == nil {
  2033. t.Fatalf("clustering column metadata at index %d was nil", i)
  2034. }
  2035. }
  2036. if tableMetadata.ClusteringColumns[0].Name != "second_id" {
  2037. t.Errorf("Expected the first clustering column to be 'second_id' but was '%s'", tableMetadata.ClusteringColumns[0].Name)
  2038. }
  2039. thirdColumn, found := tableMetadata.Columns["third_id"]
  2040. if !found {
  2041. t.Fatalf("Expected a column definition for 'third_id'")
  2042. }
  2043. if !session.useSystemSchema && thirdColumn.Index.Name != "index_metadata" {
  2044. // TODO(zariel): scan index info from system_schema
  2045. t.Errorf("Expected column index named 'index_metadata' but was '%s'", thirdColumn.Index.Name)
  2046. }
  2047. aggregate, found := keyspaceMetadata.Aggregates["average"]
  2048. if !found {
  2049. t.Fatal("failed to find the aggreate in metadata")
  2050. }
  2051. if aggregate.FinalFunc.Name != "avgfinal" {
  2052. t.Fatalf("expected final function %s, but got %s", "avgFinal", aggregate.FinalFunc.Name)
  2053. }
  2054. if aggregate.StateFunc.Name != "avgstate" {
  2055. t.Fatalf("expected state function %s, but got %s", "avgstate", aggregate.StateFunc.Name)
  2056. }
  2057. _, found = keyspaceMetadata.Views["basicview"]
  2058. if !found {
  2059. t.Fatal("failed to find the view in metadata")
  2060. }
  2061. }
  2062. // Integration test of the routing key calculation
  2063. func TestRoutingKey(t *testing.T) {
  2064. session := createSession(t)
  2065. defer session.Close()
  2066. if err := createTable(session, "CREATE TABLE gocql_test.test_single_routing_key (first_id int, second_id int, PRIMARY KEY (first_id, second_id))"); err != nil {
  2067. t.Fatalf("failed to create table with error '%v'", err)
  2068. }
  2069. if err := createTable(session, "CREATE TABLE gocql_test.test_composite_routing_key (first_id int, second_id int, PRIMARY KEY ((first_id, second_id)))"); err != nil {
  2070. t.Fatalf("failed to create table with error '%v'", err)
  2071. }
  2072. routingKeyInfo, err := session.routingKeyInfo(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?")
  2073. if err != nil {
  2074. t.Fatalf("failed to get routing key info due to error: %v", err)
  2075. }
  2076. if routingKeyInfo == nil {
  2077. t.Fatal("Expected routing key info, but was nil")
  2078. }
  2079. if len(routingKeyInfo.indexes) != 1 {
  2080. t.Fatalf("Expected routing key indexes length to be 1 but was %d", len(routingKeyInfo.indexes))
  2081. }
  2082. if routingKeyInfo.indexes[0] != 1 {
  2083. t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
  2084. }
  2085. if len(routingKeyInfo.types) != 1 {
  2086. t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
  2087. }
  2088. if routingKeyInfo.types[0] == nil {
  2089. t.Fatal("Expected routing key types[0] to be non-nil")
  2090. }
  2091. if routingKeyInfo.types[0].Type() != TypeInt {
  2092. t.Fatalf("Expected routing key types[0].Type to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
  2093. }
  2094. // verify the cache is working
  2095. routingKeyInfo, err = session.routingKeyInfo(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?")
  2096. if err != nil {
  2097. t.Fatalf("failed to get routing key info due to error: %v", err)
  2098. }
  2099. if len(routingKeyInfo.indexes) != 1 {
  2100. t.Fatalf("Expected routing key indexes length to be 1 but was %d", len(routingKeyInfo.indexes))
  2101. }
  2102. if routingKeyInfo.indexes[0] != 1 {
  2103. t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
  2104. }
  2105. if len(routingKeyInfo.types) != 1 {
  2106. t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
  2107. }
  2108. if routingKeyInfo.types[0] == nil {
  2109. t.Fatal("Expected routing key types[0] to be non-nil")
  2110. }
  2111. if routingKeyInfo.types[0].Type() != TypeInt {
  2112. t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
  2113. }
  2114. cacheSize := session.routingKeyInfoCache.lru.Len()
  2115. if cacheSize != 1 {
  2116. t.Errorf("Expected cache size to be 1 but was %d", cacheSize)
  2117. }
  2118. query := session.Query("SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", 1, 2)
  2119. routingKey, err := query.GetRoutingKey()
  2120. if err != nil {
  2121. t.Fatalf("Failed to get routing key due to error: %v", err)
  2122. }
  2123. expectedRoutingKey := []byte{0, 0, 0, 2}
  2124. if !reflect.DeepEqual(expectedRoutingKey, routingKey) {
  2125. t.Errorf("Expected routing key %v but was %v", expectedRoutingKey, routingKey)
  2126. }
  2127. routingKeyInfo, err = session.routingKeyInfo(context.Background(), "SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?")
  2128. if err != nil {
  2129. t.Fatalf("failed to get routing key info due to error: %v", err)
  2130. }
  2131. if routingKeyInfo == nil {
  2132. t.Fatal("Expected routing key info, but was nil")
  2133. }
  2134. if len(routingKeyInfo.indexes) != 2 {
  2135. t.Fatalf("Expected routing key indexes length to be 2 but was %d", len(routingKeyInfo.indexes))
  2136. }
  2137. if routingKeyInfo.indexes[0] != 1 {
  2138. t.Errorf("Expected routing key index[0] to be 1 but was %d", routingKeyInfo.indexes[0])
  2139. }
  2140. if routingKeyInfo.indexes[1] != 0 {
  2141. t.Errorf("Expected routing key index[1] to be 0 but was %d", routingKeyInfo.indexes[1])
  2142. }
  2143. if len(routingKeyInfo.types) != 2 {
  2144. t.Fatalf("Expected routing key types length to be 1 but was %d", len(routingKeyInfo.types))
  2145. }
  2146. if routingKeyInfo.types[0] == nil {
  2147. t.Fatal("Expected routing key types[0] to be non-nil")
  2148. }
  2149. if routingKeyInfo.types[0].Type() != TypeInt {
  2150. t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[0].Type())
  2151. }
  2152. if routingKeyInfo.types[1] == nil {
  2153. t.Fatal("Expected routing key types[1] to be non-nil")
  2154. }
  2155. if routingKeyInfo.types[1].Type() != TypeInt {
  2156. t.Fatalf("Expected routing key types[0] to be %v but was %v", TypeInt, routingKeyInfo.types[1].Type())
  2157. }
  2158. query = session.Query("SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?", 1, 2)
  2159. routingKey, err = query.GetRoutingKey()
  2160. if err != nil {
  2161. t.Fatalf("Failed to get routing key due to error: %v", err)
  2162. }
  2163. expectedRoutingKey = []byte{0, 4, 0, 0, 0, 2, 0, 0, 4, 0, 0, 0, 1, 0}
  2164. if !reflect.DeepEqual(expectedRoutingKey, routingKey) {
  2165. t.Errorf("Expected routing key %v but was %v", expectedRoutingKey, routingKey)
  2166. }
  2167. // verify the cache is working
  2168. cacheSize = session.routingKeyInfoCache.lru.Len()
  2169. if cacheSize != 2 {
  2170. t.Errorf("Expected cache size to be 2 but was %d", cacheSize)
  2171. }
  2172. }
  2173. // Integration test of the token-aware policy-based connection pool
  2174. func TestTokenAwareConnPool(t *testing.T) {
  2175. cluster := createCluster()
  2176. cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(RoundRobinHostPolicy())
  2177. // force metadata query to page
  2178. cluster.PageSize = 1
  2179. session := createSessionFromCluster(cluster, t)
  2180. defer session.Close()
  2181. expectedPoolSize := cluster.NumConns * len(session.ring.allHosts())
  2182. // wait for pool to fill
  2183. for i := 0; i < 10; i++ {
  2184. if session.pool.Size() == expectedPoolSize {
  2185. break
  2186. }
  2187. time.Sleep(100 * time.Millisecond)
  2188. }
  2189. if expectedPoolSize != session.pool.Size() {
  2190. t.Errorf("Expected pool size %d but was %d", expectedPoolSize, session.pool.Size())
  2191. }
  2192. // add another cf so there are two pages when fetching table metadata from our keyspace
  2193. if err := createTable(session, "CREATE TABLE gocql_test.test_token_aware_other_cf (id int, data text, PRIMARY KEY (id))"); err != nil {
  2194. t.Fatalf("failed to create test_token_aware table with err: %v", err)
  2195. }
  2196. if err := createTable(session, "CREATE TABLE gocql_test.test_token_aware (id int, data text, PRIMARY KEY (id))"); err != nil {
  2197. t.Fatalf("failed to create test_token_aware table with err: %v", err)
  2198. }
  2199. query := session.Query("INSERT INTO test_token_aware (id, data) VALUES (?,?)", 42, "8 * 6 =")
  2200. if err := query.Exec(); err != nil {
  2201. t.Fatalf("failed to insert with err: %v", err)
  2202. }
  2203. query = session.Query("SELECT data FROM test_token_aware where id = ?", 42).Consistency(One)
  2204. var data string
  2205. if err := query.Scan(&data); err != nil {
  2206. t.Error(err)
  2207. }
  2208. // TODO add verification that the query went to the correct host
  2209. }
  2210. func TestNegativeStream(t *testing.T) {
  2211. session := createSession(t)
  2212. defer session.Close()
  2213. conn := getRandomConn(t, session)
  2214. const stream = -50
  2215. writer := frameWriterFunc(func(f *framer, streamID int) error {
  2216. f.writeHeader(0, opOptions, stream)
  2217. return f.finishWrite()
  2218. })
  2219. frame, err := conn.exec(context.Background(), writer, nil)
  2220. if err == nil {
  2221. t.Fatalf("expected to get an error on stream %d", stream)
  2222. } else if frame != nil {
  2223. t.Fatalf("expected to get nil frame got %+v", frame)
  2224. }
  2225. }
  2226. func TestManualQueryPaging(t *testing.T) {
  2227. const rowsToInsert = 5
  2228. session := createSession(t)
  2229. defer session.Close()
  2230. if err := createTable(session, "CREATE TABLE gocql_test.testManualPaging (id int, count int, PRIMARY KEY (id))"); err != nil {
  2231. t.Fatal(err)
  2232. }
  2233. for i := 0; i < rowsToInsert; i++ {
  2234. err := session.Query("INSERT INTO testManualPaging(id, count) VALUES(?, ?)", i, i*i).Exec()
  2235. if err != nil {
  2236. t.Fatal(err)
  2237. }
  2238. }
  2239. // disable auto paging, 1 page per iteration
  2240. query := session.Query("SELECT id, count FROM testManualPaging").PageState(nil).PageSize(2)
  2241. var id, count, fetched int
  2242. iter := query.Iter()
  2243. // NOTE: this isnt very indicative of how it should be used, the idea is that
  2244. // the page state is returned to some client who will send it back to manually
  2245. // page through the results.
  2246. for {
  2247. for iter.Scan(&id, &count) {
  2248. if count != (id * id) {
  2249. t.Fatalf("got wrong value from iteration: got %d expected %d", count, id*id)
  2250. }
  2251. fetched++
  2252. }
  2253. if len(iter.PageState()) > 0 {
  2254. // more pages
  2255. iter = query.PageState(iter.PageState()).Iter()
  2256. } else {
  2257. break
  2258. }
  2259. }
  2260. if err := iter.Close(); err != nil {
  2261. t.Fatal(err)
  2262. }
  2263. if fetched != rowsToInsert {
  2264. t.Fatalf("expected to fetch %d rows got %d", rowsToInsert, fetched)
  2265. }
  2266. }
  2267. func TestLexicalUUIDType(t *testing.T) {
  2268. session := createSession(t)
  2269. defer session.Close()
  2270. if err := createTable(session, `CREATE TABLE gocql_test.test_lexical_uuid (
  2271. key varchar,
  2272. column1 'org.apache.cassandra.db.marshal.LexicalUUIDType',
  2273. value int,
  2274. PRIMARY KEY (key, column1)
  2275. )`); err != nil {
  2276. t.Fatal("create:", err)
  2277. }
  2278. key := TimeUUID().String()
  2279. column1 := TimeUUID()
  2280. err := session.Query("INSERT INTO test_lexical_uuid(key, column1, value) VALUES(?, ?, ?)", key, column1, 55).Exec()
  2281. if err != nil {
  2282. t.Fatal(err)
  2283. }
  2284. var gotUUID UUID
  2285. if err := session.Query("SELECT column1 from test_lexical_uuid where key = ? AND column1 = ?", key, column1).Scan(&gotUUID); err != nil {
  2286. t.Fatal(err)
  2287. }
  2288. if gotUUID != column1 {
  2289. t.Errorf("got %s, expected %s", gotUUID, column1)
  2290. }
  2291. }
  2292. // Issue 475
  2293. func TestSessionBindRoutingKey(t *testing.T) {
  2294. cluster := createCluster()
  2295. cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(RoundRobinHostPolicy())
  2296. session := createSessionFromCluster(cluster, t)
  2297. defer session.Close()
  2298. if err := createTable(session, `CREATE TABLE gocql_test.test_bind_routing_key (
  2299. key varchar,
  2300. value int,
  2301. PRIMARY KEY (key)
  2302. )`); err != nil {
  2303. t.Fatal(err)
  2304. }
  2305. const (
  2306. key = "routing-key"
  2307. value = 5
  2308. )
  2309. fn := func(info *QueryInfo) ([]interface{}, error) {
  2310. return []interface{}{key, value}, nil
  2311. }
  2312. q := session.Bind("INSERT INTO test_bind_routing_key(key, value) VALUES(?, ?)", fn)
  2313. if err := q.Exec(); err != nil {
  2314. t.Fatal(err)
  2315. }
  2316. }
  2317. func TestJSONSupport(t *testing.T) {
  2318. session := createSession(t)
  2319. defer session.Close()
  2320. if session.cfg.ProtoVersion < 4 {
  2321. t.Skip("skipping JSON support on proto < 4")
  2322. }
  2323. if err := createTable(session, `CREATE TABLE gocql_test.test_json (
  2324. id text PRIMARY KEY,
  2325. age int,
  2326. state text
  2327. )`); err != nil {
  2328. t.Fatal(err)
  2329. }
  2330. err := session.Query("INSERT INTO test_json JSON ?", `{"id": "user123", "age": 42, "state": "TX"}`).Exec()
  2331. if err != nil {
  2332. t.Fatal(err)
  2333. }
  2334. var (
  2335. id string
  2336. age int
  2337. state string
  2338. )
  2339. err = session.Query("SELECT id, age, state FROM test_json WHERE id = ?", "user123").Scan(&id, &age, &state)
  2340. if err != nil {
  2341. t.Fatal(err)
  2342. }
  2343. if id != "user123" {
  2344. t.Errorf("got id %q expected %q", id, "user123")
  2345. }
  2346. if age != 42 {
  2347. t.Errorf("got age %d expected %d", age, 42)
  2348. }
  2349. if state != "TX" {
  2350. t.Errorf("got state %q expected %q", state, "TX")
  2351. }
  2352. }
  2353. func TestDiscoverViaProxy(t *testing.T) {
  2354. // This (complicated) test tests that when the driver is given an initial host
  2355. // that is infact a proxy it discovers the rest of the ring behind the proxy
  2356. // and does not store the proxies address as a host in its connection pool.
  2357. // See https://github.com/gocql/gocql/issues/481
  2358. clusterHosts := getClusterHosts()
  2359. proxy, err := net.Listen("tcp", "localhost:0")
  2360. if err != nil {
  2361. t.Fatalf("unable to create proxy listener: %v", err)
  2362. }
  2363. ctx, cancel := context.WithCancel(context.Background())
  2364. defer cancel()
  2365. var (
  2366. mu sync.Mutex
  2367. proxyConns []net.Conn
  2368. closed bool
  2369. )
  2370. go func() {
  2371. cassandraAddr := JoinHostPort(clusterHosts[0], 9042)
  2372. cassandra := func() (net.Conn, error) {
  2373. return net.Dial("tcp", cassandraAddr)
  2374. }
  2375. proxyFn := func(errs chan error, from, to net.Conn) {
  2376. _, err := io.Copy(to, from)
  2377. if err != nil {
  2378. errs <- err
  2379. }
  2380. }
  2381. // handle dials cassandra and then proxies requests and reponsess. It waits
  2382. // for both the read and write side of the TCP connection to close before
  2383. // returning.
  2384. handle := func(conn net.Conn) error {
  2385. cass, err := cassandra()
  2386. if err != nil {
  2387. return err
  2388. }
  2389. defer cass.Close()
  2390. errs := make(chan error, 2)
  2391. go proxyFn(errs, conn, cass)
  2392. go proxyFn(errs, cass, conn)
  2393. select {
  2394. case <-ctx.Done():
  2395. return ctx.Err()
  2396. case err := <-errs:
  2397. return err
  2398. }
  2399. }
  2400. for {
  2401. // proxy just accepts connections and then proxies them to cassandra,
  2402. // it runs until it is closed.
  2403. conn, err := proxy.Accept()
  2404. if err != nil {
  2405. mu.Lock()
  2406. if !closed {
  2407. t.Error(err)
  2408. }
  2409. mu.Unlock()
  2410. return
  2411. }
  2412. mu.Lock()
  2413. proxyConns = append(proxyConns, conn)
  2414. mu.Unlock()
  2415. go func(conn net.Conn) {
  2416. defer conn.Close()
  2417. if err := handle(conn); err != nil {
  2418. mu.Lock()
  2419. if !closed {
  2420. t.Error(err)
  2421. }
  2422. mu.Unlock()
  2423. }
  2424. }(conn)
  2425. }
  2426. }()
  2427. proxyAddr := proxy.Addr().String()
  2428. cluster := createCluster()
  2429. cluster.NumConns = 1
  2430. // initial host is the proxy address
  2431. cluster.Hosts = []string{proxyAddr}
  2432. session := createSessionFromCluster(cluster, t)
  2433. defer session.Close()
  2434. // we shouldnt need this but to be safe
  2435. time.Sleep(1 * time.Second)
  2436. session.pool.mu.RLock()
  2437. for _, host := range clusterHosts {
  2438. if _, ok := session.pool.hostConnPools[host]; !ok {
  2439. t.Errorf("missing host in pool after discovery: %q", host)
  2440. }
  2441. }
  2442. session.pool.mu.RUnlock()
  2443. mu.Lock()
  2444. closed = true
  2445. if err := proxy.Close(); err != nil {
  2446. t.Log(err)
  2447. }
  2448. for _, conn := range proxyConns {
  2449. if err := conn.Close(); err != nil {
  2450. t.Log(err)
  2451. }
  2452. }
  2453. mu.Unlock()
  2454. }
  2455. func TestUnmarshallNestedTypes(t *testing.T) {
  2456. session := createSession(t)
  2457. defer session.Close()
  2458. if session.cfg.ProtoVersion < protoVersion3 {
  2459. t.Skip("can not have frozen types in cassandra < 2.1.3")
  2460. }
  2461. if err := createTable(session, `CREATE TABLE gocql_test.test_557 (
  2462. id text PRIMARY KEY,
  2463. val list<frozen<map<text, text> > >
  2464. )`); err != nil {
  2465. t.Fatal(err)
  2466. }
  2467. m := []map[string]string{
  2468. {"key1": "val1"},
  2469. {"key2": "val2"},
  2470. }
  2471. const id = "key"
  2472. err := session.Query("INSERT INTO test_557(id, val) VALUES(?, ?)", id, m).Exec()
  2473. if err != nil {
  2474. t.Fatal(err)
  2475. }
  2476. var data []map[string]string
  2477. if err := session.Query("SELECT val FROM test_557 WHERE id = ?", id).Scan(&data); err != nil {
  2478. t.Fatal(err)
  2479. }
  2480. if !reflect.DeepEqual(data, m) {
  2481. t.Fatalf("%+#v != %+#v", data, m)
  2482. }
  2483. }
  2484. func TestSchemaReset(t *testing.T) {
  2485. if flagCassVersion.Major == 0 || flagCassVersion.Before(2, 1, 3) {
  2486. t.Skipf("skipping TestSchemaReset due to CASSANDRA-7910 in Cassandra <2.1.3 version=%v", flagCassVersion)
  2487. }
  2488. cluster := createCluster()
  2489. cluster.NumConns = 1
  2490. session := createSessionFromCluster(cluster, t)
  2491. defer session.Close()
  2492. if err := createTable(session, `CREATE TABLE gocql_test.test_schema_reset (
  2493. id text PRIMARY KEY)`); err != nil {
  2494. t.Fatal(err)
  2495. }
  2496. const key = "test"
  2497. err := session.Query("INSERT INTO test_schema_reset(id) VALUES(?)", key).Exec()
  2498. if err != nil {
  2499. t.Fatal(err)
  2500. }
  2501. var id string
  2502. err = session.Query("SELECT * FROM test_schema_reset WHERE id=?", key).Scan(&id)
  2503. if err != nil {
  2504. t.Fatal(err)
  2505. } else if id != key {
  2506. t.Fatalf("expected to get id=%q got=%q", key, id)
  2507. }
  2508. if err := createTable(session, `ALTER TABLE gocql_test.test_schema_reset ADD val text`); err != nil {
  2509. t.Fatal(err)
  2510. }
  2511. const expVal = "test-val"
  2512. err = session.Query("INSERT INTO test_schema_reset(id, val) VALUES(?, ?)", key, expVal).Exec()
  2513. if err != nil {
  2514. t.Fatal(err)
  2515. }
  2516. var val string
  2517. err = session.Query("SELECT * FROM test_schema_reset WHERE id=?", key).Scan(&id, &val)
  2518. if err != nil {
  2519. t.Fatal(err)
  2520. }
  2521. if id != key {
  2522. t.Errorf("expected to get id=%q got=%q", key, id)
  2523. }
  2524. if val != expVal {
  2525. t.Errorf("expected to get val=%q got=%q", expVal, val)
  2526. }
  2527. }
  2528. func TestCreateSession_DontSwallowError(t *testing.T) {
  2529. t.Skip("This test is bad, and the resultant error from cassandra changes between versions")
  2530. cluster := createCluster()
  2531. cluster.ProtoVersion = 0x100
  2532. session, err := cluster.CreateSession()
  2533. if err == nil {
  2534. session.Close()
  2535. t.Fatal("expected to get an error for unsupported protocol")
  2536. }
  2537. if flagCassVersion.Major < 3 {
  2538. // TODO: we should get a distinct error type here which include the underlying
  2539. // cassandra error about the protocol version, for now check this here.
  2540. if !strings.Contains(err.Error(), "Invalid or unsupported protocol version") {
  2541. t.Fatalf(`expcted to get error "unsupported protocol version" got: %q`, err)
  2542. }
  2543. } else {
  2544. if !strings.Contains(err.Error(), "unsupported response version") {
  2545. t.Fatalf(`expcted to get error "unsupported response version" got: %q`, err)
  2546. }
  2547. }
  2548. }
  2549. func TestControl_DiscoverProtocol(t *testing.T) {
  2550. cluster := createCluster()
  2551. cluster.ProtoVersion = 0
  2552. session, err := cluster.CreateSession()
  2553. if err != nil {
  2554. t.Fatal(err)
  2555. }
  2556. defer session.Close()
  2557. if session.cfg.ProtoVersion == 0 {
  2558. t.Fatal("did not discovery protocol")
  2559. }
  2560. }
  2561. // TestUnsetCol verify unset column will not replace an existing column
  2562. func TestUnsetCol(t *testing.T) {
  2563. session := createSession(t)
  2564. defer session.Close()
  2565. if session.cfg.ProtoVersion < 4 {
  2566. t.Skip("Unset Values are not supported in protocol < 4")
  2567. }
  2568. if err := createTable(session, "CREATE TABLE gocql_test.testUnsetInsert (id int, my_int int, my_text text, PRIMARY KEY (id))"); err != nil {
  2569. t.Fatalf("failed to create table with error '%v'", err)
  2570. }
  2571. if err := session.Query("INSERT INTO testUnSetInsert (id,my_int,my_text) VALUES (?,?,?)", 1, 2, "3").Exec(); err != nil {
  2572. t.Fatalf("failed to insert with err: %v", err)
  2573. }
  2574. if err := session.Query("INSERT INTO testUnSetInsert (id,my_int,my_text) VALUES (?,?,?)", 1, UnsetValue, UnsetValue).Exec(); err != nil {
  2575. t.Fatalf("failed to insert with err: %v", err)
  2576. }
  2577. var id, mInt int
  2578. var mText string
  2579. if err := session.Query("SELECT id, my_int ,my_text FROM testUnsetInsert").Scan(&id, &mInt, &mText); err != nil {
  2580. t.Fatalf("failed to select with err: %v", err)
  2581. } else if id != 1 || mInt != 2 || mText != "3" {
  2582. t.Fatalf("Expected results: 1, 2, \"3\", got %v, %v, %v", id, mInt, mText)
  2583. }
  2584. }
  2585. // TestUnsetColBatch verify unset column will not replace a column in batch
  2586. func TestUnsetColBatch(t *testing.T) {
  2587. session := createSession(t)
  2588. defer session.Close()
  2589. if session.cfg.ProtoVersion < 4 {
  2590. t.Skip("Unset Values are not supported in protocol < 4")
  2591. }
  2592. if err := createTable(session, "CREATE TABLE gocql_test.batchUnsetInsert (id int, my_int int, my_text text, PRIMARY KEY (id))"); err != nil {
  2593. t.Fatalf("failed to create table with error '%v'", err)
  2594. }
  2595. b := session.NewBatch(LoggedBatch)
  2596. b.Query("INSERT INTO gocql_test.batchUnsetInsert(id, my_int, my_text) VALUES (?,?,?)", 1, 1, UnsetValue)
  2597. b.Query("INSERT INTO gocql_test.batchUnsetInsert(id, my_int, my_text) VALUES (?,?,?)", 1, UnsetValue, "")
  2598. b.Query("INSERT INTO gocql_test.batchUnsetInsert(id, my_int, my_text) VALUES (?,?,?)", 2, 2, UnsetValue)
  2599. if err := session.ExecuteBatch(b); err != nil {
  2600. t.Fatalf("query failed. %v", err)
  2601. } else {
  2602. if b.Attempts() < 1 {
  2603. t.Fatal("expected at least 1 attempt, but got 0")
  2604. }
  2605. if b.Latency() <= 0 {
  2606. t.Fatalf("expected latency to be greater than 0, but got %v instead.", b.Latency())
  2607. }
  2608. }
  2609. var id, mInt, count int
  2610. var mText string
  2611. if err := session.Query("SELECT count(*) FROM gocql_test.batchUnsetInsert;").Scan(&count); err != nil {
  2612. t.Fatalf("Failed to select with err: %v", err)
  2613. } else if count != 2 {
  2614. t.Fatalf("Expected Batch Insert count 2, got %v", count)
  2615. }
  2616. if err := session.Query("SELECT id, my_int ,my_text FROM gocql_test.batchUnsetInsert where id=1;").Scan(&id, &mInt, &mText); err != nil {
  2617. t.Fatalf("failed to select with err: %v", err)
  2618. } else if id != mInt {
  2619. t.Fatalf("expected id, my_int to be 1, got %v and %v", id, mInt)
  2620. }
  2621. }
  2622. func TestQuery_NamedValues(t *testing.T) {
  2623. session := createSession(t)
  2624. defer session.Close()
  2625. if session.cfg.ProtoVersion < 3 {
  2626. t.Skip("named Values are not supported in protocol < 3")
  2627. }
  2628. if err := createTable(session, "CREATE TABLE gocql_test.named_query(id int, value text, PRIMARY KEY (id))"); err != nil {
  2629. t.Fatal(err)
  2630. }
  2631. err := session.Query("INSERT INTO gocql_test.named_query(id, value) VALUES(:id, :value)", NamedValue("id", 1), NamedValue("value", "i am a value")).Exec()
  2632. if err != nil {
  2633. t.Fatal(err)
  2634. }
  2635. var value string
  2636. if err := session.Query("SELECT VALUE from gocql_test.named_query WHERE id = :id", NamedValue("id", 1)).Scan(&value); err != nil {
  2637. t.Fatal(err)
  2638. }
  2639. }