123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- package main
- import (
- "github.com/Shopify/sarama"
- "crypto/tls"
- "crypto/x509"
- "encoding/json"
- "flag"
- "fmt"
- "io/ioutil"
- "log"
- "net/http"
- "os"
- "strings"
- "time"
- )
- var (
- addr = flag.String("addr", ":8080", "The address to bind to")
- brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
- verbose = flag.Bool("verbose", false, "Turn on Sarama logging")
- certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
- keyFile = flag.String("key", "", "The optional key file for client authentication")
- caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
- verifySsl = flag.Bool("verify", false, "Optional verify ssl certificates chain")
- )
- func main() {
- flag.Parse()
- if *verbose {
- sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
- }
- if *brokers == "" {
- flag.PrintDefaults()
- os.Exit(1)
- }
- brokerList := strings.Split(*brokers, ",")
- log.Printf("Kafka brokers: %s", strings.Join(brokerList, ", "))
- server := &Server{
- DataCollector: newDataCollector(brokerList),
- AccessLogProducer: newAccessLogProducer(brokerList),
- }
- defer func() {
- if err := server.Close(); err != nil {
- log.Println("Failed to close server", err)
- }
- }()
- log.Fatal(server.Run(*addr))
- }
- func createTlsConfiguration() (t *tls.Config) {
- if *certFile != "" && *keyFile != "" && *caFile != "" {
- cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
- if err != nil {
- log.Fatal(err)
- }
- caCert, err := ioutil.ReadFile(*caFile)
- if err != nil {
- log.Fatal(err)
- }
- caCertPool := x509.NewCertPool()
- caCertPool.AppendCertsFromPEM(caCert)
- t = &tls.Config{
- Certificates: []tls.Certificate{cert},
- RootCAs: caCertPool,
- InsecureSkipVerify: *verifySsl,
- }
- }
-
- return t
- }
- type Server struct {
- DataCollector sarama.SyncProducer
- AccessLogProducer sarama.AsyncProducer
- }
- func (s *Server) Close() error {
- if err := s.DataCollector.Close(); err != nil {
- log.Println("Failed to shut down data collector cleanly", err)
- }
- if err := s.AccessLogProducer.Close(); err != nil {
- log.Println("Failed to shut down access log producer cleanly", err)
- }
- return nil
- }
- func (s *Server) Handler() http.Handler {
- return s.withAccessLog(s.collectQueryStringData())
- }
- func (s *Server) Run(addr string) error {
- httpServer := &http.Server{
- Addr: addr,
- Handler: s.Handler(),
- }
- log.Printf("Listening for requests on %s...\n", addr)
- return httpServer.ListenAndServe()
- }
- func (s *Server) collectQueryStringData() http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if r.URL.Path != "/" {
- http.NotFound(w, r)
- return
- }
-
-
- partition, offset, err := s.DataCollector.SendMessage(&sarama.ProducerMessage{
- Topic: "important",
- Value: sarama.StringEncoder(r.URL.RawQuery),
- })
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- fmt.Fprintf(w, "Failed to store your data:, %s", err)
- } else {
-
-
- fmt.Fprintf(w, "Your data is stored with unique identifier important/%d/%d", partition, offset)
- }
- })
- }
- type accessLogEntry struct {
- Method string `json:"method"`
- Host string `json:"host"`
- Path string `json:"path"`
- IP string `json:"ip"`
- ResponseTime float64 `json:"response_time"`
- encoded []byte
- err error
- }
- func (ale *accessLogEntry) ensureEncoded() {
- if ale.encoded == nil && ale.err == nil {
- ale.encoded, ale.err = json.Marshal(ale)
- }
- }
- func (ale *accessLogEntry) Length() int {
- ale.ensureEncoded()
- return len(ale.encoded)
- }
- func (ale *accessLogEntry) Encode() ([]byte, error) {
- ale.ensureEncoded()
- return ale.encoded, ale.err
- }
- func (s *Server) withAccessLog(next http.Handler) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- started := time.Now()
- next.ServeHTTP(w, r)
- entry := &accessLogEntry{
- Method: r.Method,
- Host: r.Host,
- Path: r.RequestURI,
- IP: r.RemoteAddr,
- ResponseTime: float64(time.Since(started)) / float64(time.Second),
- }
-
-
-
- s.AccessLogProducer.Input() <- &sarama.ProducerMessage{
- Topic: "access_log",
- Key: sarama.StringEncoder(r.RemoteAddr),
- Value: entry,
- }
- })
- }
- func newDataCollector(brokerList []string) sarama.SyncProducer {
-
-
-
- config := sarama.NewConfig()
- config.Producer.RequiredAcks = sarama.WaitForAll
- config.Producer.Retry.Max = 10
- config.Producer.Return.Successes = true
- tlsConfig := createTlsConfiguration()
- if tlsConfig != nil {
- config.Net.TLS.Config = tlsConfig
- config.Net.TLS.Enable = true
- }
-
-
-
-
- producer, err := sarama.NewSyncProducer(brokerList, config)
- if err != nil {
- log.Fatalln("Failed to start Sarama producer:", err)
- }
- return producer
- }
- func newAccessLogProducer(brokerList []string) sarama.AsyncProducer {
-
-
- config := sarama.NewConfig()
- tlsConfig := createTlsConfiguration()
- if tlsConfig != nil {
- config.Net.TLS.Enable = true
- config.Net.TLS.Config = tlsConfig
- }
- config.Producer.RequiredAcks = sarama.WaitForLocal
- config.Producer.Compression = sarama.CompressionSnappy
- config.Producer.Flush.Frequency = 500 * time.Millisecond
- producer, err := sarama.NewAsyncProducer(brokerList, config)
- if err != nil {
- log.Fatalln("Failed to start Sarama producer:", err)
- }
-
-
- go func() {
- for err := range producer.Errors() {
- log.Println("Failed to write access log entry:", err)
- }
- }()
- return producer
- }
|