From: Ulas Kozat Date: Tue, 25 Jul 2017 18:23:48 +0000 (-0700) Subject: JIRA: DOMINO-27 Extend publish RPC call X-Git-Tag: opnfv-5.0.RC1~2 X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F01%2F38101%2F1;p=domino.git JIRA: DOMINO-27 Extend publish RPC call Change-Id: If8cba13e3237c034ceace68cf4b14f3090612ab5 Signed-off-by: Ulas Kozat --- diff --git a/DominoServer.py b/DominoServer.py index 679ca42..8716799 100755 --- a/DominoServer.py +++ b/DominoServer.py @@ -237,7 +237,7 @@ class CommunicationHandler: pub_r.domino_udid = SERVER_UDID pub_r.seq_no = self.seqno pub_r.responseCode = SUCCESS - pub_r.template_UDID = pub_msg.template_UUID + pub_r.template_UUID = pub_msg.template_UUID self.seqno = self.seqno + 1 if (pub_msg.template_UUID is not None) and (self.dominoServer.TUID2Publisher.has_key(pub_msg.template_UUID) == False): @@ -303,7 +303,7 @@ class CommunicationHandler: # Otherwise update the existing domains subscribed to TUID unsuccessful_updates = [] if pub_msg.template_UUID is None: - pub_r.template_UDID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID + pub_r.template_UUID = self.dominoServer.assign_tuid() #update response message with the newly assigned template UUID else: logging.debug('TEMPLATE UUID %s exists, verify publisher and update subscribers', pub_msg.template_UUID) if self.dominoServer.TUID2Publisher[pub_msg.template_UUID] != pub_msg.domino_udid: #publisher is not the owner, reject @@ -311,16 +311,16 @@ class CommunicationHandler: pub_r.responseCode = FAILED return pub_r else: #Template exists, we need to find clients that are no longer in the subscription list list - TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) - set(file_paths.keys())) + TUID_unsubscribed_list = list(set(self.dominoServer.TUID2Subscribers[pub_r.template_UUID]) - set(file_paths.keys())) if len(TUID_unsubscribed_list) > 0: - logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UDID) + logging.debug('%s no longer host any nodes for TUID %s', TUID_unsubscribed_list, pub_r.template_UUID) # Send empty bodied templates to domains which no longer has any assigned resource template_lines = [] for i in range(len(TUID_unsubscribed_list)): domino_client_ip = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].ipaddr domino_client_port = self.dominoServer.registration_record[TUID_unsubscribed_list[i]].tcpport try: - self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID) + self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UUID) except: logging.error('Error in pushing template: %s', sys.exc_info()[0]) unsuccessful_updates.append(TUID_unsubscribed_list[i]) @@ -333,9 +333,11 @@ class CommunicationHandler: # Send domain templates to each domain agent/client # FOR NOW: send untranslated but partitioned tosca files to scheduled sites # TBD: read from work-flow + domainInfo = [] for site in file_paths: domino_client_ip = self.dominoServer.registration_record[site].ipaddr domino_client_port = self.dominoServer.registration_record[site].tcpport + domainInfo.append(DomainInfo(ipaddr=domino_client_ip,tcpport=domino_client_port)) try: if 'hot' in self.dominoServer.subscribed_templateformats[site]: tosca = ToscaTemplate(file_paths[site]) @@ -345,7 +347,7 @@ class CommunicationHandler: template_lines = [ output ] else: template_lines = miscutil.read_templatefile(file_paths[site]) - self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UDID) + self.push_template(template_lines, domino_client_ip, domino_client_port, pub_r.template_UUID) except IOError as e: logging.error('I/O error(%d): %s' , e.errno, e.strerror) pub_r.responseCode = FAILED @@ -364,30 +366,32 @@ class CommunicationHandler: c = dbconn.cursor() if pub_r.responseCode == SUCCESS: + # send domain information only if all domains have received the domain templates + pub_r.domainInfo = domainInfo # update in memory database - self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid + self.dominoServer.TUID2Publisher[pub_r.template_UUID] = pub_msg.domino_udid try: - c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UDID,pub_msg.domino_udid) ) + c.execute( "REPLACE INTO templates VALUES (?,?)", (pub_r.template_UUID,pub_msg.domino_udid) ) dbconn.commit() except sqlite3.OperationalError as ex1: - logging.error('Could not add new TUID %s DB for Domino Client %s : %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message) + logging.error('Could not add new TUID %s DB for Domino Client %s : %s', pub_r.template_UUID, pub_msg.domino_udid, ex1.message) except: - logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid) + logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UUID, pub_msg.domino_udid) logging.error('Unexpected error: %s', sys.exc_info()[0]) else: - self.dominoServer.TUID2Publisher[pub_r.template_UDID] = pub_msg.domino_udid + self.dominoServer.TUID2Publisher[pub_r.template_UUID] = pub_msg.domino_udid # update in memory database - self.dominoServer.TUID2Subscribers[pub_r.template_UDID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys() - logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UDID], pub_r.template_UDID) + self.dominoServer.TUID2Subscribers[pub_r.template_UUID] = list(set(unsuccessful_updates).union(set(file_paths.keys()))) #file_paths.keys() + logging.debug('Subscribers: %s for TUID: %s', self.dominoServer.TUID2Subscribers[pub_r.template_UUID], pub_r.template_UUID) try: - newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UDID]) - c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UDID,newvalue) ) + newvalue = ','.join(self.dominoServer.TUID2Subscribers[pub_r.template_UUID]) + c.execute( "REPLACE INTO subscribers VALUES (?,?)", (pub_r.template_UUID,newvalue) ) dbconn.commit() except sqlite3.OperationalError as ex1: - logging.error('Could not add new subscribers for TUID %s for Domino Client %s: %s', pub_r.template_UDID, pub_msg.domino_udid, ex1.message) + logging.error('Could not add new subscribers for TUID %s for Domino Client %s: %s', pub_r.template_UUID, pub_msg.domino_udid, ex1.message) except: - logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UDID, pub_msg.domino_udid) + logging.error('Could not add new TUID %s to DB for Domino Client %s', pub_r.template_UUID, pub_msg.domino_udid) logging.error('Unexpected error: %s', sys.exc_info()[0]) dbconn.close() diff --git a/domino.thrift b/domino.thrift index dd48942..4ce9dfa 100644 --- a/domino.thrift +++ b/domino.thrift @@ -155,13 +155,19 @@ struct PublishMessage { 6: optional string template_UUID } +struct DomainInfo { + 1: string ipaddr, + 2: i16 tcpport +} + struct PublishResponseMessage { 1: MessageType messageType = PUBLISH_RESPONSE, 2: string domino_udid, 3: i64 seq_no, 4: ResponseCode responseCode, 5: string template_UUID, - 6: optional list comments + 6: optional list domainInfo, + 7: optional list comments } struct PushMessage { diff --git a/lib/dominoRPC/ttypes.py b/lib/dominoRPC/ttypes.py index 47402f3..5d188cd 100644 --- a/lib/dominoRPC/ttypes.py +++ b/lib/dominoRPC/ttypes.py @@ -825,6 +825,84 @@ class PublishMessage: def __ne__(self, other): return not (self == other) +class DomainInfo: + """ + Attributes: + - ipaddr + - tcpport + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'ipaddr', None, None, ), # 1 + (2, TType.I16, 'tcpport', None, None, ), # 2 + ) + + def __init__(self, ipaddr=None, tcpport=None,): + self.ipaddr = ipaddr + self.tcpport = tcpport + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.ipaddr = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.I16: + self.tcpport = iprot.readI16() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('DomainInfo') + if self.ipaddr is not None: + oprot.writeFieldBegin('ipaddr', TType.STRING, 1) + oprot.writeString(self.ipaddr) + oprot.writeFieldEnd() + if self.tcpport is not None: + oprot.writeFieldBegin('tcpport', TType.I16, 2) + oprot.writeI16(self.tcpport) + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.ipaddr) + value = (value * 31) ^ hash(self.tcpport) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class PublishResponseMessage: """ Attributes: @@ -833,6 +911,7 @@ class PublishResponseMessage: - seq_no - responseCode - template_UUID + - domainInfo - comments """ @@ -843,10 +922,11 @@ class PublishResponseMessage: (3, TType.I64, 'seq_no', None, None, ), # 3 (4, TType.BYTE, 'responseCode', None, None, ), # 4 (5, TType.STRING, 'template_UUID', None, None, ), # 5 - (6, TType.LIST, 'comments', (TType.STRING,None), None, ), # 6 + (6, TType.LIST, 'domainInfo', (TType.STRUCT,(DomainInfo, DomainInfo.thrift_spec)), None, ), # 6 + (7, TType.LIST, 'comments', (TType.STRING,None), None, ), # 7 ) - def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, responseCode=None, template_UUID=None, comments=None,): + def __init__(self, messageType=thrift_spec[1][4], domino_udid=None, seq_no=None, responseCode=None, template_UUID=None, domainInfo=None, comments=None,): if messageType is self.thrift_spec[1][4]: messageType = 7 self.messageType = messageType @@ -854,6 +934,7 @@ class PublishResponseMessage: self.seq_no = seq_no self.responseCode = responseCode self.template_UUID = template_UUID + self.domainInfo = domainInfo self.comments = comments def read(self, iprot): @@ -892,11 +973,22 @@ class PublishResponseMessage: iprot.skip(ftype) elif fid == 6: if ftype == TType.LIST: - self.comments = [] + self.domainInfo = [] (_etype45, _size42) = iprot.readListBegin() for _i46 in xrange(_size42): - _elem47 = iprot.readString() - self.comments.append(_elem47) + _elem47 = DomainInfo() + _elem47.read(iprot) + self.domainInfo.append(_elem47) + iprot.readListEnd() + else: + iprot.skip(ftype) + elif fid == 7: + if ftype == TType.LIST: + self.comments = [] + (_etype51, _size48) = iprot.readListBegin() + for _i52 in xrange(_size48): + _elem53 = iprot.readString() + self.comments.append(_elem53) iprot.readListEnd() else: iprot.skip(ftype) @@ -930,11 +1022,18 @@ class PublishResponseMessage: oprot.writeFieldBegin('template_UUID', TType.STRING, 5) oprot.writeString(self.template_UUID) oprot.writeFieldEnd() + if self.domainInfo is not None: + oprot.writeFieldBegin('domainInfo', TType.LIST, 6) + oprot.writeListBegin(TType.STRUCT, len(self.domainInfo)) + for iter54 in self.domainInfo: + iter54.write(oprot) + oprot.writeListEnd() + oprot.writeFieldEnd() if self.comments is not None: - oprot.writeFieldBegin('comments', TType.LIST, 6) + oprot.writeFieldBegin('comments', TType.LIST, 7) oprot.writeListBegin(TType.STRING, len(self.comments)) - for iter48 in self.comments: - oprot.writeString(iter48) + for iter55 in self.comments: + oprot.writeString(iter55) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -951,6 +1050,7 @@ class PublishResponseMessage: value = (value * 31) ^ hash(self.seq_no) value = (value * 31) ^ hash(self.responseCode) value = (value * 31) ^ hash(self.template_UUID) + value = (value * 31) ^ hash(self.domainInfo) value = (value * 31) ^ hash(self.comments) return value @@ -1028,10 +1128,10 @@ class PushMessage: elif fid == 5: if ftype == TType.LIST: self.template = [] - (_etype52, _size49) = iprot.readListBegin() - for _i53 in xrange(_size49): - _elem54 = iprot.readString() - self.template.append(_elem54) + (_etype59, _size56) = iprot.readListBegin() + for _i60 in xrange(_size56): + _elem61 = iprot.readString() + self.template.append(_elem61) iprot.readListEnd() else: iprot.skip(ftype) @@ -1069,8 +1169,8 @@ class PushMessage: if self.template is not None: oprot.writeFieldBegin('template', TType.LIST, 5) oprot.writeListBegin(TType.STRING, len(self.template)) - for iter55 in self.template: - oprot.writeString(iter55) + for iter62 in self.template: + oprot.writeString(iter62) oprot.writeListEnd() oprot.writeFieldEnd() if self.template_UUID is not None: @@ -1165,10 +1265,10 @@ class PushResponseMessage: elif fid == 5: if ftype == TType.LIST: self.comments = [] - (_etype59, _size56) = iprot.readListBegin() - for _i60 in xrange(_size56): - _elem61 = iprot.readString() - self.comments.append(_elem61) + (_etype66, _size63) = iprot.readListBegin() + for _i67 in xrange(_size63): + _elem68 = iprot.readString() + self.comments.append(_elem68) iprot.readListEnd() else: iprot.skip(ftype) @@ -1201,8 +1301,8 @@ class PushResponseMessage: if self.comments is not None: oprot.writeFieldBegin('comments', TType.LIST, 5) oprot.writeListBegin(TType.STRING, len(self.comments)) - for iter62 in self.comments: - oprot.writeString(iter62) + for iter69 in self.comments: + oprot.writeString(iter69) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -1287,10 +1387,10 @@ class QueryMessage: elif fid == 4: if ftype == TType.LIST: self.queryString = [] - (_etype66, _size63) = iprot.readListBegin() - for _i67 in xrange(_size63): - _elem68 = iprot.readString() - self.queryString.append(_elem68) + (_etype73, _size70) = iprot.readListBegin() + for _i74 in xrange(_size70): + _elem75 = iprot.readString() + self.queryString.append(_elem75) iprot.readListEnd() else: iprot.skip(ftype) @@ -1324,8 +1424,8 @@ class QueryMessage: if self.queryString is not None: oprot.writeFieldBegin('queryString', TType.LIST, 4) oprot.writeListBegin(TType.STRING, len(self.queryString)) - for iter69 in self.queryString: - oprot.writeString(iter69) + for iter76 in self.queryString: + oprot.writeString(iter76) oprot.writeListEnd() oprot.writeFieldEnd() if self.template_UUID is not None: @@ -1419,10 +1519,10 @@ class QueryResponseMessage: elif fid == 5: if ftype == TType.LIST: self.queryResponse = [] - (_etype73, _size70) = iprot.readListBegin() - for _i74 in xrange(_size70): - _elem75 = iprot.readString() - self.queryResponse.append(_elem75) + (_etype80, _size77) = iprot.readListBegin() + for _i81 in xrange(_size77): + _elem82 = iprot.readString() + self.queryResponse.append(_elem82) iprot.readListEnd() else: iprot.skip(ftype) @@ -1455,8 +1555,8 @@ class QueryResponseMessage: if self.queryResponse is not None: oprot.writeFieldBegin('queryResponse', TType.LIST, 5) oprot.writeListBegin(TType.STRING, len(self.queryResponse)) - for iter76 in self.queryResponse: - oprot.writeString(iter76) + for iter83 in self.queryResponse: + oprot.writeString(iter83) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop()