package sarama import ( "sort" "time" ) // response message format is: // throttleMs(int32) [topic] // where topic is: // name(string) [partition] // where partition is: // id(int32) low_watermark(int64) error_code(int16) type DeleteRecordsResponse struct { Version int16 ThrottleTime time.Duration Topics map[string]*DeleteRecordsResponseTopic } func (d *DeleteRecordsResponse) encode(pe packetEncoder) error { pe.putInt32(int32(d.ThrottleTime / time.Millisecond)) if err := pe.putArrayLength(len(d.Topics)); err != nil { return err } keys := make([]string, 0, len(d.Topics)) for topic := range d.Topics { keys = append(keys, topic) } sort.Strings(keys) for _, topic := range keys { if err := pe.putString(topic); err != nil { return err } if err := d.Topics[topic].encode(pe); err != nil { return err } } return nil } func (d *DeleteRecordsResponse) decode(pd packetDecoder, version int16) error { d.Version = version throttleTime, err := pd.getInt32() if err != nil { return err } d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond n, err := pd.getArrayLength() if err != nil { return err } if n > 0 { d.Topics = make(map[string]*DeleteRecordsResponseTopic, n) for i := 0; i < n; i++ { topic, err := pd.getString() if err != nil { return err } details := new(DeleteRecordsResponseTopic) if err = details.decode(pd, version); err != nil { return err } d.Topics[topic] = details } } return nil } func (d *DeleteRecordsResponse) key() int16 { return 21 } func (d *DeleteRecordsResponse) version() int16 { return 0 } func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion { return V0_11_0_0 } type DeleteRecordsResponseTopic struct { Partitions map[int32]*DeleteRecordsResponsePartition } func (t *DeleteRecordsResponseTopic) encode(pe packetEncoder) error { if err := pe.putArrayLength(len(t.Partitions)); err != nil { return err } keys := make([]int32, 0, len(t.Partitions)) for partition := range t.Partitions { keys = append(keys, partition) } sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) for _, partition := range keys { pe.putInt32(partition) if err := t.Partitions[partition].encode(pe); err != nil { return err } } return nil } func (t *DeleteRecordsResponseTopic) decode(pd packetDecoder, version int16) error { n, err := pd.getArrayLength() if err != nil { return err } if n > 0 { t.Partitions = make(map[int32]*DeleteRecordsResponsePartition, n) for i := 0; i < n; i++ { partition, err := pd.getInt32() if err != nil { return err } details := new(DeleteRecordsResponsePartition) if err = details.decode(pd, version); err != nil { return err } t.Partitions[partition] = details } } return nil } type DeleteRecordsResponsePartition struct { LowWatermark int64 Err KError } func (t *DeleteRecordsResponsePartition) encode(pe packetEncoder) error { pe.putInt64(t.LowWatermark) pe.putInt16(int16(t.Err)) return nil } func (t *DeleteRecordsResponsePartition) decode(pd packetDecoder, version int16) error { lowWatermark, err := pd.getInt64() if err != nil { return err } t.LowWatermark = lowWatermark kErr, err := pd.getInt16() if err != nil { return err } t.Err = KError(kErr) return nil }