log.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. package migrate
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "os"
  9. "path"
  10. "time"
  11. etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  12. etcd4pb "github.com/coreos/etcd/migrate/etcd4pb"
  13. "github.com/coreos/etcd/pkg/types"
  14. raftpb "github.com/coreos/etcd/raft/raftpb"
  15. "github.com/coreos/etcd/store"
  16. )
  17. const etcdDefaultClusterName = "etcd-cluster"
  18. func UnixTimeOrPermanent(expireTime time.Time) int64 {
  19. expire := expireTime.Unix()
  20. if expireTime == store.Permanent {
  21. expire = 0
  22. }
  23. return expire
  24. }
  25. type Log4 []*etcd4pb.LogEntry
  26. func (l Log4) NodeIDs() map[string]uint64 {
  27. out := make(map[string]uint64)
  28. for _, e := range l {
  29. if e.GetCommandName() == "etcd:join" {
  30. cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
  31. if err != nil {
  32. log.Println("error converting an etcd:join to v0.5 format. Likely corrupt!")
  33. return nil
  34. }
  35. join := cmd4.(*JoinCommand)
  36. m := generateNodeMember(join.Name, join.RaftURL, "")
  37. out[join.Name] = uint64(m.ID)
  38. }
  39. if e.GetCommandName() == "etcd:remove" {
  40. cmd4, err := NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
  41. if err != nil {
  42. return nil
  43. }
  44. name := cmd4.(*RemoveCommand).Name
  45. delete(out, name)
  46. }
  47. }
  48. return out
  49. }
  50. func StorePath(key string) string {
  51. return path.Join("/1", key)
  52. }
  53. func DecodeLog4FromFile(logpath string) (Log4, error) {
  54. file, err := os.OpenFile(logpath, os.O_RDONLY, 0600)
  55. if err != nil {
  56. return nil, err
  57. }
  58. defer file.Close()
  59. return DecodeLog4(file)
  60. }
  61. func DecodeLog4(file *os.File) ([]*etcd4pb.LogEntry, error) {
  62. var readBytes int64
  63. entries := make([]*etcd4pb.LogEntry, 0)
  64. for {
  65. entry, n, err := DecodeNextEntry4(file)
  66. if err != nil {
  67. if err == io.EOF {
  68. break
  69. }
  70. return nil, fmt.Errorf("failed decoding next log entry: %v", err)
  71. }
  72. entries = append(entries, entry)
  73. readBytes += int64(n)
  74. }
  75. return entries, nil
  76. }
  77. // DecodeNextEntry4 unmarshals a v0.4 log entry from a reader. Returns the
  78. // number of bytes read and any error that occurs.
  79. func DecodeNextEntry4(r io.Reader) (*etcd4pb.LogEntry, int, error) {
  80. var length int
  81. _, err := fmt.Fscanf(r, "%8x\n", &length)
  82. if err != nil {
  83. return nil, -1, err
  84. }
  85. data := make([]byte, length)
  86. if _, err = io.ReadFull(r, data); err != nil {
  87. return nil, -1, err
  88. }
  89. ent4 := new(etcd4pb.LogEntry)
  90. if err = ent4.Unmarshal(data); err != nil {
  91. return nil, -1, err
  92. }
  93. // add width of scanner token to length
  94. length = length + 8 + 1
  95. return ent4, length, nil
  96. }
  97. func hashName(name string) uint64 {
  98. var sum uint64
  99. for _, ch := range name {
  100. sum = 131*sum + uint64(ch)
  101. }
  102. return sum
  103. }
  104. type Command4 interface {
  105. Type5() raftpb.EntryType
  106. Data5() ([]byte, error)
  107. }
  108. func NewCommand4(name string, data []byte, raftMap map[string]uint64) (Command4, error) {
  109. var cmd Command4
  110. switch name {
  111. case "etcd:remove":
  112. cmd = &RemoveCommand{}
  113. case "etcd:join":
  114. cmd = &JoinCommand{}
  115. case "etcd:setClusterConfig":
  116. cmd = &NOPCommand{}
  117. case "etcd:compareAndDelete":
  118. cmd = &CompareAndDeleteCommand{}
  119. case "etcd:compareAndSwap":
  120. cmd = &CompareAndSwapCommand{}
  121. case "etcd:create":
  122. cmd = &CreateCommand{}
  123. case "etcd:delete":
  124. cmd = &DeleteCommand{}
  125. case "etcd:set":
  126. cmd = &SetCommand{}
  127. case "etcd:sync":
  128. cmd = &SyncCommand{}
  129. case "etcd:update":
  130. cmd = &UpdateCommand{}
  131. case "raft:join":
  132. // These are subsumed by etcd:remove and etcd:join; we shouldn't see them.
  133. fallthrough
  134. case "raft:leave":
  135. return nil, fmt.Errorf("found a raft join/leave command; these shouldn't be in an etcd log")
  136. case "raft:nop":
  137. cmd = &NOPCommand{}
  138. default:
  139. return nil, fmt.Errorf("unregistered command type %s", name)
  140. }
  141. // If data for the command was passed in the decode it.
  142. if data != nil {
  143. if err := json.NewDecoder(bytes.NewReader(data)).Decode(cmd); err != nil {
  144. return nil, fmt.Errorf("unable to decode bytes %q: %v", data, err)
  145. }
  146. }
  147. switch name {
  148. case "etcd:join":
  149. c := cmd.(*JoinCommand)
  150. m := generateNodeMember(c.Name, c.RaftURL, c.EtcdURL)
  151. c.memb = *m
  152. if raftMap != nil {
  153. raftMap[c.Name] = uint64(m.ID)
  154. }
  155. case "etcd:remove":
  156. c := cmd.(*RemoveCommand)
  157. if raftMap != nil {
  158. m, ok := raftMap[c.Name]
  159. if !ok {
  160. return nil, fmt.Errorf("removing a node named %s before it joined", c.Name)
  161. }
  162. c.id = m
  163. delete(raftMap, c.Name)
  164. }
  165. }
  166. return cmd, nil
  167. }
  168. type RemoveCommand struct {
  169. Name string `json:"name"`
  170. id uint64
  171. }
  172. func (c *RemoveCommand) Type5() raftpb.EntryType {
  173. return raftpb.EntryConfChange
  174. }
  175. func (c *RemoveCommand) Data5() ([]byte, error) {
  176. req5 := raftpb.ConfChange{
  177. ID: 0,
  178. Type: raftpb.ConfChangeRemoveNode,
  179. NodeID: c.id,
  180. }
  181. return req5.Marshal()
  182. }
  183. type JoinCommand struct {
  184. Name string `json:"name"`
  185. RaftURL string `json:"raftURL"`
  186. EtcdURL string `json:"etcdURL"`
  187. memb member
  188. }
  189. func (c *JoinCommand) Type5() raftpb.EntryType {
  190. return raftpb.EntryConfChange
  191. }
  192. func (c *JoinCommand) Data5() ([]byte, error) {
  193. b, err := json.Marshal(c.memb)
  194. if err != nil {
  195. return nil, err
  196. }
  197. req5 := &raftpb.ConfChange{
  198. ID: 0,
  199. Type: raftpb.ConfChangeAddNode,
  200. NodeID: uint64(c.memb.ID),
  201. Context: b,
  202. }
  203. return req5.Marshal()
  204. }
  205. type SetClusterConfigCommand struct {
  206. Config *struct {
  207. ActiveSize int `json:"activeSize"`
  208. RemoveDelay float64 `json:"removeDelay"`
  209. SyncInterval float64 `json:"syncInterval"`
  210. } `json:"config"`
  211. }
  212. func (c *SetClusterConfigCommand) Type5() raftpb.EntryType {
  213. return raftpb.EntryNormal
  214. }
  215. func (c *SetClusterConfigCommand) Data5() ([]byte, error) {
  216. b, err := json.Marshal(c.Config)
  217. if err != nil {
  218. return nil, err
  219. }
  220. req5 := &etcdserverpb.Request{
  221. Method: "PUT",
  222. Path: "/v2/admin/config",
  223. Dir: false,
  224. Val: string(b),
  225. }
  226. return req5.Marshal()
  227. }
  228. type CompareAndDeleteCommand struct {
  229. Key string `json:"key"`
  230. PrevValue string `json:"prevValue"`
  231. PrevIndex uint64 `json:"prevIndex"`
  232. }
  233. func (c *CompareAndDeleteCommand) Type5() raftpb.EntryType {
  234. return raftpb.EntryNormal
  235. }
  236. func (c *CompareAndDeleteCommand) Data5() ([]byte, error) {
  237. req5 := &etcdserverpb.Request{
  238. Method: "DELETE",
  239. Path: StorePath(c.Key),
  240. PrevValue: c.PrevValue,
  241. PrevIndex: c.PrevIndex,
  242. }
  243. return req5.Marshal()
  244. }
  245. type CompareAndSwapCommand struct {
  246. Key string `json:"key"`
  247. Value string `json:"value"`
  248. ExpireTime time.Time `json:"expireTime"`
  249. PrevValue string `json:"prevValue"`
  250. PrevIndex uint64 `json:"prevIndex"`
  251. }
  252. func (c *CompareAndSwapCommand) Type5() raftpb.EntryType {
  253. return raftpb.EntryNormal
  254. }
  255. func (c *CompareAndSwapCommand) Data5() ([]byte, error) {
  256. req5 := &etcdserverpb.Request{
  257. Method: "PUT",
  258. Path: StorePath(c.Key),
  259. Val: c.Value,
  260. PrevValue: c.PrevValue,
  261. PrevIndex: c.PrevIndex,
  262. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  263. }
  264. return req5.Marshal()
  265. }
  266. type CreateCommand struct {
  267. Key string `json:"key"`
  268. Value string `json:"value"`
  269. ExpireTime time.Time `json:"expireTime"`
  270. Unique bool `json:"unique"`
  271. Dir bool `json:"dir"`
  272. }
  273. func (c *CreateCommand) Type5() raftpb.EntryType {
  274. return raftpb.EntryNormal
  275. }
  276. func (c *CreateCommand) Data5() ([]byte, error) {
  277. req5 := &etcdserverpb.Request{
  278. Path: StorePath(c.Key),
  279. Dir: c.Dir,
  280. Val: c.Value,
  281. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  282. }
  283. if c.Unique {
  284. req5.Method = "POST"
  285. } else {
  286. var prevExist = true
  287. req5.Method = "PUT"
  288. req5.PrevExist = &prevExist
  289. }
  290. return req5.Marshal()
  291. }
  292. type DeleteCommand struct {
  293. Key string `json:"key"`
  294. Recursive bool `json:"recursive"`
  295. Dir bool `json:"dir"`
  296. }
  297. func (c *DeleteCommand) Type5() raftpb.EntryType {
  298. return raftpb.EntryNormal
  299. }
  300. func (c *DeleteCommand) Data5() ([]byte, error) {
  301. req5 := &etcdserverpb.Request{
  302. Method: "DELETE",
  303. Path: StorePath(c.Key),
  304. Dir: c.Dir,
  305. Recursive: c.Recursive,
  306. }
  307. return req5.Marshal()
  308. }
  309. type SetCommand struct {
  310. Key string `json:"key"`
  311. Value string `json:"value"`
  312. ExpireTime time.Time `json:"expireTime"`
  313. Dir bool `json:"dir"`
  314. }
  315. func (c *SetCommand) Type5() raftpb.EntryType {
  316. return raftpb.EntryNormal
  317. }
  318. func (c *SetCommand) Data5() ([]byte, error) {
  319. req5 := &etcdserverpb.Request{
  320. Method: "PUT",
  321. Path: StorePath(c.Key),
  322. Dir: c.Dir,
  323. Val: c.Value,
  324. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  325. }
  326. return req5.Marshal()
  327. }
  328. type UpdateCommand struct {
  329. Key string `json:"key"`
  330. Value string `json:"value"`
  331. ExpireTime time.Time `json:"expireTime"`
  332. }
  333. func (c *UpdateCommand) Type5() raftpb.EntryType {
  334. return raftpb.EntryNormal
  335. }
  336. func (c *UpdateCommand) Data5() ([]byte, error) {
  337. exist := true
  338. req5 := &etcdserverpb.Request{
  339. Method: "PUT",
  340. Path: StorePath(c.Key),
  341. Val: c.Value,
  342. PrevExist: &exist,
  343. Expiration: UnixTimeOrPermanent(c.ExpireTime),
  344. }
  345. return req5.Marshal()
  346. }
  347. type SyncCommand struct {
  348. Time time.Time `json:"time"`
  349. }
  350. func (c *SyncCommand) Type5() raftpb.EntryType {
  351. return raftpb.EntryNormal
  352. }
  353. func (c *SyncCommand) Data5() ([]byte, error) {
  354. req5 := &etcdserverpb.Request{
  355. Method: "SYNC",
  356. Time: c.Time.UnixNano(),
  357. }
  358. return req5.Marshal()
  359. }
  360. type DefaultJoinCommand struct {
  361. Name string `json:"name"`
  362. ConnectionString string `json:"connectionString"`
  363. }
  364. type DefaultLeaveCommand struct {
  365. Name string `json:"name"`
  366. id uint64
  367. }
  368. type NOPCommand struct{}
  369. //TODO(bcwaldon): Why is CommandName here?
  370. func (c NOPCommand) CommandName() string {
  371. return "raft:nop"
  372. }
  373. func (c *NOPCommand) Type5() raftpb.EntryType {
  374. return raftpb.EntryNormal
  375. }
  376. func (c *NOPCommand) Data5() ([]byte, error) {
  377. return nil, nil
  378. }
  379. func Entries4To5(ents4 []*etcd4pb.LogEntry) ([]raftpb.Entry, error) {
  380. ents4Len := len(ents4)
  381. if ents4Len == 0 {
  382. return nil, nil
  383. }
  384. startIndex := ents4[0].GetIndex()
  385. for i, e := range ents4[1:] {
  386. eIndex := e.GetIndex()
  387. // ensure indexes are monotonically increasing
  388. wantIndex := startIndex + uint64(i+1)
  389. if wantIndex != eIndex {
  390. return nil, fmt.Errorf("skipped log index %d", wantIndex)
  391. }
  392. }
  393. raftMap := make(map[string]uint64)
  394. ents5 := make([]raftpb.Entry, 0)
  395. for i, e := range ents4 {
  396. ent, err := toEntry5(e, raftMap)
  397. if err != nil {
  398. log.Fatalf("Error converting entry %d, %s", i, err)
  399. } else {
  400. ents5 = append(ents5, *ent)
  401. }
  402. }
  403. return ents5, nil
  404. }
  405. func toEntry5(ent4 *etcd4pb.LogEntry, raftMap map[string]uint64) (*raftpb.Entry, error) {
  406. cmd4, err := NewCommand4(ent4.GetCommandName(), ent4.GetCommand(), raftMap)
  407. if err != nil {
  408. return nil, err
  409. }
  410. data, err := cmd4.Data5()
  411. if err != nil {
  412. return nil, err
  413. }
  414. ent5 := raftpb.Entry{
  415. Term: ent4.GetTerm(),
  416. Index: ent4.GetIndex(),
  417. Type: cmd4.Type5(),
  418. Data: data,
  419. }
  420. return &ent5, nil
  421. }
  422. func generateNodeMember(name, rafturl, etcdurl string) *member {
  423. pURLs, err := types.NewURLs([]string{rafturl})
  424. if err != nil {
  425. log.Fatalf("Invalid Raft URL %s -- this log could never have worked", rafturl)
  426. }
  427. m := NewMember(name, pURLs, etcdDefaultClusterName)
  428. m.ClientURLs = []string{etcdurl}
  429. return m
  430. }