Gary Burd 11 лет назад
Родитель
Сommit
cdedc64e27
2 измененных файлов с 411 добавлено и 0 удалено
  1. 152 0
      redisx/connmux.go
  2. 259 0
      redisx/connmux_test.go

+ 152 - 0
redisx/connmux.go

@@ -0,0 +1,152 @@
+// Copyright 2014 Gary Burd
+//
+// Licensed under the Apache License, Version 2.0 (the "License"): you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
+// under the License.
+
+package redisx
+
+import (
+	"errors"
+	"sync"
+
+	"github.com/garyburd/redigo/internal"
+	"github.com/garyburd/redigo/redis"
+)
+
+// ConnMux multiplexes one or more connections to a single underlying
+// connection. The ConnMux connections do not support concurrency, commands
+// that associate server side state with the connection or commands that put
+// the connection in a special mode.
+type ConnMux struct {
+	c redis.Conn
+
+	sendMu sync.Mutex
+	sendID uint
+
+	recvMu   sync.Mutex
+	recvID   uint
+	recvWait map[uint]chan struct{}
+}
+
+func NewConnMux(c redis.Conn) *ConnMux {
+	return &ConnMux{c: c, recvWait: make(map[uint]chan struct{})}
+}
+
+// Get gets a connection. The application must close the returned connection.
+func (p *ConnMux) Get() redis.Conn {
+	c := &muxConn{p: p}
+	c.ids = c.buf[:0]
+	return c
+}
+
+// Close closes the underlying connection.
+func (p *ConnMux) Close() error {
+	return p.c.Close()
+}
+
+type muxConn struct {
+	p   *ConnMux
+	ids []uint
+	buf [8]uint
+}
+
+func (c *muxConn) send(flush bool, cmd string, args ...interface{}) error {
+	if internal.LookupCommandInfo(cmd).Set != 0 {
+		return errors.New("command not supported by mux pool")
+	}
+	p := c.p
+	p.sendMu.Lock()
+	id := p.sendID
+	c.ids = append(c.ids, id)
+	p.sendID++
+	err := p.c.Send(cmd, args...)
+	if flush {
+		err = p.c.Flush()
+	}
+	p.sendMu.Unlock()
+	return err
+}
+
+func (c *muxConn) Send(cmd string, args ...interface{}) error {
+	return c.send(false, cmd, args...)
+}
+
+func (c *muxConn) Flush() error {
+	p := c.p
+	p.sendMu.Lock()
+	err := p.c.Flush()
+	p.sendMu.Unlock()
+	return err
+}
+
+func (c *muxConn) Receive() (interface{}, error) {
+	if len(c.ids) == 0 {
+		return nil, errors.New("mux pool underflow")
+	}
+
+	id := c.ids[0]
+	c.ids = c.ids[1:]
+	if len(c.ids) == 0 {
+		c.ids = c.buf[:0]
+	}
+
+	p := c.p
+	p.recvMu.Lock()
+	if p.recvID != id {
+		ch := make(chan struct{})
+		p.recvWait[id] = ch
+		p.recvMu.Unlock()
+		<-ch
+		p.recvMu.Lock()
+		if p.recvID != id {
+			panic("out of sync")
+		}
+	}
+
+	v, err := p.c.Receive()
+
+	id++
+	p.recvID = id
+	ch, ok := p.recvWait[id]
+	if ok {
+		delete(p.recvWait, id)
+	}
+	p.recvMu.Unlock()
+	if ok {
+		ch <- struct{}{}
+	}
+
+	return v, err
+}
+
+func (c *muxConn) Close() error {
+	var err error
+	if len(c.ids) == 0 {
+		return nil
+	}
+	c.Flush()
+	for range c.ids {
+		_, err = c.Receive()
+	}
+	return err
+}
+
+func (c *muxConn) Do(cmd string, args ...interface{}) (interface{}, error) {
+	if err := c.send(true, cmd, args...); err != nil {
+		return nil, err
+	}
+	return c.Receive()
+}
+
+func (c *muxConn) Err() error {
+	return c.p.c.Err()
+}

+ 259 - 0
redisx/connmux_test.go

