log_entry.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package raft
  2. import (
  3. "bytes"
  4. "code.google.com/p/goprotobuf/proto"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/benbjohnson/go-raft/protobuf"
  8. "io"
  9. )
  10. // A log entry stores a single item in the log.
  11. type LogEntry struct {
  12. log *Log
  13. Index uint64
  14. Term uint64
  15. CommandName string
  16. Command []byte
  17. Position int64 // position in the log file
  18. commit chan bool
  19. }
  20. // Creates a new log entry associated with a log.
  21. func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntry, error) {
  22. var buf bytes.Buffer
  23. var commandName string
  24. if command != nil {
  25. commandName = command.CommandName()
  26. if encoder, ok := command.(CommandEncoder); ok {
  27. if err := encoder.Encode(&buf); err != nil {
  28. return nil, err
  29. }
  30. } else {
  31. json.NewEncoder(&buf).Encode(command)
  32. }
  33. }
  34. e := &LogEntry{
  35. log: log,
  36. Index: index,
  37. Term: term,
  38. CommandName: commandName,
  39. Command: buf.Bytes(),
  40. commit: make(chan bool, 5),
  41. }
  42. return e, nil
  43. }
  44. // Encodes the log entry to a buffer. Returns the number of bytes
  45. // written and any error that may have occurred.
  46. func (e *LogEntry) encode(w io.Writer) (int, error) {
  47. defer e.log.pBuffer.Reset()
  48. e.log.pLogEntry.Index = proto.Uint64(e.Index)
  49. e.log.pLogEntry.Term = proto.Uint64(e.Term)
  50. e.log.pLogEntry.CommandName = proto.String(e.CommandName)
  51. e.log.pLogEntry.Command = e.Command
  52. err := e.log.pBuffer.Marshal(e.log.pLogEntry)
  53. if err != nil {
  54. return -1, err
  55. }
  56. if _, err = fmt.Fprintf(w, "%8x\n", len(e.log.pBuffer.Bytes())); err != nil {
  57. return -1, err
  58. }
  59. return w.Write(e.log.pBuffer.Bytes())
  60. }
  61. // Decodes the log entry from a buffer. Returns the number of bytes read and
  62. // any error that occurs.
  63. func (e *LogEntry) decode(r io.Reader) (int, error) {
  64. var length int
  65. _, err := fmt.Fscanf(r, "%8x\n", &length)
  66. if err != nil {
  67. return -1, err
  68. }
  69. data := make([]byte, length)
  70. _, err = r.Read(data)
  71. if err != nil {
  72. return -1, err
  73. }
  74. pb := &protobuf.ProtoLogEntry{}
  75. if err = proto.Unmarshal(data, pb); err != nil {
  76. return -1, err
  77. }
  78. e.Term = pb.GetTerm()
  79. e.Index = pb.GetIndex()
  80. e.CommandName = pb.GetCommandName()
  81. e.Command = pb.Command
  82. return length, nil
  83. }