package sarama

import (
	"encoding/binary"
	"time"
)

const (
	isTransactionalMask   = 0x10
	controlMask           = 0x20
	maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
)

//RecordHeader stores key and value for a record header
type RecordHeader struct {
	Key   []byte
	Value []byte
}

func (h *RecordHeader) encode(pe packetEncoder) error {
	if err := pe.putVarintBytes(h.Key); err != nil {
		return err
	}
	return pe.putVarintBytes(h.Value)
}

func (h *RecordHeader) decode(pd packetDecoder) (err error) {
	if h.Key, err = pd.getVarintBytes(); err != nil {
		return err
	}

	if h.Value, err = pd.getVarintBytes(); err != nil {
		return err
	}
	return nil
}

//Record is kafka record type
type Record struct {
	Headers []*RecordHeader

	Attributes     int8
	TimestampDelta time.Duration
	OffsetDelta    int64
	Key            []byte
	Value          []byte
	length         varintLengthField
}

func (r *Record) encode(pe packetEncoder) error {
	pe.push(&r.length)
	pe.putInt8(r.Attributes)
	pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
	pe.putVarint(r.OffsetDelta)
	if err := pe.putVarintBytes(r.Key); err != nil {
		return err
	}
	if err := pe.putVarintBytes(r.Value); err != nil {
		return err
	}
	pe.putVarint(int64(len(r.Headers)))

	for _, h := range r.Headers {
		if err := h.encode(pe); err != nil {
			return err
		}
	}

	return pe.pop()
}

func (r *Record) decode(pd packetDecoder) (err error) {
	if err = pd.push(&r.length); err != nil {
		return err
	}

	if r.Attributes, err = pd.getInt8(); err != nil {
		return err
	}

	timestamp, err := pd.getVarint()
	if err != nil {
		return err
	}
	r.TimestampDelta = time.Duration(timestamp) * time.Millisecond

	if r.OffsetDelta, err = pd.getVarint(); err != nil {
		return err
	}

	if r.Key, err = pd.getVarintBytes(); err != nil {
		return err
	}

	if r.Value, err = pd.getVarintBytes(); err != nil {
		return err
	}

	numHeaders, err := pd.getVarint()
	if err != nil {
		return err
	}

	if numHeaders >= 0 {
		r.Headers = make([]*RecordHeader, numHeaders)
	}
	for i := int64(0); i < numHeaders; i++ {
		hdr := new(RecordHeader)
		if err := hdr.decode(pd); err != nil {
			return err
		}
		r.Headers[i] = hdr
	}

	return pd.pop()
}