log.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package migrate
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "fmt"
  19. "io"
  20. "log"
  21. "os"
  22. "path"
  23. "time"
  24. etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  25. etcd4pb "github.com/coreos/etcd/migrate/etcd4pb"
  26. "github.com/coreos/etcd/pkg/types"
  27. raftpb "github.com/coreos/etcd/raft/raftpb"
  28. "github.com/coreos/etcd/store"
  29. )
  30. const etcdDefaultClusterName = "etcd-cluster"
  31. func UnixTimeOrPermanent(expireTime time.Time) int64 {
  32. expire := expireTime.Unix()
  33. if expireTime == store.Permanent {
  34. expire = 0
  35. }
  36. return expire
  37. }
  38. type Log4 []*etcd4pb.LogEntry
  39. func (l Log4) NodeIDs() map[string]uint64 {
  40. out := make(map[string]uint64)
  41. for _, e := range l {
  42. if e.GetCommandName() == "etcd:join" {
  43. cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
  44. if err != nil {
  45. log.Println("error converting an etcd:join to v2.0 format. Likely corrupt!")
  46. return nil
  47. }
  48. join := cmd4.(*JoinCommand)
  49. m := generateNodeMember(join.Name, join.RaftURL, "")
  50. out[join.Name] = uint64(m.ID)
  51. }
  52. if e.GetCommandName() == "etcd:remove" {
  53. cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
  54. if err != nil {
  55. return nil
  56. }
  57. name := cmd4.(*RemoveCommand).Name
  58. delete(out, name)
  59. }
  60. }
  61. return out
  62. }
  63. func StorePath(key string) string {
  64. return path.Join("/1", key)
  65. }
  66. func DecodeLog4FromFile(logpath string) (Log4, error) {
  67. file, err := os.OpenFile(logpath, os.O_RDONLY, 0600)
  68. if err != nil {
  69. return nil, err
  70. }
  71. defer file.Close()
  72. return DecodeLog4(file)
  73. }
  74. func DecodeLog4(file *os.File) ([]*etcd4pb.LogEntry, error) {
  75. var readBytes int64
  76. entries := make([]*etcd4pb.LogEntry, 0)
  77. for {
  78. entry, n, err := DecodeNextEntry4(file)
  79. if err != nil {
  80. if err == io.EOF {
  81. break
  82. }
  83. return nil, fmt.Errorf("failed decoding next log entry: %v", err)
  84. }
  85. entries = append(entries, entry)
  86. readBytes += int64(n)
  87. }
  88. return entries, nil
  89. }
  90. // DecodeNextEntry4 unmarshals a v0.4 log entry from a reader. Returns the
  91. // number of bytes read and any error that occurs.
  92. func DecodeNextEntry4(r io.Reader) (*etcd4pb.LogEntry, int, error) {
  93. var length int
  94. _, err := fmt.Fscanf(r, "%8x\n", &length)
  95. if err != nil {
  96. return nil, -1, err
  97. }
  98. data := make([]byte, length)
  99. if _, err = io.ReadFull(r, data); err != nil {
  100. return nil, -1, err
  101. }
  102. ent4 := new(etcd4pb.LogEntry)
  103. if err = ent4.Unmarshal(data); err != nil {
  104. return nil, -1, err
  105. }
  106. // add width of scanner token to length
  107. length = length + 8 + 1
  108. return ent4, length, nil
  109. }
  110. func hashName(name string) uint64 {
  111. var sum uint64
  112. for _, ch := range name {
  113. sum = 131*sum + uint64(ch)
  114. }
  115. return sum
  116. }
  117. type Command4 interface {
  118. Type2() raftpb.EntryType
  119. Data2() ([]byte, error)
  120. }
  121. func NewCommand4(name string, data []byte, raftMap map[string]uint64) (Command4, error) {
  122. var cmd Command4
  123. switch name {
  124. case "etcd:remove":
  125. cmd = &RemoveCommand{}
  126. case "etcd:join":
  127. cmd = &JoinCommand{}
  128. case "etcd:setClusterConfig":
  129. cmd = &NOPCommand{}
  130. case "etcd:compareAndDelete":
  131. cmd = &CompareAndDeleteCommand{}
  132. case "etcd:compareAndSwap":
  133. cmd = &CompareAndSwapCommand{}
  134. case "etcd:create":
  135. cmd = &CreateCommand{}
  136. case "etcd:delete":
  137. cmd = &DeleteCommand{}
  138. case "etcd:set":
  139. cmd = &SetCommand{}
  140. case "etcd:sync":
  141. cmd = &SyncCommand{}
  142. case "etcd:update":
  143. cmd = &UpdateCommand{}
  144. case "raft:join":
  145. // These are subsumed by etcd:remove and etcd:join; we shouldn't see them.
  146. fallthrough
  147. case "raft:leave":
  148. return nil, fmt.Errorf("found a raft join/leave command; these shouldn't be in an etcd log")
  149. case "raft:nop":
  150. cmd = &NOPCommand{}
  151. default:
  152. return nil, fmt.Errorf("unregistered command type %s", name)
  153. }
  154. // If data for the command was passed in the decode it.
  155. if data != nil {
  156. if err := json.NewDecoder(bytes.NewReader(data)).Decode(cmd); err != nil {
  157. return nil, fmt.Errorf("unable to decode bytes %q: %v", data, err)
  158. }
  159. }
  160. switch name {
  161. case "etcd:join":
  162. c := cmd.(*JoinCommand)
  163. m := generateNodeMember(c.Name, c.RaftURL, c.EtcdURL)
  164. c.memb = *m
  165. if raftMap != nil {
  166. raftMap[c.Name] = uint64(m.ID)
  167. }
  168. case "etcd:remove":
  169. c := cmd.(*RemoveCommand)
  170. if raftMap != nil {
  171. m, ok := raftMap[c.Name]
  172. if !ok {
  173. return nil, fmt.Errorf("removing a node named %s before it joined", c.Name)
  174. }
  175. c.id = m
  176. delete(raftMap, c.Name)
  177. }
  178. }
  179. return cmd, nil
  180. }
  181. type RemoveCommand struct {
  182. Name string `json:"name"`
  183. id uint64
  184. }
  185. func (c *RemoveCommand) Type2() raftpb.EntryType {
  186. return raftpb.EntryConfChange
  187. }
  188. func (c *RemoveCommand) Data2() ([]byte, error) {
  189. req2 := raftpb.ConfChange{
  190. ID: 0,
  191. Type: raftpb.ConfChangeRemoveNode,
  192. NodeID: c.id,
  193. }
  194. return req2.Marshal()
  195. }
  196. type JoinCommand struct {
  197. Name string `json:"name"`
  198. RaftURL string `json:"raftURL"`
  199. EtcdURL string `json:"etcdURL"`
  200. memb member
  201. }
  202. func (c *JoinCommand) Type2() raftpb.EntryType {
  203. return raftpb.EntryConfChange
  204. }
  205. func (c *JoinCommand) Data2() ([]byte, error) {
  206. b, err := json.Marshal(c.memb)
  207. if err != nil {
  208. return nil, err
  209. }
  210. req2 := &raftpb.ConfChange{
  211. ID: 0,
  212. Type: raftpb.ConfChangeAddNode,
  213. NodeID: uint64(c.memb.ID),
  214. Context: b,
  215. }
  216. return req2.Marshal()
  217. }
  218. type SetClusterConfigCommand struct {
  219. Config *struct {
  220. ActiveSize int `json:"activeSize"`
  221. RemoveDelay float64 `json:"removeDelay"`
  222. SyncInterval float64 `json:"syncInterval"`
  223. } `json:"config"`
  224. }
  225. func (c *SetClusterConfigCommand) Type2() raftpb.EntryType {
  226. return raftpb.EntryNormal
  227. }
  228. func (c *SetClusterConfigCommand) Data2() ([]byte, error) {
  229. b, err := json.Marshal(c.Config)
  230. if err != nil {
  231. return nil, err
  232. }
  233. req2 := &etcdserverpb.Request{
  234. Method: "PUT",
  235. Path: "/v2/admin/config",
  236. Dir: false,
  237. Val: string(b),
  238. }
  239. return req2.Marshal()
  240. }
  241. type CompareAndDeleteCommand struct {
  242. Key string `json:"key"`
  243. PrevValue string `json:"prevValue"`
  244. PrevIndex uint64 `json:"prevIndex"`
  245. }
  246. func (c *CompareAndDeleteCommand) Type2() raftpb.EntryType {
  247. return raftpb.EntryNormal
  248. }
  249. func (c *CompareAndDeleteCommand) Data2() ([]byte, error) {
  250. req2 := &etcdserverpb.Request{
  251. Method: "DELETE",
  252. Path: StorePath(c.Key),
  253. PrevValue: c.PrevValue,
  254. PrevIndex: c.PrevIndex,
  255. }
  256. return req2.Marshal()
  257. }
  258. type CompareAndSwapCommand struct {
  259. Key string `json:"key"`
  260. Value string `json:"value"`
  261. ExpireTime time.Time `json:"expireTime"`
  262. PrevValue string `json:"prevValue"`
  263. PrevIndex uint64 `json:"prevIndex"`
  264. }
  265. func (c *CompareAndSwapCommand) Type2() raftpb.EntryType {
  266. return raftpb.EntryNormal
  267. }
  268. func (c *CompareAndSwapCommand) Data2() ([]byte, error) {
  269. req2 := &etcdserverpb.Request{
  270. Method: "PUT",
  271. Path: StorePath(c.Key),
  272. Val: c.Value,
  273. PrevValue: c.PrevValue,
  274. PrevIndex: c.PrevIndex,
  275. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  276. }
  277. return req2.Marshal()
  278. }
  279. type CreateCommand struct {
  280. Key string `json:"key"`
  281. Value string `json:"value"`
  282. ExpireTime time.Time `json:"expireTime"`
  283. Unique bool `json:"unique"`
  284. Dir bool `json:"dir"`
  285. }
  286. func (c *CreateCommand) Type2() raftpb.EntryType {
  287. return raftpb.EntryNormal
  288. }
  289. func (c *CreateCommand) Data2() ([]byte, error) {
  290. req2 := &etcdserverpb.Request{
  291. Path: StorePath(c.Key),
  292. Dir: c.Dir,
  293. Val: c.Value,
  294. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  295. }
  296. if c.Unique {
  297. req2.Method = "POST"
  298. } else {
  299. var prevExist = true
  300. req2.Method = "PUT"
  301. req2.PrevExist = &prevExist
  302. }
  303. return req2.Marshal()
  304. }
  305. type DeleteCommand struct {
  306. Key string `json:"key"`
  307. Recursive bool `json:"recursive"`
  308. Dir bool `json:"dir"`
  309. }
  310. func (c *DeleteCommand) Type2() raftpb.EntryType {
  311. return raftpb.EntryNormal
  312. }
  313. func (c *DeleteCommand) Data2() ([]byte, error) {
  314. req2 := &etcdserverpb.Request{
  315. Method: "DELETE",
  316. Path: StorePath(c.Key),
  317. Dir: c.Dir,
  318. Recursive: c.Recursive,
  319. }
  320. return req2.Marshal()
  321. }
  322. type SetCommand struct {
  323. Key string `json:"key"`
  324. Value string `json:"value"`
  325. ExpireTime time.Time `json:"expireTime"`
  326. Dir bool `json:"dir"`
  327. }
  328. func (c *SetCommand) Type2() raftpb.EntryType {
  329. return raftpb.EntryNormal
  330. }
  331. func (c *SetCommand) Data2() ([]byte, error) {
  332. req2 := &etcdserverpb.Request{
  333. Method: "PUT",
  334. Path: StorePath(c.Key),
  335. Dir: c.Dir,
  336. Val: c.Value,
  337. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  338. }
  339. return req2.Marshal()
  340. }
  341. type UpdateCommand struct {
  342. Key string `json:"key"`
  343. Value string `json:"value"`
  344. ExpireTime time.Time `json:"expireTime"`
  345. }
  346. func (c *UpdateCommand) Type2() raftpb.EntryType {
  347. return raftpb.EntryNormal
  348. }
  349. func (c *UpdateCommand) Data2() ([]byte, error) {
  350. exist := true
  351. req2 := &etcdserverpb.Request{
  352. Method: "PUT",
  353. Path: StorePath(c.Key),
  354. Val: c.Value,
  355. PrevExist: &exist,
  356. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  357. }
  358. return req2.Marshal()
  359. }
  360. type SyncCommand struct {
  361. Time time.Time `json:"time"`
  362. }
  363. func (c *SyncCommand) Type2() raftpb.EntryType {
  364. return raftpb.EntryNormal
  365. }
  366. func (c *SyncCommand) Data2() ([]byte, error) {
  367. req2 := &etcdserverpb.Request{
  368. Method: "SYNC",
  369. Time: c.Time.UnixNano(),
  370. }
  371. return req2.Marshal()
  372. }
  373. type DefaultJoinCommand struct {
  374. Name string `json:"name"`
  375. ConnectionString string `json:"connectionString"`
  376. }
  377. type DefaultLeaveCommand struct {
  378. Name string `json:"name"`
  379. id uint64
  380. }
  381. type NOPCommand struct{}
  382. //TODO(bcwaldon): Why is CommandName here?
  383. func (c NOPCommand) CommandName() string {
  384. return "raft:nop"
  385. }
  386. func (c *NOPCommand) Type2() raftpb.EntryType {
  387. return raftpb.EntryNormal
  388. }
  389. func (c *NOPCommand) Data2() ([]byte, error) {
  390. return nil, nil
  391. }
  392. func Entries4To2(ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) {
  393. ents4Len := len(ents4)
  394. if ents4Len == 0 {
  395. return nil, nil
  396. }
  397. startIndex := ents4[0].GetIndex()
  398. for i, e := range ents4[1:] {
  399. eIndex := e.GetIndex()
  400. // ensure indexes are monotonically increasing
  401. wantIndex := startIndex + uint64(i+1)
  402. if wantIndex != eIndex {
  403. return nil, fmt.Errorf("skipped log index %d", wantIndex)
  404. }
  405. }
  406. raftMap := make(map[string]uint64)
  407. ents2 := make([]raftpb.Entry, 0)
  408. for i, e := range ents4 {
  409. ent, err := toEntry2(e, raftMap)
  410. if err != nil {
  411. log.Fatalf("Error converting entry %d, %s", i, err)
  412. } else {
  413. ents2 = append(ents2, *ent)
  414. }
  415. }
  416. return ents2, nil
  417. }
  418. func toEntry2(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry, error) {
  419. cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand(), raftMap)
  420. if err != nil {
  421. return nil, err
  422. }
  423. data, err := cmd4.Data2()
  424. if err != nil {
  425. return nil, err
  426. }
  427. ent2 := raftpb.Entry{
  428. Term: ent4.GetTerm() + termOffset4to2,
  429. Index: ent4.GetIndex(),
  430. Type: cmd4.Type2(),
  431. Data: data,
  432. }
  433. return &ent2, nil
  434. }
  435. func generateNodeMember(name, rafturl, etcdurl string) *member {
  436. pURLs, err := types.NewURLs([]string{rafturl})
  437. if err != nil {
  438. log.Fatalf("Invalid Raft URL %s -- this log could never have worked", rafturl)
  439. }
  440. m := NewMember(name, pURLs, etcdDefaultClusterName)
  441. m.ClientURLs = []string{etcdurl}
  442. return m
  443. }