فهرست منبع

Add AwaitSchemaAgreement to Session (#1395)

Daniel Lohse 5 سال پیش
والد
کامیت
92af2e0885
2فایلهای تغییر یافته به همراه25 افزوده شده و 0 حذف شده
  1. 10 0
      integration_test.go
  2. 15 0
      session.go

+ 10 - 0
integration_test.go

@@ -4,6 +4,7 @@ package gocql
 
 // This file groups integration tests where Cassandra has to be set up with some special integration variables
 import (
+	"context"
 	"reflect"
 	"testing"
 	"time"
@@ -172,6 +173,15 @@ func TestCustomPayloadValues(t *testing.T) {
 	}
 }
 
+func TestSessionAwaitSchemaAgreement(t *testing.T) {
+	session := createSession(t)
+	defer session.Close()
+
+	if err := session.AwaitSchemaAgreement(context.Background()); err != nil {
+		t.Fatalf("expected session.AwaitSchemaAgreement to not return an error but got '%v'", err)
+	}
+}
+
 func TestUDF(t *testing.T) {
 	session := createSession(t)
 	defer session.Close()

+ 15 - 0
session.go

@@ -283,6 +283,21 @@ func (s *Session) init() error {
 	return nil
 }
 
+// AwaitSchemaAgreement will wait until schema versions across all nodes in the
+// cluster are the same (as seen from the point of view of the control connection).
+// The maximum amount of time this takes is governed
+// by the MaxWaitSchemaAgreement setting in the configuration (default: 60s).
+// AwaitSchemaAgreement returns an error in case schema versions are not the same
+// after the timeout specified in MaxWaitSchemaAgreement elapses.
+func (s *Session) AwaitSchemaAgreement(ctx context.Context) error {
+	if s.cfg.disableControlConn {
+		return errNoControl
+	}
+	return s.control.withConn(func(conn *Conn) *Iter {
+		return &Iter{err: conn.awaitSchemaAgreement(ctx)}
+	}).err
+}
+
 func (s *Session) reconnectDownedHosts(intv time.Duration) {
 	reconnectTicker := time.NewTicker(intv)
 	defer reconnectTicker.Stop()