JIRA: DOMINO-27 Extend publish RPC call 01/38101/1
authorUlas Kozat <ulas.kozat@gmail.com>
Tue, 25 Jul 2017 18:23:48 +0000 (11:23 -0700)
committerUlas Kozat <ulas.kozat@gmail.com>
Tue, 25 Jul 2017 18:30:34 +0000 (11:30 -0700)
Change-Id: If8cba13e3237c034ceace68cf4b14f3090612ab5
Signed-off-by: Ulas Kozat <ulas.kozat@gmail.com>
DominoServer.py
domino.thrift
lib/dominoRPC/ttypes.py

index 679ca42..8716799 100755 (executable)
@@ -237,7 +237,7 @@ class CommunicationHandler:
     pub_r.domino_udid = SERVER_UDID
     pub_r.seq_no = self.seqno
     pub_r.responseCode = SUCCESS
-    pub_r.template_UDID = pub_msg.template_UUID
+    pub_r.template_UUID = pub_msg.template_UUID
     self.seqno = self.seqno + 1
 
     if (pub_msg.template_UUID is not None) and (self.dominoServer.TUID2Publisher.has_key(pub_msg.template_UUID) == False):
@@ -303,7 +303,7 @@ class CommunicationHandler:
     # Otherwise update the existing domains subscribed to TUID
     unsuccessful_updates = []
     if pub_msg.template_UUID is None:
-      pub_r.template_UDID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID
+      pub_r.template_UUID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID
     else:
       logging.debug('TEMPLATE UUID %s exists, verify publisher and update subscribers', pub_msg.template_UUID)
       if self.dominoServer.TUID2Publisher[pub_msg.template_UUID] != pub_msg.domino_udid: #publisher is not the owner, reject
@@ -311,16 +311,16 @@ class CommunicationHandler:
         pub_r.responseCode = FAILED
         return pub_r  
       else: #Template exists, we need to find clients that are no longer in the subscription list list
-        TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) - set(file_paths.keys()))
+        TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UUID]) - set(file_paths.keys()))
         if len(TUID_unsubscribed_list) > 0:
-          logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UDID)
+          logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UUID)
         # Send empty bodied templates to domains which no longer has any assigned resource
         template_lines = []
         for i in range(len(TUID_unsubscribed_list)):
           domino_client_ip = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].ipaddr
           domino_client_port = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].tcpport  
           try:
-            self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)        
+            self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UUID)        
           except:       
             logging.error('Error in pushing template: %s', sys.exc_info()[0]) 
             unsuccessful_updates.append(TUID_unsubscribed_list[i])
@@ -333,9 +333,11 @@ class CommunicationHandler:
     # Send domain templates to each domain agent/client 
     # FOR NOW: send untranslated but partitioned tosca files to scheduled sites
     # TBD: read from work-flow
+    domainInfo = []
     for site in file_paths:
       domino_client_ip = self.dominoServer.registration_record[site].ipaddr
       domino_client_port = self.dominoServer.registration_record[site].tcpport
+      domainInfo.append(DomainInfo(ipaddr=domino_client_ip,tcpport=domino_client_port))
       try:
         if 'hot' in self.dominoServer.subscribed_templateformats[site]:
           tosca = ToscaTemplate(file_paths[site])
@@ -345,7 +347,7 @@ class CommunicationHandler:
           template_lines = [ output ]
         else: 
           template_lines = miscutil.read_templatefile(file_paths[site]) 
-        self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID)
+        self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UUID)
       except IOError as e:
         logging.error('I/O error(%d): %s' , e.errno, e.strerror)
         pub_r.responseCode = FAILED
@@ -364,30 +366,32 @@ class CommunicationHandler:
     c = dbconn.cursor()
 
     if pub_r.responseCode == SUCCESS:
+      # send domain information only if all domains have received the domain templates
+      pub_r.domainInfo = domainInfo
       # update in memory database
-      self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
+      self.dominoServer.TUID2Publisher[pub_r.template_UUID] = pub_msg.domino_udid
       try:
-        c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UDID,pub_msg.domino_udid) )
+        c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UUID,pub_msg.domino_udid) )
         dbconn.commit()
       except sqlite3.OperationalError as ex1:
-        logging.error('Could not add new TUID %s  DB for Domino Client %s :  %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
+        logging.error('Could not add new TUID %s  DB for Domino Client %s :  %s', pub_r.template_UUID, pub_msg.domino_udid, ex1.message)
       except:
-        logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
+        logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UUID, pub_msg.domino_udid)
         logging.error('Unexpected error: %s', sys.exc_info()[0])
       else:
-        self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid
+        self.dominoServer.TUID2Publisher[pub_r.template_UUID] = pub_msg.domino_udid
 
     # update in memory database
