From 35641a3050f91e149cc1388340fbb3fdfc43310f Mon Sep 17 00:00:00 2001 From: asteroide Date: Mon, 6 Jul 2015 17:19:47 +0200 Subject: [PATCH] Add tests for Authz exceptions. Change-Id: Ib7d438148a4c45c79f541a370c0eee9a4519fecd Add the MetaRule Exceptions. Change-Id: I1d34246da04b0cdd53b8e85fb8ca112a23dc7ed8 --- .../source/extensions/moon/ExceptionHierarchy.pptx | Bin 34574 -> 34626 bytes keystone-moon/keystone/contrib/moon/core.py | 90 +- keystone-moon/keystone/contrib/moon/exception.py | 15 + .../unit/test_unit_core_intra_extension_admin.py | 1549 +++++++++++++++++++- .../unit/test_unit_core_intra_extension_authz.py | 713 +++++---- 5 files changed, 2075 insertions(+), 292 deletions(-) diff --git a/keystone-moon/doc/source/extensions/moon/ExceptionHierarchy.pptx b/keystone-moon/doc/source/extensions/moon/ExceptionHierarchy.pptx index f0f020fdceb36c19df3945534ce6e6bc1ccacb4b..af18d231386ec438eedc46677552ef5e62fb3a85 100644 GIT binary patch delta 5372 zcmZvgc|4R|*vH3^HOvggK8&3hyHNHpmdTPWB|>B^WFN-9W}P9jw`r7hkVrz43Mo;< zB+DpEWLK1TjOTr-=Y8*g&bfZ~_gvR?miwIZxid3?U70{uOEX$JP5>h#Bf!ICHiwl& zODXEVf@$_kEDdmrTJ8X4Fhxjc^n|nCT)5{@Xsc6wr4me1RW-FF4NLNrg%>$7HO2@Y zJF_$KfFE>&B$tEC2v&@)qtj=675Za*V6y*G76HEnx^o)$w32rF2v6AgF$efA-wnuVQmq#bSl# zH>|dem0HVQ1m#JZbbAF1E+Vdb#w&+=-dvb-X!lr?_u{DvUFqJr60Nm|D-9nm(_CDg zJT-6VX3`wX&7f>j(uw&tN;CYvk+jks*i%#y9vJ@o>zXsmNtoo^{f~&v=S3?p3kFEN zBeLFnm^@}zE8Ip*uynXl2H$wRJ)#=nV#0uA1d#i6`!FpLI>&uLK#`Z=yZ-bX zNmc<*E{ER;zfNB6cAv!`t&CR@Ak#MKgl%V&$rb4aLrd!6+^9<8C382wp1N`6vu77} zPmqSUO$4uqI+OiQ?p-CkjRg?JEBiAZjR-f6w0QY)cBt6zrHxq(p9VIWcW+fpAo zQrFmAtqfOZGQ#A(y!2M8z%H~f-==p!TfzjLbH}7=bC&FRsA6iRQkYfy8mD};cgv&I zN#Oh8FA$d-=>io;(2cs>G|$biXt<@Ue<_V(nihT~4>O)TYohv@$AH6a5$4vHlkKz3 z`%L@hBj?WVqhmX}OOc@nUxa3vp<1)E-ACI@Z21Mtbm z`~ndxjD4;y+>FP{UgK|C%6tY9wb6rSaSKU5JcG7TcxtJW{7NW;bj0E5R*r5SYzvN8 zFYgr5&rU3t)HWV7}(nhr>yEuLmWkqiI zWOSDX4c1}we%KvONun7OpluO744PV9uG1^$WBfHMTrL20xX9wq!^H!%Tx)2AzXBC*|Hqev#P~ zJADYlZRJX2)0qz zF=@Zbg5M@5*X?oMc_Z6ru!ZDc8NAPOi^;Wk9r*wrrDZx>5ro4Zy3oObyUpcaLGWXs(!4^XlHZ{wH@x@ zW!7Q$92jxCw%PN3dm>=9KimW7cq99@El-+F6epa7UoQQpnbw)Z9w*+^$64TqDm6cQ{Y@=+cUs$$;Km}HxP&mtGDX81Ow`S@N#ZA!eo_=`O_j=mDy z*W!|=5T~sK>17NwhuShST6OdzUaYF5_%G*=UTC?L7j7<}bwX|udnd_OpM5ngbK_SnKsjE~$)7wpUkGv~sW4humZ2sDAjK7C?Nj2QG?l2o4WDB1H#mgmIo& zQH<$QwNw}pz*YCQkLh)0*D@GJ)EdX6o_m>m#`OEV2Sd7PXNgN3`t02$>j9PtDVp0) zYun6t%h$j8$qRicDDPKX%_cv8sx2zymd;06@aBhwf4)Q?SG8hZFjIZ641X zQ|S`EqRueH&<*AY7m30{66x-2^UUCuTBe~r?f!NuIl?6Mr4+NCI#VfK=GfEa%$ExT zq;SatqhMKfQb3Pf=8mRiYrwdwrMhI_MMM9%I2y2Vo%NL-!3XL~!S)G;9>JJtQCE8K z8P2b|fP~4GUJD<~6e-a0E0zb$g6`Zi<00J~W;ZRVlsY$nVd!{3`t)Nq&TDd=e4I9y z4cdAw;CE3U6E`P3#q28m&}Jpf0d|$vhs*9Rij@^j$JM=@844JCU(}pqHil{%{`^g6 z>-^@Ul=Z}#{_EDQ=7Fjh6LFK9%SFw{n_hyoYVG(@VeX zJG?Q~Q8xl;T39+66QmWU_aTm3UN=Jv-cm!u$qai%R$Y0{aXY69>_`M9Uzn8LTNn@eO_0u2vFv9aiSt{h8ihc0` z*$kvFel^MR$TQono(rakDq4`6&%3$CS3#02PTd zBW~qOMn)wtFUfbvNYN_`p*AH*MhDzgNUWlQ5k$#(Nc?7TBcFJr-^6K0k5oebs1fKI zaH5#`BSU~7L%^0)MB7ER#*ODor4F@9ANn_5ntE5?_hoc=j0yI}Bsw%bb5yNoc5b;i zSZ~v`-eGP3v-Y|T@#?zu2AApZx$>oWU9Jddx+B2>6+7R6%Ldu$SS3UJPqrssZMbj= zu*p7pk}F@$b^d-vi?mf?^+#Wy9!Q zgRmnS(@%q5$>p!p2XDHFn4i{4$sD`GwWW+Nqa}^52b`ZXa;a$rUq_muURwxF3{w=Q z`^uj2;zd%Y-cf|P^#I10Lq7Iv;;Q!_-ezwT68RxwZ~|UO7gBidNllVcQhwK0*oE$b ziK@$(NWsB;#jL*3f*ubXlcH_vUuciqFgd08Kgo@Yju*u);b9q~4KTl;sG! zpo<5_3M+z;$MJ`OfSUits(5^U8;1Pezl{k&(18J#P{B$F4^TbBrIoQq;J@n%BERcp zG0a#UQR2Uxrr7UyI^xti0W3j#aGl?GWGeX^u#h?c;L^YYU_Yu5+aav8%+kLes76tp5_FHo(owzX9_7MX+OrU5j@j`4uRyT0 zcRn!S(tL!zK;wf%T$K5+3Rn0sfko;(?Gu%+zuX8#2-A&tAuIcxm9ZX#;7uCNY(EKo zOZeGi$Bq$p)W!-orB`)#n2*3y0;Uqw1CHY5b9Ph%y{&1=;9r3|GU2p&NkNjvoYhIM71e)HEe(kANncRsVvm9aQlB2sO-o$^i`7c)`g z697P&&TA|X3#SR??6~;cH43q+tcUksnX7o7{2f0+vtiMF@=hgCtRBJ+ zJZXI!+~^OO_cuGQ?L^m%sUCa5<&IwSa`#b#%xwg7S-C%vO{nq@ifYVX9xSBGc_>)j zDujw|e}GXL?Ksxm+@vDhe}gyhcF~*r56>}PII3Tw1bM^Go1T{{kT+6p&Ts=F;Q7Xo zChQLG&8keyus#p#hDI4cW58Cw5NYI7{Lrf@*>b4xo}PT;ny>Ge5eD!+=mX+d^k;ViZ}$7&l)! zxL(^hvmyjv9uK#W{Rh- zsdtj>CDw^kP}Q~wsyE*io^Bu~REW8%TMQ__1B6JPh(QYKZ)27xCcLdn>`oE9Z`$<^ zbLg69kX!rTT^I9x*{JA`2OW1z?0BqWb7;ufI1!$q1fwlAmzGvNjf#k>`J(oI@8;CS zY9iU(-6ZGiPwb%NT5uV@c~cXOIg-j*G?`w8f9$hDLT!HzR7SU##V#up`XyEGovOS2 z5G_F32%)(+jKDl0Jw*uTTa40f|BSX=X6B9r%uGFY{MHY9Vvy+y4xtXDAe9` zuYT{9mTqJzX^nj`9%jV0#2T`rC;!b)sG>c#Ju6OUvMIvE{Ndnoa*L_>;~z2&7Ys7i z-(P2NeCdcDu7?Umh)8JX3KVCRfRym()CYQy4~EJzi}4QV$uH1BIWAGF6Ax`xTHl02-Yy-OPQl0 zvwLqlX_nfwVN&+Kb1CX+Z>m3wUhR?fzKT4?&0VE_y+S+QhG+ zE=;toHTo??FmKVYW0p2fuX->_YZ--qr=*9bN~Pt z`kylZOWD{6yJ2z%+nsC#>0l2#j8>!Wa)xMAYMF~RW25|3F`^tM|C}Y*1+=R0zNByd zLm?9g0I*Yz7!kmq4-Xr*QxJwd7X_h$cM+FoOei3T2>^iq$pJP&RInxrOa-fXrG@vw z)c@l#lYGLsKp{9q62mTFpj2vK5);9~ zV>JIYrA4t|Q3MtrbKqy6K!{NrX^JubB`b?lWgT&5tW)gosOl0FSsYsv`#XfTB$YIk z{M&(n6jip6V#fN${SJqerpk}vphEj|hi~cSAD~Qy3T15M{+bdLb|vokzlnz)p~lU{ za8aeSczz&SmO4tr_=ABPrAQ43s>GRaFpZL{lh>Fijv49yJKyq(RM71o@U`sCw_pl5 zdFa1jvhqHtsEi2n^R+ai0dfNV{oQ9&LV~g=(f|OJl$L)lr_ZYUtrQghzgEGus;c^w z58(?hfa2@V#}liVC`#>KVIn`x0ygLm%N5)7k2RJ^Sq_2N&OpTe+*1ef&u{DRM6|G0 zNea{@{a_U4MoUSShLY^xVcEj56-gpgN1!M}rxFU32mt^%{!9&U3Okh~L9dA37yk#T CvNhTO delta 5243 zcmZvgcRZV28^dx*Pz?C7O{X9Adun%W-alWDItl$$-M&{# zRdZGDMn58@`dX{DP5lN2J7&31+XQy zd3G&$K$eAswc9I7rL#33xQVBE%D&EwT9SiLl&FdSiDpEtb&t|7@h;;KL;>anmp*dQvYFlMV^ii^9XJy)`4@&g%iWr}taPFETi%2BCL$d>y z@njb`M4uxK`t7@pNpafH*Yc?)ydj^^NC2#bye*;J-QSFs0HvPB18XMdCoe1rZLD%a zJMOJ~!HNoUjVWVDar)nA=~D9x=5Fh`z<@>%QP6ZD{ z@|T00;vm2jnJ+&QxcUnfo9Ml%=F=&8YG5Xx6gsNoBK>fnuQ%h5w_Q@0mfrr z{ryT^W9svo?~A%n+0{j+zqC=r7`!AKr|A>P)W~mrF`_MC4RI{H2ks#~%Ev2PH59Ik zYl71nm-oc-OAU3ekVqD0^)%gfRQx37nq}?Lb!fZF760f)?fix&;{z^$t{ka$a{S{* zM}DJU~o_Uc!1pPApk#}rj`mk!>%4PaSp6DO-XQUAM8^y$Qz`=!pFs-1qZ?(@Xiwc;) zOOcXKNAko5Q(J7>NXj=sJnByGUjS~Hl>8uIe`#b5EfzCqJuZY6HgrY>7scT@4}tm# zByI$vipEn~nEA-=usgV@j4lHma+$`Y2qwqCYlbFQ10QK%%o4ZZF=KBBRN?CEg}&@| z2Q8GWBIKsjZDrRJ>r{f#6UAug6XEsBFNk^D7Q8JEIq{|b`Q10(e?IL8u*y^;`WT*= zn(`)8Ysm&{>g9bfRVt*kfis~7-syEZqOSp5>X5{_&<|HpZLmQJ(M#9 z5j`wmcs)2LvH`D_Xn1w7YdpDEfvuQn7-Vc{XSn(z+7ve(w%_dR3)p7VNurlv*gD(Z?e1(L1Wkl=Y zkoN~3K>L8|`VJB3{w2~dz%N3OA(AxDi-L}Pn+W7bNg9omyDUj_4Nq_Lg_Ijq5mj!d zHoXl2`G=GI@%rMIe$=!k(@!tBZn6Pn&fDA#7NcCq$UBnY`es+azv14C|IAJep!s1f z5CQu@u$D}6jSeFUh2E{#Mh&4uQ*wYbP_6Kl#Z-W^xeZBmM?YE~TS96$e}xL*P;(mcxk`mABy74SqJU`i6`$7>NJmbhfG~bV{C-3@ zu_6ez`Ifz&V6{=@ioE6oA!rLx=%0R<%dDZPd-6nvrzV)VLuxLZ%C&*B9aW3>Fn4bK zr<#~q;>U`Baj-ja2_NGeW;alv_LDsne*K!McGXY`w145CWx&f~yuMd1MYaX+>v+8e zSRH8mX)GVW0tH$06@AqsU(3vvGK(7ACKn#o2K8$SU5%KgmJ}zFd54|nzo&q1yup?t zJ2P$5;Q3JYX`=Mp>lY*%hbV-WodR`z@1+#k`_t`hpOCvf_B~27ldHU5?*|3?y9NaW zrA~v`eG& zV&2DzM6=?-42D>3?>PT(Cp*KBKP*Wg^H32w>o{pIU_{%^r(2tz^7ST0&%Y_mxqrJ$ zn-oB1VO$}&H^%!iy)YkJ+t&GU$VusWp`NVTTgm2>U8aZ*BiEh{)5WOAyYB2i%D1nO z-(-q=DVQMBnhbo7YS{KP*lnJe6C1EpYZUX|OyC>F4wXu{RywBazI$H+?@2+64fIeU ze@04on{N<=l$vw(Ks}>wqv@9@`XKmo%RSd*BzOGq?WHQ?sOt71_=s+8|oL63<1XWLrQ1@H* z`kjV$A9^SEoBXxk^apm4z34mKF7Cw&w?hibirmO+XxbLiOvQqHvW=!@d`fDs5G^{f zB!-W^+g!MWYNH?8cQoHPOnf z4S6`qXyw2Fw0S!=^xBDKXhM9x`oq3C!<&e=Dr}N)R9k+uWOTHBCx=6JS4z9V-l7xz z17mAUZy|!tAimO!RNq}B+=$Yn+sR}}B_KVAvOu10_HvRB*OYa~hN_ST$AciW4xhG~ zw=nn!nethgFHTL>L_E>)`>)y|J!Iu@{1RiCnB!FJ^N2rP9fI(JIYC~(&Y?hl51h!&ylItz47Cn@Ed=OK;;o02)~*dLst>M$}N7=+P`Q4g6zWtc7z@#9BeZ!9SQKp6}G{Oj_I$)<&1Z(gbd;;y_k zOrltEYA0L=n@#)ZoSsQ+8T8I+4O7P7S_~%&gbiVJxB#A~fQf_b99RfRV=Op7<1$Nd zOJS_Ja4}Yb8+d-xK7-CY7XlC7i})Vjd3=@x!GFO+1TNx;%NOzQa8isjSNmBptc8%k zMS*(ZbJ5w{>O=_t9xHYQqJPnGmbiS;c_$@)(czMXohS1&rN_p}xe?&z<7vu?Wx9Hc z6<5#G)E5_EB)C~>?`fUOYnWIt_P(qrqj7nr=BcNBy9NGbC-xumGcNV1W#)^FS5tDj z@vU=qSBX3&QB=ep$c|@j3kKu`M;+OZwuOa~6p?+35=ANPSKMpi(b(P|H~-j|7SG)k z;571F6yK1;!MpWC{`$eFs+c&8izJz(Tb?5EifBT;;v2VlIOu_Ls+v(4CrGRg1$ELZ z>Z#Ym#_~(^wJ(c2Wm`>@MAML@+q!jxBf|y-rP#awd`jb;C_R{`T+Ai2fI;FSzgOvS zbENXJCZ@)=bQefQSt`WkRp?mgr8bG*54D!IuTfy!%gnp=a(5Fp_x|wKO5Xlc8-anF zVW~`QQd9zhPWBmc0KfWT@mvTHHU9J09DsQW$ugRis2&(>r4dS=6-7AcUX}FokCg)E za({iH@lIja^PTNv{cOLjf3n^ml{=IYTXu%!Ew;wWg8*heLu8Xb>W^yZRz5q9gk&=OwP=xv72^nciT zIVlxqH26}SSZbXg%TcrzY7=TE7xm$PD#zdCckA)9R5rqXC3^U{R9hdh=0(ZB<=|v9 zZIyTvk!#A5l*gT4BTUokf=W`mi&i`k*|Y4|O}ksHVro6B{wY`!onvtsf+py4qKxM( zT@v&=(b46|rBPK9{A_x?+}>#+I}yIRozVPIs+a$2VHzslVWmYmv^?l)#5aK5;o}_h zTJ5hd*>2}((&&>!tRvzdl+6+`r;7@Z^HIM?1m@HbRx5J>HN1!uQK*w+6U@>JmY!#c z7NRi%c^NLHUW+EPK@-(RF=Q77UiO-pO=vt$e>zp9-yEZrROTYGjx?ibO%=-VYxz?_R=;I*#ls zgWy@+AHG<5GEP^Ffl#lqM?h_V1cLy|#QR%SUNuBsmTYgspYH&2?8>)wM}QMnz6Vu& z2O`?eVK){SHwAS`-k2Yo>b1nh75$8S6yCzpEtd^xYJB!PfHn%eXtina&|hM;@%UP? z|1VV;Eo+NK@J8xyx|?$(iRCXt6Hu|DzlGod+0`2FW-yXg@FRg+ktx$paH$$7!g33UD647T448r7N zAyb79Pr-?@`^1N^Ib3`4)(j){t_zEu#d7!LD;o4vt*03Qh< z8gji8aj9ae^^!vM5yMcO!dq2VH~-M_hge0O`Fh>Ejax4F`3CqMm*K0usIy~$w%dC% zoB#lzX87-8;B0dfM~LFKF04)9xAQ$r1u2W$?GQ*ET-$-vy>z<4YF->C7;vx*=NVGd z+ZRs)o}MtLrwljXpT9W@j3J5xlNwBe6P7}}=Cn_RR3rcZ*FSZDRw|rOoEjto)5QbQ z0sj6(qglx;$a!Xp`=9I?!R_b=Ixp0RYygi}3Hs7h_;3aoj`B z3u6WT0As=;_)bqxQp{%*ISxM$X9X^Eo?>1MIF#XU^qUKZ5dT11+&DDw546vNLms@O znEmjJd?&m(920h)@2oNb|0&|b%>0GUs*(!eR3QTYGUHKVyu%g$9tWKuPV_tcJQYa< zE09?Tmnuo<-@faZ(1>$i%c%$|f?M>x2x?p^iqj7VArKsbMxG6tE z3Wt9Fh4xPoRtAS;qRx%lQLK1U7>6hxT;@-wy?V^kiyjbSiU~1gNR2*!9+c6nc*Pju zXhq_)g*yAEU_7E3fF!;cb6*}zSu{EB?X4f>u){xHK)ln1ru*01I~IWX8qJL>wI9t2 U1P5X~1L-j$F;s*{L1)7M0OR*2UH||9 diff --git a/keystone-moon/keystone/contrib/moon/core.py b/keystone-moon/keystone/contrib/moon/core.py index aa7fd884..69e8585b 100644 --- a/keystone-moon/keystone/contrib/moon/core.py +++ b/keystone-moon/keystone/contrib/moon/core.py @@ -245,6 +245,16 @@ class IntraExtensionManager(manager.Manager): :param obj: object of the request :param act: action of the request :return: True or False or raise an exception + :raises: (in that order) + SubjectUnknown + ObjectUnknown + ActionUnknown + SubjectCategoryAssignmentOutOfScope + ActionCategoryAssignmentOutOfScope + ObjectCategoryAssignmentOutOfScope + SubjectCategoryAssignmentUnknown + ObjectCategoryAssignmentUnknown + ActionCategoryAssignmentUnknown """ if not self.driver.get_intra_extension(uuid): raise IntraExtensionNotFound() @@ -1244,124 +1254,124 @@ class IntraExtensionAuthzManager(IntraExtensionManager): raise AdminException() def set_subject_dict(self, user_name, intra_extension_uuid, subject_dict): - raise AdminException() + raise SubjectAddNotAuthorized() def add_subject_dict(self, user_name, intra_extension_uuid, subject_uuid): - raise AdminException() + raise SubjectAddNotAuthorized() def del_subject(self, user_name, intra_extension_uuid, subject_uuid): - raise AdminException() + raise SubjectDelNotAuthorized() def set_object_dict(self, user_name, intra_extension_uuid, object_dict): - raise AdminException() + raise ObjectAddNotAuthorized() def add_object_dict(self, user_name, intra_extension_uuid, object_name): - raise AdminException() + raise ObjectAddNotAuthorized() def del_object(self, user_name, intra_extension_uuid, object_uuid): - raise AdminException() + raise ObjectDelNotAuthorized() def set_action_dict(self, user_name, intra_extension_uuid, action_dict): - raise AdminException() + raise ActionAddNotAuthorized() def add_action_dict(self, user_name, intra_extension_uuid, action_name): - raise AdminException() + raise ActionAddNotAuthorized() def del_action(self, user_name, intra_extension_uuid, action_uuid): - raise AdminException() + raise ActionDelNotAuthorized() def set_subject_category_dict(self, user_name, intra_extension_uuid, subject_category): - raise AdminException() + raise SubjectCategoryAddNotAuthorized() def add_subject_category_dict(self, user_name, intra_extension_uuid, subject_category_name): - raise AdminException() + raise SubjectCategoryAddNotAuthorized() def del_subject_category(self, user_name, intra_extension_uuid, subject_uuid): - raise AdminException() + raise SubjectCategoryDelNotAuthorized() def set_object_category_dict(self, user_name, intra_extension_uuid, object_category): - raise AdminException() + raise ObjectCategoryAddNotAuthorized() def add_object_category_dict(self, user_name, intra_extension_uuid, object_category_name): - raise AdminException() + raise ObjectCategoryAddNotAuthorized() def del_object_category(self, user_name, intra_extension_uuid, object_uuid): - raise AdminException() + raise ObjectCategoryDelNotAuthorized() def set_action_category_dict(self, user_name, intra_extension_uuid, action_category): - raise AdminException() + raise ActionCategoryAddNotAuthorized() def add_action_category_dict(self, user_name, intra_extension_uuid, action_category_name): - raise AdminException() + raise ActionCategoryAddNotAuthorized() def del_action_category(self, user_name, intra_extension_uuid, action_uuid): - raise AdminException() + raise ActionCategoryDelNotAuthorized() def set_subject_category_scope_dict(self, user_name, intra_extension_uuid, category, scope): - raise AdminException() + raise SubjectCategoryScopeAddNotAuthorized() def add_subject_category_scope_dict(self, user_name, intra_extension_uuid, subject_category, scope_name): - raise AdminException() + raise SubjectCategoryScopeAddNotAuthorized() def del_subject_category_scope(self, user_name, intra_extension_uuid, subject_category, subject_category_scope): - raise AdminException() + raise SubjectCategoryScopeDelNotAuthorized() def set_object_category_scope_dict(self, user_name, intra_extension_uuid, category, scope): - raise AdminException() + raise ObjectCategoryScopeAddNotAuthorized() def add_object_category_scope_dict(self, user_name, intra_extension_uuid, object_category, scope_name): - raise AdminException() + raise ObjectCategoryScopeAddNotAuthorized() def del_object_category_scope(self, user_name, intra_extension_uuid, object_category, object_category_scope): - raise AdminException() + raise ObjectCategoryScopeDelNotAuthorized() def set_action_category_scope_dict(self, user_name, intra_extension_uuid, category, scope): - raise AdminException() + raise ActionCategoryScopeAddNotAuthorized() def add_action_category_scope_dict(self, user_name, intra_extension_uuid, action_category, scope_name): - raise AdminException() + raise ActionCategoryScopeAddNotAuthorized() def del_action_category_scope(self, user_name, intra_extension_uuid, action_category, action_category_scope): - raise AdminException() + raise ActionCategoryScopeDelNotAuthorized() def set_subject_category_assignment_dict(self, user_name, intra_extension_uuid, subject_uuid, assignment_dict): - raise AdminException() + raise SubjectCategoryAssignmentAddNotAuthorized() def del_subject_category_assignment(self, user_name, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid): - raise AdminException() + raise SubjectCategoryAssignmentAddNotAuthorized() def add_subject_category_assignment_dict(self, user_name, intra_extension_uuid, subject_uuid, category_uuid, scope_uuid): - raise AdminException() + raise SubjectCategoryAssignmentDelNotAuthorized() def set_object_category_assignment_dict(self, user_name, intra_extension_uuid, object_uuid, assignment_dict): - raise AdminException() + raise ObjectCategoryAssignmentAddNotAuthorized() def del_object_category_assignment(self, user_name, intra_extension_uuid, object_uuid, category_uuid, scope_uuid): - raise AdminException() + raise ObjectCategoryAssignmentAddNotAuthorized() def add_object_category_assignment_dict(self, user_name, intra_extension_uuid, object_uuid, category_uuid, scope_uuid): - raise AdminException() + raise ObjectCategoryAssignmentDelNotAuthorized() def set_action_category_assignment_dict(self, user_name, intra_extension_uuid, action_uuid, assignment_dict): - raise AdminException() + raise ActionCategoryAssignmentAddNotAuthorized() def del_action_category_assignment(self, user_name, intra_extension_uuid, action_uuid, category_uuid, scope_uuid): - raise AdminException() + raise ActionCategoryAssignmentAddNotAuthorized() def add_action_category_assignment_dict(self, user_name, intra_extension_uuid, action_uuid, category_uuid, scope_uuid): - raise AdminException() + raise ActionCategoryAssignmentDelNotAuthorized() def set_aggregation_algorithm(self, user_name, intra_extension_uuid, aggregation_algorithm): - raise AdminException() + raise MetaRuleAddNotAuthorized() def set_sub_meta_rule(self, user_name, intra_extension_uuid, sub_meta_rules): - raise AdminException() + raise MetaRuleAddNotAuthorized() def set_sub_rule(self, user_name, intra_extension_uuid, relation, sub_rule): - raise AdminException() + raise RuleAddNotAuthorized() def del_sub_rule(self, user_name, intra_extension_uuid, relation_name, rule): - raise AdminException() + raise RuleAddNotAuthorized() @dependency.provider('admin_api') @dependency.requires('identity_api', 'moonlog_api', 'tenant_api') diff --git a/keystone-moon/keystone/contrib/moon/exception.py b/keystone-moon/keystone/contrib/moon/exception.py index b0ec740b..b206fc76 100644 --- a/keystone-moon/keystone/contrib/moon/exception.py +++ b/keystone-moon/keystone/contrib/moon/exception.py @@ -239,6 +239,9 @@ class AdminAssignment(AuthzException): class AdminRule(AuthzException): title = 'Rule Exception' +class AdminMetaRule(AuthzException): + title = 'MetaRule Exception' + class SubjectReadNotAuthorized(AdminPerimeter): title = 'Subject Read Not Authorized' @@ -395,3 +398,15 @@ class RuleAddNotAuthorized(AdminRule): class RuleDelNotAuthorized(AdminRule): title = 'Rule Del Not Authorized' + + +class MetaRuleReadNotAuthorized(AdminRule): + title = 'MetaRule Read Not Authorized' + + +class MetaRuleAddNotAuthorized(AdminRule): + title = 'MetaRule Add Not Authorized' + + +class MetaRuleDelNotAuthorized(AdminRule): + title = 'MetaRule Del Not Authorized' diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py index 29ebe7a2..6426bf84 100644 --- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py +++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_admin.py @@ -19,7 +19,7 @@ from keystone.contrib.moon.core import LogManager, TenantManager CONF = cfg.CONF -USER_ADMIN = { +USER = { 'name': 'admin', 'domain_id': "default", 'password': 'admin' @@ -31,11 +31,11 @@ IE = { "description": "a simple description." } -class TestIntraExtensionAdminManager(tests.TestCase): +class TestIntraExtensionAdminManagerOK(tests.TestCase): def setUp(self): self.useFixture(database.Database()) - super(TestIntraExtensionAdminManager, self).setUp() + super(TestIntraExtensionAdminManagerOK, self).setUp() self.load_backends() self.load_fixtures(default_fixtures) self.manager = IntraExtensionAdminManager() @@ -51,7 +51,7 @@ class TestIntraExtensionAdminManager(tests.TestCase): } def config_overrides(self): - super(TestIntraExtensionAdminManager, self).config_overrides() + super(TestIntraExtensionAdminManagerOK, self).config_overrides() self.policy_directory = 'examples/moon/policies' self.config_fixture.config( group='moon', @@ -62,7 +62,7 @@ class TestIntraExtensionAdminManager(tests.TestCase): def create_intra_extension(self, policy_model="policy_rbac_admin"): # Create the admin user because IntraExtension needs it - self.admin = self.identity_api.create_user(USER_ADMIN) + self.admin = self.identity_api.create_user(USER) IE["policymodel"] = policy_model self.ref = self.manager.load_intra_extension(IE) self.assertIsInstance(self.ref, dict) @@ -1225,5 +1225,1544 @@ class TestIntraExtensionAdminManager(tests.TestCase): self.assertIn(a_scope, scope[func_name][cat_value]) +class TestIntraExtensionAdminManagerKO(tests.TestCase): + def setUp(self): + self.useFixture(database.Database()) + super(TestIntraExtensionAdminManagerKO, self).setUp() + self.load_backends() + self.load_fixtures(default_fixtures) + self.manager = IntraExtensionAuthzManager() + self.admin_manager = IntraExtensionAdminManager() + + def __get_key_from_value(self, value, values_dict): + return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0] + + def load_extra_backends(self): + return { + "moonlog_api": LogManager(), + "tenant_api": TenantManager(), + "resource_api": resource.Manager(), + } + + def config_overrides(self): + super(TestIntraExtensionAdminManagerKO, self).config_overrides() + self.policy_directory = 'examples/moon/policies' + self.config_fixture.config( + group='moon', + intraextension_driver='keystone.contrib.moon.backends.sql.IntraExtensionConnector') + self.config_fixture.config( + group='moon', + policy_directory=self.policy_directory) + + def create_tenant(self): + tenant = { + "id": uuid.uuid4().hex, + "name": "TestIntraExtensionAuthzManager", + "enabled": True, + "description": "", + "domain_id": "default" + } + return self.resource_api.create_project(tenant["id"], tenant) + + def create_mapping(self, tenant, authz_uuid=None, admin_uuid=None): + + mapping = self.tenant_api.set_tenant_dict(tenant["id"], tenant["name"], authz_uuid, admin_uuid) + self.assertIsInstance(mapping, dict) + self.assertIn("authz", mapping) + self.assertEqual(mapping["authz"], authz_uuid) + return mapping + + def create_user(self, username="admin"): + + _USER = dict(USER) + _USER["name"] = username + return self.identity_api.create_user(_USER) + + def create_intra_extension(self, policy_model="policy_rbac_authz"): + + IE["policymodel"] = policy_model + ref = self.admin_manager.load_intra_extension(IE) + self.assertIsInstance(self.ref, dict) + return ref + + def test_subjects(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + SubjectReadNotAuthorized, + self.manager.get_subject_dict, + demo_user["id"], ref["id"]) + + subjects = self.manager.get_subject_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(subjects, dict) + self.assertIn("subjects", subjects) + self.assertIn("id", subjects) + self.assertIn("intra_extension_uuid", subjects) + self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) + self.assertIsInstance(subjects["subjects"], dict) + + new_subject = self.create_user("new_user") + new_subjects = dict() + new_subjects[new_subject["id"]] = new_subject["name"] + + self.assertRaises( + SubjectAddNotAuthorized, + self.manager.set_subject_dict, + demo_user["id"], ref["id"], new_subjects) + + subjects = self.manager.set_subject_dict(admin_user["id"], ref["id"], new_subjects) + self.assertIsInstance(subjects, dict) + self.assertIn("subjects", subjects) + self.assertIn("id", subjects) + self.assertIn("intra_extension_uuid", subjects) + self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) + self.assertEqual(subjects["subjects"], new_subjects) + self.assertIn(new_subject["id"], subjects["subjects"]) + + # Delete the new subject + self.assertRaises( + SubjectDelNotAuthorized, + self.manager.del_subject_dict, + demo_user["id"], ref["id"], new_subject["id"]) + + self.manager.del_subject(admin_user["id"], ref["id"], new_subject["id"]) + subjects = self.manager.get_subject_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(subjects, dict) + self.assertIn("subjects", subjects) + self.assertIn("id", subjects) + self.assertIn("intra_extension_uuid", subjects) + self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) + self.assertNotIn(new_subject["id"], subjects["subjects"]) + + # Add a particular subject + self.assertRaises( + SubjectAddNotAuthorized, + self.manager.add_subject_dict, + demo_user["id"], ref["id"], new_subject["id"]) + + subjects = self.manager.add_subject_dict(admin_user["id"], ref["id"], new_subject["id"]) + self.assertIsInstance(subjects, dict) + self.assertIn("subject", subjects) + self.assertIn("uuid", subjects["subject"]) + self.assertEqual(new_subject["name"], subjects["subject"]["name"]) + subjects = self.manager.get_subject_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(subjects, dict) + self.assertIn("subjects", subjects) + self.assertIn("id", subjects) + self.assertIn("intra_extension_uuid", subjects) + self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) + self.assertIn(new_subject["id"], subjects["subjects"]) + + def test_objects(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + ObjectReadNotAuthorized, + self.manager.get_object_dict, + demo_user["id"], ref["id"]) + + objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(objects, dict) + self.assertIn("objects", objects) + self.assertIn("id", objects) + self.assertIn("intra_extension_uuid", objects) + self.assertEqual(ref["id"], objects["intra_extension_uuid"]) + self.assertIsInstance(objects["objects"], dict) + + new_object = {"id": uuid.uuid4().hex, "name": "my_object"} + new_objects = dict() + new_objects[new_object["id"]] = new_object["name"] + + self.assertRaises( + ObjectAddNotAuthorized, + self.manager.set_object_dict, + demo_user["id"], ref["id"], new_objects) + + objects = self.manager.set_object_dict(admin_user["id"], ref["id"], new_objects) + self.assertIsInstance(objects, dict) + self.assertIn("objects", objects) + self.assertIn("id", objects) + self.assertIn("intra_extension_uuid", objects) + self.assertEqual(ref["id"], objects["intra_extension_uuid"]) + self.assertEqual(objects["objects"], new_objects) + self.assertIn(new_object["id"], objects["objects"]) + + # Delete the new object + self.assertRaises( + ObjectDelNotAuthorized, + self.manager.del_object_dict, + demo_user["id"], ref["id"], new_object["id"]) + + self.manager.del_object(admin_user["id"], ref["id"], new_object["id"]) + objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(objects, dict) + self.assertIn("objects", objects) + self.assertIn("id", objects) + self.assertIn("intra_extension_uuid", objects) + self.assertEqual(ref["id"], objects["intra_extension_uuid"]) + self.assertNotIn(new_object["id"], objects["objects"]) + + # Add a particular object + self.assertRaises( + ObjectAddNotAuthorized, + self.manager.add_object_dict, + demo_user["id"], ref["id"], new_object["name"]) + + objects = self.manager.add_object_dict(admin_user["id"], ref["id"], new_object["name"]) + self.assertIsInstance(objects, dict) + self.assertIn("object", objects) + self.assertIn("uuid", objects["object"]) + self.assertEqual(new_object["name"], objects["object"]["name"]) + new_object["id"] = objects["object"]["uuid"] + objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(objects, dict) + self.assertIn("objects", objects) + self.assertIn("id", objects) + self.assertIn("intra_extension_uuid", objects) + self.assertEqual(ref["id"], objects["intra_extension_uuid"]) + self.assertIn(new_object["id"], objects["objects"]) + + def test_actions(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + ActionReadNotAuthorized, + self.manager.get_action_dict, + demo_user["id"], ref["id"]) + + actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(actions, dict) + self.assertIn("actions", actions) + self.assertIn("id", actions) + self.assertIn("intra_extension_uuid", actions) + self.assertEqual(ref["id"], actions["intra_extension_uuid"]) + self.assertIsInstance(actions["actions"], dict) + + new_action = {"id": uuid.uuid4().hex, "name": "my_action"} + new_actions = dict() + new_actions[new_action["id"]] = new_action["name"] + + self.assertRaises( + ActionAddNotAuthorized, + self.manager.set_action_dict, + demo_user["id"], ref["id"], new_actions) + + actions = self.manager.set_action_dict(admin_user["id"], ref["id"], new_actions) + self.assertIsInstance(actions, dict) + self.assertIn("actions", actions) + self.assertIn("id", actions) + self.assertIn("intra_extension_uuid", actions) + self.assertEqual(ref["id"], actions["intra_extension_uuid"]) + self.assertEqual(actions["actions"], new_actions) + self.assertIn(new_action["id"], actions["actions"]) + + # Delete the new action + self.assertRaises( + ActionDelNotAuthorized, + self.manager.del_action_dict, + demo_user["id"], ref["id"], new_action["id"]) + + self.manager.del_action(admin_user["id"], ref["id"], new_action["id"]) + actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(actions, dict) + self.assertIn("actions", actions) + self.assertIn("id", actions) + self.assertIn("intra_extension_uuid", actions) + self.assertEqual(ref["id"], actions["intra_extension_uuid"]) + self.assertNotIn(new_action["id"], actions["actions"]) + + # Add a particular action + self.assertRaises( + ActionAddNotAuthorized, + self.manager.add_action_dict, + demo_user["id"], ref["id"], new_action["name"]) + + actions = self.manager.add_action_dict(admin_user["id"], ref["id"], new_action["name"]) + self.assertIsInstance(actions, dict) + self.assertIn("action", actions) + self.assertIn("uuid", actions["action"]) + self.assertEqual(new_action["name"], actions["action"]["name"]) + new_action["id"] = actions["action"]["uuid"] + actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(actions, dict) + self.assertIn("actions", actions) + self.assertIn("id", actions) + self.assertIn("intra_extension_uuid", actions) + self.assertEqual(ref["id"], actions["intra_extension_uuid"]) + self.assertIn(new_action["id"], actions["actions"]) + + def test_subject_categories(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + SubjectCategoryReadNotAuthorized, + self.manager.get_subject_category_dict, + demo_user["id"], ref["id"]) + + subject_categories = self.manager.get_subject_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) + self.assertIsInstance(subject_categories["subject_categories"], dict) + + new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} + new_subject_categories = dict() + new_subject_categories[new_subject_category["id"]] = new_subject_category["name"] + + self.assertRaises( + SubjectCategoryAddNotAuthorized, + self.manager.set_subject_category_dict, + demo_user["id"], ref["id"], new_subject_categories) + + subject_categories = self.manager.set_subject_category_dict(admin_user["id"], ref["id"], new_subject_categories) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) + self.assertEqual(subject_categories["subject_categories"], new_subject_categories) + self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) + + # Delete the new subject_category + self.assertRaises( + SubjectCategoryDelNotAuthorized, + self.manager.del_subject_category_dict, + demo_user["id"], ref["id"], new_subject_category["id"]) + + self.manager.del_subject_category(admin_user["id"], ref["id"], new_subject_category["id"]) + subject_categories = self.manager.get_subject_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) + self.assertNotIn(new_subject_category["id"], subject_categories["subject_categories"]) + + # Add a particular subject_category + self.assertRaises( + SubjectCategoryAddNotAuthorized, + self.manager.add_subject_category_dict, + demo_user["id"], ref["id"], new_subject_category["name"]) + + subject_categories = self.manager.add_subject_category_dict( + admin_user["id"], + ref["id"], + new_subject_category["name"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_category", subject_categories) + self.assertIn("uuid", subject_categories["subject_category"]) + self.assertEqual(new_subject_category["name"], subject_categories["subject_category"]["name"]) + new_subject_category["id"] = subject_categories["subject_category"]["uuid"] + subject_categories = self.manager.get_subject_category_dict( + admin_user["id"], + ref["id"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) + self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) + + def test_object_categories(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + ObjectCategoryReadNotAuthorized, + self.manager.get_object_category_dict, + demo_user["id"], ref["id"]) + + object_categories = self.manager.get_object_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(object_categories, dict) + self.assertIn("object_categories", object_categories) + self.assertIn("id", object_categories) + self.assertIn("intra_extension_uuid", object_categories) + self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) + self.assertIsInstance(object_categories["object_categories"], dict) + + new_object_category = {"id": uuid.uuid4().hex, "name": "object_category_test"} + new_object_categories = dict() + new_object_categories[new_object_category["id"]] = new_object_category["name"] + + self.assertRaises( + ObjectCategoryAddNotAuthorized, + self.manager.set_object_category_dict, + demo_user["id"], ref["id"], new_object_categories) + + object_categories = self.manager.set_object_category_dict(admin_user["id"], ref["id"], new_object_categories) + self.assertIsInstance(object_categories, dict) + self.assertIn("object_categories", object_categories) + self.assertIn("id", object_categories) + self.assertIn("intra_extension_uuid", object_categories) + self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) + self.assertEqual(object_categories["object_categories"], new_object_categories) + self.assertIn(new_object_category["id"], object_categories["object_categories"]) + + # Delete the new object_category + self.assertRaises( + ObjectCategoryDelNotAuthorized, + self.manager.del_object_category, + demo_user["id"], ref["id"], new_object_category["id"]) + + self.manager.del_object_category(admin_user["id"], ref["id"], new_object_category["id"]) + object_categories = self.manager.get_object_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(object_categories, dict) + self.assertIn("object_categories", object_categories) + self.assertIn("id", object_categories) + self.assertIn("intra_extension_uuid", object_categories) + self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) + self.assertNotIn(new_object_category["id"], object_categories["object_categories"]) + + # Add a particular object_category + self.assertRaises( + ObjectCategoryAddNotAuthorized, + self.manager.add_object_category, + demo_user["id"], ref["id"], new_object_category["name"]) + + object_categories = self.manager.add_object_category_dict( + admin_user["id"], + ref["id"], + new_object_category["name"]) + self.assertIsInstance(object_categories, dict) + self.assertIn("object_category", object_categories) + self.assertIn("uuid", object_categories["object_category"]) + self.assertEqual(new_object_category["name"], object_categories["object_category"]["name"]) + new_object_category["id"] = object_categories["object_category"]["uuid"] + object_categories = self.manager.get_object_category_dict( + admin_user["id"], + ref["id"]) + self.assertIsInstance(object_categories, dict) + self.assertIn("object_categories", object_categories) + self.assertIn("id", object_categories) + self.assertIn("intra_extension_uuid", object_categories) + self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) + self.assertIn(new_object_category["id"], object_categories["object_categories"]) + + def test_action_categories(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + self.assertRaises( + ActionCategoryReadNotAuthorized, + self.manager.get_action_category_dict, + demo_user["id"], ref["id"]) + + action_categories = self.manager.get_action_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(action_categories, dict) + self.assertIn("action_categories", action_categories) + self.assertIn("id", action_categories) + self.assertIn("intra_extension_uuid", action_categories) + self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) + self.assertIsInstance(action_categories["action_categories"], dict) + + new_action_category = {"id": uuid.uuid4().hex, "name": "action_category_test"} + new_action_categories = dict() + new_action_categories[new_action_category["id"]] = new_action_category["name"] + self.assertRaises( + ActionCategoryAddNotAuthorized, + self.manager.set_action_category_dict, + demo_user["id"], ref["id"], new_action_categories) + + action_categories = self.manager.set_action_category_dict(admin_user["id"], ref["id"], new_action_categories) + self.assertIsInstance(action_categories, dict) + self.assertIn("action_categories", action_categories) + self.assertIn("id", action_categories) + self.assertIn("intra_extension_uuid", action_categories) + self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) + self.assertEqual(action_categories["action_categories"], new_action_categories) + self.assertIn(new_action_category["id"], action_categories["action_categories"]) + + # Delete the new action_category + self.assertRaises( + ActionCategoryDelNotAuthorized, + self.manager.del_action_category_dict, + demo_user["id"], ref["id"], new_action_category["id"]) + + self.manager.del_action_category(admin_user["id"], ref["id"], new_action_category["id"]) + action_categories = self.manager.get_action_category_dict(admin_user["id"], ref["id"]) + self.assertIsInstance(action_categories, dict) + self.assertIn("action_categories", action_categories) + self.assertIn("id", action_categories) + self.assertIn("intra_extension_uuid", action_categories) + self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) + self.assertNotIn(new_action_category["id"], action_categories["action_categories"]) + + # Add a particular action_category + self.assertRaises( + ActionCategoryAddNotAuthorized, + self.manager.add_action_category_dict, + demo_user["id"], ref["id"], new_action_category["name"]) + + action_categories = self.manager.add_action_category_dict( + admin_user["id"], + ref["id"], + new_action_category["name"]) + self.assertIsInstance(action_categories, dict) + self.assertIn("action_category", action_categories) + self.assertIn("uuid", action_categories["action_category"]) + self.assertEqual(new_action_category["name"], action_categories["action_category"]["name"]) + new_action_category["id"] = action_categories["action_category"]["uuid"] + action_categories = self.manager.get_action_category_dict( + admin_user["id"], + ref["id"]) + self.assertIsInstance(action_categories, dict) + self.assertIn("action_categories", action_categories) + self.assertIn("id", action_categories) + self.assertIn("intra_extension_uuid", action_categories) + self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) + self.assertIn(new_action_category["id"], action_categories["action_categories"]) + + def test_subject_category_scope(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + subject_categories = self.manager.set_subject_category_dict( + admin_user["id"], + ref["id"], + { + uuid.uuid4().hex: admin_user["id"], + uuid.uuid4().hex: "dev", + } + ) + + for subject_category in subject_categories["subject_categories"]: + self.assertRaises( + SubjectCategoryScopeReadNotAuthorized, + self.manager.get_subject_category_scope_dict, + demo_user["id"], ref["id"], subject_category) + + subject_category_scope = self.manager.get_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) + + new_subject_category_scope = dict() + new_subject_category_scope_uuid = uuid.uuid4().hex + new_subject_category_scope[new_subject_category_scope_uuid] = "new_subject_category_scope" + + self.assertRaises( + SubjectCategoryScopeAddNotAuthorized, + self.manager.set_subject_category_scope_dict, + demo_user["id"], ref["id"], subject_category, new_subject_category_scope) + + subject_category_scope = self.manager.set_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category, + new_subject_category_scope) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], + subject_category_scope["subject_category_scope"][subject_category].values()) + + # Delete the new subject_category_scope + self.assertRaises( + SubjectCategoryScopeDelNotAuthorized, + self.manager.del_subject_category_scope_dict, + demo_user["id"], ref["id"], subject_category, new_subject_category_scope_uuid) + + self.manager.del_subject_category_scope( + admin_user["id"], + ref["id"], + subject_category, + new_subject_category_scope_uuid) + subject_category_scope = self.manager.get_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_subject_category_scope_uuid, subject_category_scope["subject_category_scope"]) + + # Add a particular subject_category_scope + self.assertRaises( + SubjectCategoryScopeAddNotAuthorized, + self.manager.add_subject_category_scope_dict, + demo_user["id"], ref["id"], subject_category, + new_subject_category_scope[new_subject_category_scope_uuid]) + + subject_category_scope = self.manager.add_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category, + new_subject_category_scope[new_subject_category_scope_uuid]) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("uuid", subject_category_scope["subject_category_scope"]) + self.assertEqual(new_subject_category_scope[new_subject_category_scope_uuid], + subject_category_scope["subject_category_scope"]["name"]) + subject_category_scope = self.manager.get_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_subject_category_scope_uuid, subject_category_scope["subject_category_scope"]) + + def test_object_category_scope(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + object_categories = self.manager.set_object_category_dict( + admin_user["id"], + ref["id"], + { + uuid.uuid4().hex: "id", + uuid.uuid4().hex: "domain", + } + ) + + for object_category in object_categories["object_categories"]: + self.assertRaises( + ObjectCategoryScopeReadNotAuthorized, + self.manager.get_object_category_scope_dict, + demo_user["id"], ref["id"], object_category) + + object_category_scope = self.manager.get_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIsInstance(object_category_scope["object_category_scope"], dict) + + new_object_category_scope = dict() + new_object_category_scope_uuid = uuid.uuid4().hex + new_object_category_scope[new_object_category_scope_uuid] = "new_object_category_scope" + + self.assertRaises( + ObjectCategoryScopeAddNotAuthorized, + self.manager.set_object_category_scope_dict, + demo_user["id"], ref["id"], object_category, new_object_category_scope) + + object_category_scope = self.manager.set_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category, + new_object_category_scope) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIn(new_object_category_scope[new_object_category_scope_uuid], + object_category_scope["object_category_scope"][object_category].values()) + + # Delete the new object_category_scope + self.assertRaises( + ObjectCategoryScopeDelNotAuthorized, + self.manager.del_object_category_scope_dict, + demo_user["id"], ref["id"], object_category, new_object_category_scope) + + self.manager.del_object_category_scope( + admin_user["id"], + ref["id"], + object_category, + new_object_category_scope_uuid) + object_category_scope = self.manager.get_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_object_category_scope_uuid, object_category_scope["object_category_scope"]) + + # Add a particular object_category_scope + self.assertRaises( + ObjectCategoryScopeAddNotAuthorized, + self.manager.add_object_category_scope_dict, + demo_user["id"], ref["id"], object_category, + new_object_category_scope[new_object_category_scope_uuid] + ) + + object_category_scope = self.manager.add_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category, + new_object_category_scope[new_object_category_scope_uuid]) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("uuid", object_category_scope["object_category_scope"]) + self.assertEqual(new_object_category_scope[new_object_category_scope_uuid], + object_category_scope["object_category_scope"]["name"]) + object_category_scope = self.manager.get_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_object_category_scope_uuid, object_category_scope["object_category_scope"]) + + def test_action_category_scope(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + action_categories = self.manager.set_action_category_dict( + admin_user["id"], + ref["id"], + { + uuid.uuid4().hex: "compute", + uuid.uuid4().hex: "identity", + } + ) + + for action_category in action_categories["action_categories"]: + self.assertRaises( + ActionCategoryScopeReadNotAuthorized, + self.manager.get_object_category_scope_dict, + demo_user["id"], ref["id"], action_category) + + action_category_scope = self.manager.get_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIsInstance(action_category_scope["action_category_scope"], dict) + + new_action_category_scope = dict() + new_action_category_scope_uuid = uuid.uuid4().hex + new_action_category_scope[new_action_category_scope_uuid] = "new_action_category_scope" + + self.assertRaises( + ActionCategoryScopeAddNotAuthorized, + self.manager.set_action_category_scope_dict, + demo_user["id"], ref["id"], action_category, new_action_category_scope) + + action_category_scope = self.manager.set_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category, + new_action_category_scope) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIn(new_action_category_scope[new_action_category_scope_uuid], + action_category_scope["action_category_scope"][action_category].values()) + + # Delete the new action_category_scope + self.assertRaises( + ActionCategoryScopeDelNotAuthorized, + self.manager.del_action_category_scope_dict, + demo_user["id"], ref["id"], action_category, + new_action_category_scope_uuid + ) + + self.manager.del_action_category_scope( + admin_user["id"], + ref["id"], + action_category, + new_action_category_scope_uuid) + action_category_scope = self.manager.get_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_action_category_scope_uuid, action_category_scope["action_category_scope"]) + + # Add a particular action_category_scope + self.assertRaises( + ActionCategoryScopeAddNotAuthorized, + self.manager.add_action_category_scope_dict, + demo_user["id"], ref["id"], action_category, + new_action_category_scope[new_action_category_scope_uuid] + ) + + action_category_scope = self.manager.add_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category, + new_action_category_scope[new_action_category_scope_uuid]) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("uuid", action_category_scope["action_category_scope"]) + self.assertEqual(new_action_category_scope[new_action_category_scope_uuid], + action_category_scope["action_category_scope"]["name"]) + action_category_scope = self.manager.get_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertNotIn(new_action_category_scope_uuid, action_category_scope["action_category_scope"]) + + def test_subject_category_assignment(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + new_subject = self.create_user() + new_subjects = dict() + new_subjects[new_subject["id"]] = new_subject["name"] + subjects = self.manager.set_subject_dict(admin_user["id"], ref["id"], new_subjects) + + new_subject_category_uuid = uuid.uuid4().hex + new_subject_category_value = "role" + subject_categories = self.manager.set_subject_category_dict( + admin_user["id"], + ref["id"], + { + new_subject_category_uuid: new_subject_category_value + } + ) + + for subject_category in subject_categories["subject_categories"]: + subject_category_scope = self.manager.get_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) + + new_subject_category_scope = dict() + new_subject_category_scope_uuid = uuid.uuid4().hex + new_subject_category_scope[new_subject_category_scope_uuid] = admin_user["id"] + subject_category_scope = self.manager.set_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category, + new_subject_category_scope) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], + subject_category_scope["subject_category_scope"][subject_category].values()) + + new_subject_category_scope2 = dict() + new_subject_category_scope2_uuid = uuid.uuid4().hex + new_subject_category_scope2[new_subject_category_scope2_uuid] = "dev" + subject_category_scope = self.manager.set_subject_category_scope_dict( + admin_user["id"], + ref["id"], + subject_category, + new_subject_category_scope2) + self.assertIsInstance(subject_category_scope, dict) + self.assertIn("subject_category_scope", subject_category_scope) + self.assertIn("id", subject_category_scope) + self.assertIn("intra_extension_uuid", subject_category_scope) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertIn(new_subject_category_scope2[new_subject_category_scope2_uuid], + subject_category_scope["subject_category_scope"][subject_category].values()) + + self.assertRaises( + SubjectCategoryAssignmentReadNotAuthorized, + self.manager.get_subject_category_assignment_dict, + demo_user["id"], ref["id"], new_subject["id"]) + + subject_category_assignments = self.manager.get_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"] + ) + self.assertIsInstance(subject_category_assignments, dict) + self.assertIn("subject_category_assignments", subject_category_assignments) + self.assertIn("id", subject_category_assignments) + self.assertIn("intra_extension_uuid", subject_category_assignments) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual({}, subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + + self.assertRaises( + SubjectCategoryAssignmentAddNotAuthorized, + self.manager.set_subject_category_assignment_dict, + demo_user["id"], ref["id"], new_subject["id"], + { + new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid], + } + ) + + subject_category_assignments = self.manager.set_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"], + { + new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid], + } + ) + self.assertIsInstance(subject_category_assignments, dict) + self.assertIn("subject_category_assignments", subject_category_assignments) + self.assertIn("id", subject_category_assignments) + self.assertIn("intra_extension_uuid", subject_category_assignments) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid]}, + subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + subject_category_assignments = self.manager.get_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"] + ) + self.assertIsInstance(subject_category_assignments, dict) + self.assertIn("subject_category_assignments", subject_category_assignments) + self.assertIn("id", subject_category_assignments) + self.assertIn("intra_extension_uuid", subject_category_assignments) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid]}, + subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + + self.assertRaises( + SubjectCategoryAssignmentDelNotAuthorized, + self.manager.del_subject_category_assignment_dict, + demo_user["id"], ref["id"], new_subject["id"], + new_subject_category_uuid, + new_subject_category_scope_uuid + ) + + self.manager.del_subject_category_assignment( + admin_user["id"], + ref["id"], + new_subject["id"], + new_subject_category_uuid, + new_subject_category_scope_uuid + ) + subject_category_assignments = self.manager.get_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"] + ) + self.assertIsInstance(subject_category_assignments, dict) + self.assertIn("subject_category_assignments", subject_category_assignments) + self.assertIn("id", subject_category_assignments) + self.assertIn("intra_extension_uuid", subject_category_assignments) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_subject_category_uuid: [new_subject_category_scope2_uuid, ]}, + subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + + self.assertRaises( + SubjectCategoryAssignmentAddNotAuthorized, + self.manager.add_subject_category_assignment_dict, + demo_user["id"], ref["id"], new_subject["id"], + new_subject_category_uuid, + new_subject_category_scope_uuid + ) + + data = self.manager.add_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"], + new_subject_category_uuid, + new_subject_category_scope_uuid + ) + + subject_category_assignments = self.manager.get_subject_category_assignment_dict( + admin_user["id"], + ref["id"], + new_subject["id"] + ) + self.assertIsInstance(subject_category_assignments, dict) + self.assertIn("subject_category_assignments", subject_category_assignments) + self.assertIn("id", subject_category_assignments) + self.assertIn("intra_extension_uuid", subject_category_assignments) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_subject_category_uuid: [new_subject_category_scope2_uuid, new_subject_category_scope_uuid]}, + subject_category_assignments["subject_category_assignments"][new_subject["id"]]) + + def test_object_category_assignment(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + new_object = self.create_user() + new_objects = dict() + new_objects[new_object["id"]] = new_object["name"] + objects = self.manager.set_object_dict(admin_user["id"], ref["id"], new_objects) + + new_object_category_uuid = uuid.uuid4().hex + new_object_category_value = "role" + object_categories = self.manager.set_object_category_dict( + admin_user["id"], + ref["id"], + { + new_object_category_uuid: new_object_category_value + } + ) + + for object_category in object_categories["object_categories"]: + object_category_scope = self.manager.get_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIsInstance(object_category_scope["object_category_scope"], dict) + + new_object_category_scope = dict() + new_object_category_scope_uuid = uuid.uuid4().hex + new_object_category_scope[new_object_category_scope_uuid] = admin_user["id"] + object_category_scope = self.manager.set_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category, + new_object_category_scope) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIn(new_object_category_scope[new_object_category_scope_uuid], + object_category_scope["object_category_scope"][object_category].values()) + + new_object_category_scope2 = dict() + new_object_category_scope2_uuid = uuid.uuid4().hex + new_object_category_scope2[new_object_category_scope2_uuid] = "dev" + object_category_scope = self.manager.set_object_category_scope_dict( + admin_user["id"], + ref["id"], + object_category, + new_object_category_scope2) + self.assertIsInstance(object_category_scope, dict) + self.assertIn("object_category_scope", object_category_scope) + self.assertIn("id", object_category_scope) + self.assertIn("intra_extension_uuid", object_category_scope) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertIn(new_object_category_scope2[new_object_category_scope2_uuid], + object_category_scope["object_category_scope"][object_category].values()) + + self.assertRaises( + ObjectCategoryAssignmentReadNotAuthorized, + self.manager.get_object_category_assignment_dict, + demo_user["id"], ref["id"], new_object["id"] + ) + + object_category_assignments = self.manager.get_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"] + ) + self.assertIsInstance(object_category_assignments, dict) + self.assertIn("object_category_assignments", object_category_assignments) + self.assertIn("id", object_category_assignments) + self.assertIn("intra_extension_uuid", object_category_assignments) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual({}, object_category_assignments["object_category_assignments"][new_object["id"]]) + + self.assertRaises( + ObjectCategoryAssignmentAddNotAuthorized, + self.manager.set_object_category_assignment_dict, + demo_user["id"], ref["id"], new_object["id"], + { + new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid], + } + ) + + object_category_assignments = self.manager.set_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"], + { + new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid], + } + ) + self.assertIsInstance(object_category_assignments, dict) + self.assertIn("object_category_assignments", object_category_assignments) + self.assertIn("id", object_category_assignments) + self.assertIn("intra_extension_uuid", object_category_assignments) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]}, + object_category_assignments["object_category_assignments"][new_object["id"]]) + object_category_assignments = self.manager.get_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"] + ) + self.assertIsInstance(object_category_assignments, dict) + self.assertIn("object_category_assignments", object_category_assignments) + self.assertIn("id", object_category_assignments) + self.assertIn("intra_extension_uuid", object_category_assignments) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid]}, + object_category_assignments["object_category_assignments"][new_object["id"]]) + + self.assertRaises( + ObjectCategoryAssignmentDelNotAuthorized, + self.manager.del_object_category_assignment_dict, + demo_user["id"], ref["id"], new_object["id"], + new_object_category_uuid, + new_object_category_scope_uuid + ) + + self.manager.del_object_category_assignment( + admin_user["id"], + ref["id"], + new_object["id"], + new_object_category_uuid, + new_object_category_scope_uuid + ) + object_category_assignments = self.manager.get_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"] + ) + self.assertIsInstance(object_category_assignments, dict) + self.assertIn("object_category_assignments", object_category_assignments) + self.assertIn("id", object_category_assignments) + self.assertIn("intra_extension_uuid", object_category_assignments) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_object_category_uuid: [new_object_category_scope2_uuid, ]}, + object_category_assignments["object_category_assignments"][new_object["id"]]) + + self.assertRaises( + ObjectCategoryAssignmentAddNotAuthorized, + self.manager.add_object_category_assignment_dict, + demo_user["id"], ref["id"], new_object["id"], + new_object_category_uuid, + new_object_category_scope_uuid + ) + + self.manager.add_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"], + new_object_category_uuid, + new_object_category_scope_uuid + ) + + object_category_assignments = self.manager.get_object_category_assignment_dict( + admin_user["id"], + ref["id"], + new_object["id"] + ) + self.assertIsInstance(object_category_assignments, dict) + self.assertIn("object_category_assignments", object_category_assignments) + self.assertIn("id", object_category_assignments) + self.assertIn("intra_extension_uuid", object_category_assignments) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_object_category_uuid: [new_object_category_scope2_uuid, new_object_category_scope_uuid]}, + object_category_assignments["object_category_assignments"][new_object["id"]]) + + def test_action_category_assignment(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + new_action = self.create_user() + new_actions = dict() + new_actions[new_action["id"]] = new_action["name"] + actions = self.manager.set_action_dict(admin_user["id"], ref["id"], new_actions) + + new_action_category_uuid = uuid.uuid4().hex + new_action_category_value = "role" + action_categories = self.manager.set_action_category_dict( + admin_user["id"], + ref["id"], + { + new_action_category_uuid: new_action_category_value + } + ) + + for action_category in action_categories["action_categories"]: + action_category_scope = self.manager.get_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIsInstance(action_category_scope["action_category_scope"], dict) + + new_action_category_scope = dict() + new_action_category_scope_uuid = uuid.uuid4().hex + new_action_category_scope[new_action_category_scope_uuid] = admin_user["id"] + action_category_scope = self.manager.set_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category, + new_action_category_scope) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIn(new_action_category_scope[new_action_category_scope_uuid], + action_category_scope["action_category_scope"][action_category].values()) + + new_action_category_scope2 = dict() + new_action_category_scope2_uuid = uuid.uuid4().hex + new_action_category_scope2[new_action_category_scope2_uuid] = "dev" + action_category_scope = self.manager.set_action_category_scope_dict( + admin_user["id"], + ref["id"], + action_category, + new_action_category_scope2) + self.assertIsInstance(action_category_scope, dict) + self.assertIn("action_category_scope", action_category_scope) + self.assertIn("id", action_category_scope) + self.assertIn("intra_extension_uuid", action_category_scope) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertIn(new_action_category_scope2[new_action_category_scope2_uuid], + action_category_scope["action_category_scope"][action_category].values()) + + self.assertRaises( + ActionCategoryAssignmentReadNotAuthorized, + self.manager.get_action_category_assignment_dict, + demo_user["id"], ref["id"], new_action["id"] + ) + + action_category_assignments = self.manager.get_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"] + ) + self.assertIsInstance(action_category_assignments, dict) + self.assertIn("action_category_assignments", action_category_assignments) + self.assertIn("id", action_category_assignments) + self.assertIn("intra_extension_uuid", action_category_assignments) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual({}, action_category_assignments["action_category_assignments"][new_action["id"]]) + + self.assertRaises( + ActionCategoryAssignmentAddNotAuthorized, + self.manager.set_action_category_assignment_dict, + demo_user["id"], ref["id"], new_action["id"], + { + new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid], + } + ) + + action_category_assignments = self.manager.set_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"], + { + new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid], + } + ) + self.assertIsInstance(action_category_assignments, dict) + self.assertIn("action_category_assignments", action_category_assignments) + self.assertIn("id", action_category_assignments) + self.assertIn("intra_extension_uuid", action_category_assignments) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]}, + action_category_assignments["action_category_assignments"][new_action["id"]]) + action_category_assignments = self.manager.get_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"] + ) + self.assertIsInstance(action_category_assignments, dict) + self.assertIn("action_category_assignments", action_category_assignments) + self.assertIn("id", action_category_assignments) + self.assertIn("intra_extension_uuid", action_category_assignments) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid]}, + action_category_assignments["action_category_assignments"][new_action["id"]]) + + self.assertRaises( + ActionCategoryAssignmentDelNotAuthorized, + self.manager.del_action_category_assignment_dict, + demo_user["id"], ref["id"], new_action["id"], + new_action_category_uuid, + new_action_category_scope_uuid + ) + + self.manager.del_action_category_assignment( + admin_user["id"], + ref["id"], + new_action["id"], + new_action_category_uuid, + new_action_category_scope_uuid + ) + action_category_assignments = self.manager.get_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"] + ) + self.assertIsInstance(action_category_assignments, dict) + self.assertIn("action_category_assignments", action_category_assignments) + self.assertIn("id", action_category_assignments) + self.assertIn("intra_extension_uuid", action_category_assignments) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_action_category_uuid: [new_action_category_scope2_uuid, ]}, + action_category_assignments["action_category_assignments"][new_action["id"]]) + + self.assertRaises( + ActionCategoryAssignmentAddNotAuthorized, + self.manager.add_action_category_assignment_dict, + demo_user["id"], ref["id"], new_action["id"], + new_action_category_uuid, + new_action_category_scope_uuid + ) + + self.manager.add_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"], + new_action_category_uuid, + new_action_category_scope_uuid + ) + + action_category_assignments = self.manager.get_action_category_assignment_dict( + admin_user["id"], + ref["id"], + new_action["id"] + ) + self.assertIsInstance(action_category_assignments, dict) + self.assertIn("action_category_assignments", action_category_assignments) + self.assertIn("id", action_category_assignments) + self.assertIn("intra_extension_uuid", action_category_assignments) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual( + {new_action_category_uuid: [new_action_category_scope2_uuid, new_action_category_scope_uuid]}, + action_category_assignments["action_category_assignments"][new_action["id"]]) + + def test_sub_meta_rules(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + aggregation_algorithms = self.manager.get_aggregation_algorithms(admin_user["id"], ref["id"]) + self.assertIsInstance(aggregation_algorithms, dict) + self.assertIsInstance(aggregation_algorithms["aggregation_algorithms"], list) + self.assertIn("and_true_aggregation", aggregation_algorithms["aggregation_algorithms"]) + self.assertIn("test_aggregation", aggregation_algorithms["aggregation_algorithms"]) + + self.assertRaises( + MetaRuleReadNotAuthorized, + self.manager.get_aggregation_algorithm, + demo_user["id"], ref["id"] + ) + + aggregation_algorithm = self.manager.get_aggregation_algorithm(admin_user["id"], ref["id"]) + self.assertIsInstance(aggregation_algorithm, dict) + self.assertIn("aggregation", aggregation_algorithm) + self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) + + _aggregation_algorithm = list(aggregation_algorithms["aggregation_algorithms"]) + _aggregation_algorithm.remove(aggregation_algorithm["aggregation"]) + + self.assertRaises( + MetaRuleAddNotAuthorized, + self.manager.set_aggregation_algorithms, + demo_user["id"], ref["id"], _aggregation_algorithm[0] + ) + + aggregation_algorithm = self.manager.set_aggregation_algorithm(admin_user["id"], ref["id"], _aggregation_algorithm[0]) + self.assertIsInstance(aggregation_algorithm, dict) + self.assertIn("aggregation", aggregation_algorithm) + self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) + + self.assertRaises( + MetaRuleReadNotAuthorized, + self.manager.get_sub_meta_rule, + demo_user["id"], ref["id"] + ) + + sub_meta_rules = self.manager.get_sub_meta_rule(admin_user["id"], ref["id"]) + self.assertIsInstance(sub_meta_rules, dict) + self.assertIn("sub_meta_rules", sub_meta_rules) + sub_meta_rules_conf = json.load(open(os.path.join(self.policy_directory, ref["model"], "metarule.json"))) + metarule = dict() + categories = { + "subject_categories": self.manager.get_subject_category_dict(admin_user["id"], ref["id"]), + "object_categories": self.manager.get_object_category_dict(admin_user["id"], ref["id"]), + "action_categories": self.manager.get_action_category_dict(admin_user["id"], ref["id"]) + } + for relation in sub_meta_rules_conf["sub_meta_rules"]: + metarule[relation] = dict() + for item in ("subject_categories", "object_categories", "action_categories"): + metarule[relation][item] = list() + for element in sub_meta_rules_conf["sub_meta_rules"][relation][item]: + metarule[relation][item].append(self.__get_key_from_value( + element, + categories[item][item] + )) + + for relation in sub_meta_rules["sub_meta_rules"]: + self.assertIn(relation, metarule) + for item in ("subject_categories", "object_categories", "action_categories"): + self.assertEqual( + sub_meta_rules["sub_meta_rules"][relation][item], + metarule[relation][item] + ) + + new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} + # Add a particular subject_category + data = self.manager.add_subject_category_dict( + admin_user["id"], + ref["id"], + new_subject_category["name"]) + new_subject_category["id"] = data["subject_category"]["uuid"] + subject_categories = self.manager.get_subject_category_dict( + admin_user["id"], + ref["id"]) + self.assertIsInstance(subject_categories, dict) + self.assertIn("subject_categories", subject_categories) + self.assertIn("id", subject_categories) + self.assertIn("intra_extension_uuid", subject_categories) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) + self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) + metarule[relation]["subject_categories"].append(new_subject_category["id"]) + + self.assertRaises( + MetaRuleAddNotAuthorized, + self.manager.set_sub_meta_rule, + demo_user["id"], ref["id"], metarule + ) + + _sub_meta_rules = self.manager.set_sub_meta_rule(admin_user["id"], ref["id"], metarule) + self.assertIn(relation, metarule) + for item in ("subject_categories", "object_categories", "action_categories"): + self.assertEqual( + _sub_meta_rules["sub_meta_rules"][relation][item], + metarule[relation][item] + ) + + def test_sub_rules(self): + admin_user = self.create_user("admin") + ref = self.create_intra_extension() + demo_user = self.create_user("demo") + + sub_meta_rules = self.manager.get_sub_meta_rule(admin_user["id"], ref["id"]) + self.assertIsInstance(sub_meta_rules, dict) + self.assertIn("sub_meta_rules", sub_meta_rules) + + self.assertRaises( + RuleReadNotAuthorized, + self.manager.get_sub_rules, + demo_user["id"], ref["id"] + ) + + sub_rules = self.manager.get_sub_rules(admin_user["id"], ref["id"]) + self.assertIsInstance(sub_rules, dict) + self.assertIn("rules", sub_rules) + rules = dict() + for relation in sub_rules["rules"]: + self.assertIn(relation, self.manager.get_sub_meta_rule_relations(admin_user["id"], ref["id"])["sub_meta_rule_relations"]) + rules[relation] = list() + for rule in sub_rules["rules"][relation]: + print(rule) + for cat, cat_func, func_name in ( + ("subject_categories", self.manager.get_subject_category_scope_dict, "subject_category_scope"), + ("action_categories", self.manager.get_action_category_scope_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_category_scope_dict, "object_category_scope"), + ): + for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + scope = cat_func( + admin_user["id"], + ref["id"], + cat_value + ) + a_scope = rule.pop(0) + print(a_scope) + if type(a_scope) is not bool: + self.assertIn(a_scope, scope[func_name][cat_value]) + + # add a new subrule + + relation = sub_rules["rules"].keys()[0] + sub_rule = [] + for cat, cat_func, func_name in ( + ("subject_categories", self.manager.get_subject_category_scope_dict, "subject_category_scope"), + ("action_categories", self.manager.get_action_category_scope_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_category_scope_dict, "object_category_scope"), + ): + for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + scope = cat_func( + admin_user["id"], + ref["id"], + cat_value + ) + sub_rule.append(scope[func_name][cat_value].keys()[0]) + + sub_rule.append(True) + self.assertRaises( + RuleAddNotAuthorized, + self.manager.set_sub_rules, + demo_user["id"], ref["id"], relation, sub_rule + ) + + sub_rules = self.manager.set_sub_rule(admin_user["id"], ref["id"], relation, sub_rule) + self.assertIsInstance(sub_rules, dict) + self.assertIn("rules", sub_rules) + rules = dict() + self.assertIn(sub_rule, sub_rules["rules"][relation]) + for relation in sub_rules["rules"]: + self.assertIn(relation, self.manager.get_sub_meta_rule_relations(admin_user["id"], ref["id"])["sub_meta_rule_relations"]) + rules[relation] = list() + for rule in sub_rules["rules"][relation]: + for cat, cat_func, func_name in ( + ("subject_categories", self.manager.get_subject_category_scope_dict, "subject_category_scope"), + ("action_categories", self.manager.get_action_category_scope_dict, "action_category_scope"), + ("object_categories", self.manager.get_object_category_scope_dict, "object_category_scope"), + ): + for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: + scope = cat_func( + admin_user["id"], + ref["id"], + cat_value + ) + a_scope = rule.pop(0) + self.assertIn(a_scope, scope[func_name][cat_value]) diff --git a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py index 6d852780..64a2d38f 100644 --- a/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py +++ b/keystone-moon/keystone/tests/moon/unit/test_unit_core_intra_extension_authz.py @@ -19,7 +19,7 @@ from keystone.contrib.moon.core import LogManager, TenantManager CONF = cfg.CONF -USER_ADMIN = { +USER = { 'name': 'admin', 'domain_id': "default", 'password': 'admin' @@ -48,7 +48,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): return { "moonlog_api": LogManager(), "tenant_api": TenantManager(), - # "resource_api": resource.Manager(), + "resource_api": resource.Manager(), } def config_overrides(self): @@ -61,46 +61,7 @@ class TestIntraExtensionAuthzManagerAuthz(tests.TestCase): group='moon', policy_directory=self.policy_directory) - -class TestIntraExtensionAuthzManager(tests.TestCase): - - def setUp(self): - self.useFixture(database.Database()) - super(TestIntraExtensionAuthzManager, self).setUp() - self.load_backends() - self.load_fixtures(default_fixtures) - self.manager = IntraExtensionAuthzManager() - self.admin_manager = IntraExtensionAdminManager() - - def __get_key_from_value(self, value, values_dict): - return filter(lambda v: v[1] == value, values_dict.iteritems())[0][0] - - def load_extra_backends(self): - return { - "moonlog_api": LogManager(), - "tenant_api": TenantManager(), - # "resource_api": resource.Manager(), - } - - def config_overrides(self): - super(TestIntraExtensionAuthzManager, self).config_overrides() - self.policy_directory = 'examples/moon/policies' - self.config_fixture.config( - group='moon', - intraextension_driver='keystone.contrib.moon.backends.sql.IntraExtensionConnector') - self.config_fixture.config( - group='moon', - policy_directory=self.policy_directory) - - def create_intra_extension(self, policy_model="policy_rbac_authz"): - # Create the admin user because IntraExtension needs it - self.admin = self.identity_api.create_user(USER_ADMIN) - IE["policymodel"] = policy_model - self.ref = self.admin_manager.load_intra_extension(IE) - self.assertIsInstance(self.ref, dict) - self.create_tenant(self.ref["id"]) - - def create_tenant(self, authz_uuid): + def create_tenant(self): tenant = { "id": uuid.uuid4().hex, "name": "TestIntraExtensionAuthzManager", @@ -108,266 +69,517 @@ class TestIntraExtensionAuthzManager(tests.TestCase): "description": "", "domain_id": "default" } - project = self.resource_api.create_project(tenant["id"], tenant) - mapping = self.tenant_api.set_tenant_dict(project["id"], project["name"], authz_uuid, None) + return self.resource_api.create_project(tenant["id"], tenant) + + def create_mapping(self, tenant, authz_uuid=None, admin_uuid=None): + + mapping = self.tenant_api.set_tenant_dict(tenant["id"], tenant["name"], authz_uuid, admin_uuid) self.assertIsInstance(mapping, dict) self.assertIn("authz", mapping) self.assertEqual(mapping["authz"], authz_uuid) return mapping - def create_user(self, username="TestIntraExtensionAuthzManagerUser"): - user = { - "id": uuid.uuid4().hex, - "name": username, - "enabled": True, - "description": "", - "domain_id": "default" - } - _user = self.identity_api.create_user(user) - return _user + def create_user(self, username="admin"): + + _USER = dict(USER) + _USER["name"] = username + return self.identity_api.create_user(_USER) + + def create_intra_extension(self, policy_model="policy_rbac_authz"): + + IE["policymodel"] = policy_model + ref = self.admin_manager.load_intra_extension(IE) + self.assertIsInstance(self.ref, dict) + return ref + + def test_tenant_exceptions(self): + self.assertRaises( + TenantListEmpty, + self.manager.get_tenant_dict + ) + self.assertRaises( + TenantNotFound, + self.manager.get_tenant_name, + uuid.uuid4().hex + ) + self.assertRaises( + TenantNotFound, + self.manager.set_tenant_name, + uuid.uuid4().hex, uuid.uuid4().hex + ) + self.assertRaises( + TenantNotFound, + self.manager.get_extension_uuid, + uuid.uuid4().hex, "authz" + ) + self.assertRaises( + TenantNotFound, + self.manager.get_extension_uuid, + uuid.uuid4().hex, "admin" + ) - def delete_admin_intra_extension(self): + def test_intra_extension_exceptions(self): + + tenant = self.create_tenant() + self.assertRaises( + IntraExtensionNotFound, + self.manager.get_extension_uuid, + tenant["id"], "authz" + ) + self.assertRaises( + IntraExtensionNotFound, + self.manager.get_extension_uuid, + tenant["id"], "admin" + ) + # TODO + + def test_delete_admin_intra_extension(self): self.assertRaises( AdminException, self.manager.delete_intra_extension, self.ref["id"]) + def test_authz_exceptions(self): + self.assertRaises( + IntraExtensionNotFound, + self.manager.authz, + uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex + ) + + admin_user = self.create_user() + tenant = self.create_tenant() + ie_authz = self.create_intra_extension("policy_rbac_authz") + ie_admin = self.create_intra_extension("policy_rbac_admin") + mapping = self.create_mapping(tenant, ie_authz["id"], ie_admin["id"]) + + # Test when subject is unknown + self.assertRaises( + SubjectUnknown, + self.manager.authz, + ie_authz["id"], uuid.uuid4().hex, uuid.uuid4().hex, uuid.uuid4().hex + ) + + # Test when subject is known but not the object + demo_user = self.create_user("demo") + self.manager.add_subject_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"] + ) + + self.assertRaises( + ObjectUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], uuid.uuid4().hex, uuid.uuid4().hex + ) + + # Test when subject and object are known but not the action + _tmp = self.manager.add_object_dict( + admin_user['id'], + self.ref["id"], + "my_object" + ).items()[0] + my_object = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + ActionUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], uuid.uuid4().hex + ) + + # Test when subject and object and action are known + _tmp = self.manager.add_action_dict( + admin_user['id'], + self.ref["id"], + "my_action" + ).items()[0] + my_action = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + SubjectCategoryAssignmentOutOfScope, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add a subject scope and test ObjectCategoryAssignmentOutOfScope + _tmp = self.manager.add_subject_category_dict( + admin_user['id'], + self.ref["id"], + "my_subject_category" + ) + my_subject_category = {"id": _tmp[0], "name": _tmp[1]} + + _tmp = self.manager.add_subject_category_scope_dict( + admin_user['id'], + self.ref["id"], + my_subject_category["id"], + "my_subject_scope", + ) + my_subject_scope = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + ObjectCategoryAssignmentOutOfScope, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an object scope and test ActionCategoryAssignmentOutOfScope + _tmp = self.manager.add_object_category_dict( + admin_user['id'], + self.ref["id"], + "my_object_category" + ) + my_object_category = {"id": _tmp[0], "name": _tmp[1]} + + _tmp = self.manager.add_object_category_scope_dict( + admin_user['id'], + self.ref["id"], + my_object_category["id"], + "my_object_scope", + ) + my_object_scope = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + ActionCategoryAssignmentOutOfScope, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an action scope and test SubjectCategoryAssignmentUnknown + _tmp = self.manager.add_action_category_dict( + admin_user['id'], + self.ref["id"], + "my_action_category" + ) + my_action_category = {"id": _tmp[0], "name": _tmp[1]} + + _tmp = self.manager.add_action_category_scope_dict( + admin_user['id'], + self.ref["id"], + my_action_category["id"], + "my_action_scope", + ) + my_action_scope = {"id": _tmp[0], "name": _tmp[1]} + + self.assertRaises( + SubjectCategoryAssignmentUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add a subject assignment and test ObjectCategoryAssignmentUnknown + self.manager.add_subject_category_assignment_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"], + my_subject_category["id"], + my_subject_scope["id"] + ) + + self.assertRaises( + ObjectCategoryAssignmentUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an object assignment and test ActionCategoryAssignmentUnknown + self.manager.add_object_category_assignment_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"], + my_object_category["id"], + my_object_scope["id"] + ) + + self.assertRaises( + ActionCategoryAssignmentUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add an action assignment and test RuleUnknown + self.manager.add_action_category_assignment_dict( + admin_user['id'], + self.ref["id"], + demo_user["id"], + my_action_category["id"], + my_action_scope["id"] + ) + + self.assertRaises( + RuleUnknown, + self.manager.authz, + ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"] + ) + + # Add the correct rule and test that no exception is raised + my_meta_rule = { + "relation_super": { + "subject_categories": [my_subject_category["id"], ], + "action_categories": [my_object_category["id"], ], + "object_categories": [my_action_category["id"], ], + "relation": "relation_super" + } + } + self.manager.set_sub_meta_rule( + admin_user['id'], + self.ref["id"], + my_meta_rule + ) + self.manager.set_sub_rule( + admin_user['id'], + self.ref["id"], + "relation_super", + [my_subject_scope, my_object_scope, my_action_scope] + ) + + result = self.manager.authz(ie_authz["id"], demo_user["id"], my_object["id"], my_action["id"]) + self.assertEqual(True, result) + def test_subjects(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - subjects = self.manager.get_subject_dict("admin", self.ref["id"]) + subjects = self.manager.get_subject_dict(admin_user["id"], ref["id"]) self.assertIsInstance(subjects, dict) self.assertIn("subjects", subjects) self.assertIn("id", subjects) self.assertIn("intra_extension_uuid", subjects) - self.assertEqual(self.ref["id"], subjects["intra_extension_uuid"]) + self.assertEqual(ref["id"], subjects["intra_extension_uuid"]) self.assertIsInstance(subjects["subjects"], dict) - new_subject = self.create_user() + new_subject = self.create_user("my_user") new_subjects = dict() new_subjects[new_subject["id"]] = new_subject["name"] self.assertRaises( - AdminException, + SubjectAddNotAuthorized, self.manager.set_subject_dict, - "admin", self.ref["id"], new_subjects) + admin_user["id"], ref["id"], new_subjects) # Delete the new subject self.assertRaises( - AdminException, + SubjectDelNotAuthorized, self.manager.del_subject, - "admin", self.ref["id"], new_subject["id"]) + admin_user["id"], ref["id"], new_subject["id"]) # Add a particular subject self.assertRaises( - AdminException, + SubjectAddNotAuthorized, self.manager.add_subject_dict, - "admin", self.ref["id"], new_subject["id"]) + admin_user["id"], ref["id"], new_subject["id"]) def test_objects(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - objects = self.manager.get_object_dict("admin", self.ref["id"]) + objects = self.manager.get_object_dict(admin_user["id"], ref["id"]) self.assertIsInstance(objects, dict) self.assertIn("objects", objects) self.assertIn("id", objects) self.assertIn("intra_extension_uuid", objects) - self.assertEqual(self.ref["id"], objects["intra_extension_uuid"]) + self.assertEqual(ref["id"], objects["intra_extension_uuid"]) self.assertIsInstance(objects["objects"], dict) - new_object = self.create_user() + new_object = {"id": uuid.uuid4().hex, "name": "my_object"} new_objects = dict() new_objects[new_object["id"]] = new_object["name"] self.assertRaises( - AdminException, + ObjectAddNotAuthorized, self.manager.set_object_dict, - "admin", self.ref["id"], new_object["id"]) + admin_user["id"], ref["id"], new_object["id"]) # Delete the new object self.assertRaises( - AdminException, + ObjectDelNotAuthorized, self.manager.del_object, - "admin", self.ref["id"], new_object["id"]) + admin_user["id"], ref["id"], new_object["id"]) # Add a particular object self.assertRaises( - AdminException, + ObjectAddNotAuthorized, self.manager.add_object_dict, - "admin", self.ref["id"], new_object["name"]) + admin_user["id"], ref["id"], new_object["name"]) def test_actions(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - actions = self.manager.get_action_dict("admin", self.ref["id"]) + actions = self.manager.get_action_dict(admin_user["id"], ref["id"]) self.assertIsInstance(actions, dict) self.assertIn("actions", actions) self.assertIn("id", actions) self.assertIn("intra_extension_uuid", actions) - self.assertEqual(self.ref["id"], actions["intra_extension_uuid"]) + self.assertEqual(ref["id"], actions["intra_extension_uuid"]) self.assertIsInstance(actions["actions"], dict) - new_action = self.create_user() + new_action = {"id": uuid.uuid4().hex, "name": "my_action"} new_actions = dict() new_actions[new_action["id"]] = new_action["name"] self.assertRaises( - AdminException, + ActionAddNotAuthorized, self.manager.set_action_dict, - "admin", self.ref["id"], new_actions) + admin_user["id"], ref["id"], new_actions) # Delete the new action self.assertRaises( - AdminException, + ActionDelNotAuthorized, self.manager.del_action, - "admin", self.ref["id"], new_action["id"]) + admin_user["id"], ref["id"], new_action["id"]) # Add a particular action self.assertRaises( - AdminException, + ActionAddNotAuthorized, self.manager.add_action_dict, - "admin", self.ref["id"], new_action["id"]) + admin_user["id"], ref["id"], new_action["id"]) def test_subject_categories(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - subject_categories = self.manager.get_subject_category_dict("admin", self.ref["id"]) + subject_categories = self.manager.get_subject_category_dict(admin_user["id"], ref["id"]) self.assertIsInstance(subject_categories, dict) self.assertIn("subject_categories", subject_categories) self.assertIn("id", subject_categories) self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) self.assertIsInstance(subject_categories["subject_categories"], dict) new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} new_subject_categories = dict() new_subject_categories[new_subject_category["id"]] = new_subject_category["name"] self.assertRaises( - AdminException, + SubjectCategoryAddNotAuthorized, self.manager.set_subject_category_dict, - "admin", self.ref["id"], new_subject_categories) + admin_user["id"], ref["id"], new_subject_categories) # Delete the new subject_category self.assertRaises( - AdminException, + SubjectCategoryDelNotAuthorized, self.manager.del_subject_category, - "admin", self.ref["id"], new_subject_category["id"]) + admin_user["id"], ref["id"], new_subject_category["id"]) # Add a particular subject_category self.assertRaises( - AdminException, + SubjectCategoryAddNotAuthorized, self.manager.add_subject_category_dict, - "admin", self.ref["id"], new_subject_category["name"]) + admin_user["id"], ref["id"], new_subject_category["name"]) def test_object_categories(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - object_categories = self.manager.get_object_category_dict("admin", self.ref["id"]) + object_categories = self.manager.get_object_category_dict(admin_user["id"], ref["id"]) self.assertIsInstance(object_categories, dict) self.assertIn("object_categories", object_categories) self.assertIn("id", object_categories) self.assertIn("intra_extension_uuid", object_categories) - self.assertEqual(self.ref["id"], object_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_categories["intra_extension_uuid"]) self.assertIsInstance(object_categories["object_categories"], dict) new_object_category = {"id": uuid.uuid4().hex, "name": "object_category_test"} new_object_categories = dict() new_object_categories[new_object_category["id"]] = new_object_category["name"] self.assertRaises( - AdminException, + ObjectCategoryAddNotAuthorized, self.manager.set_object_category_dict, - "admin", self.ref["id"], new_object_categories) + admin_user["id"], ref["id"], new_object_categories) # Delete the new object_category self.assertRaises( - AdminException, + ObjectCategoryDelNotAuthorized, self.manager.del_object_category, - "admin", self.ref["id"], new_object_category["id"]) + admin_user["id"], ref["id"], new_object_category["id"]) # Add a particular object_category self.assertRaises( - AdminException, + ObjectCategoryAddNotAuthorized, self.manager.add_object_category_dict, - "admin", self.ref["id"], new_object_category["name"]) + admin_user["id"], ref["id"], new_object_category["name"]) def test_action_categories(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - action_categories = self.manager.get_action_category_dict("admin", self.ref["id"]) + action_categories = self.manager.get_action_category_dict(admin_user["id"], ref["id"]) self.assertIsInstance(action_categories, dict) self.assertIn("action_categories", action_categories) self.assertIn("id", action_categories) self.assertIn("intra_extension_uuid", action_categories) - self.assertEqual(self.ref["id"], action_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_categories["intra_extension_uuid"]) self.assertIsInstance(action_categories["action_categories"], dict) new_action_category = {"id": uuid.uuid4().hex, "name": "action_category_test"} new_action_categories = dict() new_action_categories[new_action_category["id"]] = new_action_category["name"] self.assertRaises( - AdminException, + ActionCategoryAddNotAuthorized, self.manager.set_action_category_dict, - "admin", self.ref["id"], new_action_categories) + admin_user["id"], ref["id"], new_action_categories) # Delete the new action_category self.assertRaises( - AdminException, + ActionCategoryDelNotAuthorized, self.manager.del_action_category, - "admin", self.ref["id"], new_action_category["id"]) + admin_user["id"], ref["id"], new_action_category["id"]) # Add a particular action_category self.assertRaises( - AdminException, + ActionCategoryAddNotAuthorized, self.manager.add_action_category_dict, - "admin", self.ref["id"], new_action_category["name"]) + admin_user["id"], ref["id"], new_action_category["name"]) def test_subject_category_scope(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() subject_categories = self.admin_manager.set_subject_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { - uuid.uuid4().hex: "admin", + uuid.uuid4().hex: admin_user["id"], uuid.uuid4().hex: "dev", } ) for subject_category in subject_categories["subject_categories"]: subject_category_scope = self.manager.get_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) new_subject_category_scope = dict() new_subject_category_scope_uuid = uuid.uuid4().hex new_subject_category_scope[new_subject_category_scope_uuid] = "new_subject_category_scope" self.assertRaises( - AdminException, + SubjectCategoryScopeAddNotAuthorized, self.manager.set_subject_category_scope_dict, - "admin", self.ref["id"], subject_category, new_subject_category_scope) + admin_user["id"], ref["id"], subject_category, new_subject_category_scope) # Delete the new subject_category_scope self.assertRaises( - AdminException, + SubjectCategoryScopeDelNotAuthorized, self.manager.del_subject_category_scope, - "admin", self.ref["id"], subject_category, new_subject_category_scope_uuid) + admin_user["id"], ref["id"], subject_category, new_subject_category_scope_uuid) # Add a particular subject_category_scope self.assertRaises( - AdminException, + SubjectCategoryScopeAddNotAuthorized, self.manager.add_subject_category_scope_dict, - "admin", self.ref["id"], subject_category, new_subject_category_scope[new_subject_category_scope_uuid]) + admin_user["id"], ref["id"], subject_category, new_subject_category_scope[new_subject_category_scope_uuid]) def test_object_category_scope(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() object_categories = self.admin_manager.set_object_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { uuid.uuid4().hex: "id", uuid.uuid4().hex: "domain", @@ -376,42 +588,43 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for object_category in object_categories["object_categories"]: object_category_scope = self.manager.get_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIsInstance(object_category_scope["object_category_scope"], dict) new_object_category_scope = dict() new_object_category_scope_uuid = uuid.uuid4().hex new_object_category_scope[new_object_category_scope_uuid] = "new_object_category_scope" self.assertRaises( - AdminException, + ObjectCategoryScopeAddNotAuthorized, self.manager.set_object_category_scope_dict, - "admin", self.ref["id"], object_category, new_object_category_scope) + admin_user["id"], ref["id"], object_category, new_object_category_scope) # Delete the new object_category_scope self.assertRaises( - AdminException, + ObjectCategoryScopeDelNotAuthorized, self.manager.del_object_category_scope, - "admin", self.ref["id"], object_category, new_object_category_scope_uuid) + admin_user["id"], ref["id"], object_category, new_object_category_scope_uuid) # Add a particular object_category_scope self.assertRaises( - AdminException, + ObjectCategoryScopeAddNotAuthorized, self.manager.add_object_category_scope_dict, - "admin", self.ref["id"], object_category, new_object_category_scope[new_object_category_scope_uuid]) + admin_user["id"], ref["id"], object_category, new_object_category_scope[new_object_category_scope_uuid]) def test_action_category_scope(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() action_categories = self.admin_manager.set_action_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { uuid.uuid4().hex: "compute", uuid.uuid4().hex: "identity", @@ -420,49 +633,50 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for action_category in action_categories["action_categories"]: action_category_scope = self.manager.get_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIsInstance(action_category_scope["action_category_scope"], dict) new_action_category_scope = dict() new_action_category_scope_uuid = uuid.uuid4().hex new_action_category_scope[new_action_category_scope_uuid] = "new_action_category_scope" self.assertRaises( - AdminException, + ActionCategoryScopeAddNotAuthorized, self.manager.set_action_category_scope_dict, - "admin", self.ref["id"], action_category, new_action_category_scope) + admin_user["id"], ref["id"], action_category, new_action_category_scope) # Delete the new action_category_scope self.assertRaises( - AdminException, + ActionCategoryScopeDelNotAuthorized, self.manager.del_action_category_scope, - "admin", self.ref["id"], action_category, new_action_category_scope_uuid) + admin_user["id"], ref["id"], action_category, new_action_category_scope_uuid) # Add a particular action_category_scope self.assertRaises( - AdminException, + ActionCategoryScopeAddNotAuthorized, self.manager.add_action_category_scope_dict, - "admin", self.ref["id"], action_category, new_action_category_scope[new_action_category_scope_uuid]) + admin_user["id"], ref["id"], action_category, new_action_category_scope[new_action_category_scope_uuid]) def test_subject_category_assignment(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() new_subject = self.create_user() new_subjects = dict() new_subjects[new_subject["id"]] = new_subject["name"] - subjects = self.admin_manager.set_subject_dict("admin", self.ref["id"], new_subjects) + subjects = self.admin_manager.set_subject_dict(admin_user["id"], ref["id"], new_subjects) new_subject_category_uuid = uuid.uuid4().hex new_subject_category_value = "role" subject_categories = self.admin_manager.set_subject_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { new_subject_category_uuid: new_subject_category_value } @@ -470,29 +684,29 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for subject_category in subject_categories["subject_categories"]: subject_category_scope = self.admin_manager.get_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIsInstance(subject_category_scope["subject_category_scope"], dict) new_subject_category_scope = dict() new_subject_category_scope_uuid = uuid.uuid4().hex - new_subject_category_scope[new_subject_category_scope_uuid] = "admin" + new_subject_category_scope[new_subject_category_scope_uuid] = admin_user["id"] subject_category_scope = self.admin_manager.set_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category, new_subject_category_scope) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIn(new_subject_category_scope[new_subject_category_scope_uuid], subject_category_scope["subject_category_scope"][subject_category].values()) @@ -500,65 +714,66 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_subject_category_scope2_uuid = uuid.uuid4().hex new_subject_category_scope2[new_subject_category_scope2_uuid] = "dev" subject_category_scope = self.admin_manager.set_subject_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], subject_category, new_subject_category_scope2) self.assertIsInstance(subject_category_scope, dict) self.assertIn("subject_category_scope", subject_category_scope) self.assertIn("id", subject_category_scope) self.assertIn("intra_extension_uuid", subject_category_scope) - self.assertEqual(self.ref["id"], subject_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_scope["intra_extension_uuid"]) self.assertIn(new_subject_category_scope2[new_subject_category_scope2_uuid], subject_category_scope["subject_category_scope"][subject_category].values()) subject_category_assignments = self.manager.get_subject_category_assignment_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_subject["id"] ) self.assertIsInstance(subject_category_assignments, dict) self.assertIn("subject_category_assignments", subject_category_assignments) self.assertIn("id", subject_category_assignments) self.assertIn("intra_extension_uuid", subject_category_assignments) - self.assertEqual(self.ref["id"], subject_category_assignments["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_category_assignments["intra_extension_uuid"]) self.assertEqual({}, subject_category_assignments["subject_category_assignments"][new_subject["id"]]) self.assertRaises( - AdminException, + SubjectCategoryAssignmentAddNotAuthorized, self.manager.set_subject_category_assignment_dict, - "admin", self.ref["id"], new_subject["id"], + admin_user["id"], ref["id"], new_subject["id"], { new_subject_category_uuid: [new_subject_category_scope_uuid, new_subject_category_scope2_uuid], }) self.assertRaises( - AdminException, + SubjectCategoryAssignmentDelNotAuthorized, self.manager.del_subject_category_assignment, - "admin", self.ref["id"], new_subject["id"], + admin_user["id"], ref["id"], new_subject["id"], new_subject_category_uuid, new_subject_category_scope_uuid) self.assertRaises( - AdminException, + SubjectCategoryAssignmentAddNotAuthorized, self.manager.add_subject_category_assignment_dict, - "admin", self.ref["id"], new_subject["id"], + admin_user["id"], ref["id"], new_subject["id"], new_subject_category_uuid, new_subject_category_scope_uuid) def test_object_category_assignment(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - new_object = self.create_user() + new_object = {"id": uuid.uuid4().hex, "name": "my_object"} new_objects = dict() new_objects[new_object["id"]] = new_object["name"] - objects = self.admin_manager.set_object_dict("admin", self.ref["id"], new_objects) + objects = self.admin_manager.set_object_dict(admin_user["id"], ref["id"], new_objects) new_object_category_uuid = uuid.uuid4().hex new_object_category_value = "role" object_categories = self.admin_manager.set_object_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { new_object_category_uuid: new_object_category_value } @@ -566,29 +781,29 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for object_category in object_categories["object_categories"]: object_category_scope = self.admin_manager.get_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIsInstance(object_category_scope["object_category_scope"], dict) new_object_category_scope = dict() new_object_category_scope_uuid = uuid.uuid4().hex - new_object_category_scope[new_object_category_scope_uuid] = "admin" + new_object_category_scope[new_object_category_scope_uuid] = admin_user["id"] object_category_scope = self.admin_manager.set_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category, new_object_category_scope) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIn(new_object_category_scope[new_object_category_scope_uuid], object_category_scope["object_category_scope"][object_category].values()) @@ -596,65 +811,66 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_object_category_scope2_uuid = uuid.uuid4().hex new_object_category_scope2[new_object_category_scope2_uuid] = "dev" object_category_scope = self.admin_manager.set_object_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], object_category, new_object_category_scope2) self.assertIsInstance(object_category_scope, dict) self.assertIn("object_category_scope", object_category_scope) self.assertIn("id", object_category_scope) self.assertIn("intra_extension_uuid", object_category_scope) - self.assertEqual(self.ref["id"], object_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_scope["intra_extension_uuid"]) self.assertIn(new_object_category_scope2[new_object_category_scope2_uuid], object_category_scope["object_category_scope"][object_category].values()) object_category_assignments = self.manager.get_object_category_assignment_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_object["id"] ) self.assertIsInstance(object_category_assignments, dict) self.assertIn("object_category_assignments", object_category_assignments) self.assertIn("id", object_category_assignments) self.assertIn("intra_extension_uuid", object_category_assignments) - self.assertEqual(self.ref["id"], object_category_assignments["intra_extension_uuid"]) + self.assertEqual(ref["id"], object_category_assignments["intra_extension_uuid"]) self.assertEqual({}, object_category_assignments["object_category_assignments"][new_object["id"]]) self.assertRaises( - AdminException, + ObjectCategoryAssignmentAddNotAuthorized, self.manager.set_object_category_assignment_dict, - "admin", self.ref["id"], new_object["id"], + admin_user["id"], ref["id"], new_object["id"], { new_object_category_uuid: [new_object_category_scope_uuid, new_object_category_scope2_uuid], }) self.assertRaises( - AdminException, + ObjectCategoryAssignmentDelNotAuthorized, self.manager.del_object_category_assignment, - "admin", self.ref["id"], new_object["id"], + admin_user["id"], ref["id"], new_object["id"], new_object_category_uuid, new_object_category_scope_uuid) self.assertRaises( - AdminException, + ObjectCategoryAssignmentAddNotAuthorized, self.manager.add_object_category_assignment_dict, - "admin", self.ref["id"], new_object["id"], + admin_user["id"], ref["id"], new_object["id"], new_object_category_uuid, new_object_category_scope_uuid) def test_action_category_assignment(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - new_action = self.create_user() + new_action = {"id": uuid.uuid4().hex, "name": "my_action"} new_actions = dict() new_actions[new_action["id"]] = new_action["name"] - actions = self.admin_manager.set_action_dict("admin", self.ref["id"], new_actions) + actions = self.admin_manager.set_action_dict(admin_user["id"], ref["id"], new_actions) new_action_category_uuid = uuid.uuid4().hex new_action_category_value = "role" action_categories = self.admin_manager.set_action_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], { new_action_category_uuid: new_action_category_value } @@ -662,29 +878,29 @@ class TestIntraExtensionAuthzManager(tests.TestCase): for action_category in action_categories["action_categories"]: action_category_scope = self.admin_manager.get_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIsInstance(action_category_scope["action_category_scope"], dict) new_action_category_scope = dict() new_action_category_scope_uuid = uuid.uuid4().hex - new_action_category_scope[new_action_category_scope_uuid] = "admin" + new_action_category_scope[new_action_category_scope_uuid] = admin_user["id"] action_category_scope = self.admin_manager.set_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category, new_action_category_scope) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIn(new_action_category_scope[new_action_category_scope_uuid], action_category_scope["action_category_scope"][action_category].values()) @@ -692,62 +908,63 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_action_category_scope2_uuid = uuid.uuid4().hex new_action_category_scope2[new_action_category_scope2_uuid] = "dev" action_category_scope = self.admin_manager.set_action_category_scope_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], action_category, new_action_category_scope2) self.assertIsInstance(action_category_scope, dict) self.assertIn("action_category_scope", action_category_scope) self.assertIn("id", action_category_scope) self.assertIn("intra_extension_uuid", action_category_scope) - self.assertEqual(self.ref["id"], action_category_scope["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_scope["intra_extension_uuid"]) self.assertIn(new_action_category_scope2[new_action_category_scope2_uuid], action_category_scope["action_category_scope"][action_category].values()) action_category_assignments = self.manager.get_action_category_assignment_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_action["id"] ) self.assertIsInstance(action_category_assignments, dict) self.assertIn("action_category_assignments", action_category_assignments) self.assertIn("id", action_category_assignments) self.assertIn("intra_extension_uuid", action_category_assignments) - self.assertEqual(self.ref["id"], action_category_assignments["intra_extension_uuid"]) + self.assertEqual(ref["id"], action_category_assignments["intra_extension_uuid"]) self.assertEqual({}, action_category_assignments["action_category_assignments"][new_action["id"]]) self.assertRaises( - AdminException, + ActionCategoryAssignmentAddNotAuthorized, self.manager.set_action_category_assignment_dict, - "admin", self.ref["id"], new_action["id"], + admin_user["id"], ref["id"], new_action["id"], { new_action_category_uuid: [new_action_category_scope_uuid, new_action_category_scope2_uuid], }) self.assertRaises( - AdminException, + ActionCategoryAssignmentDelNotAuthorized, self.manager.del_action_category_assignment, - "admin", self.ref["id"], new_action["id"], + admin_user["id"], ref["id"], new_action["id"], new_action_category_uuid, new_action_category_scope_uuid) self.assertRaises( - AdminException, + ActionCategoryAssignmentAddNotAuthorized, self.manager.add_action_category_assignment_dict, - "admin", self.ref["id"], new_action["id"], + admin_user["id"], ref["id"], new_action["id"], new_action_category_uuid, new_action_category_scope_uuid) def test_sub_meta_rules(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - aggregation_algorithms = self.manager.get_aggregation_algorithms("admin", self.ref["id"]) + aggregation_algorithms = self.manager.get_aggregation_algorithms(admin_user["id"], ref["id"]) self.assertIsInstance(aggregation_algorithms, dict) self.assertIsInstance(aggregation_algorithms["aggregation_algorithms"], list) self.assertIn("and_true_aggregation", aggregation_algorithms["aggregation_algorithms"]) self.assertIn("test_aggregation", aggregation_algorithms["aggregation_algorithms"]) - aggregation_algorithm = self.manager.get_aggregation_algorithm("admin", self.ref["id"]) + aggregation_algorithm = self.manager.get_aggregation_algorithm(admin_user["id"], ref["id"]) self.assertIsInstance(aggregation_algorithm, dict) self.assertIn("aggregation", aggregation_algorithm) self.assertIn(aggregation_algorithm["aggregation"], aggregation_algorithms["aggregation_algorithms"]) @@ -755,19 +972,19 @@ class TestIntraExtensionAuthzManager(tests.TestCase): _aggregation_algorithm = list(aggregation_algorithms["aggregation_algorithms"]) _aggregation_algorithm.remove(aggregation_algorithm["aggregation"]) self.assertRaises( - AdminException, + MetaRuleAddNotAuthorized, self.manager.set_aggregation_algorithm, - "admin", self.ref["id"], _aggregation_algorithm[0]) + admin_user["id"], ref["id"], _aggregation_algorithm[0]) - sub_meta_rules = self.manager.get_sub_meta_rule("admin", self.ref["id"]) + sub_meta_rules = self.manager.get_sub_meta_rule(admin_user["id"], ref["id"]) self.assertIsInstance(sub_meta_rules, dict) self.assertIn("sub_meta_rules", sub_meta_rules) - sub_meta_rules_conf = json.load(open(os.path.join(self.policy_directory, self.ref["model"], "metarule.json"))) + sub_meta_rules_conf = json.load(open(os.path.join(self.policy_directory, ref["model"], "metarule.json"))) metarule = dict() categories = { - "subject_categories": self.manager.get_subject_category_dict("admin", self.ref["id"]), - "object_categories": self.manager.get_object_category_dict("admin", self.ref["id"]), - "action_categories": self.manager.get_action_category_dict("admin", self.ref["id"]) + "subject_categories": self.manager.get_subject_category_dict(admin_user["id"], ref["id"]), + "object_categories": self.manager.get_object_category_dict(admin_user["id"], ref["id"]), + "action_categories": self.manager.get_action_category_dict(admin_user["id"], ref["id"]) } for relation in sub_meta_rules_conf["sub_meta_rules"]: metarule[relation] = dict() @@ -790,38 +1007,39 @@ class TestIntraExtensionAuthzManager(tests.TestCase): new_subject_category = {"id": uuid.uuid4().hex, "name": "subject_category_test"} # Add a particular subject_category data = self.admin_manager.add_subject_category_dict( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], new_subject_category["name"]) new_subject_category["id"] = data["subject_category"]["uuid"] subject_categories = self.manager.get_subject_category_dict( - "admin", - self.ref["id"]) + admin_user["id"], + ref["id"]) self.assertIsInstance(subject_categories, dict) self.assertIn("subject_categories", subject_categories) self.assertIn("id", subject_categories) self.assertIn("intra_extension_uuid", subject_categories) - self.assertEqual(self.ref["id"], subject_categories["intra_extension_uuid"]) + self.assertEqual(ref["id"], subject_categories["intra_extension_uuid"]) self.assertIn(new_subject_category["id"], subject_categories["subject_categories"]) metarule[relation]["subject_categories"].append(new_subject_category["id"]) - self.assertRaises( + self.MetaRuleAddNotAuthorized( AdminException, self.manager.set_sub_meta_rule, - "admin", self.ref["id"], metarule) + admin_user["id"], ref["id"], metarule) def test_sub_rules(self): - self.create_intra_extension() + ref = self.create_intra_extension() + admin_user = self.create_user() - sub_meta_rules = self.manager.get_sub_meta_rule("admin", self.ref["id"]) + sub_meta_rules = self.manager.get_sub_meta_rule(admin_user["id"], ref["id"]) self.assertIsInstance(sub_meta_rules, dict) self.assertIn("sub_meta_rules", sub_meta_rules) - sub_rules = self.manager.get_sub_rules("admin", self.ref["id"]) + sub_rules = self.manager.get_sub_rules(admin_user["id"], ref["id"]) self.assertIsInstance(sub_rules, dict) self.assertIn("rules", sub_rules) rules = dict() for relation in sub_rules["rules"]: - self.assertIn(relation, self.manager.get_sub_meta_rule_relations("admin", self.ref["id"])["sub_meta_rule_relations"]) + self.assertIn(relation, self.manager.get_sub_meta_rule_relations(admin_user["id"], ref["id"])["sub_meta_rule_relations"]) rules[relation] = list() for rule in sub_rules["rules"][relation]: for cat, cat_func, func_name in ( @@ -831,8 +1049,8 @@ class TestIntraExtensionAuthzManager(tests.TestCase): ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], cat_value ) a_scope = rule.pop(0) @@ -849,13 +1067,14 @@ class TestIntraExtensionAuthzManager(tests.TestCase): ): for cat_value in sub_meta_rules["sub_meta_rules"][relation][cat]: scope = cat_func( - "admin", - self.ref["id"], + admin_user["id"], + ref["id"], cat_value ) sub_rule.append(scope[func_name][cat_value].keys()[0]) self.assertRaises( - AdminException, + RuleAddNotAuthorized, self.manager.set_sub_rule, - "admin", self.ref["id"], relation, sub_rule) + admin_user["id"], ref["id"], relation, sub_rule) + -- 2.16.6