123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- package raft
- import (
- "fmt"
- "io/ioutil"
- "os"
- "time"
- )
- const (
- testHeartbeatInterval = 50 * time.Millisecond
- testElectionTimeout = 200 * time.Millisecond
- )
- const (
- testListenerLoggerEnabled = false
- )
- func init() {
- RegisterCommand(&testCommand1{})
- RegisterCommand(&testCommand2{})
- }
- //------------------------------------------------------------------------------
- //
- // Helpers
- //
- //------------------------------------------------------------------------------
- //--------------------------------------
- // Logs
- //--------------------------------------
- func getLogPath() string {
- f, _ := ioutil.TempFile("", "raft-log-")
- f.Close()
- os.Remove(f.Name())
- return f.Name()
- }
- func setupLog(entries []*LogEntry) (*Log, string) {
- f, _ := ioutil.TempFile("", "raft-log-")
- for _, entry := range entries {
- entry.Encode(f)
- }
- err := f.Close()
- if err != nil {
- panic(err)
- }
- log := newLog()
- log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
- return nil, nil
- }
- if err := log.open(f.Name()); err != nil {
- panic(err)
- }
- return log, f.Name()
- }
- //--------------------------------------
- // Servers
- //--------------------------------------
- func newTestServer(name string, transporter Transporter) Server {
- p, _ := ioutil.TempDir("", "raft-server-")
- if err := os.MkdirAll(p, 0644); err != nil {
- panic(err.Error())
- }
- server, _ := NewServer(name, p, transporter, nil, nil, "")
- if testListenerLoggerEnabled {
- fn := func(e Event) {
- server := e.Source().(Server)
- warnf("[%s] %s %v -> %v\n", server.Name(), e.Type(), e.PrevValue(), e.Value())
- }
- server.AddEventListener(StateChangeEventType, fn)
- server.AddEventListener(LeaderChangeEventType, fn)
- server.AddEventListener(TermChangeEventType, fn)
- }
- return server
- }
- func newTestServerWithPath(name string, transporter Transporter, p string) Server {
- server, _ := NewServer(name, p, transporter, nil, nil, "")
- return server
- }
- func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) Server {
- server := newTestServer(name, transporter)
- f, err := os.Create(server.LogPath())
- if err != nil {
- panic(err)
- }
- for _, entry := range entries {
- entry.Encode(f)
- }
- f.Close()
- return server
- }
- func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server {
- servers := []Server{}
- e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20})
- for _, name := range names {
- if lookup[name] != nil {
- panic(fmt.Sprintf("raft: Duplicate server in test cluster! %v", name))
- }
- server := newTestServerWithLog("1", transporter, []*LogEntry{e0})
- server.SetElectionTimeout(testElectionTimeout)
- servers = append(servers, server)
- lookup[name] = server
- }
- for _, server := range servers {
- server.SetHeartbeatInterval(testHeartbeatInterval)
- server.Start()
- for _, peer := range servers {
- server.AddPeer(peer.Name(), "")
- }
- }
- return servers
- }
- //--------------------------------------
- // Transporter
- //--------------------------------------
- type testTransporter struct {
- sendVoteRequestFunc func(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
- sendAppendEntriesRequestFunc func(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
- sendSnapshotRequestFunc func(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
- }
- func (t *testTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
- return t.sendVoteRequestFunc(server, peer, req)
- }
- func (t *testTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
- return t.sendAppendEntriesRequestFunc(server, peer, req)
- }
- func (t *testTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
- return t.sendSnapshotRequestFunc(server, peer, req)
- }
- func (t *testTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
- return t.SendSnapshotRecoveryRequest(server, peer, req)
- }
- type testStateMachine struct {
- saveFunc func() ([]byte, error)
- recoveryFunc func([]byte) error
- }
- func (sm *testStateMachine) Save() ([]byte, error) {
- return sm.saveFunc()
- }
- func (sm *testStateMachine) Recovery(state []byte) error {
- return sm.recoveryFunc(state)
- }
- //--------------------------------------
- // Command1
- //--------------------------------------
- type testCommand1 struct {
- Val string `json:"val"`
- I int `json:"i"`
- }
- func (c *testCommand1) CommandName() string {
- return "cmd_1"
- }
- func (c *testCommand1) Apply(server Server) (interface{}, error) {
- return nil, nil
- }
- //--------------------------------------
- // Command2
- //--------------------------------------
- type testCommand2 struct {
- X int `json:"x"`
- }
- func (c *testCommand2) CommandName() string {
- return "cmd_2"
- }
- func (c *testCommand2) Apply(server Server) (interface{}, error) {
- return nil, nil
- }
|