main.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package main
  15. import (
  16. "bufio"
  17. "bytes"
  18. "encoding/hex"
  19. "flag"
  20. "fmt"
  21. "io"
  22. "log"
  23. "os"
  24. "os/exec"
  25. "path/filepath"
  26. "strings"
  27. "time"
  28. "go.etcd.io/etcd/etcdserver/api/snap"
  29. "go.etcd.io/etcd/etcdserver/etcdserverpb"
  30. "go.etcd.io/etcd/pkg/pbutil"
  31. "go.etcd.io/etcd/pkg/types"
  32. "go.etcd.io/etcd/raft/raftpb"
  33. "go.etcd.io/etcd/wal"
  34. "go.etcd.io/etcd/wal/walpb"
  35. "go.uber.org/zap"
  36. )
  37. func main() {
  38. snapfile := flag.String("start-snap", "", "The base name of snapshot file to start dumping")
  39. index := flag.Uint64("start-index", 0, "The index to start dumping")
  40. entrytype := flag.String("entry-type", "", `If set, filters output by entry type. Must be one or more than one of:
  41. ConfigChange, Normal, Request, InternalRaftRequest,
  42. IRRRange, IRRPut, IRRDeleteRange, IRRTxn,
  43. IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke, IRRLeaseCheckpoint`)
  44. streamdecoder := flag.String("stream-decoder", "", `The name of an executable decoding tool, the executable must process
  45. hex encoded lines of binary input (from etcd-dump-logs)
  46. and output a hex encoded line of binary for each input line`)
  47. flag.Parse()
  48. if len(flag.Args()) != 1 {
  49. log.Fatalf("Must provide data-dir argument (got %+v)", flag.Args())
  50. }
  51. dataDir := flag.Args()[0]
  52. if *snapfile != "" && *index != 0 {
  53. log.Fatal("start-snap and start-index flags cannot be used together.")
  54. }
  55. var (
  56. walsnap walpb.Snapshot
  57. snapshot *raftpb.Snapshot
  58. err error
  59. )
  60. isIndex := *index != 0
  61. if isIndex {
  62. fmt.Printf("Start dumping log entries from index %d.\n", *index)
  63. walsnap.Index = *index
  64. } else {
  65. if *snapfile == "" {
  66. ss := snap.New(zap.NewExample(), snapDir(dataDir))
  67. snapshot, err = ss.Load()
  68. } else {
  69. snapshot, err = snap.Read(zap.NewExample(), filepath.Join(snapDir(dataDir), *snapfile))
  70. }
  71. switch err {
  72. case nil:
  73. walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
  74. nodes := genIDSlice(snapshot.Metadata.ConfState.Voters)
  75. fmt.Printf("Snapshot:\nterm=%d index=%d nodes=%s\n",
  76. walsnap.Term, walsnap.Index, nodes)
  77. case snap.ErrNoSnapshot:
  78. fmt.Printf("Snapshot:\nempty\n")
  79. default:
  80. log.Fatalf("Failed loading snapshot: %v", err)
  81. }
  82. fmt.Println("Start dupmping log entries from snapshot.")
  83. }
  84. w, err := wal.OpenForRead(zap.NewExample(), walDir(dataDir), walsnap)
  85. if err != nil {
  86. log.Fatalf("Failed opening WAL: %v", err)
  87. }
  88. wmetadata, state, ents, err := w.ReadAll()
  89. w.Close()
  90. if err != nil && (!isIndex || err != wal.ErrSnapshotNotFound) {
  91. log.Fatalf("Failed reading WAL: %v", err)
  92. }
  93. id, cid := parseWALMetadata(wmetadata)
  94. vid := types.ID(state.Vote)
  95. fmt.Printf("WAL metadata:\nnodeID=%s clusterID=%s term=%d commitIndex=%d vote=%s\n",
  96. id, cid, state.Term, state.Commit, vid)
  97. fmt.Printf("WAL entries:\n")
  98. fmt.Printf("lastIndex=%d\n", ents[len(ents)-1].Index)
  99. fmt.Printf("%4s\t%10s\ttype\tdata", "term", "index")
  100. if *streamdecoder != "" {
  101. fmt.Printf("\tdecoder_status\tdecoded_data")
  102. }
  103. fmt.Println()
  104. listEntriesType(*entrytype, *streamdecoder, ents)
  105. }
  106. func walDir(dataDir string) string { return filepath.Join(dataDir, "member", "wal") }
  107. func snapDir(dataDir string) string { return filepath.Join(dataDir, "member", "snap") }
  108. func parseWALMetadata(b []byte) (id, cid types.ID) {
  109. var metadata etcdserverpb.Metadata
  110. pbutil.MustUnmarshal(&metadata, b)
  111. id = types.ID(metadata.NodeID)
  112. cid = types.ID(metadata.ClusterID)
  113. return id, cid
  114. }
  115. func genIDSlice(a []uint64) []types.ID {
  116. ids := make([]types.ID, len(a))
  117. for i, id := range a {
  118. ids[i] = types.ID(id)
  119. }
  120. return ids
  121. }
  122. // excerpt replaces middle part with ellipsis and returns a double-quoted
  123. // string safely escaped with Go syntax.
  124. func excerpt(str string, pre, suf int) string {
  125. if pre+suf > len(str) {
  126. return fmt.Sprintf("%q", str)
  127. }
  128. return fmt.Sprintf("%q...%q", str[:pre], str[len(str)-suf:])
  129. }
  130. type EntryFilter func(e raftpb.Entry) (bool, string)
  131. // The 9 pass functions below takes the raftpb.Entry and return if the entry should be printed and the type of entry,
  132. // the type of the entry will used in the following print function
  133. func passConfChange(entry raftpb.Entry) (bool, string) {
  134. return entry.Type == raftpb.EntryConfChange, "ConfigChange"
  135. }
  136. func passInternalRaftRequest(entry raftpb.Entry) (bool, string) {
  137. var rr etcdserverpb.InternalRaftRequest
  138. return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil, "InternalRaftRequest"
  139. }
  140. func passUnknownNormal(entry raftpb.Entry) (bool, string) {
  141. var rr1 etcdserverpb.Request
  142. var rr2 etcdserverpb.InternalRaftRequest
  143. return (entry.Type == raftpb.EntryNormal) && (rr1.Unmarshal(entry.Data) != nil) && (rr2.Unmarshal(entry.Data) != nil), "UnknownNormal"
  144. }
  145. func passIRRRange(entry raftpb.Entry) (bool, string) {
  146. var rr etcdserverpb.InternalRaftRequest
  147. return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.Range != nil, "InternalRaftRequest"
  148. }
  149. func passIRRPut(entry raftpb.Entry) (bool, string) {
  150. var rr etcdserverpb.InternalRaftRequest
  151. return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.Put != nil, "InternalRaftRequest"
  152. }
  153. func passIRRDeleteRange(entry raftpb.Entry) (bool, string) {
  154. var rr etcdserverpb.InternalRaftRequest
  155. return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.DeleteRange != nil, "InternalRaftRequest"
  156. }
  157. func passIRRTxn(entry raftpb.Entry) (bool, string) {
  158. var rr etcdserverpb.InternalRaftRequest
  159. return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.Txn != nil, "InternalRaftRequest"
  160. }
  161. func passIRRCompaction(entry raftpb.Entry) (bool, string) {
  162. var rr etcdserverpb.InternalRaftRequest
  163. return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.Compaction != nil, "InternalRaftRequest"
  164. }
  165. func passIRRLeaseGrant(entry raftpb.Entry) (bool, string) {
  166. var rr etcdserverpb.InternalRaftRequest
  167. return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.LeaseGrant != nil, "InternalRaftRequest"
  168. }
  169. func passIRRLeaseRevoke(entry raftpb.Entry) (bool, string) {
  170. var rr etcdserverpb.InternalRaftRequest
  171. return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.LeaseRevoke != nil, "InternalRaftRequest"
  172. }
  173. func passIRRLeaseCheckpoint(entry raftpb.Entry) (bool, string) {
  174. var rr etcdserverpb.InternalRaftRequest
  175. return entry.Type == raftpb.EntryNormal && rr.Unmarshal(entry.Data) == nil && rr.LeaseCheckpoint != nil, "InternalRaftRequest"
  176. }
  177. func passRequest(entry raftpb.Entry) (bool, string) {
  178. var rr1 etcdserverpb.Request
  179. var rr2 etcdserverpb.InternalRaftRequest
  180. return entry.Type == raftpb.EntryNormal && rr1.Unmarshal(entry.Data) == nil && rr2.Unmarshal(entry.Data) != nil, "Request"
  181. }
  182. type EntryPrinter func(e raftpb.Entry)
  183. // The 4 print functions below print the entry format based on there types
  184. // printInternalRaftRequest is used to print entry information for IRRRange, IRRPut,
  185. // IRRDeleteRange and IRRTxn entries
  186. func printInternalRaftRequest(entry raftpb.Entry) {
  187. var rr etcdserverpb.InternalRaftRequest
  188. if err := rr.Unmarshal(entry.Data); err == nil {
  189. fmt.Printf("%4d\t%10d\tnorm\t%s", entry.Term, entry.Index, rr.String())
  190. }
  191. }
  192. func printUnknownNormal(entry raftpb.Entry) {
  193. fmt.Printf("%4d\t%10d\tnorm\t???", entry.Term, entry.Index)
  194. }
  195. func printConfChange(entry raftpb.Entry) {
  196. fmt.Printf("%4d\t%10d", entry.Term, entry.Index)
  197. fmt.Printf("\tconf")
  198. var r raftpb.ConfChange
  199. if err := r.Unmarshal(entry.Data); err != nil {
  200. fmt.Printf("\t???")
  201. } else {
  202. fmt.Printf("\tmethod=%s id=%s", r.Type, types.ID(r.NodeID))
  203. }
  204. }
  205. func printRequest(entry raftpb.Entry) {
  206. var r etcdserverpb.Request
  207. if err := r.Unmarshal(entry.Data); err == nil {
  208. fmt.Printf("%4d\t%10d\tnorm", entry.Term, entry.Index)
  209. switch r.Method {
  210. case "":
  211. fmt.Printf("\tnoop")
  212. case "SYNC":
  213. fmt.Printf("\tmethod=SYNC time=%q", time.Unix(0, r.Time))
  214. case "QGET", "DELETE":
  215. fmt.Printf("\tmethod=%s path=%s", r.Method, excerpt(r.Path, 64, 64))
  216. default:
  217. fmt.Printf("\tmethod=%s path=%s val=%s", r.Method, excerpt(r.Path, 64, 64), excerpt(r.Val, 128, 0))
  218. }
  219. }
  220. }
  221. // evaluateEntrytypeFlag evaluates entry-type flag and choose proper filter/filters to filter entries
  222. func evaluateEntrytypeFlag(entrytype string) []EntryFilter {
  223. var entrytypelist []string
  224. if entrytype != "" {
  225. entrytypelist = strings.Split(entrytype, ",")
  226. }
  227. validRequest := map[string][]EntryFilter{"ConfigChange": {passConfChange},
  228. "Normal": {passInternalRaftRequest, passRequest, passUnknownNormal},
  229. "Request": {passRequest},
  230. "InternalRaftRequest": {passInternalRaftRequest},
  231. "IRRRange": {passIRRRange},
  232. "IRRPut": {passIRRPut},
  233. "IRRDeleteRange": {passIRRDeleteRange},
  234. "IRRTxn": {passIRRTxn},
  235. "IRRCompaction": {passIRRCompaction},
  236. "IRRLeaseGrant": {passIRRLeaseGrant},
  237. "IRRLeaseRevoke": {passIRRLeaseRevoke},
  238. "IRRLeaseCheckpoint": {passIRRLeaseCheckpoint},
  239. }
  240. filters := make([]EntryFilter, 0)
  241. if len(entrytypelist) == 0 {
  242. filters = append(filters, passInternalRaftRequest)
  243. filters = append(filters, passRequest)
  244. filters = append(filters, passUnknownNormal)
  245. filters = append(filters, passConfChange)
  246. }
  247. for _, et := range entrytypelist {
  248. if f, ok := validRequest[et]; ok {
  249. filters = append(filters, f...)
  250. } else {
  251. log.Printf(`[%+v] is not a valid entry-type, ignored.
  252. Please set entry-type to one or more of the following:
  253. ConfigChange, Normal, Request, InternalRaftRequest,
  254. IRRRange, IRRPut, IRRDeleteRange, IRRTxn,
  255. IRRCompaction, IRRLeaseGrant, IRRLeaseRevoke, IRRLeaseCheckpoint`, et)
  256. }
  257. }
  258. return filters
  259. }
  260. // listEntriesType filters and prints entries based on the entry-type flag,
  261. func listEntriesType(entrytype string, streamdecoder string, ents []raftpb.Entry) {
  262. entryFilters := evaluateEntrytypeFlag(entrytype)
  263. printerMap := map[string]EntryPrinter{"InternalRaftRequest": printInternalRaftRequest,
  264. "Request": printRequest,
  265. "ConfigChange": printConfChange,
  266. "UnknownNormal": printUnknownNormal}
  267. var stderr bytes.Buffer
  268. args := strings.Split(streamdecoder, " ")
  269. cmd := exec.Command(args[0], args[1:]...)
  270. stdin, err := cmd.StdinPipe()
  271. if err != nil {
  272. log.Panic(err)
  273. }
  274. stdout, err := cmd.StdoutPipe()
  275. if err != nil {
  276. log.Panic(err)
  277. }
  278. cmd.Stderr = &stderr
  279. if streamdecoder != "" {
  280. err = cmd.Start()
  281. if err != nil {
  282. log.Panic(err)
  283. }
  284. }
  285. cnt := 0
  286. for _, e := range ents {
  287. passed := false
  288. currtype := ""
  289. for _, filter := range entryFilters {
  290. passed, currtype = filter(e)
  291. if passed {
  292. cnt++
  293. break
  294. }
  295. }
  296. if passed {
  297. printer := printerMap[currtype]
  298. printer(e)
  299. if streamdecoder == "" {
  300. fmt.Println()
  301. continue
  302. }
  303. // if decoder is set, pass the e.Data to stdin and read the stdout from decoder
  304. io.WriteString(stdin, hex.EncodeToString(e.Data))
  305. io.WriteString(stdin, "\n")
  306. outputReader := bufio.NewReader(stdout)
  307. decoderoutput, currerr := outputReader.ReadString('\n')
  308. if currerr != nil {
  309. fmt.Println(currerr)
  310. return
  311. }
  312. decoder_status, decoded_data := parseDecoderOutput(decoderoutput)
  313. fmt.Printf("\t%s\t%s", decoder_status, decoded_data)
  314. }
  315. }
  316. stdin.Close()
  317. err = cmd.Wait()
  318. if streamdecoder != "" {
  319. if err != nil {
  320. log.Panic(err)
  321. }
  322. if stderr.String() != "" {
  323. os.Stderr.WriteString("decoder stderr: " + stderr.String())
  324. }
  325. }
  326. fmt.Printf("\nEntry types (%s) count is : %d", entrytype, cnt)
  327. }
  328. func parseDecoderOutput(decoderoutput string) (string, string) {
  329. var decoder_status string
  330. var decoded_data string
  331. output := strings.Split(decoderoutput, "|")
  332. switch len(output) {
  333. case 1:
  334. decoder_status = "decoder output format is not right, print output anyway"
  335. decoded_data = decoderoutput
  336. case 2:
  337. decoder_status = output[0]
  338. decoded_data = output[1]
  339. default:
  340. decoder_status = output[0] + "(*WARNING: data might contain deliminator used by etcd-dump-logs)"
  341. decoded_data = strings.Join(output[1:], "")
  342. }
  343. return decoder_status, decoded_data
  344. }