util.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "log"
  7. "net"
  8. "net/http"
  9. "net/url"
  10. "os"
  11. "os/signal"
  12. "runtime/pprof"
  13. "strconv"
  14. "time"
  15. "github.com/coreos/etcd/file_system"
  16. "github.com/coreos/etcd/web"
  17. )
  18. //--------------------------------------
  19. // etcd http Helper
  20. //--------------------------------------
  21. // Convert string duration to time format
  22. func durationToExpireTime(strDuration string) (time.Time, error) {
  23. if strDuration != "" {
  24. duration, err := strconv.Atoi(strDuration)
  25. if err != nil {
  26. return fileSystem.Permanent, err
  27. }
  28. return time.Now().Add(time.Second * (time.Duration)(duration)), nil
  29. } else {
  30. return fileSystem.Permanent, nil
  31. }
  32. }
  33. //--------------------------------------
  34. // Web Helper
  35. //--------------------------------------
  36. var storeMsg chan string
  37. // Help to send msg from store to webHub
  38. func webHelper() {
  39. storeMsg = make(chan string)
  40. etcdStore.SetMessager(storeMsg)
  41. for {
  42. // transfer the new msg to webHub
  43. web.Hub().Send(<-storeMsg)
  44. }
  45. }
  46. // startWebInterface starts web interface if webURL is not empty
  47. func startWebInterface() {
  48. if argInfo.WebURL != "" {
  49. // start web
  50. go webHelper()
  51. go web.Start(r.Server, argInfo.WebURL)
  52. }
  53. }
  54. //--------------------------------------
  55. // HTTP Utilities
  56. //--------------------------------------
  57. func dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
  58. if r.State() == raft.Leader {
  59. if body, err := r.Do(c); err != nil {
  60. return err
  61. } else {
  62. if body == nil {
  63. return etcdErr.NewError(300, "Empty result from raft")
  64. } else {
  65. body, _ := body.([]byte)
  66. w.WriteHeader(http.StatusOK)
  67. w.Write(body)
  68. return nil
  69. }
  70. }
  71. } else {
  72. leader := r.Leader()
  73. // current no leader
  74. if leader == "" {
  75. return etcdErr.NewError(300, "")
  76. }
  77. url, _ := toURL(leader)
  78. redirect(url, w, req)
  79. return nil
  80. }
  81. return etcdErr.NewError(300, "")
  82. }
  83. func redirect(hostname string, w http.ResponseWriter, req *http.Request) {
  84. path := req.URL.Path
  85. url := hostname + path
  86. debugf("Redirect to %s", url)
  87. http.Redirect(w, req, url, http.StatusTemporaryRedirect)
  88. }
  89. func decodeJsonRequest(req *http.Request, data interface{}) error {
  90. decoder := json.NewDecoder(req.Body)
  91. if err := decoder.Decode(&data); err != nil && err != io.EOF {
  92. warnf("Malformed json request: %v", err)
  93. return fmt.Errorf("Malformed json request: %v", err)
  94. }
  95. return nil
  96. }
  97. func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
  98. w.Header().Set("Content-Type", "application/json")
  99. w.WriteHeader(status)
  100. if data != nil {
  101. encoder := json.NewEncoder(w)
  102. encoder.Encode(data)
  103. }
  104. }
  105. // sanitizeURL will cleanup a host string in the format hostname:port and
  106. // attach a schema.
  107. func sanitizeURL(host string, defaultScheme string) string {
  108. // Blank URLs are fine input, just return it
  109. if len(host) == 0 {
  110. return host
  111. }
  112. p, err := url.Parse(host)
  113. if err != nil {
  114. fatal(err)
  115. }
  116. // Make sure the host is in Host:Port format
  117. _, _, err = net.SplitHostPort(host)
  118. if err != nil {
  119. fatal(err)
  120. }
  121. p = &url.URL{Host: host, Scheme: defaultScheme}
  122. return p.String()
  123. }
  124. // sanitizeListenHost cleans up the ListenHost parameter and appends a port
  125. // if necessary based on the advertised port.
  126. func sanitizeListenHost(listen string, advertised string) string {
  127. aurl, err := url.Parse(advertised)
  128. if err != nil {
  129. fatal(err)
  130. }
  131. ahost, aport, err := net.SplitHostPort(aurl.Host)
  132. if err != nil {
  133. fatal(err)
  134. }
  135. // If the listen host isn't set use the advertised host
  136. if listen == "" {
  137. listen = ahost
  138. }
  139. return net.JoinHostPort(listen, aport)
  140. }
  141. func check(err error) {
  142. if err != nil {
  143. fatal(err)
  144. }
  145. }
  146. //--------------------------------------
  147. // Log
  148. //--------------------------------------
  149. var logger *log.Logger
  150. func init() {
  151. logger = log.New(os.Stdout, "[etcd] ", log.Lmicroseconds)
  152. }
  153. func infof(msg string, v ...interface{}) {
  154. logger.Printf("INFO "+msg+"\n", v...)
  155. }
  156. func debugf(msg string, v ...interface{}) {
  157. if verbose {
  158. logger.Printf("DEBUG "+msg+"\n", v...)
  159. }
  160. }
  161. func debug(v ...interface{}) {
  162. if verbose {
  163. logger.Println("DEBUG " + fmt.Sprint(v...))
  164. }
  165. }
  166. func warnf(msg string, v ...interface{}) {
  167. logger.Printf("WARN "+msg+"\n", v...)
  168. }
  169. func warn(v ...interface{}) {
  170. logger.Println("WARN " + fmt.Sprint(v...))
  171. }
  172. func fatalf(msg string, v ...interface{}) {
  173. logger.Printf("FATAL "+msg+"\n", v...)
  174. os.Exit(1)
  175. }
  176. func fatal(v ...interface{}) {
  177. logger.Println("FATAL " + fmt.Sprint(v...))
  178. os.Exit(1)
  179. }
  180. //--------------------------------------
  181. // CPU profile
  182. //--------------------------------------
  183. func runCPUProfile() {
  184. f, err := os.Create(cpuprofile)
  185. if err != nil {
  186. fatal(err)
  187. }
  188. pprof.StartCPUProfile(f)
  189. c := make(chan os.Signal, 1)
  190. signal.Notify(c, os.Interrupt)
  191. go func() {
  192. for sig := range c {
  193. infof("captured %v, stopping profiler and exiting..", sig)
  194. pprof.StopCPUProfile()
  195. os.Exit(1)
  196. }
  197. }()
  198. }
  199. //--------------------------------------
  200. // Testing
  201. //--------------------------------------
  202. func directSet() {
  203. c := make(chan bool, 1000)
  204. for i := 0; i < 1000; i++ {
  205. go send(c)
  206. }
  207. for i := 0; i < 1000; i++ {
  208. <-c
  209. }
  210. }
  211. func send(c chan bool) {
  212. for i := 0; i < 10; i++ {
  213. command := &SetCommand{}
  214. command.Key = "foo"
  215. command.Value = "bar"
  216. command.ExpireTime = time.Unix(0, 0)
  217. r.Do(command)
  218. }
  219. c <- true
  220. }