Add library for dashboard notification System 25/26725/1
authormaxbr <maxbr@mi.fu-berlin.de>
Thu, 5 Jan 2017 11:36:54 +0000 (12:36 +0100)
committermaxbr <maxbr@mi.fu-berlin.de>
Thu, 5 Jan 2017 11:36:54 +0000 (12:36 +0100)
JIRA: PHAROS-265

Change-Id: Ia33235c5160ef6b36b27a6fe1a2eb97a45e72367
Signed-off-by: maxbr <maxbr@mi.fu-berlin.de>
15 files changed:
tools/pharos-dashboard/config.env.sample
tools/pharos-dashboard/dashboard_notification/__init__.py [moved from tools/pharos-dashboard/src/notification_framework/__init__.py with 100% similarity]
tools/pharos-dashboard/dashboard_notification/notification.py [new file with mode: 0644]
tools/pharos-dashboard/docker-compose.yml
tools/pharos-dashboard/rabbitmq/Dockerfile [new file with mode: 0644]
tools/pharos-dashboard/rabbitmq/init.sh [new file with mode: 0755]
tools/pharos-dashboard/src/__init__.py [new file with mode: 0644]
tools/pharos-dashboard/src/notification/migrations/0001_initial.py
tools/pharos-dashboard/src/notification/models.py
tools/pharos-dashboard/src/notification/tasks.py
tools/pharos-dashboard/src/notification_framework/notification.py [deleted file]
tools/pharos-dashboard/web/Dockerfile
tools/pharos-dashboard/web/requirements.txt
tools/pharos-dashboard/worker/Dockerfile
tools/pharos-dashboard/worker/requirements.txt

index bd93616..892faac 100644 (file)
@@ -15,4 +15,8 @@ OAUTH_CONSUMER_SECRET=sample_secret
 
 JIRA_URL=sample_url
 JIRA_USER_NAME=sample_jira_user
-JIRA_USER_PASSWORD=sample_jira_pass
\ No newline at end of file
+JIRA_USER_PASSWORD=sample_jira_pass
+
+# Rabbitmq
+RABBITMQ_USER=opnfv
+RABBITMQ_PASSWORD=opnfvopnfv
diff --git a/tools/pharos-dashboard/dashboard_notification/notification.py b/tools/pharos-dashboard/dashboard_notification/notification.py
new file mode 100644 (file)
index 0000000..6843c76
--- /dev/null
@@ -0,0 +1,120 @@
+##############################################################################
+# Copyright (c) 2016 Max Breitenfeldt and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import jsonpickle
+import pika
+
+
+class Message(object):
+    def __init__(self, type, topic, content):
+        self.type = type
+        self.topic = topic
+        self.content = content
+
+
+class Notification(object):
+    """
+    This class can be used by the dashboard and the labs to exchange notifications about booking
+    events and pod status. It utilizes rabbitmq to communicate.
+
+    Notifications are associated to an event and to a topic.
+    Events are:
+    [ 'booking_start', 'booking_end']
+    The topic is usually a POD name, ie:
+    'Intel POD 2'
+    """
+
+    def __init__(self, dashboard_url, user=None, password=None, verbose=False):
+        self.rabbitmq_broker = dashboard_url
+        self.verbose = verbose
+        if user is None and password is None:
+            self._connection = pika.BlockingConnection(pika.ConnectionParameters(
+                host=self.rabbitmq_broker))
+        else:
+            self.credentials = pika.PlainCredentials(user, password)
+            self._connection = pika.BlockingConnection(pika.ConnectionParameters(
+                credentials=self.credentials,
+                host=self.rabbitmq_broker))
+        self._registry = {}
+        self._channel = self._connection.channel()
+        self._channel.exchange_declare(exchange='notifications', type='topic')
+        self._result = self._channel.queue_declare(exclusive=True, durable=True)
+        self._queue_name = self._result.method.queue
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self._connection.close()
+
+    def register(self, function, topic, type='all'):
+        """
+        Registers a function to be called for the specified event.
+        :param function: the function to register
+        :param event: the event type
+        :param regex: a regex to specify for wich topics the function will be called. Some
+        possible Expressions can be:
+        'Intel POD 2' : Intel POD 2
+        """
+
+        if topic not in self._registry:
+            self._registry[topic] = [(function, type)]
+        else:
+            self._registry[topic].append((function, type))
+
+    def receive(self):
+        """
+        Start receiving notifications. This is a blocking operation, if a notification is received,
+        the registered functions will be called.
+        """
+        if self.verbose:
+            print('Start receiving Notifications. Keys: ', self._registry.keys())
+        self._receive_message(self._registry.keys())
+
+    def send(self, message):
+        """
+        Send an event notification.
+        :param event: the event type
+        :param topic: the pod name
+        :param content: a JSON-serializable dictionary
+        """
+        self._send_message(message)
+
+    def _send_message(self, message):
+        routing_key = message.topic
+        message_json = jsonpickle.encode(message)
+        self._channel.basic_publish(exchange='notifications',
+                                    routing_key=routing_key,
+                                    body=message_json,
+                                    properties=pika.BasicProperties(
+                                        content_type='application/json',
+                                        delivery_mode=2,  # make message persistent
+                                    ))
+        if self.verbose:
+            print(" [x] Sent %r:%r" % (routing_key, message_json))
+
+    def _receive_message(self, binding_keys):
+        for key in binding_keys:
+            self._channel.queue_bind(exchange='notifications',
+                                     queue=self._queue_name,
+                                     routing_key=key)
+        self._channel.basic_consume(self._message_callback,
+                                    queue=self._queue_name)
+        self._channel.start_consuming()
+
+    def _message_callback(self, ch, method, properties, body):
+        if self.verbose:
+            print(" [x] Got %r:%r" % (method.routing_key, body))
+        if method.routing_key not in self._registry:
+            return
+        for func, type in self._registry[method.routing_key]:
+            message = jsonpickle.decode(body.decode())
+            if message.type == type:
+                func(message)
+        ch.basic_ack(delivery_tag=method.delivery_tag)
index b487620..d2d672a 100644 (file)
@@ -34,7 +34,7 @@ services:
             - postgres
         env_file: config.env
         volumes:
