|
@@ -256,6 +256,7 @@ func (s *Session) dial(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHa
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
go c.serve()
|
|
go c.serve()
|
|
|
|
|
+ go c.heartBeat()
|
|
|
|
|
|
|
|
return c, nil
|
|
return c, nil
|
|
|
}
|
|
}
|
|
@@ -506,6 +507,53 @@ func (p *protocolError) Error() string {
|
|
|
return fmt.Sprintf("gocql: received unexpected frame on stream %d: %v", p.frame.Header().stream, p.frame)
|
|
return fmt.Sprintf("gocql: received unexpected frame on stream %d: %v", p.frame.Header().stream, p.frame)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (c *Conn) heartBeat() {
|
|
|
|
|
+ sleepTime := 1 * time.Second
|
|
|
|
|
+ timer := time.NewTimer(sleepTime)
|
|
|
|
|
+ defer timer.Stop()
|
|
|
|
|
+
|
|
|
|
|
+ var failures int
|
|
|
|
|
+
|
|
|
|
|
+ for {
|
|
|
|
|
+ if failures > 5 {
|
|
|
|
|
+ c.closeWithError(fmt.Errorf("gocql: heartbeat failed"))
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ timer.Reset(sleepTime)
|
|
|
|
|
+
|
|
|
|
|
+ select {
|
|
|
|
|
+ case <-c.quit:
|
|
|
|
|
+ return
|
|
|
|
|
+ case <-timer.C:
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ framer, err := c.exec(context.Background(), &writeOptionsFrame{}, nil)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ failures++
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ resp, err := framer.parseFrame()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ // invalid frame
|
|
|
|
|
+ failures++
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ switch resp.(type) {
|
|
|
|
|
+ case *supportedFrame:
|
|
|
|
|
+ // Everything ok
|
|
|
|
|
+ sleepTime = 5 * time.Second
|
|
|
|
|
+ failures = 0
|
|
|
|
|
+ case error:
|
|
|
|
|
+ // TODO: should we do something here?
|
|
|
|
|
+ default:
|
|
|
|
|
+ panic(fmt.Sprintf("gocql: unknown frame in response to options: %T", resp))
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (c *Conn) recv() error {
|
|
func (c *Conn) recv() error {
|
|
|
// not safe for concurrent reads
|
|
// not safe for concurrent reads
|
|
|
|
|
|