浏览代码

Merge pull request #608 from Zariel/schema-events-clear-prepared-cache

Schema events clear prepared cache
Chris Bannister 10 年之前
父节点
当前提交
4fd9e05d55
共有 6 个文件被更改,包括 57 次插入50 次删除
  1. 2 0
      cluster.go
  2. 1 1
      conn.go
  3. 3 0
      control.go
  4. 31 28
      events.go
  5. 17 20
      frame.go
  6. 3 1
      session.go

+ 2 - 0
cluster.go

@@ -149,6 +149,8 @@ type ClusterConfig struct {
 		DisableNodeStatusEvents bool
 		// disable registering for topology events (node added/removed/moved)
 		DisableTopologyEvents bool
+		// disable registering for schema events (keyspace/table/function removed/created/updated)
+		DisableSchemaEvents bool
 	}
 
 	// internal config for testing

+ 1 - 1
conn.go

@@ -784,7 +784,7 @@ func (c *Conn) executeQuery(qry *Query) *Iter {
 		return iter
 	case *resultKeyspaceFrame:
 		return &Iter{framer: framer}
-	case *resultSchemaChangeFrame, *schemaChangeKeyspace, *schemaChangeTable, *schemaChangeFunction:
+	case *schemaChangeKeyspace, *schemaChangeTable, *schemaChangeFunction:
 		// Clear the statments cache so that we dont use stale table info for requests.
 		// TODO: only reset a specific table/keyapce and only when it is changed.
 		c.session.stmtsLRU.clear()

+ 3 - 0
control.go

@@ -131,6 +131,9 @@ func (c *controlConn) registerEvents(conn *Conn) error {
 	if !c.session.cfg.Events.DisableNodeStatusEvents {
 		events = append(events, "STATUS_CHANGE")
 	}
+	if !c.session.cfg.Events.DisableSchemaEvents {
+		events = append(events, "SCHEMA_CHANGE")
+	}
 
 	if len(events) == 0 {
 		return nil

+ 31 - 28
events.go

@@ -80,6 +80,37 @@ func (e *eventDeouncer) debounce(frame frame) {
 	e.mu.Unlock()
 }
 
+func (s *Session) handleEvent(framer *framer) {
+	// TODO(zariel): need to debounce events frames, and possible also events
+	defer framerPool.Put(framer)
+
+	frame, err := framer.parseFrame()
+	if err != nil {
+		// TODO: logger
+		log.Printf("gocql: unable to parse event frame: %v\n", err)
+		return
+	}
+
+	if debug {
+		log.Printf("gocql: handling frame: %v\n", frame)
+	}
+
+	// TODO: handle medatadata events
+	switch f := frame.(type) {
+	case *schemaChangeKeyspace, *schemaChangeFunction, *schemaChangeTable:
+		s.schemaEvents.debounce(frame)
+	case *topologyChangeEventFrame, *statusChangeEventFrame:
+		s.nodeEvents.debounce(frame)
+	default:
+		log.Printf("gocql: invalid event frame (%T): %v\n", f, f)
+	}
+}
+
+func (s *Session) handleSchemaEvent(frames []frame) {
+	// for now we dont care about them, just reset the prepared statements
+	s.stmtsLRU.clear()
+}
+
 func (s *Session) handleNodeEvent(frames []frame) {
 	type nodeEvent struct {
 		change string
@@ -131,34 +162,6 @@ func (s *Session) handleNodeEvent(frames []frame) {
 	}
 }
 
-func (s *Session) handleEvent(framer *framer) {
-	// TODO(zariel): need to debounce events frames, and possible also events
-	defer framerPool.Put(framer)
-
-	frame, err := framer.parseFrame()
-	if err != nil {
-		// TODO: logger
-		log.Printf("gocql: unable to parse event frame: %v\n", err)
-		return
-	}
-
-	if debug {
-		log.Printf("gocql: handling frame: %v\n", frame)
-	}
-
-	// TODO: handle medatadata events
-	switch f := frame.(type) {
-	case *schemaChangeKeyspace:
-	case *schemaChangeFunction:
-	case *schemaChangeTable:
-	case *topologyChangeEventFrame, *statusChangeEventFrame:
-		s.nodeEvents.debounce(frame)
-	default:
-		log.Printf("gocql: invalid event frame (%T): %v\n", f, f)
-	}
-
-}
-
 func (s *Session) handleNewNode(host net.IP, port int, waitForBinary bool) {
 	// TODO(zariel): need to be able to filter discovered nodes
 

+ 17 - 20
frame.go

@@ -1007,18 +1007,6 @@ func (f *framer) parseResultPrepared() frame {
 	return frame
 }
 
-type resultSchemaChangeFrame struct {
-	frameHeader
-
-	change   string
-	keyspace string
-	table    string
-}
-
-func (s *resultSchemaChangeFrame) String() string {
-	return fmt.Sprintf("[result_schema_change change=%s keyspace=%s table=%s]", s.change, s.keyspace, s.table)
-}
-
 type schemaChangeKeyspace struct {
 	frameHeader
 
@@ -1053,15 +1041,24 @@ type schemaChangeFunction struct {
 
 func (f *framer) parseResultSchemaChange() frame {
 	if f.proto <= protoVersion2 {
-		frame := &resultSchemaChangeFrame{
-			frameHeader: *f.header,
-		}
-
-		frame.change = f.readString()
-		frame.keyspace = f.readString()
-		frame.table = f.readString()
+		change := f.readString()
+		keyspace := f.readString()
+		table := f.readString()
 
-		return frame
+		if table != "" {
+			return &schemaChangeTable{
+				frameHeader: *f.header,
+				change:      change,
+				keyspace:    keyspace,
+				object:      table,
+			}
+		} else {
+			return &schemaChangeKeyspace{
+				frameHeader: *f.header,
+				change:      change,
+				keyspace:    keyspace,
+			}
+		}
 	} else {
 		change := f.readString()
 		target := f.readString()

+ 3 - 1
session.go

@@ -48,7 +48,8 @@ type Session struct {
 	control *controlConn
 
 	// event handlers
-	nodeEvents *eventDeouncer
+	nodeEvents   *eventDeouncer
+	schemaEvents *eventDeouncer
 
 	// ring metadata
 	hosts []HostInfo
@@ -102,6 +103,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
 	s.connCfg = connCfg
 
 	s.nodeEvents = newEventDeouncer("NodeEvents", s.handleNodeEvent)
+	s.schemaEvents = newEventDeouncer("SchemaEvents", s.handleSchemaEvent)
 
 	s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo)