Add library for dashboard API
[pharos.git] / tools / pharos-dashboard / src / notification_framework / notification.py
1 ##############################################################################
2 # Copyright (c) 2016 Max Breitenfeldt and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10
11 import json
12 import re
13
14 import pika
15
16
17 class Notification(object):
18     """
19     This class can be used by the dashboard and the labs to exchange notifications about booking
20     events and pod status. It utilizes rabbitmq to communicate.
21
22     Notifications are associated to an event and to a topic.
23     Events are:
24     [ 'booking_start', 'booking_stop', 'pod_status' ]
25     The topic is usually a POD name, ie:
26     'Intel POD 2'
27     """
28
29     def __init__(self, dashboard_url, verbose=False):
30         self.rabbitmq_broker = dashboard_url
31         self.verbose = verbose
32         self._registry = {}
33
34         self.connection = pika.BlockingConnection(pika.ConnectionParameters(
35             host=self.rabbitmq_broker))
36         self.channel = self.connection.channel()
37
38         self.channel.exchange_declare(exchange='notifications', type='topic')
39
40         self.result = self.channel.queue_declare(exclusive=True)
41         self.queue_name = self.result.method.queue
42
43     def register(self, function, event, regex):
44         """
45         Registers a function to be called for the specified event.
46         :param function: the function to register
47         :param event: the event type
48         :param regex: a regex to specify for wich topics the function will be called. Some
49         possible Expressions can be:
50         'Intel POD 2' : Intel POD 2
51         'Intel POD .*' : All Intel Pods
52         '.*' : All Topics
53         """
54
55         if event not in self._registry:
56             self._registry[event] = [(function, regex)]
57         else:
58             self._registry[event].append((function, regex))
59
60     def receive(self):
61         """
62         Start receiving notifications. This is a blocking operation, if a notification is received,
63         the registered functions will be called.
64         """
65         if self.verbose:
66             print('Start receiving Notifications. Keys: ', self._registry.keys())
67         self._receive_message(self._registry.keys())
68
69     def send(self, event, topic, content):
70         """
71         Send an event notification.
72         :param event: the event type
73         :param topic: the pod name
74         :param content: a JSON-serializable dictionary
75         """
76         message = {
77             'event': event,
78             'topic': topic,
79             'content': content
80         }
81         self._send_message(message)
82
83     def _send_message(self, event):
84         routing_key = event['type']
85         message = json.dumps(event)
86         self.channel.basic_publish(exchange='notifications',
87                                    routing_key=routing_key,
88                                    body=message,
89                                    properties=pika.BasicProperties(
90                                        content_type='application/json'
91                                    ))
92         if self.verbose:
93             print(" [x] Sent %r:%r" % (routing_key, message))
94
95     def _receive_message(self, binding_keys):
96         for key in binding_keys:
97             self.channel.queue_bind(exchange='notifications',
98                                     queue=self.queue_name,
99                                     routing_key=key)
100         self.channel.basic_consume(self._message_callback,
101                                    queue=self.queue_name,
102                                    no_ack=True)
103         self.channel.start_consuming()
104
105     def _message_callback(self, ch, method, properties, body):
106         if self.verbose:
107             print(" [x] Got %r:%r" % (method.routing_key, body))
108         if method.routing_key not in self._registry:
109             return
110         for func, regex in self._registry[method.routing_key]:
111             message = json.loads(body.decode())
112             match = re.match(regex, message['topic'])
113             if match:
114                 func(body)