-            - ./src:/src
+            - ./:/pharos_dashboard
             - /var/lib/pharos_dashboard/static:/static
             - /var/lib/pharos_dashboard/media:/media
         expose:
@@ -51,19 +51,20 @@ services:
 
     rabbitmq:
         restart: always
-        image: rabbitmq:latest
+        build: ./rabbitmq/
         container_name: rm01
-        expose:
-          - "5672"
-
+        env_file: config.env
+        ports:
+          - "5672:5672"
+          
     worker:
         restart: always
         build: ./worker/
-        command: bash -c "celery -A pharos_dashboard worker -l info -B --schedule=~/celerybeat-schedule""
+        command: bash -c "celery -A pharos_dashboard worker -l info -B --schedule=~/celerybeat-schedule"
         env_file: config.env
         links:
             - postgres
             - rabbitmq
         volumes:
-            - ./src:/src
-     
\ No newline at end of file
+            - ./:/pharos_dashboard
+     
diff --git a/tools/pharos-dashboard/rabbitmq/Dockerfile b/tools/pharos-dashboard/rabbitmq/Dockerfile
new file mode 100644 (file)
index 0000000..71162a4
--- /dev/null
@@ -0,0 +1,4 @@
+FROM rabbitmq
+
+ADD init.sh /init.sh
+CMD ["/init.sh"]
\ No newline at end of file
diff --git a/tools/pharos-dashboard/rabbitmq/init.sh b/tools/pharos-dashboard/rabbitmq/init.sh
new file mode 100755 (executable)
index 0000000..f8ac089
--- /dev/null
@@ -0,0 +1,10 @@
+#!/bin/sh
+
+# Create Rabbitmq user
+( sleep 20 ; \
+rabbitmqctl add_user $RABBITMQ_USER $RABBITMQ_PASSWORD 2>/dev/null ; \
+rabbitmqctl set_user_tags $RABBITMQ_USER administrator ; \
+rabbitmqctl set_permissions -p / $RABBITMQ_USER  ".*" ".*" ".*" ; \
+echo "*** User '$RABBITMQ_USER' with password '$RABBITMQ_PASSWORD' completed. ***") &
+
+rabbitmq-server $@
diff --git a/tools/pharos-dashboard/src/__init__.py b/tools/pharos-dashboard/src/__init__.py
new file mode 100644 (file)
index 0000000..ce1acf3
--- /dev/null
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2016 Max Breitenfeldt and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
\ No newline at end of file
index d4af751..8b8414e 100644 (file)
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Generated by Django 1.10 on 2016-09-23 11:36
+# Generated by Django 1.10 on 2016-11-03 13:33
 from __future__ import unicode_literals
 
 from django.db import migrations, models
