123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- package sarama
- import (
- "bufio"
- "net"
- "sort"
- )
- type none struct{}
- type int32Slice []int32
- func (slice int32Slice) Len() int {
- return len(slice)
- }
- func (slice int32Slice) Less(i, j int) bool {
- return slice[i] < slice[j]
- }
- func (slice int32Slice) Swap(i, j int) {
- slice[i], slice[j] = slice[j], slice[i]
- }
- func dupeAndSort(input []int32) []int32 {
- ret := make([]int32, 0, len(input))
- for _, val := range input {
- ret = append(ret, val)
- }
- sort.Sort(int32Slice(ret))
- return ret
- }
- func withRecover(fn func()) {
- defer func() {
- handler := PanicHandler
- if handler != nil {
- if err := recover(); err != nil {
- handler(err)
- }
- }
- }()
- fn()
- }
- func safeAsyncClose(b *Broker) {
- tmp := b
- go withRecover(func() {
- if connected, _ := tmp.Connected(); connected {
- if err := tmp.Close(); err != nil {
- Logger.Println("Error closing broker", tmp.ID(), ":", err)
- }
- }
- })
- }
- type Encoder interface {
- Encode() ([]byte, error)
- Length() int
- }
- type StringEncoder string
- func (s StringEncoder) Encode() ([]byte, error) {
- return []byte(s), nil
- }
- func (s StringEncoder) Length() int {
- return len(s)
- }
- type ByteEncoder []byte
- func (b ByteEncoder) Encode() ([]byte, error) {
- return b, nil
- }
- func (b ByteEncoder) Length() int {
- return len(b)
- }
- type bufConn struct {
- net.Conn
- buf *bufio.Reader
- }
- func newBufConn(conn net.Conn) *bufConn {
- return &bufConn{
- Conn: conn,
- buf: bufio.NewReader(conn),
- }
- }
- func (bc *bufConn) Read(b []byte) (n int, err error) {
- return bc.buf.Read(b)
- }
|