snapshot.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package migrate
  15. import (
  16. "encoding/json"
  17. "errors"
  18. "fmt"
  19. "hash/crc32"
  20. "io/ioutil"
  21. "log"
  22. "net/url"
  23. "os"
  24. "path"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "time"
  29. raftpb "github.com/coreos/etcd/raft/raftpb"
  30. )
  31. type Snapshot4 struct {
  32. State []byte `json:"state"`
  33. LastIndex uint64 `json:"lastIndex"`
  34. LastTerm uint64 `json:"lastTerm"`
  35. Peers []struct {
  36. Name string `json:"name"`
  37. ConnectionString string `json:"connectionString"`
  38. } `json:"peers"`
  39. }
  40. type Store4 struct {
  41. Root *node
  42. CurrentIndex uint64
  43. CurrentVersion int
  44. }
  45. type node struct {
  46. Path string
  47. CreatedIndex uint64
  48. ModifiedIndex uint64
  49. Parent *node `json:"-"` // should not encode this field! avoid circular dependency.
  50. ExpireTime time.Time
  51. ACL string
  52. Value string // for key-value pair
  53. Children map[string]*node // for directory
  54. }
  55. func replacePathNames(n *node, s1, s2 string) {
  56. n.Path = path.Clean(strings.Replace(n.Path, s1, s2, 1))
  57. for _, c := range n.Children {
  58. replacePathNames(c, s1, s2)
  59. }
  60. }
  61. func pullNodesFromEtcd(n *node) map[string]uint64 {
  62. out := make(map[string]uint64)
  63. machines := n.Children["machines"]
  64. for name, c := range machines.Children {
  65. q, err := url.ParseQuery(c.Value)
  66. if err != nil {
  67. log.Fatal("Couldn't parse old query string value")
  68. }
  69. etcdurl := q.Get("etcd")
  70. rafturl := q.Get("raft")
  71. m := generateNodeMember(name, rafturl, etcdurl)
  72. out[m.Name] = uint64(m.ID)
  73. }
  74. return out
  75. }
  76. func fixEtcd(n *node) {
  77. n.Path = "/0"
  78. machines := n.Children["machines"]
  79. n.Children["members"] = &node{
  80. Path: "/0/members",
  81. CreatedIndex: machines.CreatedIndex,
  82. ModifiedIndex: machines.ModifiedIndex,
  83. ExpireTime: machines.ExpireTime,
  84. ACL: machines.ACL,
  85. Children: make(map[string]*node),
  86. }
  87. for name, c := range machines.Children {
  88. q, err := url.ParseQuery(c.Value)
  89. if err != nil {
  90. log.Fatal("Couldn't parse old query string value")
  91. }
  92. etcdurl := q.Get("etcd")
  93. rafturl := q.Get("raft")
  94. m := generateNodeMember(name, rafturl, etcdurl)
  95. attrBytes, err := json.Marshal(m.attributes)
  96. if err != nil {
  97. log.Fatal("Couldn't marshal attributes")
  98. }
  99. raftBytes, err := json.Marshal(m.raftAttributes)
  100. if err != nil {
  101. log.Fatal("Couldn't marshal raft attributes")
  102. }
  103. newNode := &node{
  104. Path: path.Join("/0/members", m.ID.String()),
  105. CreatedIndex: c.CreatedIndex,
  106. ModifiedIndex: c.ModifiedIndex,
  107. ExpireTime: c.ExpireTime,
  108. ACL: c.ACL,
  109. Children: map[string]*node{
  110. "attributes": &node{
  111. Path: path.Join("/0/members", m.ID.String(), "attributes"),
  112. CreatedIndex: c.CreatedIndex,
  113. ModifiedIndex: c.ModifiedIndex,
  114. ExpireTime: c.ExpireTime,
  115. ACL: c.ACL,
  116. Value: string(attrBytes),
  117. },
  118. "raftAttributes": &node{
  119. Path: path.Join("/0/members", m.ID.String(), "raftAttributes"),
  120. CreatedIndex: c.CreatedIndex,
  121. ModifiedIndex: c.ModifiedIndex,
  122. ExpireTime: c.ExpireTime,
  123. ACL: c.ACL,
  124. Value: string(raftBytes),
  125. },
  126. },
  127. }
  128. n.Children["members"].Children[m.ID.String()] = newNode
  129. }
  130. delete(n.Children, "machines")
  131. }
  132. func mangleRoot(n *node) *node {
  133. newRoot := &node{
  134. Path: "/",
  135. CreatedIndex: n.CreatedIndex,
  136. ModifiedIndex: n.ModifiedIndex,
  137. ExpireTime: n.ExpireTime,
  138. ACL: n.ACL,
  139. Children: make(map[string]*node),
  140. }
  141. newRoot.Children["1"] = n
  142. etcd := n.Children["_etcd"]
  143. delete(n.Children, "_etcd")
  144. replacePathNames(n, "/", "/1/")
  145. fixEtcd(etcd)
  146. newRoot.Children["0"] = etcd
  147. return newRoot
  148. }
  149. func (s *Snapshot4) GetNodesFromStore() map[string]uint64 {
  150. st := &Store4{}
  151. if err := json.Unmarshal(s.State, st); err != nil {
  152. log.Fatal("Couldn't unmarshal snapshot")
  153. }
  154. etcd := st.Root.Children["_etcd"]
  155. return pullNodesFromEtcd(etcd)
  156. }
  157. func (s *Snapshot4) Snapshot2() *raftpb.Snapshot {
  158. st := &Store4{}
  159. if err := json.Unmarshal(s.State, st); err != nil {
  160. log.Fatal("Couldn't unmarshal snapshot")
  161. }
  162. st.Root = mangleRoot(st.Root)
  163. newState, err := json.Marshal(st)
  164. if err != nil {
  165. log.Fatal("Couldn't re-marshal new snapshot")
  166. }
  167. nodes := s.GetNodesFromStore()
  168. nodeList := make([]uint64, 0)
  169. for _, v := range nodes {
  170. nodeList = append(nodeList, v)
  171. }
  172. snap2 := raftpb.Snapshot{
  173. Data: newState,
  174. Metadata: raftpb.SnapshotMetadata{
  175. Index: s.LastIndex,
  176. Term: s.LastTerm + termOffset4to2,
  177. ConfState: raftpb.ConfState{
  178. Nodes: nodeList,
  179. },
  180. },
  181. }
  182. return &snap2
  183. }
  184. func DecodeLatestSnapshot4FromDir(snapdir string) (*Snapshot4, error) {
  185. fname, err := FindLatestFile(snapdir)
  186. if err != nil {
  187. return nil, err
  188. }
  189. if fname == "" {
  190. return nil, nil
  191. }
  192. snappath := path.Join(snapdir, fname)
  193. log.Printf("Decoding snapshot from %s", snappath)
  194. return DecodeSnapshot4FromFile(snappath)
  195. }
  196. // FindLatestFile identifies the "latest" filename in a given directory
  197. // by sorting all the files and choosing the highest value.
  198. func FindLatestFile(dirpath string) (string, error) {
  199. dir, err := os.OpenFile(dirpath, os.O_RDONLY, 0)
  200. if err != nil {
  201. if os.IsNotExist(err) {
  202. err = nil
  203. }
  204. return "", err
  205. }
  206. defer dir.Close()
  207. fnames, err := dir.Readdirnames(-1)
  208. if err != nil {
  209. return "", err
  210. }
  211. if len(fnames) == 0 {
  212. return "", nil
  213. }
  214. names, err := NewSnapshotFileNames(fnames)
  215. if err != nil {
  216. return "", err
  217. }
  218. return names[len(names)-1].FileName, nil
  219. }
  220. func DecodeSnapshot4FromFile(path string) (*Snapshot4, error) {
  221. // Read snapshot data.
  222. f, err := os.OpenFile(path, os.O_RDONLY, 0)
  223. if err != nil {
  224. return nil, err
  225. }
  226. defer f.Close()
  227. return DecodeSnapshot4(f)
  228. }
  229. func DecodeSnapshot4(f *os.File) (*Snapshot4, error) {
  230. // Verify checksum
  231. var checksum uint32
  232. n, err := fmt.Fscanf(f, "%08x\n", &checksum)
  233. if err != nil {
  234. return nil, err
  235. } else if n != 1 {
  236. return nil, errors.New("miss heading checksum")
  237. }
  238. // Load remaining snapshot contents.
  239. b, err := ioutil.ReadAll(f)
  240. if err != nil {
  241. return nil, err
  242. }
  243. // Generate checksum.
  244. byteChecksum := crc32.ChecksumIEEE(b)
  245. if uint32(checksum) != byteChecksum {
  246. return nil, errors.New("bad checksum")
  247. }
  248. // Decode snapshot.
  249. snapshot := new(Snapshot4)
  250. if err = json.Unmarshal(b, snapshot); err != nil {
  251. return nil, err
  252. }
  253. return snapshot, nil
  254. }
  255. func NewSnapshotFileNames(names []string) ([]SnapshotFileName, error) {
  256. s := make([]SnapshotFileName, 0)
  257. for _, n := range names {
  258. trimmed := strings.TrimSuffix(n, ".ss")
  259. if trimmed == n {
  260. return nil, fmt.Errorf("file %q does not have .ss extension", n)
  261. }
  262. parts := strings.SplitN(trimmed, "_", 2)
  263. if len(parts) != 2 {
  264. return nil, fmt.Errorf("unrecognized file name format %q", n)
  265. }
  266. fn := SnapshotFileName{FileName: n}
  267. var err error
  268. fn.Term, err = strconv.ParseUint(parts[0], 10, 64)
  269. if err != nil {
  270. return nil, fmt.Errorf("unable to parse term from filename %q: %v", n, err)
  271. }
  272. fn.Index, err = strconv.ParseUint(parts[1], 10, 64)
  273. if err != nil {
  274. return nil, fmt.Errorf("unable to parse index from filename %q: %v", n, err)
  275. }
  276. s = append(s, fn)
  277. }
  278. sortable := SnapshotFileNames(s)
  279. sort.Sort(&sortable)
  280. return s, nil
  281. }
  282. type SnapshotFileNames []SnapshotFileName
  283. type SnapshotFileName struct {
  284. FileName string
  285. Term uint64
  286. Index uint64
  287. }
  288. func (n *SnapshotFileNames) Less(i, j int) bool {
  289. iTerm, iIndex := (*n)[i].Term, (*n)[i].Index
  290. jTerm, jIndex := (*n)[j].Term, (*n)[j].Index
  291. return iTerm < jTerm || (iTerm == jTerm && iIndex < jIndex)
  292. }
  293. func (n *SnapshotFileNames) Swap(i, j int) {
  294. (*n)[i], (*n)[j] = (*n)[j], (*n)[i]
  295. }
  296. func (n *SnapshotFileNames) Len() int {
  297. return len([]SnapshotFileName(*n))
  298. }