فهرست منبع

checkpoint wip message encoding/decoding

Evan Huus 12 سال پیش
والد
کامیت
6567b45aaf
2فایلهای تغییر یافته به همراه96 افزوده شده و 0 حذف شده
  1. 42 0
      message.go
  2. 54 0
      message_set.go

+ 42 - 0
message.go

@@ -0,0 +1,42 @@
+package kafka
+
+type topicMetadata struct {
+	err        KError
+	name       *string
+	partitions []partitionMetadata
+}
+
+func (tm *topicMetadata) encode(pe packetEncoder) {
+	pe.putError(tm.err)
+	pe.putString(tm.name)
+	pe.putArrayCount(len(tm.partitions))
+	for i := range tm.partitions {
+		(&tm.partitions[i]).encode(pe)
+	}
+}
+
+func (tm *topicMetadata) decode(pd packetDecoder) (err error) {
+	tm.err, err = pd.getError()
+	if err != nil {
+		return err
+	}
+
+	tm.name, err = pd.getString()
+	if err != nil {
+		return err
+	}
+
+	n, err := pd.getArrayCount()
+	if err != nil {
+		return err
+	}
+	tm.partitions = make([]partitionMetadata, n)
+	for i := 0; i < n; i++ {
+		err = (&tm.partitions[i]).decode(pd)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}

+ 54 - 0
message_set.go

@@ -0,0 +1,54 @@
+package kafka
+
+type messageSetBlock struct {
+	offset int64
+	size int32
+	msg message
+}
+
+func (msb *messageSetBlock) encode(pe packetEncoder) {
+	pe.putInt64(msb.offset)
+	pe.putInt32(msb.size)
+	(&msb.msg).encode(pe)
+}
+
+func (msb *messageSetBlock) decode(pd packetDecoder) (err error) {
+	msb.offset, err = pd.getInt64()
+	if err != nil {
+		return err
+	}
+
+	msb.size, err = pd.getInt32()
+	if err != nil {
+		return err
+	}
+
+	err = (&msb.message).decode(pd)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+type messageSet struct {
+	msgs []*messageSetBlock
+}
+
+func (ms *messageSet) encode(pe packetEncoder) {
+	for i := range ms.msgs {
+		ms.msgs[i].encode(pe)
+	}
+}
+
+func (ms *messageSet) decode(pd packetDecoder) (err error) {
+	ms.msgs = make([]*messageSetBlock)
+
+	msb = new(messageSetBlock)
+	err = msb.decode(pd)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}