integration_test.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. // +build all integration
  2. package gocql
  3. // This file groups integration tests where Cassandra has to be set up with some special integration variables
  4. import (
  5. "reflect"
  6. "testing"
  7. "time"
  8. )
  9. // TestAuthentication verifies that gocql will work with a host configured to only accept authenticated connections
  10. func TestAuthentication(t *testing.T) {
  11. if *flagProto < 2 {
  12. t.Skip("Authentication is not supported with protocol < 2")
  13. }
  14. if !*flagRunAuthTest {
  15. t.Skip("Authentication is not configured in the target cluster")
  16. }
  17. cluster := createCluster()
  18. cluster.Authenticator = PasswordAuthenticator{
  19. Username: "cassandra",
  20. Password: "cassandra",
  21. }
  22. session, err := cluster.CreateSession()
  23. if err != nil {
  24. t.Fatalf("Authentication error: %s", err)
  25. }
  26. session.Close()
  27. }
  28. func TestGetHosts(t *testing.T) {
  29. clusterHosts := getClusterHosts()
  30. cluster := createCluster()
  31. session := createSessionFromCluster(cluster, t)
  32. hosts, partitioner, err := session.hostSource.GetHosts()
  33. assertTrue(t, "err == nil", err == nil)
  34. assertEqual(t, "len(hosts)", len(clusterHosts), len(hosts))
  35. assertTrue(t, "len(partitioner) != 0", len(partitioner) != 0)
  36. }
  37. //TestRingDiscovery makes sure that you can autodiscover other cluster members when you seed a cluster config with just one node
  38. func TestRingDiscovery(t *testing.T) {
  39. clusterHosts := getClusterHosts()
  40. cluster := createCluster()
  41. cluster.Hosts = clusterHosts[:1]
  42. session := createSessionFromCluster(cluster, t)
  43. defer session.Close()
  44. if *clusterSize > 1 {
  45. // wait for autodiscovery to update the pool with the list of known hosts
  46. time.Sleep(*flagAutoWait)
  47. }
  48. session.pool.mu.RLock()
  49. defer session.pool.mu.RUnlock()
  50. size := len(session.pool.hostConnPools)
  51. if *clusterSize != size {
  52. for p, pool := range session.pool.hostConnPools {
  53. t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.ConnectAddress().String())
  54. }
  55. t.Errorf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size)
  56. }
  57. }
  58. func TestWriteFailure(t *testing.T) {
  59. cluster := createCluster()
  60. createKeyspace(t, cluster, "test")
  61. cluster.Keyspace = "test"
  62. session, err := cluster.CreateSession()
  63. if err != nil {
  64. t.Fatal("create session:", err)
  65. }
  66. defer session.Close()
  67. if err := createTable(session, "CREATE TABLE test.test (id int,value int,PRIMARY KEY (id))"); err != nil {
  68. t.Fatalf("failed to create table with error '%v'", err)
  69. }
  70. if err := session.Query(`INSERT INTO test.test (id, value) VALUES (1, 1)`).Exec(); err != nil {
  71. errWrite, ok := err.(*RequestErrWriteFailure)
  72. if ok {
  73. if session.cfg.ProtoVersion >= 5 {
  74. // ErrorMap should be filled with some hosts that should've errored
  75. if len(errWrite.ErrorMap) == 0 {
  76. t.Fatal("errWrite.ErrorMap should have some failed hosts but it didn't have any")
  77. }
  78. } else {
  79. // Map doesn't get filled for V4
  80. if len(errWrite.ErrorMap) != 0 {
  81. t.Fatal("errWrite.ErrorMap should have length 0, it's: ", len(errWrite.ErrorMap))
  82. }
  83. }
  84. } else {
  85. t.Fatal("error should be RequestErrWriteFailure, it's: ", errWrite)
  86. }
  87. } else {
  88. t.Fatal("a write fail error should have happened when querying test keyspace")
  89. }
  90. if err = session.Query("DROP KEYSPACE test").Exec(); err != nil {
  91. t.Fatal(err)
  92. }
  93. }
  94. func TestCustomPayloadMessages(t *testing.T) {
  95. cluster := createCluster()
  96. session := createSessionFromCluster(cluster, t)
  97. defer session.Close()
  98. if err := createTable(session, "CREATE TABLE gocql_test.testCustomPayloadMessages (id int, value int, PRIMARY KEY (id))"); err != nil {
  99. t.Fatal(err)
  100. }
  101. // QueryMessage
  102. var customPayload = map[string][]byte{"a": []byte{10, 20}, "b": []byte{20, 30}}
  103. query := session.Query("SELECT id FROM testCustomPayloadMessages where id = ?", 42).Consistency(One).CustomPayload(customPayload)
  104. iter := query.Iter()
  105. rCustomPayload := iter.GetCustomPayload()
  106. if !reflect.DeepEqual(customPayload, rCustomPayload) {
  107. t.Fatal("The received custom payload should match the sent")
  108. }
  109. iter.Close()
  110. // Insert query
  111. query = session.Query("INSERT INTO testCustomPayloadMessages(id,value) VALUES(1, 1)").Consistency(One).CustomPayload(customPayload)
  112. iter = query.Iter()
  113. rCustomPayload = iter.GetCustomPayload()
  114. if !reflect.DeepEqual(customPayload, rCustomPayload) {
  115. t.Fatal("The received custom payload should match the sent")
  116. }
  117. iter.Close()
  118. // Batch Message
  119. b := session.NewBatch(LoggedBatch)
  120. b.CustomPayload = customPayload
  121. b.Query("INSERT INTO testCustomPayloadMessages(id,value) VALUES(1, 1)")
  122. if err := session.ExecuteBatch(b); err != nil {
  123. t.Fatalf("query failed. %v", err)
  124. }
  125. }
  126. func TestCustomPayloadValues(t *testing.T) {
  127. cluster := createCluster()
  128. session := createSessionFromCluster(cluster, t)
  129. defer session.Close()
  130. if err := createTable(session, "CREATE TABLE gocql_test.testCustomPayloadValues (id int, value int, PRIMARY KEY (id))"); err != nil {
  131. t.Fatal(err)
  132. }
  133. values := []map[string][]byte{map[string][]byte{"a": []byte{10, 20}, "b": []byte{20, 30}}, nil, map[string][]byte{"a": []byte{10, 20}, "b": nil}}
  134. for _, customPayload := range values {
  135. query := session.Query("SELECT id FROM testCustomPayloadValues where id = ?", 42).Consistency(One).CustomPayload(customPayload)
  136. iter := query.Iter()
  137. rCustomPayload := iter.GetCustomPayload()
  138. if !reflect.DeepEqual(customPayload, rCustomPayload) {
  139. t.Fatal("The received custom payload should match the sent")
  140. }
  141. }
  142. }
  143. func TestUDF(t *testing.T) {
  144. session := createSession(t)
  145. defer session.Close()
  146. if session.cfg.ProtoVersion < 4 {
  147. t.Skip("skipping UDF support on proto < 4")
  148. }
  149. const query = `CREATE OR REPLACE FUNCTION uniq(state set<text>, val text)
  150. CALLED ON NULL INPUT RETURNS set<text> LANGUAGE java
  151. AS 'state.add(val); return state;'`
  152. err := session.Query(query).Exec()
  153. if err != nil {
  154. t.Fatal(err)
  155. }
  156. }