-    self.dominoServer.TUID2Subscribers[pub_r.template_UDID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys()
-    logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UDID], pub_r.template_UDID)
+    self.dominoServer.TUID2Subscribers[pub_r.template_UUID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys()
+    logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UUID], pub_r.template_UUID)
     try:
-      newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UDID])
-      c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UDID,newvalue) )
+      newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UUID])
+      c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UUID,newvalue) )
       dbconn.commit()
     except sqlite3.OperationalError as ex1:
-      logging.error('Could not add new subscribers for TUID %s for Domino Client %s:  %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message)
+      logging.error('Could not add new subscribers for TUID %s for Domino Client %s:  %s', pub_r.template_UUID, pub_msg.domino_udid, ex1.message)
     except:
-      logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid)
+      logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UUID, pub_msg.domino_udid)
       logging.error('Unexpected error: %s', sys.exc_info()[0])
 
     dbconn.close()
index dd48942..4ce9dfa 100644 (file)
@@ -155,13 +155,19 @@ struct PublishMessage {
  6: optional string template_UUID
 }
 
+struct DomainInfo {
+ 1: string ipaddr,
+ 2: i16 tcpport
+}
+
 struct PublishResponseMessage {
  1: MessageType messageType = PUBLISH_RESPONSE,
  2: string domino_udid,
  3: i64 seq_no,
  4: ResponseCode responseCode,
  5: string template_UUID,
- 6: optional list<string> comments
+ 6: optional list<DomainInfo> domainInfo,
+ 7: optional list<string> comments
 }
 
 struct PushMessage {
index 47402f3..5d188cd 100644 (file)
@@ -825,6 +825,84 @@ class PublishMessage:
   def __ne__(self, other):
     return not (self == other)
 
+class DomainInfo:
+  """
+  Attributes:
+   - ipaddr
+   - tcpport
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'ipaddr', None, None, ), # 1
+    (2, TType.I16, 'tcpport', None, None, ), # 2
+  )
+
+  def __init__(self, ipaddr=None, tcpport=None,):
+    self.ipaddr = ipaddr
+    self.tcpport = tcpport
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.ipaddr = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.I16:
+          self.tcpport = iprot.readI16()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('DomainInfo')
+    if self.ipaddr is not None:
+      oprot.writeFieldBegin('ipaddr', TType.STRING, 1)
+      oprot.writeString(self.ipaddr)
+      oprot.writeFieldEnd()
+    if self.tcpport is not None:
+      oprot.writeFieldBegin('tcpport', TType.I16, 2)
+      oprot.writeI16(self.tcpport)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.ipaddr)
+    value = (value * 31) ^ hash(self.tcpport)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class PublishResponseMessage:
   """
   Attributes:
@@ -833,6 +911,7 @@ class PublishResponseMessage:
    - seq_no
    - responseCode
    - template_UUID
+   - domainInfo
    - comments
   """
 
@@ -843,10 +922,11 @@ class PublishResponseMessage:
     (3, TType.I64, 'seq_no', None, None, ), # 3
     (4, TType.BYTE, 'responseCode', None, None, ), # 4
     (5, TType.STRING, 'template_UUID', None, None, ), # 5
-    (6, TType.LIST, 'comments', (TType.STRING,None), None, ), # 6
+    (6, TType.LIST, 'domainInfo', (TType.STRUCT,(DomainInfo, DomainInfo.thrift_spec)), None, ), # 6
+    (7, TType.LIST, 'comments', (TType.STRING,None), None, ), # 7
   )
 
-  def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, responseCode=None, template_UUID=None, comments=None,):
+  def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, responseCode=None, template_UUID=None, domainInfo=None, comments=None,):
     if messageType is self.thrift_spec[1][4]:
       messageType = 7
     self.messageType = messageType
@@ -854,6 +934,7 @@ class PublishResponseMessage:
     self.seq_no = seq_no
     self.responseCode = responseCode
     self.template_UUID = template_UUID
+    self.domainInfo = domainInfo
     self.comments = comments
 
   def read(self, iprot):
@@ -892,11 +973,22 @@ class PublishResponseMessage:
           iprot.skip(ftype)
       elif fid == 6:
         if ftype == TType.LIST:
-          self.comments = []
+          self.domainInfo = []
           (_etype45, _size42) = iprot.readListBegin()
           for _i46 in xrange(_size42):
-            _elem47 = iprot.readString()
-            self.comments.append(_elem47)
+            _elem47 = DomainInfo()
+            _elem47.read(iprot)
+            self.domainInfo.append(_elem47)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 7:
+        if ftype == TType.LIST:
+          self.comments = []
+          (_etype51, _size48) = iprot.readListBegin()
+          for _i52 in xrange(_size48):
+            _elem53 = iprot.readString()
+            self.comments.append(_elem53)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -930,11 +1022,18 @@ class PublishResponseMessage:
       oprot.writeFieldBegin('template_UUID', TType.STRING, 5)
       oprot.writeString(self.template_UUID)
       oprot.writeFieldEnd()
+    if self.domainInfo is not None:
+      oprot.writeFieldBegin('domainInfo', TType.LIST, 6)
+      oprot.writeListBegin(TType.STRUCT, len(self.domainInfo))
+      for iter54 in self.domainInfo:
+        iter54.write(oprot)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
     if self.comments is not None:
-      oprot.writeFieldBegin('comments', TType.LIST, 6)
+      oprot.writeFieldBegin('comments', TType.LIST, 7)
       oprot.writeListBegin(TType.STRING, len(self.comments))
-      for iter48 in self.comments:
-        oprot.writeString(iter48)
+      for iter55 in self.comments:
+        oprot.writeString(iter55)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -951,6 +1050,7 @@ class PublishResponseMessage:
     value = (value * 31) ^ hash(self.seq_no)
     value = (value * 31) ^ hash(self.responseCode)
     value = (value * 31) ^ hash(self.template_UUID)
+    value = (value * 31) ^ hash(self.domainInfo)
     value = (value * 31) ^ hash(self.comments)
     return value
 
@@ -1028,10 +1128,10 @@ class PushMessage:
       elif fid == 5:
         if ftype == TType.LIST:
           self.template = []
-          (_etype52, _size49) = iprot.readListBegin()
-          for _i53 in xrange(_size49):
-            _elem54 = iprot.readString()
-            self.template.append(_elem54)
+          (_etype59, _size56) = iprot.readListBegin()
+          for _i60 in xrange(_size56):
+            _elem61 = iprot.readString()
+            self.template.append(_elem61)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -1069,8 +1169,8 @@ class PushMessage:
     if self.template is not None:
       oprot.writeFieldBegin('template', TType.LIST, 5)
       oprot.writeListBegin(TType.STRING, len(self.template))
-      for iter55 in self.template:
-        oprot.writeString(iter55)
+      for iter62 in self.template:
+        oprot.writeString(iter62)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.template_UUID is not None:
@@ -1165,10 +1265,10 @@ class PushResponseMessage:
       elif fid == 5:
         if ftype == TType.LIST:
           self.comments = []
-          (_etype59, _size56) = iprot.readListBegin()
-          for _i60 in xrange(_size56):
-            _elem61 = iprot.readString()
-            self.comments.append(_elem61)
+          (_etype66, _size63) = iprot.readListBegin()
+          for _i67 in xrange(_size63):
+            _elem68 = iprot.readString()
+            self.comments.append(_elem68)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -1201,8 +1301,8 @@ class PushResponseMessage:
     if self.comments is not None:
       oprot.writeFieldBegin('comments', TType.LIST, 5)
       oprot.writeListBegin(TType.STRING, len(self.comments))
-      for iter62 in self.comments:
-        oprot.writeString(iter62)
+      for iter69 in self.comments:
+        oprot.writeString(iter69)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -1287,10 +1387,10 @@ class QueryMessage:
       elif fid == 4:
         if ftype == TType.LIST:
           self.queryString = []
-          (_etype66, _size63) = iprot.readListBegin()
-          for _i67 in xrange(_size63):
-            _elem68 = iprot.readString()
-            self.queryString.append(_elem68)
+          (_etype73, _size70) = iprot.readListBegin()
+          for _i74 in xrange(_size70):
+            _elem75 = iprot.readString()
+            self.queryString.append(_elem75)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -1324,8 +1424,8 @@ class QueryMessage:
     if self.queryString is not None:
       oprot.writeFieldBegin('queryString', TType.LIST, 4)
       oprot.writeListBegin(TType.STRING, len(self.queryString))
-      for iter69 in self.queryString:
-        oprot.writeString(iter69)
+      for iter76 in self.queryString:
+        oprot.writeString(iter76)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.template_UUID is not None:
@@ -1419,10 +1519,10 @@ class QueryResponseMessage:
       elif fid == 5:
         if ftype == TType.LIST:
           self.queryResponse = []
-          (_etype73, _size70) = iprot.readListBegin()
-          for _i74 in xrange(_size70):
-            _elem75 = iprot.readString()
-            self.queryResponse.append(_elem75)
+          (_etype80, _size77) = iprot.readListBegin()
+          for _i81 in xrange(_size77):
+            _elem82 = iprot.readString()
+            self.queryResponse.append(_elem82)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -1455,8 +1555,8 @@ class QueryResponseMessage:
     if self.queryResponse is not None:
       oprot.writeFieldBegin('queryResponse', TType.LIST, 5)
       oprot.writeListBegin(TType.STRING, len(self.queryResponse))
-      for iter76 in self.queryResponse:
-        oprot.writeString(iter76)
+      for iter83 in self.queryResponse:
+        oprot.writeString(iter83)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()