integration_test.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. // +build all integration
  2. package gocql
  3. // This file groups integration tests where Cassandra has to be set up with some special integration variables
  4. import (
  5. "context"
  6. "reflect"
  7. "testing"
  8. "time"
  9. )
  10. // TestAuthentication verifies that gocql will work with a host configured to only accept authenticated connections
  11. func TestAuthentication(t *testing.T) {
  12. if *flagProto < 2 {
  13. t.Skip("Authentication is not supported with protocol < 2")
  14. }
  15. if !*flagRunAuthTest {
  16. t.Skip("Authentication is not configured in the target cluster")
  17. }
  18. cluster := createCluster()
  19. cluster.Authenticator = PasswordAuthenticator{
  20. Username: "cassandra",
  21. Password: "cassandra",
  22. }
  23. session, err := cluster.CreateSession()
  24. if err != nil {
  25. t.Fatalf("Authentication error: %s", err)
  26. }
  27. session.Close()
  28. }
  29. func TestGetHosts(t *testing.T) {
  30. clusterHosts := getClusterHosts()
  31. cluster := createCluster()
  32. session := createSessionFromCluster(cluster, t)
  33. hosts, partitioner, err := session.hostSource.GetHosts()
  34. assertTrue(t, "err == nil", err == nil)
  35. assertEqual(t, "len(hosts)", len(clusterHosts), len(hosts))
  36. assertTrue(t, "len(partitioner) != 0", len(partitioner) != 0)
  37. }
  38. //TestRingDiscovery makes sure that you can autodiscover other cluster members when you seed a cluster config with just one node
  39. func TestRingDiscovery(t *testing.T) {
  40. clusterHosts := getClusterHosts()
  41. cluster := createCluster()
  42. cluster.Hosts = clusterHosts[:1]
  43. session := createSessionFromCluster(cluster, t)
  44. defer session.Close()
  45. if *clusterSize > 1 {
  46. // wait for autodiscovery to update the pool with the list of known hosts
  47. time.Sleep(*flagAutoWait)
  48. }
  49. session.pool.mu.RLock()
  50. defer session.pool.mu.RUnlock()
  51. size := len(session.pool.hostConnPools)
  52. if *clusterSize != size {
  53. for p, pool := range session.pool.hostConnPools {
  54. t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.ConnectAddress().String())
  55. }
  56. t.Errorf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size)
  57. }
  58. }
  59. func TestWriteFailure(t *testing.T) {
  60. cluster := createCluster()
  61. createKeyspace(t, cluster, "test")
  62. cluster.Keyspace = "test"
  63. session, err := cluster.CreateSession()
  64. if err != nil {
  65. t.Fatal("create session:", err)
  66. }
  67. defer session.Close()
  68. if err := createTable(session, "CREATE TABLE test.test (id int,value int,PRIMARY KEY (id))"); err != nil {
  69. t.Fatalf("failed to create table with error '%v'", err)
  70. }
  71. if err := session.Query(`INSERT INTO test.test (id, value) VALUES (1, 1)`).Exec(); err != nil {
  72. errWrite, ok := err.(*RequestErrWriteFailure)
  73. if ok {
  74. if session.cfg.ProtoVersion >= 5 {
  75. // ErrorMap should be filled with some hosts that should've errored
  76. if len(errWrite.ErrorMap) == 0 {
  77. t.Fatal("errWrite.ErrorMap should have some failed hosts but it didn't have any")
  78. }
  79. } else {
  80. // Map doesn't get filled for V4
  81. if len(errWrite.ErrorMap) != 0 {
  82. t.Fatal("errWrite.ErrorMap should have length 0, it's: ", len(errWrite.ErrorMap))
  83. }
  84. }
  85. } else {
  86. t.Fatal("error should be RequestErrWriteFailure, it's: ", errWrite)
  87. }
  88. } else {
  89. t.Fatal("a write fail error should have happened when querying test keyspace")
  90. }
  91. if err = session.Query("DROP KEYSPACE test").Exec(); err != nil {
  92. t.Fatal(err)
  93. }
  94. }
  95. func TestCustomPayloadMessages(t *testing.T) {
  96. cluster := createCluster()
  97. session := createSessionFromCluster(cluster, t)
  98. defer session.Close()
  99. if err := createTable(session, "CREATE TABLE gocql_test.testCustomPayloadMessages (id int, value int, PRIMARY KEY (id))"); err != nil {
  100. t.Fatal(err)
  101. }
  102. // QueryMessage
  103. var customPayload = map[string][]byte{"a": []byte{10, 20}, "b": []byte{20, 30}}
  104. query := session.Query("SELECT id FROM testCustomPayloadMessages where id = ?", 42).Consistency(One).CustomPayload(customPayload)
  105. iter := query.Iter()
  106. rCustomPayload := iter.GetCustomPayload()
  107. if !reflect.DeepEqual(customPayload, rCustomPayload) {
  108. t.Fatal("The received custom payload should match the sent")
  109. }
  110. iter.Close()
  111. // Insert query
  112. query = session.Query("INSERT INTO testCustomPayloadMessages(id,value) VALUES(1, 1)").Consistency(One).CustomPayload(customPayload)
  113. iter = query.Iter()
  114. rCustomPayload = iter.GetCustomPayload()
  115. if !reflect.DeepEqual(customPayload, rCustomPayload) {
  116. t.Fatal("The received custom payload should match the sent")
  117. }
  118. iter.Close()
  119. // Batch Message
  120. b := session.NewBatch(LoggedBatch)
  121. b.CustomPayload = customPayload
  122. b.Query("INSERT INTO testCustomPayloadMessages(id,value) VALUES(1, 1)")
  123. if err := session.ExecuteBatch(b); err != nil {
  124. t.Fatalf("query failed. %v", err)
  125. }
  126. }
  127. func TestCustomPayloadValues(t *testing.T) {
  128. cluster := createCluster()
  129. session := createSessionFromCluster(cluster, t)
  130. defer session.Close()
  131. if err := createTable(session, "CREATE TABLE gocql_test.testCustomPayloadValues (id int, value int, PRIMARY KEY (id))"); err != nil {
  132. t.Fatal(err)
  133. }
  134. values := []map[string][]byte{map[string][]byte{"a": []byte{10, 20}, "b": []byte{20, 30}}, nil, map[string][]byte{"a": []byte{10, 20}, "b": nil}}
  135. for _, customPayload := range values {
  136. query := session.Query("SELECT id FROM testCustomPayloadValues where id = ?", 42).Consistency(One).CustomPayload(customPayload)
  137. iter := query.Iter()
  138. rCustomPayload := iter.GetCustomPayload()
  139. if !reflect.DeepEqual(customPayload, rCustomPayload) {
  140. t.Fatal("The received custom payload should match the sent")
  141. }
  142. }
  143. }
  144. func TestSessionAwaitSchemaAgreement(t *testing.T) {
  145. session := createSession(t)
  146. defer session.Close()
  147. if err := session.AwaitSchemaAgreement(context.Background()); err != nil {
  148. t.Fatalf("expected session.AwaitSchemaAgreement to not return an error but got '%v'", err)
  149. }
  150. }
  151. func TestUDF(t *testing.T) {
  152. session := createSession(t)
  153. defer session.Close()
  154. if session.cfg.ProtoVersion < 4 {
  155. t.Skip("skipping UDF support on proto < 4")
  156. }
  157. const query = `CREATE OR REPLACE FUNCTION uniq(state set<text>, val text)
  158. CALLED ON NULL INPUT RETURNS set<text> LANGUAGE java
  159. AS 'state.add(val); return state;'`
  160. err := session.Query(query).Exec()
  161. if err != nil {
  162. t.Fatal(err)
  163. }
  164. }