Переглянути джерело

added connection timeouts, simple round robin and a TestServer

Christoph Hack 12 роки тому
батько
коміт
fd3db9e196
4 змінених файлів з 407 додано та 197 видалено
  1. 14 3
      binary.go
  2. 135 56
      conn.go
  3. 87 51
      gocql.go
  4. 171 87
      gocql_test.go

+ 14 - 3
binary.go

@@ -1,3 +1,7 @@
+// Copyright (c) 2012 The gocql Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
 package gocql
 
 import (
@@ -105,9 +109,6 @@ func (b *buffer) writeInet(ip net.IP, port int) {
 	b.writeInt(int32(port))
 }
 
-func (b *buffer) writeConsistency() {
-}
-
 func (b *buffer) writeStringMap(v map[string]string) {
 	b.writeShort(uint16(len(v)))
 	for key, value := range v {
@@ -185,6 +186,16 @@ func (b *buffer) readString() string {
 	return v
 }
 
+func (b *buffer) readLongString() string {
+	n := b.readInt()
+	if len(*b) < n {
+		panic(ErrInvalid)
+	}
+	v := string((*b)[:n])
+	*b = (*b)[n:]
+	return v
+}
+
 func (b *buffer) readBytes() []byte {
 	n := b.readInt()
 	if n < 0 {

+ 135 - 56
conn.go

@@ -1,27 +1,26 @@
+// Copyright (c) 2012 The gocql Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
 package gocql
 
 import (
-	"io"
 	"net"
 	"sync"
 	"sync/atomic"
+	"time"
 )
 
-type queryInfo struct {
-	id    []byte
-	args  []columnInfo
-	rval  []columnInfo
-	avail chan bool
-}
-
 type connection struct {
-	conn    net.Conn
-	uniq    chan uint8
-	reply   []chan buffer
-	waiting uint64
+	conn     net.Conn
+	uniq     chan uint8
+	requests []frameRequest
+	nwait    int32
 
 	prepMu sync.Mutex
 	prep   map[string]*queryInfo
+
+	timeout time.Duration
 }
 
 func connect(addr string, cfg *Config) (*connection, error) {
@@ -30,16 +29,17 @@ func connect(addr string, cfg *Config) (*connection, error) {
 		return nil, err
 	}
 	c := &connection{
-		conn:  conn,
-		uniq:  make(chan uint8, 64),
-		reply: make([]chan buffer, 64),
-		prep:  make(map[string]*queryInfo),
+		conn:     conn,
+		uniq:     make(chan uint8, 64),
+		requests: make([]frameRequest, 64),
+		prep:     make(map[string]*queryInfo),
+		timeout:  cfg.Timeout,
 	}
 	for i := 0; i < cap(c.uniq); i++ {
 		c.uniq <- uint8(i)
 	}
 
-	go c.recv()
+	go c.run()
 
 	frame := make(buffer, headerSize)
 	frame.setHeader(protoRequest, 0, 0, opStartup)
@@ -48,7 +48,10 @@ func connect(addr string, cfg *Config) (*connection, error) {
 	})
 	frame.setLength(len(frame) - headerSize)
 
-	frame = c.request(frame)
+	frame, err = c.request(frame)
+	if err != nil {
+		return nil, err
+	}
 
 	if cfg.Keyspace != "" {
 		qry := &Query{stmt: "USE " + cfg.Keyspace}
@@ -58,56 +61,103 @@ func connect(addr string, cfg *Config) (*connection, error) {
 	return c, nil
 }
 
-func (c *connection) recv() {
+func (c *connection) run() {
+	var err error
 	for {
-		frame := make(buffer, headerSize, headerSize+512)
-		if _, err := io.ReadFull(c.conn, frame); err != nil {
-			return
+		var frame buffer
+		frame, err = c.recv()
+		if err != nil {
+			break
 		}
-		if frame[0] != protoResponse {
-			continue
+		c.dispatch(frame)
+	}
+
+	c.conn.Close()
+	for id := 0; id < len(c.requests); id++ {
+		req := &c.requests[id]
+		if atomic.LoadInt32(&req.active) == 1 {
+			req.reply <- frameReply{nil, err}
 		}
-		if length := frame.Length(); length > 0 {
+	}
+}
+
+func (c *connection) recv() (buffer, error) {
+	frame := make(buffer, headerSize, headerSize+512)
+	c.conn.SetReadDeadline(time.Now().Add(c.timeout))
+	n, last, pinged := 0, 0, false
+	for n < len(frame) {
+		nn, err := c.conn.Read(frame[n:])
+		n += nn
+		if err != nil {
+			if err, ok := err.(net.Error); ok && err.Timeout() {
+				if n > last {
+					// we hit the deadline but we made progress.
+					// simply extend the deadline
+					c.conn.SetReadDeadline(time.Now().Add(c.timeout))
+					last = n
+				} else if n == 0 && !pinged {
+					c.conn.SetReadDeadline(time.Now().Add(c.timeout))
+					if atomic.LoadInt32(&c.nwait) > 0 {
+						go c.ping()
+						pinged = true
+					}
+				} else {
+					return nil, err
+				}
+			} else {
+				return nil, err
+			}
+		}
+		if n == headerSize && len(frame) == headerSize {
+			if frame[0] != protoResponse {
+				return nil, ErrInvalid
+			}
 			frame.grow(frame.Length())
-			io.ReadFull(c.conn, frame[headerSize:])
 		}
-		c.dispatch(frame)
 	}
-	panic("not possible")
+	return frame, nil
+}
+
+func (c *connection) ping() error {
+	frame := make(buffer, headerSize, headerSize)
+	frame.setHeader(protoRequest, 0, 0, opOptions)
+	frame.setLength(0)
+
+	_, err := c.request(frame)
+	return err
 }
 
-func (c *connection) request(frame buffer) buffer {
+func (c *connection) request(frame buffer) (buffer, error) {
 	id := <-c.uniq
 	frame[2] = id
-	c.reply[id] = make(chan buffer, 1)
 
-	for {
-		w := atomic.LoadUint64(&c.waiting)
-		if atomic.CompareAndSwapUint64(&c.waiting, w, w|(1<<id)) {
-			break
-		}
+	req := &c.requests[id]
+	req.reply = make(chan frameReply, 1)
+	atomic.AddInt32(&c.nwait, 1)
+	atomic.StoreInt32(&req.active, 1)
+
+	if _, err := c.conn.Write(frame); err != nil {
+		return nil, err
 	}
-	c.conn.Write(frame)
-	resp := <-c.reply[id]
+
+	reply := <-req.reply
+	req.reply = nil
+
 	c.uniq <- id
-	return resp
+	return reply.buf, reply.err
 }
 
 func (c *connection) dispatch(frame buffer) {
-	id := frame[2]
-	if id >= 128 {
+	id := int(frame[2])
+	if id >= len(c.requests) {
 		return
 	}
-	for {
-		w := atomic.LoadUint64(&c.waiting)
-		if w&(1<<id) == 0 {
-			return
-		}
-		if atomic.CompareAndSwapUint64(&c.waiting, w, w&^(1<<id)) {
-			break
-		}
+	req := &c.requests[id]
+	if !atomic.CompareAndSwapInt32(&req.active, 1, 0) {
+		return
 	}
-	c.reply[id] <- frame
+	atomic.AddInt32(&c.nwait, -1)
+	req.reply <- frameReply{frame, nil}
 }
 
 func (c *connection) prepareQuery(stmt string) *queryInfo {
@@ -115,10 +165,11 @@ func (c *connection) prepareQuery(stmt string) *queryInfo {
 	info := c.prep[stmt]
 	if info != nil {
 		c.prepMu.Unlock()
-		<-info.avail
+		info.wg.Wait()
 		return info
 	}
-	info = &queryInfo{avail: make(chan bool)}
+	info = new(queryInfo)
+	info.wg.Add(1)
 	c.prep[stmt] = info
 	c.prepMu.Unlock()
 
@@ -127,13 +178,16 @@ func (c *connection) prepareQuery(stmt string) *queryInfo {
 	frame.writeLongString(stmt)
 	frame.setLength(len(frame) - headerSize)
 
-	frame = c.request(frame)
+	frame, err := c.request(frame)
+	if err != nil {
+		return nil
+	}
 	frame.skipHeader()
 	frame.readInt() // kind
 	info.id = frame.readShortBytes()
 	info.args = frame.readMetaData()
 	info.rval = frame.readMetaData()
-	close(info.avail)
+	info.wg.Done()
 	return info
 }
 
@@ -144,8 +198,13 @@ func (c *connection) executeQuery(query *Query) (buffer, error) {
 	}
 
 	frame := make(buffer, headerSize, headerSize+512)
-	frame.setHeader(protoRequest, 0, 0, opQuery)
-	frame.writeLongString(query.stmt)
+	if info == nil {
+		frame.setHeader(protoRequest, 0, 0, opQuery)
+		frame.writeLongString(query.stmt)
+	} else {
+		frame.setHeader(protoRequest, 0, 0, opExecute)
+		frame.writeShortBytes(info.id)
+	}
 	frame.writeShort(uint16(query.cons))
 	flags := uint8(0)
 	if len(query.args) > 0 {
@@ -164,7 +223,10 @@ func (c *connection) executeQuery(query *Query) (buffer, error) {
 	}
 	frame.setLength(len(frame) - headerSize)
 
-	frame = c.request(frame)
+	frame, err := c.request(frame)
+	if err != nil {
+		return nil, err
+	}
 
 	if frame[3] == opError {
 		frame.skipHeader()
@@ -174,3 +236,20 @@ func (c *connection) executeQuery(query *Query) (buffer, error) {
 	}
 	return frame, nil
 }
+
+type queryInfo struct {
+	id   []byte
+	args []columnInfo
+	rval []columnInfo
+	wg   sync.WaitGroup
+}
+
+type frameRequest struct {
+	active int32
+	reply  chan frameReply
+}
+
+type frameReply struct {
+	buf buffer
+	err error
+}

+ 87 - 51
gocql.go

@@ -8,6 +8,9 @@ import (
 	"errors"
 	"fmt"
 	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
 )
 
 type Config struct {
@@ -16,6 +19,7 @@ type Config struct {
 	Keyspace    string
 	Consistency Consistency
 	DefaultPort int
+	Timeout     time.Duration
 }
 
 func (c *Config) normalize() {
@@ -25,6 +29,9 @@ func (c *Config) normalize() {
 	if c.DefaultPort == 0 {
 		c.DefaultPort = 9042
 	}
+	if c.Timeout <= 0 {
+		c.Timeout = 200 * time.Millisecond
+	}
 	for i := 0; i < len(c.Nodes); i++ {
 		c.Nodes[i] = strings.TrimSpace(c.Nodes[i])
 		if strings.IndexByte(c.Nodes[i], ':') < 0 {
@@ -34,20 +41,25 @@ func (c *Config) normalize() {
 }
 
 type Session struct {
-	cfg  *Config
-	pool []*connection
+	cfg      *Config
+	active   []*node
+	pos      uint32
+	mu       sync.RWMutex
+	keyspace string
 }
 
 func NewSession(cfg Config) *Session {
 	cfg.normalize()
-	pool := make([]*connection, 0, len(cfg.Nodes))
+	active := make([]*node, 0, len(cfg.Nodes))
 	for _, address := range cfg.Nodes {
 		con, err := connect(address, &cfg)
 		if err == nil {
-			pool = append(pool, con)
+			active = append(active, &node{con})
+		} else {
+			fmt.Println("connect", err)
 		}
 	}
-	return &Session{cfg: &cfg, pool: pool}
+	return &Session{cfg: &cfg, active: active}
 }
 
 func (s *Session) Query(stmt string, args ...interface{}) *Query {
@@ -59,43 +71,32 @@ func (s *Session) Query(stmt string, args ...interface{}) *Query {
 	}
 }
 
-func (s *Session) executeQuery(query *Query) (buffer, error) {
-	// TODO(tux21b): do something clever here
-	return s.pool[0].executeQuery(query)
-}
-
 func (s *Session) Close() {
 	return
 }
 
-type Consistency uint16
-
-const (
-	ConAny         Consistency = 0x0000
-	ConOne         Consistency = 0x0001
-	ConTwo         Consistency = 0x0002
-	ConThree       Consistency = 0x0003
-	ConQuorum      Consistency = 0x0004
-	ConAll         Consistency = 0x0005
-	ConLocalQuorum Consistency = 0x0006
-	ConEachQuorum  Consistency = 0x0007
-	ConSerial      Consistency = 0x0008
-	ConLocalSerial Consistency = 0x0009
-)
-
-var ErrNotFound = errors.New("not found")
+func (s *Session) executeQuery(query *Query) (buffer, error) {
+	pos := atomic.AddUint32(&s.pos, 1)
+	var conn *connection
+	//var keyspace string
+	s.mu.RLock()
+	if len(s.active) == 0 {
+		s.mu.Unlock()
+		return nil, errors.New("no active nodes")
+	}
+	conn = s.active[pos%uint32(len(s.active))].conn
+	//keyspace = s.keyspace
+	s.mu.RUnlock()
+	return conn.executeQuery(query)
+}
 
 type Query struct {
 	stmt string
 	args []interface{}
 	cons Consistency
-	ctx  interface {
-		executeQuery(query *Query) (buffer, error)
-	}
+	ctx  queryContext
 }
 
-var ErrQueryUnbound = errors.New("can not execute unbound query")
-
 func NewQuery(stmt string) *Query {
 	return &Query{stmt: stmt, cons: ConQuorum}
 }
@@ -117,13 +118,19 @@ func (q *Query) Exec() error {
 	return nil
 }
 
-func (q *Query) request() (buffer, error) {
-	return q.ctx.executeQuery(q)
-}
-
-func (q *Query) Consistency(cons Consistency) *Query {
-	q.cons = cons
-	return q
+func (q *Query) Iter() *Iter {
+	iter := new(Iter)
+	frame, err := q.request()
+	if err != nil {
+		iter.err = err
+		return iter
+	}
+	frame.skipHeader()
+	kind := frame.readInt()
+	if kind == resultKindRows {
+		iter.setFrame(frame)
+	}
+	return iter
 }
 
 func (q *Query) Scan(values ...interface{}) error {
@@ -140,19 +147,13 @@ func (q *Query) Scan(values ...interface{}) error {
 	return nil
 }
 
-func (q *Query) Iter() *Iter {
-	iter := new(Iter)
-	frame, err := q.request()
-	if err != nil {
-		iter.err = err
-		return iter
-	}
-	frame.skipHeader()
-	kind := frame.readInt()
-	if kind == resultKindRows {
-		iter.setFrame(frame)
-	}
-	return iter
+func (q *Query) Consistency(cons Consistency) *Query {
+	q.cons = cons
+	return q
+}
+
+func (q *Query) request() (buffer, error) {
+	return q.ctx.executeQuery(q)
 }
 
 type Iter struct {
@@ -197,6 +198,10 @@ func (iter *Iter) Close() error {
 	return iter.err
 }
 
+type queryContext interface {
+	executeQuery(query *Query) (buffer, error)
+}
+
 type columnInfo struct {
 	Keyspace string
 	Table    string
@@ -204,6 +209,21 @@ type columnInfo struct {
 	TypeInfo *TypeInfo
 }
 
+type Consistency uint16
+
+const (
+	ConAny         Consistency = 0x0000
+	ConOne         Consistency = 0x0001
+	ConTwo         Consistency = 0x0002
+	ConThree       Consistency = 0x0003
+	ConQuorum      Consistency = 0x0004
+	ConAll         Consistency = 0x0005
+	ConLocalQuorum Consistency = 0x0006
+	ConEachQuorum  Consistency = 0x0007
+	ConSerial      Consistency = 0x0008
+	ConLocalSerial Consistency = 0x0009
+)
+
 type Error struct {
 	Code    int
 	Message string
@@ -212,3 +232,19 @@ type Error struct {
 func (e Error) Error() string {
 	return e.Message
 }
+
+var ErrNotFound = errors.New("not found")
+
+var ErrQueryUnbound = errors.New("can not execute unbound query")
+
+// active (choose round robin)
+// connecting
+// down
+
+// getNode()
+// getNextNode() für failover
+// getNodeForShard(key) ...
+
+type node struct {
+	conn *connection
+}

+ 171 - 87
gocql_test.go

@@ -1,124 +1,208 @@
+// Copyright (c) 2012 The gocql Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
 package gocql
 
 import (
-	"bytes"
 	"fmt"
+	"io"
+	"net"
+	"strings"
+	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 )
 
-func TestConnect(t *testing.T) {
-	db := NewSession(Config{
-		Nodes: []string{
-			"127.0.0.1",
-		},
-		Keyspace:    "system",
-		Consistency: ConQuorum,
-	})
-	defer db.Close()
+type TestServer struct {
+	Address string
+	t       *testing.T
+	nreq    uint64
+	listen  net.Listener
+}
 
-	for i := 0; i < 5; i++ {
-		db.Query("SELECT keyspace_name FROM schema_keyspaces WHERE keyspace_name = ?",
-			"system_auth").Exec()
+func NewTestServer(t *testing.T, address string) *TestServer {
+	listen, err := net.Listen("tcp", address)
+	if err != nil {
+		t.Fatal(err)
+	}
+	srv := &TestServer{Address: address, listen: listen, t: t}
+	go srv.serve()
+	return srv
+}
+
+func (srv *TestServer) serve() {
+	for {
+		conn, err := srv.listen.Accept()
+		if err != nil {
+			break
+		}
+		go func(conn net.Conn) {
+			defer conn.Close()
+			for {
+				frame := srv.readFrame(conn)
+				atomic.AddUint64(&srv.nreq, 1)
+				srv.process(frame, conn)
+			}
+		}(conn)
 	}
+}
+
+func (srv *TestServer) Stop() {
+	srv.listen.Close()
+}
 
-	var keyspace string
-	var durable bool
-	iter := db.Query("SELECT keyspace_name, durable_writes FROM schema_keyspaces").Iter()
-	for iter.Scan(&keyspace, &durable) {
-		fmt.Println("Keyspace:", keyspace, durable)
+func (srv *TestServer) process(frame buffer, conn net.Conn) {
+	switch frame[3] {
+	case opStartup:
+		frame = frame[:headerSize]
+		frame.setHeader(protoResponse, 0, frame[2], opReady)
+	case opQuery:
+		input := frame
+		input.skipHeader()
+		query := strings.TrimSpace(input.readLongString())
+		frame = frame[:headerSize]
+		frame.setHeader(protoResponse, 0, frame[2], opResult)
+		first := query
+		if n := strings.IndexByte(query, ' '); n > 0 {
+			first = first[:n]
+		}
+		switch strings.ToLower(first) {
+		case "kill":
+			select {}
+		case "delay":
+			go func() {
+				<-time.After(1 * time.Second)
+				frame.writeInt(0)
+				frame.setLength(len(frame) - headerSize)
+				if _, err := conn.Write(frame); err != nil {
+					return
+				}
+			}()
+			return
+		case "use":
+			frame.writeInt(3)
+			frame.writeString(strings.TrimSpace(query[3:]))
+		case "void":
+			frame.writeInt(0)
+		default:
+			frame.writeInt(0)
+		}
+	default:
+		frame = frame[:headerSize]
+		frame.setHeader(protoResponse, 0, frame[2], opError)
+		frame.writeInt(0)
+		frame.writeString("not supported")
 	}
-	if err := iter.Close(); err != nil {
-		fmt.Println(err)
+	frame.setLength(len(frame) - headerSize)
+	if _, err := conn.Write(frame); err != nil {
+		return
 	}
 }
 
-type Page struct {
-	Title      string
-	RevID      int
-	Body       string
-	Hits       int
-	Protected  bool
-	Modified   time.Time
-	Attachment []byte
+func (srv *TestServer) readFrame(conn net.Conn) buffer {
+	frame := make(buffer, headerSize, headerSize+512)
+	if _, err := io.ReadFull(conn, frame); err != nil {
+		srv.t.Fatal(err)
+	}
+	if n := frame.Length(); n > 0 {
+		frame.grow(n)
+		if _, err := io.ReadFull(conn, frame[headerSize:]); err != nil {
+			srv.t.Fatal(err)
+		}
+	}
+	return frame
 }
 
-var pages = []*Page{
-	&Page{"Frontpage", 1, "Hello world!", 0, false,
-		time.Date(2012, 8, 20, 10, 0, 0, 0, time.UTC), []byte{}},
-	&Page{"Frontpage", 2, "Hello modified world!", 0, false,
-		time.Date(2012, 8, 22, 10, 0, 0, 0, time.UTC), []byte("img data\x00")},
-	&Page{"LoremIpsum", 3, "Lorem ipsum dolor sit amet", 12,
-		true, time.Date(2012, 8, 22, 10, 0, 8, 0, time.UTC), []byte{}},
+func TestSimple(t *testing.T) {
+	srv := NewTestServer(t, "127.0.0.1:9051")
+	defer srv.Stop()
+
+	db := NewSession(Config{
+		Nodes:       []string{srv.Address},
+		Consistency: ConQuorum,
+	})
+	if err := db.Query("void").Exec(); err != nil {
+		//t.Error("Query", err)
+		return
+	}
 }
 
-func TestWiki(t *testing.T) {
+func TestTimeout(t *testing.T) {
+	srv := NewTestServer(t, "127.0.0.1:9051")
+	defer srv.Stop()
+
 	db := NewSession(Config{
-		Nodes:       []string{"localhost"},
+		Nodes:       []string{srv.Address},
 		Consistency: ConQuorum,
 	})
 
-	if err := db.Query("DROP KEYSPACE gocql_wiki").Exec(); err != nil {
-		t.Log("DROP KEYSPACE:", err)
-	}
+	go func() {
+		<-time.After(1 * time.Second)
+		t.Fatal("no timeout")
+	}()
 
-	if err := db.Query(`CREATE KEYSPACE gocql_wiki
-		WITH replication = {
-			'class' : 'SimpleStrategy',
-			'replication_factor' : 1
-		}`).Exec(); err != nil {
-		t.Fatal("CREATE KEYSPACE:", err)
+	if err := db.Query("kill").Exec(); err == nil {
+		t.Fatal("expected error")
 	}
+}
 
-	if err := db.Query("USE gocql_wiki").Exec(); err != nil {
-		t.Fatal("USE:", err)
-	}
+func TestLongQuery(t *testing.T) {
+	srv := NewTestServer(t, "127.0.0.1:9051")
+	defer srv.Stop()
 
-	if err := db.Query(`CREATE TABLE page (
-		title varchar,
-		revid int,
-		body varchar,
-		hits int,
-		protected boolean,
-		modified timestamp,
-		attachment blob,
-		PRIMARY KEY (title, revid)
-		)`).Exec(); err != nil {
-		t.Fatal("CREATE TABLE:", err)
-	}
+	db := NewSession(Config{
+		Nodes:       []string{srv.Address},
+		Consistency: ConQuorum,
+	})
 
-	for _, p := range pages {
-		if err := db.Query(`INSERT INTO page (title, revid, body, hits,
-			protected, modified, attachment) VALUES (?, ?, ?, ?, ?, ?, ?)`,
-			p.Title, p.RevID, p.Body, p.Hits, p.Protected, p.Modified,
-			p.Attachment).Exec(); err != nil {
-			t.Fatal("INSERT:", err)
-		}
+	if err := db.Query("delay").Exec(); err != nil {
+		t.Fatal(err)
 	}
+}
 
-	var count int
-	if err := db.Query("SELECT count(*) FROM page").Scan(&count); err != nil {
-		t.Fatal("COUNT:", err)
+func TestRoundRobin(t *testing.T) {
+	servers := make([]*TestServer, 5)
+	addrs := make([]string, len(servers))
+	for i := 0; i < len(servers); i++ {
+		addrs[i] = fmt.Sprintf("127.0.0.1:%d", 9051+i)
+		servers[i] = NewTestServer(t, addrs[i])
+		defer servers[i].Stop()
 	}
-	if count != len(pages) {
-		t.Fatalf("COUNT: expected %d got %d", len(pages), count)
+	db := NewSession(Config{
+		Nodes:       addrs,
+		Consistency: ConQuorum,
+	})
+
+	var wg sync.WaitGroup
+	wg.Add(5)
+	for i := 0; i < 5; i++ {
+		go func() {
+			for j := 0; j < 5; j++ {
+				if err := db.Query("void").Exec(); err != nil {
+					t.Fatal(err)
+				}
+			}
+			wg.Done()
+		}()
 	}
+	wg.Wait()
 
-	for _, page := range pages {
-		qry := db.Query(`SELECT title, revid, body, hits, protected,
-			modified, attachment
-		    FROM page WHERE title = ? AND revid = ?`, page.Title, page.RevID)
-		var p Page
-		if err := qry.Scan(&p.Title, &p.RevID, &p.Body, &p.Hits, &p.Protected,
-			&p.Modified, &p.Attachment); err != nil {
-			t.Fatal("SELECT PAGE:", err)
+	diff := 0
+	for i := 1; i < len(servers); i++ {
+		d := 0
+		if servers[i].nreq > servers[i-1].nreq {
+			d = int(servers[i].nreq - servers[i-1].nreq)
+		} else {
+			d = int(servers[i-1].nreq - servers[i].nreq)
 		}
-		p.Modified = p.Modified.In(time.UTC)
-		if page.Title != p.Title || page.RevID != p.RevID ||
-			page.Body != p.Body || page.Modified != p.Modified ||
-			page.Hits != p.Hits || page.Protected != p.Protected ||
-			!bytes.Equal(page.Attachment, p.Attachment) {
-			t.Errorf("expected %#v got %#v", *page, p)
+		if d > diff {
+			diff = d
 		}
 	}
+
+	if diff > 0 {
+		t.Fatal("diff:", diff)
+	}
 }