@@ -0,0 +1,259 @@
+// Copyright 2014 Gary Burd
+//
+// Licensed under the Apache License, Version 2.0 (the "License"): you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// License for the specific language governing permissions and limitations
+// under the License.
+
+package redisx_test
+
+import (
+	"net/textproto"
+	"sync"
+	"testing"
+
+	"github.com/garyburd/redigo/internal/redistest"
+	"github.com/garyburd/redigo/redis"
+	"github.com/garyburd/redigo/redisx"
+)
+
+func TestConnMux(t *testing.T) {
+	c, err := redistest.Dial()
+	if err != nil {
+		t.Fatalf("error connection to database, %v", err)
+	}
+	m := redisx.NewConnMux(c)
+	defer m.Close()
+
+	c1 := m.Get()
+	c2 := m.Get()
+	c1.Send("ECHO", "hello")
+	c2.Send("ECHO", "world")
+	c1.Flush()
+	c2.Flush()
+	s, err := redis.String(c1.Receive())
+	if err != nil {
+		t.Fatal(err)
+	}
+	if s != "hello" {
+		t.Fatalf("echo returned %q, want %q", s, "hello")
+	}
+	s, err = redis.String(c2.Receive())
+	if err != nil {
+		t.Fatal(err)
+	}
+	if s != "world" {
+		t.Fatalf("echo returned %q, want %q", s, "world")
+	}
+	c1.Close()
+	c2.Close()
+}
+
+func TestConnMuxClose(t *testing.T) {
+	c, err := redistest.Dial()
+	if err != nil {
+		t.Fatalf("error connection to database, %v", err)
+	}
+	m := redisx.NewConnMux(c)
+	defer m.Close()
+
+	c1 := m.Get()
+	c2 := m.Get()
+
+	if err := c1.Send("ECHO", "hello"); err != nil {
+		t.Fatal(err)
+	}
+	if err := c1.Close(); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := c2.Send("ECHO", "world"); err != nil {
+		t.Fatal(err)
+	}
+	if err := c2.Flush(); err != nil {
+		t.Fatal(err)
+	}
+
+	s, err := redis.String(c2.Receive())
+	if err != nil {
+		t.Fatal(err)
+	}
+	if s != "world" {
+		t.Fatalf("echo returned %q, want %q", s, "world")
+	}
+	c2.Close()
+}
+
+func BenchmarkConn(b *testing.B) {
+	b.StopTimer()
+	c, err := redistest.Dial()
+	if err != nil {
+		b.Fatalf("error connection to database, %v", err)
+	}
+	defer c.Close()
+	b.StartTimer()
+
+	for i := 0; i < b.N; i++ {
+		if _, err := c.Do("PING"); err != nil {
+			b.Fatal(err)
+		}
+	}
+}
+
+func BenchmarkConnMux(b *testing.B) {
+	b.StopTimer()
+	c, err := redistest.Dial()
+	if err != nil {
+		b.Fatalf("error connection to database, %v", err)
+	}
+	m := redisx.NewConnMux(c)
+	defer m.Close()
+
+	b.StartTimer()
+
+	for i := 0; i < b.N; i++ {
+		c := m.Get()
+		if _, err := c.Do("PING"); err != nil {
+			b.Fatal(err)
+		}
+		c.Close()
+	}
+}
+
+func BenchmarkPool(b *testing.B) {
+	b.StopTimer()
+
+	p := redis.Pool{Dial: redistest.Dial, MaxIdle: 1}
+	defer p.Close()
+
+	// Fill the pool.
+	c := p.Get()
+	if err := c.Err(); err != nil {
+		b.Fatal(err)
+	}
+	c.Close()
+
+	b.StartTimer()
+
+	for i := 0; i < b.N; i++ {
+		c := p.Get()
+		if _, err := c.Do("PING"); err != nil {
+			b.Fatal(err)
+		}
+		c.Close()
+	}
+}
+
+const numConcurrent = 10
+
+func BenchmarkConnMuxConcurrent(b *testing.B) {
+	b.StopTimer()
+	c, err := redistest.Dial()
+	if err != nil {
+		b.Fatalf("error connection to database, %v", err)
+	}
+	defer c.Close()
+
+	m := redisx.NewConnMux(c)
+
+	var wg sync.WaitGroup
+	wg.Add(numConcurrent)
+
+	b.StartTimer()
+
+	for i := 0; i < numConcurrent; i++ {
+		go func() {
+			defer wg.Done()
+			for i := 0; i < b.N; i++ {
+				c := m.Get()
+				if _, err := c.Do("PING"); err != nil {
+					b.Fatal(err)
+				}
+				c.Close()
+			}
+		}()
+	}
+	wg.Wait()
+}
+
+func BenchmarkPoolConcurrent(b *testing.B) {
+	b.StopTimer()
+
+	p := redis.Pool{Dial: redistest.Dial, MaxIdle: numConcurrent}
+	defer p.Close()
+
+	// Fill the pool.
+	conns := make([]redis.Conn, numConcurrent)
+	for i := range conns {
+		c := p.Get()
+		if err := c.Err(); err != nil {
+			b.Fatal(err)
+		}
+		conns[i] = c
+	}
+	for _, c := range conns {
+		c.Close()
+	}
+
+	var wg sync.WaitGroup
+	wg.Add(numConcurrent)
+
+	b.StartTimer()
+
+	for i := 0; i < numConcurrent; i++ {
+		go func() {
+			defer wg.Done()
+			for i := 0; i < b.N; i++ {
+				c := p.Get()
+				if _, err := c.Do("PING"); err != nil {
+					b.Fatal(err)
+				}
+				c.Close()
+			}
+		}()
+	}
+	wg.Wait()
+}
+
+func BenchmarkPipelineConcurrency(b *testing.B) {
+	b.StopTimer()
+	c, err := redistest.Dial()
+	if err != nil {
+		b.Fatalf("error connection to database, %v", err)
+	}
+	defer c.Close()
+
+	var wg sync.WaitGroup
+	wg.Add(numConcurrent)
+
+	var pipeline textproto.Pipeline
+
+	b.StartTimer()
+
+	for i := 0; i < numConcurrent; i++ {
+		go func() {
+			defer wg.Done()
+			for i := 0; i < b.N; i++ {
+				id := pipeline.Next()
+				pipeline.StartRequest(id)
+				c.Send("PING")
+				c.Flush()
+				pipeline.EndRequest(id)
+				pipeline.StartResponse(id)
+				_, err := c.Receive()
+				if err != nil {
+					b.Fatal(err)
+				}
+				pipeline.EndResponse(id)
+			}
+		}()
+	}
+	wg.Wait()
+}