1 // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 // Source code and contact info at http://github.com/streadway/amqp
18 func (w *writer) WriteFrame(frame frame) (err error) {
19 if err = frame.write(w.w); err != nil {
23 if buf, ok := w.w.(*bufio.Writer); ok {
30 func (f *methodFrame) write(w io.Writer) (err error) {
31 var payload bytes.Buffer
34 return errors.New("malformed frame: missing method")
37 class, method := f.Method.id()
39 if err = binary.Write(&payload, binary.BigEndian, class); err != nil {
43 if err = binary.Write(&payload, binary.BigEndian, method); err != nil {
47 if err = f.Method.write(&payload); err != nil {
51 return writeFrame(w, frameMethod, f.ChannelId, payload.Bytes())
57 func (f *heartbeatFrame) write(w io.Writer) (err error) {
58 return writeFrame(w, frameHeartbeat, f.ChannelId, []byte{})
63 // +----------+--------+-----------+----------------+------------- - -
64 // | class-id | weight | body size | property flags | property list...
65 // +----------+--------+-----------+----------------+------------- - -
66 // short short long long short remainder...
68 func (f *headerFrame) write(w io.Writer) (err error) {
69 var payload bytes.Buffer
70 var zeroTime time.Time
72 if err = binary.Write(&payload, binary.BigEndian, f.ClassId); err != nil {
76 if err = binary.Write(&payload, binary.BigEndian, f.weight); err != nil {
80 if err = binary.Write(&payload, binary.BigEndian, f.Size); err != nil {
84 // First pass will build the mask to be serialized, second pass will serialize
85 // each of the fields that appear in the mask.
89 if len(f.Properties.ContentType) > 0 {
90 mask = mask | flagContentType
92 if len(f.Properties.ContentEncoding) > 0 {
93 mask = mask | flagContentEncoding
95 if f.Properties.Headers != nil && len(f.Properties.Headers) > 0 {
96 mask = mask | flagHeaders
98 if f.Properties.DeliveryMode > 0 {
99 mask = mask | flagDeliveryMode
101 if f.Properties.Priority > 0 {
102 mask = mask | flagPriority
104 if len(f.Properties.CorrelationId) > 0 {
105 mask = mask | flagCorrelationId
107 if len(f.Properties.ReplyTo) > 0 {
108 mask = mask | flagReplyTo
110 if len(f.Properties.Expiration) > 0 {
111 mask = mask | flagExpiration
113 if len(f.Properties.MessageId) > 0 {
114 mask = mask | flagMessageId
116 if f.Properties.Timestamp != zeroTime {
117 mask = mask | flagTimestamp
119 if len(f.Properties.Type) > 0 {
120 mask = mask | flagType
122 if len(f.Properties.UserId) > 0 {
123 mask = mask | flagUserId
125 if len(f.Properties.AppId) > 0 {
126 mask = mask | flagAppId
129 if err = binary.Write(&payload, binary.BigEndian, mask); err != nil {
133 if hasProperty(mask, flagContentType) {
134 if err = writeShortstr(&payload, f.Properties.ContentType); err != nil {
138 if hasProperty(mask, flagContentEncoding) {
139 if err = writeShortstr(&payload, f.Properties.ContentEncoding); err != nil {
143 if hasProperty(mask, flagHeaders) {
144 if err = writeTable(&payload, f.Properties.Headers); err != nil {
148 if hasProperty(mask, flagDeliveryMode) {
149 if err = binary.Write(&payload, binary.BigEndian, f.Properties.DeliveryMode); err != nil {
153 if hasProperty(mask, flagPriority) {
154 if err = binary.Write(&payload, binary.BigEndian, f.Properties.Priority); err != nil {
158 if hasProperty(mask, flagCorrelationId) {
159 if err = writeShortstr(&payload, f.Properties.CorrelationId); err != nil {
163 if hasProperty(mask, flagReplyTo) {
164 if err = writeShortstr(&payload, f.Properties.ReplyTo); err != nil {
168 if hasProperty(mask, flagExpiration) {
169 if err = writeShortstr(&payload, f.Properties.Expiration); err != nil {
173 if hasProperty(mask, flagMessageId) {
174 if err = writeShortstr(&payload, f.Properties.MessageId); err != nil {
178 if hasProperty(mask, flagTimestamp) {
179 if err = binary.Write(&payload, binary.BigEndian, uint64(f.Properties.Timestamp.Unix())); err != nil {
183 if hasProperty(mask, flagType) {
184 if err = writeShortstr(&payload, f.Properties.Type); err != nil {
188 if hasProperty(mask, flagUserId) {
189 if err = writeShortstr(&payload, f.Properties.UserId); err != nil {
193 if hasProperty(mask, flagAppId) {
194 if err = writeShortstr(&payload, f.Properties.AppId); err != nil {
199 return writeFrame(w, frameHeader, f.ChannelId, payload.Bytes())
204 // Payload is one byterange from the full body who's size is declared in the
206 func (f *bodyFrame) write(w io.Writer) (err error) {
207 return writeFrame(w, frameBody, f.ChannelId, f.Body)
210 func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err error) {
211 end := []byte{frameEnd}
212 size := uint(len(payload))
214 _, err = w.Write([]byte{
216 byte((channel & 0xff00) >> 8),
217 byte((channel & 0x00ff) >> 0),
218 byte((size & 0xff000000) >> 24),
219 byte((size & 0x00ff0000) >> 16),
220 byte((size & 0x0000ff00) >> 8),
221 byte((size & 0x000000ff) >> 0),
228 if _, err = w.Write(payload); err != nil {
232 if _, err = w.Write(end); err != nil {
239 func writeShortstr(w io.Writer, s string) (err error) {
242 var length = uint8(len(b))
244 if err = binary.Write(w, binary.BigEndian, length); err != nil {
248 if _, err = w.Write(b[:length]); err != nil {
255 func writeLongstr(w io.Writer, s string) (err error) {
258 var length = uint32(len(b))
260 if err = binary.Write(w, binary.BigEndian, length); err != nil {
264 if _, err = w.Write(b[:length]); err != nil {
287 func writeField(w io.Writer, value interface{}) (err error) {
291 switch v := value.(type) {
308 binary.BigEndian.PutUint16(buf[1:3], uint16(v))
313 binary.BigEndian.PutUint32(buf[1:5], uint32(v))
318 binary.BigEndian.PutUint32(buf[1:5], uint32(v))
323 binary.BigEndian.PutUint64(buf[1:9], uint64(v))
328 binary.BigEndian.PutUint32(buf[1:5], math.Float32bits(v))
333 binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v))
338 buf[1] = byte(v.Scale)
339 binary.BigEndian.PutUint32(buf[2:6], uint32(v.Value))
344 binary.BigEndian.PutUint32(buf[1:5], uint32(len(v)))
345 enc = append(buf[:5], []byte(v)...)
347 case []interface{}: // field-array
350 sec := new(bytes.Buffer)
351 for _, val := range v {
352 if err = writeField(sec, val); err != nil {
357 binary.BigEndian.PutUint32(buf[1:5], uint32(sec.Len()))
358 if _, err = w.Write(buf[:5]); err != nil {
362 if _, err = w.Write(sec.Bytes()); err != nil {
370 binary.BigEndian.PutUint64(buf[1:9], uint64(v.Unix()))
374 if _, err = w.Write([]byte{'F'}); err != nil {
377 return writeTable(w, v)
381 binary.BigEndian.PutUint32(buf[1:5], uint32(len(v)))
382 if _, err = w.Write(buf[0:5]); err != nil {
385 if _, err = w.Write(v); err != nil {
398 _, err = w.Write(enc)
403 func writeTable(w io.Writer, table Table) (err error) {
406 for key, val := range table {
407 if err = writeShortstr(&buf, key); err != nil {
410 if err = writeField(&buf, val); err != nil {
415 return writeLongstr(w, string(buf.Bytes()))