src: Add DMA localagent
[barometer.git] / src / dma / vendor / github.com / streadway / amqp / confirms.go
diff --git a/src/dma/vendor/github.com/streadway/amqp/confirms.go b/src/dma/vendor/github.com/streadway/amqp/confirms.go
new file mode 100644 (file)
index 0000000..06cbaa7
--- /dev/null
@@ -0,0 +1,94 @@
+package amqp
+
+import "sync"
+
+// confirms resequences and notifies one or multiple publisher confirmation listeners
+type confirms struct {
+       m         sync.Mutex
+       listeners []chan Confirmation
+       sequencer map[uint64]Confirmation
+       published uint64
+       expecting uint64
+}
+
+// newConfirms allocates a confirms
+func newConfirms() *confirms {
+       return &confirms{
+               sequencer: map[uint64]Confirmation{},
+               published: 0,
+               expecting: 1,
+       }
+}
+
+func (c *confirms) Listen(l chan Confirmation) {
+       c.m.Lock()
+       defer c.m.Unlock()
+
+       c.listeners = append(c.listeners, l)
+}
+
+// publish increments the publishing counter
+func (c *confirms) Publish() uint64 {
+       c.m.Lock()
+       defer c.m.Unlock()
+
+       c.published++
+       return c.published
+}
+
+// confirm confirms one publishing, increments the expecting delivery tag, and
+// removes bookkeeping for that delivery tag.
+func (c *confirms) confirm(confirmation Confirmation) {
+       delete(c.sequencer, c.expecting)
+       c.expecting++
+       for _, l := range c.listeners {
+               l <- confirmation
+       }
+}
+
+// resequence confirms any out of order delivered confirmations
+func (c *confirms) resequence() {
+       for c.expecting <= c.published {
+               sequenced, found := c.sequencer[c.expecting]
+               if !found {
+                       return
+               }
+               c.confirm(sequenced)
+       }
+}
+
+// one confirms one publishing and all following in the publishing sequence
+func (c *confirms) One(confirmed Confirmation) {
+       c.m.Lock()
+       defer c.m.Unlock()
+
+       if c.expecting == confirmed.DeliveryTag {
+               c.confirm(confirmed)
+       } else {
+               c.sequencer[confirmed.DeliveryTag] = confirmed
+       }
+       c.resequence()
+}
+
+// multiple confirms all publishings up until the delivery tag
+func (c *confirms) Multiple(confirmed Confirmation) {
+       c.m.Lock()
+       defer c.m.Unlock()
+
+       for c.expecting <= confirmed.DeliveryTag {
+               c.confirm(Confirmation{c.expecting, confirmed.Ack})
+       }
+       c.resequence()
+}
+
+// Close closes all listeners, discarding any out of sequence confirmations
+func (c *confirms) Close() error {
+       c.m.Lock()
+       defer c.m.Unlock()
+
+       for _, l := range c.listeners {
+               close(l)
+       }
+       c.listeners = nil
+       return nil
+}