index 0ee275d..89b3023 100644 (file)
@@ -19,15 +19,15 @@ class BookingNotification(models.Model):
 
     def get_content(self):
         return {
-            'start': self.booking.start.isoformat(),
-            'end': self.booking.end.isoformat(),
+            'resource_id': self.booking.resource.id,
+            'booking_id': self.booking.id,
             'user': self.booking.user.username,
-            'purpose': self.booking.purpose
+            'user_id': self.booking.user.id,
         }
 
     def save(self, *args, **kwargs):
         notifications = self.booking.bookingnotification_set.filter(type=self.type).exclude(
             id=self.id)
-        if notifications.count() > 0:
-            raise ValueError('Doubled Notification')
-        return super(BookingNotification, self).save(*args, **kwargs)
\ No newline at end of file
+        #if notifications.count() > 0:
+        #    raise ValueError('Doubled Notification')
+        return super(BookingNotification, self).save(*args, **kwargs)
index 4173433..e2b34ca 100644 (file)
@@ -8,6 +8,8 @@
 ##############################################################################
 
 
+import os
+import sys
 from datetime import timedelta
 
 from celery import shared_task
@@ -15,19 +17,33 @@ from django.conf import settings
 from django.utils import timezone
 
 from notification.models import BookingNotification
-from notification_framework.notification import Notification
+
+# this adds the top level directory to the python path, this is needed so that we can access the
+# notification library
+sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
+
+from dashboard_notification.notification import Notification, Message
 
 
 @shared_task
 def send_booking_notifications():
