Gracefully Handle Missing Jenkins Utilization for Dev Pod
[laas.git] / dashboard_notification / notification.py
1 ##############################################################################
2 # Copyright (c) 2016 Max Breitenfeldt and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import jsonpickle
11 import pika
12
13
14 class Message(object):
15     def __init__(self, type, topic, content):
16         self.type = type
17         self.topic = topic
18         self.content = content
19
20
21 class Notification(object):
22     """
23     This class can be used by the dashboard and the labs to exchange notifications about booking
24     events and pod status. It utilizes rabbitmq to communicate.
25
26     Notifications are associated to an event and to a topic.
27     Events are:
28     [ 'booking_start', 'booking_end']
29     The topic is usually a POD name, ie:
30     'Intel POD 2'
31     """
32
33     def __init__(self, dashboard_url, user=None, password=None, verbose=False):
34         self.rabbitmq_broker = dashboard_url
35         self.verbose = verbose
36         if user is None and password is None:
37             self._connection = pika.BlockingConnection(pika.ConnectionParameters(
38                 host=self.rabbitmq_broker))
39         else:
40             self.credentials = pika.PlainCredentials(user, password)
41             self._connection = pika.BlockingConnection(pika.ConnectionParameters(
42                 credentials=self.credentials,
43                 host=self.rabbitmq_broker))
44         self._registry = {}
45         self._channel = self._connection.channel()
46         self._channel.exchange_declare(exchange='notifications', type='topic')
47         self._result = self._channel.queue_declare(exclusive=True, durable=True)
48         self._queue_name = self._result.method.queue
49
50     def __enter__(self):
51         return self
52
53     def __exit__(self, exc_type, exc_val, exc_tb):
54         self._connection.close()
55
56     def register(self, function, topic, type='all'):
57         """
58         Registers a function to be called for the specified event.
59         :param function: the function to register
60         :param event: the event type
61         :param regex: a regex to specify for wich topics the function will be called. Some
62         possible Expressions can be:
63         'Intel POD 2' : Intel POD 2
64         """
65
66         if topic not in self._registry:
67             self._registry[topic] = [(function, type)]
68         else:
69             self._registry[topic].append((function, type))
70
71     def receive(self):
72         """
73         Start receiving notifications. This is a blocking operation, if a notification is received,
74         the registered functions will be called.
75         """
76         if self.verbose:
77             print('Start receiving Notifications. Keys: ', self._registry.keys())
78         self._receive_message(self._registry.keys())
79
80     def send(self, message):
81         """
82         Send an event notification.
83         :param event: the event type
84         :param topic: the pod name
85         :param content: a JSON-serializable dictionary
86         """
87         self._send_message(message)
88
89     def _send_message(self, message):
90         routing_key = message.topic
91         message_json = jsonpickle.encode(message)
92         self._channel.basic_publish(exchange='notifications',
93                                     routing_key=routing_key,
94                                     body=message_json,
95                                     properties=pika.BasicProperties(
96                                         content_type='application/json',
97                                         delivery_mode=2,  # make message persistent
98                                     ))
99         if self.verbose:
100             print(" [x] Sent %r:%r" % (routing_key, message_json))
101
102     def _receive_message(self, binding_keys):
103         for key in binding_keys:
104             self._channel.queue_bind(exchange='notifications',
105                                      queue=self._queue_name,
106                                      routing_key=key)
107         self._channel.basic_consume(self._message_callback,
108                                     queue=self._queue_name)
109         self._channel.start_consuming()
110
111     def _message_callback(self, ch, method, properties, body):
112         if self.verbose:
113             print(" [x] Got %r:%r" % (method.routing_key, body))
114         if method.routing_key not in self._registry:
115             return
116         for func, type in self._registry[method.routing_key]:
117             message = jsonpickle.decode(body.decode())
118             if message.type == type:
119                 func(message)
120         ch.basic_ack(delivery_tag=method.delivery_tag)