conn_test.go 15 KB


  1. // Copyright (c) 2012 The gocql Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // +build all unit
  5. package gocql
  6. import (
  7. "crypto/tls"
  8. "crypto/x509"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "net"
  13. "os"
  14. "strings"
  15. "sync"
  16. "sync/atomic"
  17. "testing"
  18. "time"
  19. )
  20. const (
  21. defaultProto = protoVersion2
  22. )
  23. func TestJoinHostPort(t *testing.T) {
  24. tests := map[string]string{
  25. "127.0.0.1:0": JoinHostPort("127.0.0.1", 0),
  26. "127.0.0.1:1": JoinHostPort("127.0.0.1:1", 9142),
  27. "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:0": JoinHostPort("2001:0db8:85a3:0000:0000:8a2e:0370:7334", 0),
  28. "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:1": JoinHostPort("[2001:0db8:85a3:0000:0000:8a2e:0370:7334]:1", 9142),
  29. }
  30. for k, v := range tests {
  31. if k != v {
  32. t.Fatalf("expected '%v', got '%v'", k, v)
  33. }
  34. }
  35. }
  36. func TestSimple(t *testing.T) {
  37. srv := NewTestServer(t, defaultProto)
  38. defer srv.Stop()
  39. cluster := NewCluster(srv.Address)
  40. cluster.ProtoVersion = int(defaultProto)
  41. db, err := cluster.CreateSession()
  42. if err != nil {
  43. t.Errorf("0x%x: NewCluster: %v", defaultProto, err)
  44. return
  45. }
  46. if err := db.Query("void").Exec(); err != nil {
  47. t.Errorf("0x%x: %v", defaultProto, err)
  48. }
  49. }
  50. func TestSSLSimple(t *testing.T) {
  51. srv := NewSSLTestServer(t, defaultProto)
  52. defer srv.Stop()
  53. db, err := createTestSslCluster(srv.Address, defaultProto, true).CreateSession()
  54. if err != nil {
  55. t.Fatalf("0x%x: NewCluster: %v", defaultProto, err)
  56. }
  57. if err := db.Query("void").Exec(); err != nil {
  58. t.Fatalf("0x%x: %v", defaultProto, err)
  59. }
  60. }
  61. func TestSSLSimpleNoClientCert(t *testing.T) {
  62. srv := NewSSLTestServer(t, defaultProto)
  63. defer srv.Stop()
  64. db, err := createTestSslCluster(srv.Address, defaultProto, false).CreateSession()
  65. if err != nil {
  66. t.Fatalf("0x%x: NewCluster: %v", defaultProto, err)
  67. }
  68. if err := db.Query("void").Exec(); err != nil {
  69. t.Fatalf("0x%x: %v", defaultProto, err)
  70. }
  71. }
  72. func createTestSslCluster(hosts string, proto uint8, useClientCert bool) *ClusterConfig {
  73. cluster := NewCluster(hosts)
  74. sslOpts := &SslOptions{
  75. CaPath: "testdata/pki/ca.crt",
  76. EnableHostVerification: false,
  77. }
  78. if useClientCert {
  79. sslOpts.CertPath = "testdata/pki/gocql.crt"
  80. sslOpts.KeyPath = "testdata/pki/gocql.key"
  81. }
  82. cluster.SslOpts = sslOpts
  83. cluster.ProtoVersion = int(proto)
  84. return cluster
  85. }
  86. func TestClosed(t *testing.T) {
  87. t.Skip("Skipping the execution of TestClosed for now to try to concentrate on more important test failures on Travis")
  88. srv := NewTestServer(t, defaultProto)
  89. defer srv.Stop()
  90. cluster := NewCluster(srv.Address)
  91. cluster.ProtoVersion = int(defaultProto)
  92. session, err := cluster.CreateSession()
  93. defer session.Close()
  94. if err != nil {
  95. t.Errorf("0x%x: NewCluster: %v", defaultProto, err)
  96. return
  97. }
  98. if err := session.Query("void").Exec(); err != ErrSessionClosed {
  99. t.Errorf("0x%x: expected %#v, got %#v", defaultProto, ErrSessionClosed, err)
  100. return
  101. }
  102. }
  103. func newTestSession(addr string, proto uint8) (*Session, error) {
  104. cluster := NewCluster(addr)
  105. cluster.ProtoVersion = int(proto)
  106. return cluster.CreateSession()
  107. }
  108. func TestTimeout(t *testing.T) {
  109. srv := NewTestServer(t, defaultProto)
  110. defer srv.Stop()
  111. db, err := newTestSession(srv.Address, defaultProto)
  112. if err != nil {
  113. t.Errorf("NewCluster: %v", err)
  114. return
  115. }
  116. defer db.Close()
  117. go func() {
  118. <-time.After(2 * time.Second)
  119. t.Errorf("no timeout")
  120. }()
  121. if err := db.Query("kill").Exec(); err == nil {
  122. t.Errorf("expected error")
  123. }
  124. }
  125. // TestQueryRetry will test to make sure that gocql will execute
  126. // the exact amount of retry queries designated by the user.
  127. func TestQueryRetry(t *testing.T) {
  128. srv := NewTestServer(t, defaultProto)
  129. defer srv.Stop()
  130. db, err := newTestSession(srv.Address, defaultProto)
  131. if err != nil {
  132. t.Fatalf("NewCluster: %v", err)
  133. }
  134. defer db.Close()
  135. go func() {
  136. <-time.After(5 * time.Second)
  137. t.Fatalf("no timeout")
  138. }()
  139. rt := &SimpleRetryPolicy{NumRetries: 1}
  140. qry := db.Query("kill").RetryPolicy(rt)
  141. if err := qry.Exec(); err == nil {
  142. t.Fatalf("expected error")
  143. }
  144. requests := atomic.LoadInt64(&srv.nKillReq)
  145. attempts := qry.Attempts()
  146. if requests != int64(attempts) {
  147. t.Fatalf("expected requests %v to match query attemps %v", requests, attempts)
  148. }
  149. //Minus 1 from the requests variable since there is the initial query attempt
  150. if requests-1 != int64(rt.NumRetries) {
  151. t.Fatalf("failed to retry the query %v time(s). Query executed %v times", rt.NumRetries, requests-1)
  152. }
  153. }
  154. func TestSlowQuery(t *testing.T) {
  155. srv := NewTestServer(t, defaultProto)
  156. defer srv.Stop()
  157. db, err := newTestSession(srv.Address, defaultProto)
  158. if err != nil {
  159. t.Errorf("NewCluster: %v", err)
  160. return
  161. }
  162. if err := db.Query("slow").Exec(); err != nil {
  163. t.Fatal(err)
  164. }
  165. }
  166. func TestSimplePoolRoundRobin(t *testing.T) {
  167. servers := make([]*TestServer, 5)
  168. addrs := make([]string, len(servers))
  169. for n := 0; n < len(servers); n++ {
  170. servers[n] = NewTestServer(t, defaultProto)
  171. addrs[n] = servers[n].Address
  172. defer servers[n].Stop()
  173. }
  174. cluster := NewCluster(addrs...)
  175. cluster.ProtoVersion = defaultProto
  176. db, err := cluster.CreateSession()
  177. time.Sleep(1 * time.Second) // Sleep to allow the Cluster.fillPool to complete
  178. if err != nil {
  179. t.Fatalf("NewCluster: %v", err)
  180. }
  181. var wg sync.WaitGroup
  182. wg.Add(5)
  183. for n := 0; n < 5; n++ {
  184. go func() {
  185. for j := 0; j < 5; j++ {
  186. if err := db.Query("void").Exec(); err != nil {
  187. t.Fatal(err)
  188. }
  189. }
  190. wg.Done()
  191. }()
  192. }
  193. wg.Wait()
  194. diff := 0
  195. for n := 1; n < len(servers); n++ {
  196. d := 0
  197. if servers[n].nreq > servers[n-1].nreq {
  198. d = int(servers[n].nreq - servers[n-1].nreq)
  199. } else {
  200. d = int(servers[n-1].nreq - servers[n].nreq)
  201. }
  202. if d > diff {
  203. diff = d
  204. }
  205. }
  206. if diff > 0 {
  207. t.Errorf("Expected 0 difference in usage but was %d", diff)
  208. }
  209. }
  210. func TestConnClosing(t *testing.T) {
  211. t.Skip("Skipping until test can be ran reliably")
  212. srv := NewTestServer(t, protoVersion2)
  213. defer srv.Stop()
  214. db, err := NewCluster(srv.Address).CreateSession()
  215. if err != nil {
  216. t.Errorf("NewCluster: %v", err)
  217. }
  218. defer db.Close()
  219. numConns := db.cfg.NumConns
  220. count := db.cfg.NumStreams * numConns
  221. wg := &sync.WaitGroup{}
  222. wg.Add(count)
  223. for i := 0; i < count; i++ {
  224. go func(wg *sync.WaitGroup) {
  225. wg.Done()
  226. db.Query("kill").Exec()
  227. }(wg)
  228. }
  229. wg.Wait()
  230. time.Sleep(1 * time.Second) //Sleep so the fillPool can complete.
  231. pool := db.Pool.(ConnectionPool)
  232. conns := pool.Size()
  233. if conns != numConns {
  234. t.Errorf("Expected to have %d connections but have %d", numConns, conns)
  235. }
  236. }
  237. func TestStreams_Protocol1(t *testing.T) {
  238. srv := NewTestServer(t, protoVersion1)
  239. defer srv.Stop()
  240. // TODO: these are more like session tests and should instead operate
  241. // on a single Conn
  242. cluster := NewCluster(srv.Address)
  243. cluster.NumConns = 1
  244. cluster.ProtoVersion = 1
  245. db, err := cluster.CreateSession()
  246. if err != nil {
  247. t.Fatal(err)
  248. }
  249. defer db.Close()
  250. var wg sync.WaitGroup
  251. for i := 0; i < db.cfg.NumStreams; i++ {
  252. // here were just validating that if we send NumStream request we get
  253. // a response for every stream and the lengths for the queries are set
  254. // correctly.
  255. wg.Add(1)
  256. go func() {
  257. defer wg.Done()
  258. if err := db.Query("void").Exec(); err != nil {
  259. t.Error(err)
  260. }
  261. }()
  262. }
  263. wg.Wait()
  264. }
  265. func TestStreams_Protocol2(t *testing.T) {
  266. srv := NewTestServer(t, protoVersion2)
  267. defer srv.Stop()
  268. // TODO: these are more like session tests and should instead operate
  269. // on a single Conn
  270. cluster := NewCluster(srv.Address)
  271. cluster.NumConns = 1
  272. cluster.ProtoVersion = 2
  273. db, err := cluster.CreateSession()
  274. if err != nil {
  275. t.Fatal(err)
  276. }
  277. defer db.Close()
  278. for i := 0; i < db.cfg.NumStreams; i++ {
  279. // the test server processes each conn synchronously
  280. // here were just validating that if we send NumStream request we get
  281. // a response for every stream and the lengths for the queries are set
  282. // correctly.
  283. if err = db.Query("void").Exec(); err != nil {
  284. t.Fatal(err)
  285. }
  286. }
  287. }
  288. func TestStreams_Protocol3(t *testing.T) {
  289. srv := NewTestServer(t, protoVersion3)
  290. defer srv.Stop()
  291. // TODO: these are more like session tests and should instead operate
  292. // on a single Conn
  293. cluster := NewCluster(srv.Address)
  294. cluster.NumConns = 1
  295. cluster.ProtoVersion = 3
  296. db, err := cluster.CreateSession()
  297. if err != nil {
  298. t.Fatal(err)
  299. }
  300. defer db.Close()
  301. for i := 0; i < db.cfg.NumStreams; i++ {
  302. // the test server processes each conn synchronously
  303. // here were just validating that if we send NumStream request we get
  304. // a response for every stream and the lengths for the queries are set
  305. // correctly.
  306. if err = db.Query("void").Exec(); err != nil {
  307. t.Fatal(err)
  308. }
  309. }
  310. }
  311. func BenchmarkProtocolV3(b *testing.B) {
  312. srv := NewTestServer(b, protoVersion3)
  313. defer srv.Stop()
  314. // TODO: these are more like session tests and should instead operate
  315. // on a single Conn
  316. cluster := NewCluster(srv.Address)
  317. cluster.NumConns = 1
  318. cluster.ProtoVersion = 3
  319. db, err := cluster.CreateSession()
  320. if err != nil {
  321. b.Fatal(err)
  322. }
  323. defer db.Close()
  324. b.ResetTimer()
  325. b.ReportAllocs()
  326. for i := 0; i < b.N; i++ {
  327. if err = db.Query("void").Exec(); err != nil {
  328. b.Fatal(err)
  329. }
  330. }
  331. }
  332. func TestRoundRobinConnPoolRoundRobin(t *testing.T) {
  333. // create 5 test servers
  334. servers := make([]*TestServer, 5)
  335. addrs := make([]string, len(servers))
  336. for n := 0; n < len(servers); n++ {
  337. servers[n] = NewTestServer(t, defaultProto)
  338. addrs[n] = servers[n].Address
  339. defer servers[n].Stop()
  340. }
  341. // create a new cluster using the policy-based round robin conn pool
  342. cluster := NewCluster(addrs...)
  343. cluster.ConnPoolType = NewRoundRobinConnPool
  344. db, err := cluster.CreateSession()
  345. if err != nil {
  346. t.Fatalf("failed to create a new session: %v", err)
  347. }
  348. // Sleep to allow the pool to fill
  349. time.Sleep(100 * time.Millisecond)
  350. // run concurrent queries against the pool, server usage should
  351. // be even
  352. var wg sync.WaitGroup
  353. wg.Add(5)
  354. for n := 0; n < 5; n++ {
  355. go func() {
  356. for j := 0; j < 5; j++ {
  357. if err := db.Query("void").Exec(); err != nil {
  358. t.Errorf("Query failed with error: %v", err)
  359. }
  360. }
  361. wg.Done()
  362. }()
  363. }
  364. wg.Wait()
  365. db.Close()
  366. // wait for the pool to drain
  367. time.Sleep(100 * time.Millisecond)
  368. size := db.Pool.Size()
  369. if size != 0 {
  370. t.Errorf("connection pool did not drain, still contains %d connections", size)
  371. }
  372. // verify that server usage is even
  373. diff := 0
  374. for n := 1; n < len(servers); n++ {
  375. d := 0
  376. if servers[n].nreq > servers[n-1].nreq {
  377. d = int(servers[n].nreq - servers[n-1].nreq)
  378. } else {
  379. d = int(servers[n-1].nreq - servers[n].nreq)
  380. }
  381. if d > diff {
  382. diff = d
  383. }
  384. }
  385. if diff > 0 {
  386. t.Errorf("expected 0 difference in usage but was %d", diff)
  387. }
  388. }
  389. // This tests that the policy connection pool handles SSL correctly
  390. func TestPolicyConnPoolSSL(t *testing.T) {
  391. srv := NewSSLTestServer(t, defaultProto)
  392. defer srv.Stop()
  393. cluster := createTestSslCluster(srv.Address, defaultProto, true)
  394. cluster.ConnPoolType = NewRoundRobinConnPool
  395. db, err := cluster.CreateSession()
  396. if err != nil {
  397. t.Fatalf("failed to create new session: %v", err)
  398. }
  399. if err := db.Query("void").Exec(); err != nil {
  400. t.Errorf("query failed due to error: %v", err)
  401. }
  402. db.Close()
  403. // wait for the pool to drain
  404. time.Sleep(100 * time.Millisecond)
  405. size := db.Pool.Size()
  406. if size != 0 {
  407. t.Errorf("connection pool did not drain, still contains %d connections", size)
  408. }
  409. }
  410. func NewTestServer(t testing.TB, protocol uint8) *TestServer {
  411. laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
  412. if err != nil {
  413. t.Fatal(err)
  414. }
  415. listen, err := net.ListenTCP("tcp", laddr)
  416. if err != nil {
  417. t.Fatal(err)
  418. }
  419. headerSize := 8
  420. if protocol > protoVersion2 {
  421. headerSize = 9
  422. }
  423. srv := &TestServer{
  424. Address: listen.Addr().String(),
  425. listen: listen,
  426. t: t,
  427. protocol: protocol,
  428. headerSize: headerSize,
  429. }
  430. go srv.serve()
  431. return srv
  432. }
  433. func NewSSLTestServer(t testing.TB, protocol uint8) *TestServer {
  434. pem, err := ioutil.ReadFile("testdata/pki/ca.crt")
  435. certPool := x509.NewCertPool()
  436. if !certPool.AppendCertsFromPEM(pem) {
  437. t.Errorf("Failed parsing or appending certs")
  438. }
  439. mycert, err := tls.LoadX509KeyPair("testdata/pki/cassandra.crt", "testdata/pki/cassandra.key")
  440. if err != nil {
  441. t.Errorf("could not load cert")
  442. }
  443. config := &tls.Config{
  444. Certificates: []tls.Certificate{mycert},
  445. RootCAs: certPool,
  446. }
  447. listen, err := tls.Listen("tcp", "127.0.0.1:0", config)
  448. if err != nil {
  449. t.Fatal(err)
  450. }
  451. headerSize := 8
  452. if protocol > protoVersion2 {
  453. headerSize = 9
  454. }
  455. srv := &TestServer{
  456. Address: listen.Addr().String(),
  457. listen: listen,
  458. t: t,
  459. protocol: protocol,
  460. headerSize: headerSize,
  461. }
  462. go srv.serve()
  463. return srv
  464. }
  465. type TestServer struct {
  466. Address string
  467. t testing.TB
  468. nreq uint64
  469. listen net.Listener
  470. nKillReq int64
  471. compressor Compressor
  472. protocol byte
  473. headerSize int
  474. }
  475. func (srv *TestServer) serve() {
  476. defer srv.listen.Close()
  477. for {
  478. conn, err := srv.listen.Accept()
  479. if err != nil {
  480. break
  481. }
  482. go func(conn net.Conn) {
  483. defer conn.Close()
  484. for {
  485. framer, err := srv.readFrame(conn)
  486. if err != nil {
  487. if err == io.EOF {
  488. return
  489. }
  490. srv.t.Error(err)
  491. return
  492. }
  493. atomic.AddUint64(&srv.nreq, 1)
  494. go srv.process(framer)
  495. }
  496. }(conn)
  497. }
  498. }
  499. func (srv *TestServer) Stop() {
  500. srv.listen.Close()
  501. }
  502. func (srv *TestServer) process(f *framer) {
  503. head := f.header
  504. if head == nil {
  505. srv.t.Error("process frame with a nil header")
  506. return
  507. }
  508. switch head.op {
  509. case opStartup:
  510. f.writeHeader(0, opReady, head.stream)
  511. case opOptions:
  512. f.writeHeader(0, opSupported, head.stream)
  513. f.writeShort(0)
  514. case opQuery:
  515. query := f.readLongString()
  516. first := query
  517. if n := strings.Index(query, " "); n > 0 {
  518. first = first[:n]
  519. }
  520. switch strings.ToLower(first) {
  521. case "kill":
  522. atomic.AddInt64(&srv.nKillReq, 1)
  523. f.writeHeader(0, opError, head.stream)
  524. f.writeInt(0x1001)
  525. f.writeString("query killed")
  526. case "slow":
  527. go func() {
  528. <-time.After(1 * time.Second)
  529. f.writeHeader(0, opResult, head.stream)
  530. f.wbuf[0] = srv.protocol | 0x80
  531. f.writeInt(resultKindVoid)
  532. if err := f.finishWrite(); err != nil {
  533. srv.t.Error(err)
  534. }
  535. }()
  536. return
  537. case "use":
  538. f.writeInt(resultKindKeyspace)
  539. f.writeString(strings.TrimSpace(query[3:]))
  540. case "void":
  541. f.writeHeader(0, opResult, head.stream)
  542. f.writeInt(resultKindVoid)
  543. default:
  544. f.writeHeader(0, opResult, head.stream)
  545. f.writeInt(resultKindVoid)
  546. }
  547. default:
  548. f.writeHeader(0, opError, head.stream)
  549. f.writeInt(0)
  550. f.writeString("not supported")
  551. }
  552. f.wbuf[0] = srv.protocol | 0x80
  553. if err := f.finishWrite(); err != nil {
  554. srv.t.Error(err)
  555. }
  556. }
  557. func (srv *TestServer) readFrame(conn net.Conn) (*framer, error) {
  558. buf := make([]byte, srv.headerSize)
  559. head, err := readHeader(conn, buf)
  560. if err != nil {
  561. return nil, err
  562. }
  563. framer := newFramer(conn, conn, nil, srv.protocol)
  564. err = framer.readFrame(&head)
  565. if err != nil {
  566. return nil, err
  567. }
  568. // should be a request frame
  569. if head.version.response() {
  570. return nil, fmt.Errorf("expected to read a request frame got version: %v", head.version)
  571. } else if head.version.version() != srv.protocol {
  572. return nil, fmt.Errorf("expected to read protocol version 0x%x got 0x%x", srv.protocol, head.version.version())
  573. }
  574. return framer, nil
  575. }