-    messaging = Notification(dashboard_url=settings.RABBITMQ_URL)
-
-    now = timezone.now()
-    notifications = BookingNotification.objects.filter(submitted=False,
-                                                       submit_time__gt=now,
-                                                       submit_time__lt=now + timedelta(minutes=5))
-    for notification in notifications:
-        messaging.send(notification.type, notification.booking.resource.name,
-                       notification.get_content())
-        notification.submitted = True
-        notification.save()
+    with Notification(dashboard_url=settings.RABBITMQ_URL) as messaging:
+        now = timezone.now()
+        notifications = BookingNotification.objects.filter(submitted=False,
+                                                           submit_time__gt=now - timedelta(minutes=1),
+                                                           submit_time__lt=now + timedelta(minutes=5))
+        for notification in notifications:
+            message = Message(type=notification.type, topic=notification.booking.resource.name,
+                              content=notification.get_content())
+            messaging.send(message)
+            notification.submitted = True
+            notification.save()
+
+@shared_task
+def notification_debug():
+    with Notification(dashboard_url=settings.RABBITMQ_URL) as messaging:
+        notifications = BookingNotification.objects.all()
+        for notification in notifications:
+            message = Message(type=notification.type, topic=notification.booking.resource.name,
+                              content=notification.get_content())
+            messaging.send(message)
diff --git a/tools/pharos-dashboard/src/notification_framework/notification.py b/tools/pharos-dashboard/src/notification_framework/notification.py
deleted file mode 100644 (file)
index 84fbcff..0000000
+++ /dev/null
@@ -1,114 +0,0 @@
-##############################################################################
-# Copyright (c) 2016 Max Breitenfeldt and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-
-
-import json
-import re
-
-import pika
-
-
-class Notification(object):
-    """
-    This class can be used by the dashboard and the labs to exchange notifications about booking
-    events and pod status. It utilizes rabbitmq to communicate.
-
-    Notifications are associated to an event and to a topic.
-    Events are:
-    [ 'booking_start', 'booking_stop', 'pod_status' ]
-    The topic is usually a POD name, ie:
-    'Intel POD 2'
-    """
-
-    def __init__(self, dashboard_url, verbose=False):
-        self.rabbitmq_broker = dashboard_url
-        self.verbose = verbose
-        self._registry = {}
-
-        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
-            host=self.rabbitmq_broker))
-        self.channel = self.connection.channel()
-
-        self.channel.exchange_declare(exchange='notifications', type='topic')
-
-        self.result = self.channel.queue_declare(exclusive=True)
-        self.queue_name = self.result.method.queue
-
-    def register(self, function, event, regex):
-        """
-        Registers a function to be called for the specified event.
-        :param function: the function to register
-        :param event: the event type
-        :param regex: a regex to specify for wich topics the function will be called. Some
-        possible Expressions can be:
-        'Intel POD 2' : Intel POD 2
-        'Intel POD .*' : All Intel Pods
-        '.*' : All Topics
-        """
-
-        if event not in self._registry:
-            self._registry[event] = [(function, regex)]
-        else:
-            self._registry[event].append((function, regex))
-
-    def receive(self):
-        """
-        Start receiving notifications. This is a blocking operation, if a notification is received,
-        the registered functions will be called.
-        """
-        if self.verbose:
-            print('Start receiving Notifications. Keys: ', self._registry.keys())
-        self._receive_message(self._registry.keys())
-
-    def send(self, event, topic, content):
-        """
-        Send an event notification.
-        :param event: the event type
-        :param topic: the pod name
-        :param content: a JSON-serializable dictionary
-        """
-        message = {
-            'event': event,
-            'topic': topic,
-            'content': content
-        }
-        self._send_message(message)
-
-    def _send_message(self, event):
-        routing_key = event['type']
-        message = json.dumps(event)
-        self.channel.basic_publish(exchange='notifications',
-                                   routing_key=routing_key,
-                                   body=message,
-                                   properties=pika.BasicProperties(
-                                       content_type='application/json'
-                                   ))
-        if self.verbose:
-            print(" [x] Sent %r:%r" % (routing_key, message))
-
-    def _receive_message(self, binding_keys):
-        for key in binding_keys:
-            self.channel.queue_bind(exchange='notifications',
-                                    queue=self.queue_name,
-                                    routing_key=key)
-        self.channel.basic_consume(self._message_callback,
-                                   queue=self.queue_name,
-                                   no_ack=True)
-        self.channel.start_consuming()
-
-    def _message_callback(self, ch, method, properties, body):
-        if self.verbose:
-            print(" [x] Got %r:%r" % (method.routing_key, body))
-        if method.routing_key not in self._registry:
-            return
-        for func, regex in self._registry[method.routing_key]:
-            message = json.loads(body.decode())
-            match = re.match(regex, message['topic'])
-            if match:
-                func(body)
index d543235..228b0b0 100644 (file)
@@ -3,5 +3,5 @@ ENV PYTHONUNBUFFERED 1
 RUN mkdir /config
 ADD ./requirements.txt /config/
 RUN pip install -r /config/requirements.txt
-RUN mkdir /src;
-WORKDIR /src
+RUN mkdir -p /pharos_dashboard/src
+WORKDIR /pharos_dashboard/src
index edb20d0..f80f1c0 100644 (file)
@@ -8,6 +8,7 @@ django-registration==2.1.2
 djangorestframework==3.4.6
 gunicorn==19.6.0
 jira==1.0.7
+jsonpickle==0.9.3
 oauth2==1.9.0.post1
 oauthlib==1.1.2
 pika==0.10.0
index 86395e0..c1e8aff 100644 (file)
@@ -5,4 +5,4 @@ ADD ./requirements.txt /config/
 RUN pip install -r /config/requirements.txt
 RUN useradd -ms /bin/bash celery
 USER celery
-WORKDIR /src
+WORKDIR /pharos_dashboard/src
index edb20d0..f80f1c0 100644 (file)
@@ -8,6 +8,7 @@ django-registration==2.1.2
 djangorestframework==3.4.6
 gunicorn==19.6.0
 jira==1.0.7
+jsonpickle==0.9.3
 oauth2==1.9.0.post1
 oauthlib==1.1.2
 pika==0.10.0