From ad64dd5d0e1b39b11d4f152b06d5023d314d0647 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Juri=C4=8D?= <74237898+BorisYourich@users.noreply.github.com> Date: Mon, 12 May 2025 13:36:57 +0200 Subject: [PATCH 01/10] Update event_actions.py --- tesp_api/service/event_actions.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index 09c837e..dc0506e 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -93,16 +93,24 @@ async def setup_data(job_id: ObjectId, print(inputs) - for i in range(0, len(inputs)): - content = inputs[i].content + for i, input_item in enumerate(inputs): pulsar_path = payload['task_config']['inputs_directory'] + f'/input_file_{i}' - if content is not None and inputs[i].url is None: - #content = await file_transfer_service.download_file(inputs[i].url) + + if input_item.type == TesTaskIOType.DIRECTORY: + pulsar_path = input_item.url # Presumably already valid + elif input_item.content is not None and input_item.url is None: pulsar_path = await pulsar_operations.upload( job_id, DataType.INPUT, - file_content=Just(content), - file_path=f'input_file_{i}') - input_confs.append({'container_path': inputs[i].path, 'pulsar_path': pulsar_path, 'url':inputs[i].url}) + file_content=Just(input_item.content), + file_path=f'input_file_{i}' + ) + + input_confs.append({ + 'container_path': input_item.path, + 'pulsar_path': pulsar_path, + 'url': input_item.url, + 'type': input_item.type + }) return resource_conf, volume_confs, input_confs, output_confs From c01de5cf9be75406ed00009e6171087515a1566f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Boris=20Juri=C4=8D?= <74237898+BorisYourich@users.noreply.github.com> Date: Mon, 19 May 2025 12:05:13 +0200 Subject: [PATCH 02/10] Update task.py --- tesp_api/repository/model/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tesp_api/repository/model/task.py b/tesp_api/repository/model/task.py index 7a31702..7f64889 100644 --- a/tesp_api/repository/model/task.py +++ b/tesp_api/repository/model/task.py @@ -52,7 +52,7 @@ class TesTaskInput(BaseModel): ..., description="Path of the file inside the container. Must be an absolute path.", example="/data/file1") - type: TesTaskIOType = Field(..., example=TesTaskIOType.FILE) + type: TesTaskIOType = TesTaskIOType.FILE content: str = Field( None, description="File content literal. Implementations should support a minimum of 128 KiB " "in this field and may define their own maximum. UTF-8 encoded If content " From 462b8ed01bf008341bbaf2c19f4aeb029032a755 Mon Sep 17 00:00:00 2001 From: Debian Date: Mon, 16 Jun 2025 12:26:37 +0000 Subject: [PATCH 03/10] Input directory processing finnished --- .../dts/s3/data/tests-data/sample.jpg/xl.meta | Bin 0 -> 7342 bytes docker/dts/sample.jpg | Bin 0 -> 6920 bytes public-ftp-test.json | 16 +++ tesp_api/service/event_actions.py | 13 +- tesp_api/utils/docker.py | 111 +++++++++++++----- 5 files changed, 107 insertions(+), 33 deletions(-) create mode 100644 docker/dts/s3/data/tests-data/sample.jpg/xl.meta create mode 100644 docker/dts/sample.jpg create mode 100644 public-ftp-test.json diff --git a/docker/dts/s3/data/tests-data/sample.jpg/xl.meta b/docker/dts/s3/data/tests-data/sample.jpg/xl.meta new file mode 100644 index 0000000000000000000000000000000000000000..99da7635bb4f77e84fe338fa20bcb63b3d0c3331 GIT binary patch literal 7342 zcmc(EcTg1FvnWX<2qLH;K|p3fkRUmUWY}4vB$X&(L9%4Yf}|z63>gFkb|vSWRX`;w z3n&?dB`3)l-hSWvz5A-(`}@|Nnwsi#PWS2Vb52k76HO5gVj@zaFGNJd4y2^So!qIN zbVUEpGsL7A$jsyJ?$=3%Gk)7iOio1HMf~73#=!Rlk{D|wqGRPSO>`|6RQdmhVpLS@ z-8qkB|R?|BUxY7-XR+`iG(T zUnu4uh(N|_8Q41`|7$h*T1ZbzeP54|mX`v~_Ad6W0`@MRNOu=YCj#JP?}8MtvGla; zB=dClLjEgj=;0n(V(m&0a`6=KBrLwj-r3R)Ddg}1X@?=qWjDqlEG#X2-^yA_RLa)I z%Fy zE@S=;YigdXbn0tGU98>zBJngRxPaZg z4}Em};QNCeRvBW&mTPQ5BxDdBT#18Hk%KY>_!s^c{TKcpW^DtS&-gDUL)cgz^B?|_ z{}6cir|;=bO39h`!ImSoWKER}!t?f=BhS-Z`dNM!voYMbZ6$Flj@#pjil?cSUSi#w zwmoE%;j^q{>RoOG@h|>y5q4Ng!{^w5OP0Oga~*VfvBsa z6uzvFDD+867K*fvrAl6NcptZ~4DDN+u?kXlA8J^-Eo?}iv8dd-Zhhy&=t74PYd2DL z2jKZJc$u0L+>stdH1#a|RlkcwItA2POFc4RRox4)Tcj~+JMZs0hMbo1^l+SC1T*Hu zcj56KlktGd5~IP8JL*STg1mWIldSc3-EJRM*DxT?ho~eMN8*qlHC8 z=556QGUa2J1>@lp!f>2Z^l^j3iAo9NWOz0%DOKeAYj7vHbs()XcLLelG*PyZA$$5= z>rU=vEPU`;=AK}NK?GI;j&m?dGXp54nj>8KdfW?fzTfAf zXw2YbRGwhull-QpF@3)uTr{R^LoK7^U`Fh`Kxzj7p2aw@iI;}=3ci+jipF%aYI%<- zt1z2BUMRRz`@E2L zNE$<4;JIV#*p;l}a6=jT0ExencZm~wSTYczf7F{p-*giaOJ;DniQxWZGTa!Gp*q#K zar;>aAYkait}mH)$ljEzr!mvduZ6=)0rq!)#-GYW)Et@#bhJw@vdgT9l7r*jyywyM zz^us#tyz;4G%Ihud;<61T1C zeaIlO#OivFa%3OEi@m#?YJn}g>=n>&D#;> z|7vmbQ;rH$g2p6VkHs6gWEoiWF9kSe180<1N4*;K}$K zjj?^mq?yIsS*fY60I5~+#PZnJTs7*l%6bWnn{a?EG#lbA)0iJR0WVfN7q*J(C(0Bc ziEWAUR+9^Li?>QmsQOHh3b5yT?F$*dJLuxIsl{t1L{F1};{{XWTg<$z0V2|0DEnmy5;XNRM;%u_l#ZNz?8`GqE8wDqw{}Sv`^MuDMisfOox4LgJCq*I)irGP?sZAy zJ?FJZHhgapEEX^YOL2j2wCQoadF#h!uK|I{=Jh{Q63dYH(JV0VSBCf-nw2niz#}y^ zYQJkcz=^cpl+}<7G=@LATVCsU_>r-r?~D5;JI^==$S}eZjDLkYRvnLUyOTRMuSu$Osz4#Q`#4ZLRCJIezo+ z2T>R0bWGN5Z2r_*mD=pjVo&L@aVg1YmMk>_+3EM@tGuB>f9)dDWm5r3b?!H|%8<7^ z*Z9;Pa5nrB-K%HP#7pC)xM{)(D&Jb$2&EVKTXMjOm3#g|UCOeU2xNA+>+Hq@s$V%1 z@;xbN%nFTa?%=iv2L$G=L}ydzmpy(KXAxshGW6=zN80ZwKq-3n$DMO9Ts0$%=B&ET zW`2>e*39}GpvlOD-J{MkBy5X;s&$0rg)cnzglpwF4>e$a&kjPGK4#0&s-O4!MBi&Z z6V)UA-ZG8|9FKN>;Kl#yE;fu#9q~se5bKGi?0q77KkXYdAwo#;6zkcEyYx6aTM$f022+Zg;f0QX(X0>NyJ0&`5ZwuoT%RTYWq(%A<+f5cb`S&>ddz(Lyi8?pPdMjyRJzrE1tpg9xlzx$CH%@U3zkR@JEdc5(H78#Y*9huC>0~qu29eDrHActUm?2vImz3gl00Q5JR5De{%ISeO# zTXn8()w+gz+3zKKUx5mHzc(DW)E1PXFFTdp33{1DA<&rIYeKTKq7P~vb00v%9A>Y8 zxWyhWa&>-oL-9NjyJ_o!jZ$tEi_L54^((R~z9hU3iG=B0J@WyXcwa~TRvKsf@y`Ql z#``UC^vok?@BH!cezK#Jq;N%jXEgqDl9B9myJ{e}V69R6HvCOn$A^pG-BOeKrT90n zoWODlH2!M(^*e1Vo@vh+-&PU7C|P6A4p zTqPcT{u+?)E>xRUuV(65| z;um8w0lvdem z31&nlO<#d&S3~s2?ah9_KF#syT|GaYZ~t;oAm8l>MdkD#5gmJxf{^Zm=J+8hhA_G! z$#C`X^d83d@68iccBl5K33Ya5N$I9-HuF{rpu{d?{TjIFuctgk#<##wX_1#lg;<_L zVM*ECxorUb&VUzud$Xdt!#C;MsySNad28Gy65z4fuMJ?pVJcD_;X$3j+n1k2oXrE-?6RWO?%>n4gY*j5eF4BkvKsto} zhngP~^{L;it6*cn(M0;lvHM-01ID`Tq_Pj5o zlZdr;ELssN*K4tS5|}FcgI?(~dselsn7i=+q)Tv@%18`MK@SnD)+flE=Cyegmhn4=HJ zB410k$dM6p|`zNME7Rk?7w9BUJp2fBQVr<>`8Om+V{QL#k=%XCMmi3yl;NhW6QC1bYf+_Fd7udq3 z>C$={h{Ob1-{bh2%*xQ-ml#l7`|&)(>#v~ENLtAqR3`onUH3&%X0{&l z!`*s%Vu-{HM@rl5@Mhb|869KtGbnW)uGolq!wEV?I@yimOGU%U4dHkxxKDmjFQglDDyE5wmkJR zb}l&mGCAzZtsdjpSL7r2u2cn=B0uArdY-=~?}n#5`9+R|BYKW)1Gx00NsC$aU$*Z( zMdRbHt3?Z@2gRRWsES&vh^q8Nl8g?N``7vu8^_1Ic-&gp55k}?(cEvzN~eOPlTmZGWiYjLC<*RozKi+J zVXWrkl~&MrP@4qsH6~Iy2P7*(vrdeR?B0254uE|_reX^Ot(;Z$^12pqbdM~_K>>t) ziYuX$|FL--sRzn@hg}-+(vIB(mOUdBQ#nClAsLqf)cuJ8@n`1cGiDh=MpOozoWMEygd<<$HfHju4?u+Iwv&RXRSXhJO~SfVwsX@r|z3+%m) zD2h_gLj-X1+E$yHd~4EexlUsCk)t~XS~#i2l&c2s?a!__lK{Nna^7q{eKmXKL{w=CZM?1vJ~hY)JhR=OT26%)P;U}0gNW_d&nw|(v)YA{SuXRb&`koBV3|AO zLHtC++e@*4r8M9x+~qcV-k9+7BX44g0>s$i7X_O#VlAjOc`gj_Hv4)!q4s5?W&(3> zN}`m4p(@s!I{J6IB}vq9o`Xlf!U1HZ7`+DW`&A||QDt$4l@Rf?j=oi+ucY(S&X}ds z&2S)GYvcRtu5f6pagvBPu_!|rz;+&lxj0B-Q9XZ2xi?CHiCnI<#+Xb z2{!)5o)klV#L#u_88<~2HcrlUHGydl&U`8PgOz2uRf{(Vy0|~Upa$6PtBxbv`WPN> zS2W|Aj`Y6AyG?!4xFXvnn=mqU35=eY^oL1)6lT&|_@-MuXd}k-fbCt9;EFK^o^y*z zRXlmu9DO+b%(jcfGd9al2gX>*#4~ zL{hr?^i8%M{Ain!JxINqPy=rfA{UN3UpN@HKJh+p4sPhkPKO|hoZ5Hhb)xe>{shVo zG<8@WPR|{o@!>%mHg|lA6?`G5t1`8-c`lCA4d$LF9SC6kzD$`6leho2SzK=6&~z~b zxmZQJjGo4Ssf-V{oV(Juu&?Aq-O+jv)w*tFF@<@T4DX>x@y{p&8ci+lti2}!&EZ>c zuB{J%Ww{~^sPwo90>-DvL34268^^2`h;=qdsL_Ujoh=%#fKB}Pi?Jj;+{$8U1Z^-G zTfk*saRX{eF0oU_n`6y4V3g@o1yoSjk!@}$pcnjxFuBI3k`GluKFl(zFNC`58fhOX z;;?Ogv=pBcFu$6`;7T7D6(L6JDOq&WDh)@Ngn5R5GNdbJvC^D`u(}MHPTp9dpmA}0 zAyBxrqbPXp9@dt`r2zRw?6IXn zLx*I-L=R~5(COl9P57`2!@lZO%E}3b(_Ly*r~S?~Na}?>ac{{avc-J{nj2x+dzjK& z7Bno-vlSL~uA@Q43C7`X9|ssvi^7tO9^B=f(T&j6j}b1uigF%tqpArmxo+`=e?o1?Nd!Ics!AKW9DzFGqT)bRHp12tj6ITt5uWFA27~|y+9g7nkvG6-nz;Yp8ooE|EE1aEuC6>+j0dOMA5E6lH1DYroVoCR=?4< zhiC|8*I4?~aPz5|1jP@=~?Li E0&sH0lK=n! literal 0 HcmV?d00001 diff --git a/docker/dts/sample.jpg b/docker/dts/sample.jpg new file mode 100644 index 0000000000000000000000000000000000000000..b44fda912ac6b7fe154fe83e6e6d83900e3554f8 GIT binary patch literal 6920 zcmc(EXH!&B^CwB71VJSyxdVa(5tJm7VeSwGL}ds{lq^}YB58=jC4+##jAW23Sp`(0 zFyyQ-h9C0PyJAOIvTXpY}7;_>djp43pYp7N0o2^`zr<5W`8Yx~Yj0bG%RDpGhzS_a$FFwz z6*{UtNx!Yl;w}9f58?tD3 z9&&vrih6!?^vTx(My7z})Ef|q%*tiUaoU)@gVALlDtSaLDw-e za_}9?WVuuHjGLh|$Sq#a0Fj6$4~>3DN%1C)x|WVEDeJ$$zA&q-<4i(E`ta)&e<4cR z(MZ>aj5VriNp>dg!HrIB-Sr!0T=i zRbfe3u{(kg`n-NE+wc0Kq&_6&y!wc@pj3FAjR(z|n!>t|tYb>Q78*-@a``a$j8ns;XcAP+t zD|kjY&Nqw=bLzABbf3kz&v}{IaL^6apBk@VR@NYA`CG5kTg@c|i18*W$;FY_=SR%c zmLuWX!q)r1mLGUnH^7)Z}^W(g~7|nmXFL2%178526HGH z3u-;A?bCnofNhPSztdVv?o0`$LoSFAIZzk4gxdq?$M7W zmDr%yYqa?)2kx$4Wq_FMZ_E(I1o@FHVhG~F-^`v8u!H%c`DBQL=drXc^5g$v=j(C`i8u5{XC zT=_=K>-$1~E+o*HfvJc*(eP*a4GrT4zWsc(CY*y!W8`3Z^nyrA8vve1*>m164eb_v z|KK?q)5)RjHLjw{X7Y5cARpPtRU{d%1d&^2I(bazjr|ET4woNHuF`o*4%pot=QQ;; zPB=*`N&>yzvr4q8l%Ru3{s^h1`6j})fe-Fu2TMGlyWj#{Vu?Gvbe^lMTgDNOAP0@R zZ8pzp$F_!KH#Zq(m}c{LctJ}+4!95fcH5)JD_)$rG4+TvioC#M*T$hEN!9*_3iJuO|5m{{M)FZf-#dfj?i|L3n~-D@lk-gk z|2O1NeN?*Ibk8Q&i+6y?6K8G%sk|fZhFpEk*s zquusg^79Y4EE&DYAjyQPI`?wx9)u@%XF2sEXI9x;px4Ts<4}JlC2#X@`Tk$73qe90 zR!CD>d!`Mn-cS5Azxgdk6)Hhv;;%>Jjhr(L&3l*q9kPIPDmV@rJ)F^R|-T6*j3XTby#G+fhLeVAUo~m{iYdAzpnr5 zwf4pBqPod4Md-oKgYstNrH1(j^u#us zV6*?dSK>iDOB~-XXu%1%NZ*qog{=gG+kv5jdiPo=z;t+Q{Ak8rj&9#K*-SoEiz6Yz zhR$|PNU4S?Dcg$FwzR8v-U1@<6V#m*_Sp{|LOR(ueS~SCYV^JdDCh6axRxs|Fd#rm zV8EvGom-D0By{x;6?j0Jw7#Uhyf{G3U1v_0P(w66;dfU2WKfYaF<8btqxvl4i+ZMOL1P{nG%pQb=cpp78 z$=pr{8~(780CvjyQe}h;r#g9kXk>iWo{z@FM9+yX+w6V&0;QRNuCpr8gD%YTGc2)2 z@r=~-p_s`(Wf|1q1iCu3wcoOO_3%$)!k2xAd4dz_bnUw-E9M_}wnZd@>q9Mc2?Y*6 znLfLpC|C5bX5Rj3;J)nhw}2D=M6m^P6jOAQdfkt&RqKo#l7t{C--{Vu-D$nue^(B- zC+)tap)pCqSC?E=n9egJLm4w{|1hIADh^6*DvM(?oL)gs{7vGs+7+ZTZ!1DZ&^Rsz zkOgaMTz1UxTmL>uIIE;#GP$t%)9aP$b0ftbGUF4{QjzSL>IAmqpRKofg9Cm#MWibx zB2pUsaW*QDmmA;2^geJt^akCnk8BX6^;FtI@`5U?>zl!h_x_a}@?zy*z1EO^Sxf}7 z+23|@6#zA^o{RaM7Sv}3M>Mwan}-4-3lc(f&(0S|~Ap0s`3er^rhW}Dh6tR9bb9D1{Ps3Bgh2!GS2qCbfu`zyCUH0Q7c_Z~P~yp! zbCb6jakjYVm3cYV!Zw&dwO5_)ZlZL<#lZPMCC7@#Nhb`AsZYku=PC>O(=f3xjdfI1 z#11p?N^WOXe@~Zu@yza`A|2j!Ay;OxZ)GzrF=l52jXiTp^uQM{?*Ep?%um!nkv+PbFO&cI|d?w*g4RL zH4eECp&=gAw?NEN7azHXF!z)DdG~B*EDJVE`BlxguBFwj%B}j42-+tQ)Vp}(1G2F` z4hGG%PIeRD`_zpOnqnB)hR?J8@UgyfV^gGXB?Biko+8mmZl+Z&fM2x6sC5S(*V6Xs z@=vGqltC#z4we&8PKn0Tr(M6*vg(ogiupq&@#~Ux?kxYM(Api?{R_3XKB z$sllq(m+az_kfkB*7M91v$qG z170aNsCj~6bj&P-$u&|Tnjr;oQY+C{D2%E-7186lf(CNl7^fc5#ga*})DP-U#~F@b zt-24h0Od9A8iE_)i8EJWI#m$kNo%9;=(l+Refo>Dh1MU31qz)GP()7eG0}-9DG2F3 zY>XYGW(r{_k_y!bP3vO*_|Ysub#MBBhA?MWl$37SWU*~01Ipa8mhXYf-a4u?>)0k3 zDm9#9OpN^%6q1<5pW6a3?)G`YceXxPwfQ9eS~EkdzG{xSLINE7l?-Z^<+H@6SOsb} ztg*VN&R$3}@dy${VoNzq`_=^V2%}or;W%d!*V|Tm9t5MK-MqHw=HKg@wpjj)rWIe=sf_O&sssoD*9?>oOJTx)g4Ezb4q~QAxkXWfaXswEmU=gZ zJtq#Btn&8;t#Rvbx|Cp&plr=W=VjEFjeU>t(5U^T7V$;?@(YiO>#bY>ikknM&Hm@u z%EY@v_UI5rMiP(`;3a(jo4!1K8T_LA;mPjP;-_UM^==zKHd-^AoYh+C0H5^;Q{??h zPi22C)n`86^DG>O?FKG4oW_^=LxAIGx7gk-d(rG~y6x_#g7>{YG%QtmbG>^`y|Zy^ zKQa5pWNY3J(kaAxD;BK;mFqXXd=`)**UzZ@ojWzKqyFmd5lNE&qn-NI=p(~95!;{R z&GYr0_f1?Hk7!R_e8jIn{)wdW7Srqd@L*&Vh3LKRwD-(TT*h7N8tckP*`N<1Ba^K# zeCz!?FXLD@TGbQ=slgWbX49M1!2M$QshX4KK};-PqU4ERgT{P++5XJ{Yd$K_UCq{_ zWE{7#{`S?qyT=TWe>@Xku=RS04#h+Our{q~{J2C|Fc^Rf{r+}ZSkhs`{M7v5Ls9P- z6)F=+4WbQ4zBkAHGQOO6le5ZK8|oEB3^n-iQbA;Y-MD?w@a|P(VT0;)$^b}-5m=zb z-aGVq^QlM$6(BMZ9Tk$v>`gm%1tL*nJe4-w^pdD5cEPF3ICj*0-=rCg%aHsmTudjum6EA*f*-nf`$e&Bsj#x8Y|uV z3Z}h+USkWFXG-g6Ard6CvCs1}iG!)NC&9nC=Icc|^3&vSB;ZT9K;Mb1_}hlxLe(Cb znhDHvpBJgqq5JD1c?7@0&KcG|8yQWux%n0A#!wc&2}Z633_$1T>u;g4a5|}7R0ckd zq4TmRBTJv{(Ow-RF+^gDBc*G!kK1u{LPsG-0#g>?O7)mHUeEz@vuuav1|K#V>jxE> z;FRIGRG@Y`C&;x@Dy;WT${xxdO8qK&xziCWLo;&b0cuuRUo3TFP;<8gr{4FLMY^p+ z4wsjHkVggooNHPWq?I0 z4T#tcgF}JUrsBW*Hjd1M6iog}fwItRD(JdR^zt0pdGAH7bF>Wnmiq3i)&xu*9}6J3 zrsJ!=hh(>%Xih%-9A(}tuqy`wC(?jm~W(7mq;Fpu^k($_4 z!cDIYc3(#nMQG$9{P_iKs!Wl;8uXg3lbC+x>5PIFPirvcYC-!4bE{4yfFQV%HmEzr>%wIvGe{_1>AH_r%)=>c_9V5 zN#GJZb3;6go&56QN;L3N2JjK@aGkrTk01Fen2@XpF}L|fz^08j3TlwA!~wxZANOZ8 zKAbcxVD3#RlyVSM$#GNH;8v#;i8{_>;CM6?u&$6~)WrR!X8{vbmu5K#8BcBNSu^@c zx-jE}Sw7ne1wyqqf4}btg|_Rbhy)WFqWXduvXXE2#3$2*>i=4{5YzP4v znU~UV{nYQa1@i^R#B@=vcM?@+^)zsfYS5U{_7NsX5yClMbvL0>pLOo+)E7gr>8)}S z0tBpN*9pa2`5Ajq4*C*9*ZF5%m7FKc zN4EVnG}g9g)+No_>yF?K&1wCrT!&oz@bncha&pQKCiPXEMSJm=Ue$n=B+En2>_pL3 zV;(&3HnrOQq&+kA(aZ~*4ib;(OkdSAz*wI#=A5V8&5cTKJUJpK8l)pFS1GA(zQayz~n-XumY9&@pHIAnS1b(tHbS;U`{?C*t82 z=F7uq!>Q;3KD*C1pr)h}TNS(+)@&0-l{Q^K4TT)r#VJ(d#r@Rw)oOfevikDHcG;kKQSvo%$Cz~=!P{~o{$G-=^`qSo}~F|V3i&jo8;boZ4-7U(BgTv{6LQ?AZ5+dKD53a>Qzi@i{eIU zd9GqlUMhZRlZv100<9i7UjD2O9dc$mP@|`+m}ENJqd~RX?OubVU)vFPmrPkVxy?fJ z!|eNylDo?SheW!zLn1D8HK}>Q7(CaBzafnTEYaxUZNXW+Fg=4P@nU+EJ#qKRHI<@W zM3F>zbm?!HVW7cr3ee(#ZHjx*Yrg*Ict{8}2z!_g_$Q z&tGYjNaOh&ce1xK=<+8oj+V>2id^($GU3MTzZ56ya`Q3SvTLwv`)m%22JgRMj3ak} z3}}s985ea)9cu(bWX2rtrS*LK=Gho+?%l~hg!fVY8`~+*u1=BB8?Q~#JGE7sd5j1Q zsL~7`cYKi6%Uh>Vfp^=_4eFWZ9UEY!7YI#z|GxLzzOS}!jh#)oA}yk5&oI$-b!^Me zAU3ntXvbaR9aYD8+VfDe>DhRyVA$^70I7zKt;8a$EB{?&Uq^z#Z(C$qvS+|CvPp3Ml>;&iOBdLx6Msi|+hir04&sNOk{AVgE None: run_command = (f"""set -xe && {stage_in_command} && {run_commands} && {stage_out_command}""") + print(run_command) + command_start_time = datetime.datetime.now(datetime.timezone.utc) # start the task (docker container/s) in the pulsar diff --git a/tesp_api/utils/docker.py b/tesp_api/utils/docker.py index 1e9e942..f7abe78 100644 --- a/tesp_api/utils/docker.py +++ b/tesp_api/utils/docker.py @@ -1,9 +1,11 @@ import os +import shlex +from urllib.parse import urlparse from typing import Dict, List, Tuple from pymonad.maybe import Nothing, Maybe, Just -from tesp_api.repository.model.task import TesTaskExecutor, TesTaskOutput +from tesp_api.repository.model.task import TesTaskExecutor, TesTaskOutput, TesTaskIOType from tesp_api.utils.functional import get_else_throw, maybe_of @@ -140,51 +142,104 @@ def docker_run_command(executor: TesTaskExecutor, job_id: str, resource_conf: di return command_builder.get_run_command_script(inputs_directory, i) -def docker_stage_in_command(executor: TesTaskExecutor, resource_conf: dict, - bind_mount: str, input_confs: List[dict]) -> str: - command_builder = DockerRunCommandBuilder() \ - .with_image(executor.image) \ - .with_workdir(executor.workdir) \ +def docker_stage_in_command( + executor: TesTaskExecutor, + resource_conf: dict, + bind_mount: str, + input_confs: List[dict] +) -> str: + command_builder = ( + DockerRunCommandBuilder() + .with_image(executor.image) + .with_workdir(executor.workdir) .with_resource(resource_conf) + ) + + stage_in_commands = [] + + for input_conf in input_confs: + url = input_conf.get('url') + input_type = input_conf.get('type', TesTaskIOType.FILE) + pulsar_path = os.path.basename(input_conf['pulsar_path']) - command = "" + if not url: + continue - for input in input_confs: - if (input['url']): - command += "curl -o " + os.path.basename(input['pulsar_path']) + " '" + input['url'] + "' && " - command = command[:-3] + scheme = urlparse(url).scheme - command_builder._command = Just('sh -c "' + command + '"') + if input_type == TesTaskIOType.DIRECTORY: + if scheme in ('http', 'https', 'ftp'): + url_quoted = shlex.quote(url) + cmd = ( + f"wget --mirror --no-parent --no-host-directories " + f"--directory-prefix={pulsar_path} '{url_quoted}'" + ) + else: + raise ValueError(f"Unsupported scheme for directory input: {scheme}") + else: + cmd = f"curl -o {pulsar_path} '{url}'" + + stage_in_commands.append(cmd) + + if stage_in_commands: + full_command = " && ".join(stage_in_commands) + command_builder._command = Just(f'sh -c "{full_command}"') command_builder.with_bind_mount(executor.workdir, bind_mount) + if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] + for env_name, env_value in executor.env.items(): + command_builder.with_env(env_name, env_value) return command_builder.get_run_command() -def docker_stage_out_command(executor: TesTaskExecutor, resource_conf: dict, - output_confs: List[dict], volume_confs: List[dict]) -> str: - command_builder = DockerRunCommandBuilder() \ - .with_image(executor.image) \ - .with_workdir(executor.workdir) \ +def docker_stage_out_command( + executor: TesTaskExecutor, + resource_conf: dict, + output_confs: List[dict], + volume_confs: List[dict] +) -> str: + command_builder = ( + DockerRunCommandBuilder() + .with_image(executor.image) + .with_workdir(executor.workdir) .with_resource(resource_conf) - - command = "" + ) + stage_out_commands = [] for output in output_confs: - command += "curl -X POST -H 'Content-Type: multipart/form-data' -F 'file=@" \ - + output['container_path'] + "' '" + output['url'] + "' && " - command = command[:-3] - - command_builder._command = Just('sh -c "' + command + '"') + path = output['container_path'] + url = output['url'] + output_type = output.get('type', TesTaskIOType.FILE) + + if output_type == TesTaskIOType.DIRECTORY: + # bash find + curl for recursive upload inside container + cmd = ( + f"base={shlex.quote(path)}; " + f"url={shlex.quote(url)}; " + f"find \"$base\" -type f -exec sh -c '" + f"for filepath do " + f"relpath=\"${{filepath#$base/}}\"; " + f"curl -X POST -F \"file=@${{filepath}}\" -F \"path=${{relpath}}\" \"$url\"; " + f"done' sh {{}} +" + ) + stage_out_commands.append(cmd) + else: + safe_path = shlex.quote(path) + safe_url = shlex.quote(url) + cmd = f"curl -X POST -H 'Content-Type: multipart/form-data' -F 'file=@{safe_path}' {safe_url}" + stage_out_commands.append(cmd) + + if stage_out_commands: + full_command = " && ".join(stage_out_commands) + command_builder._command = Just(f"sh -c {shlex.quote(full_command)}") for volume_conf in volume_confs: command_builder.with_volume(volume_conf['container_path'], volume_conf['volume_name']) if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] + for env_name, env_value in executor.env.items(): + command_builder.with_env(env_name, env_value) return command_builder.get_run_command() From a46daf111f6a9edf3d2e984ab91d868ef750bbfa Mon Sep 17 00:00:00 2001 From: Debian Date: Mon, 16 Jun 2025 12:27:46 +0000 Subject: [PATCH 04/10] Input directory processing finnished --- .../dts/s3/data/tests-data/sample.jpg/xl.meta | Bin 7342 -> 0 bytes docker/dts/sample.jpg | Bin 6920 -> 0 bytes public-ftp-test.json | 16 ---------------- 3 files changed, 16 deletions(-) delete mode 100644 docker/dts/s3/data/tests-data/sample.jpg/xl.meta delete mode 100644 docker/dts/sample.jpg delete mode 100644 public-ftp-test.json diff --git a/docker/dts/s3/data/tests-data/sample.jpg/xl.meta b/docker/dts/s3/data/tests-data/sample.jpg/xl.meta deleted file mode 100644 index 99da7635bb4f77e84fe338fa20bcb63b3d0c3331..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 7342 zcmc(EcTg1FvnWX<2qLH;K|p3fkRUmUWY}4vB$X&(L9%4Yf}|z63>gFkb|vSWRX`;w z3n&?dB`3)l-hSWvz5A-(`}@|Nnwsi#PWS2Vb52k76HO5gVj@zaFGNJd4y2^So!qIN zbVUEpGsL7A$jsyJ?$=3%Gk)7iOio1HMf~73#=!Rlk{D|wqGRPSO>`|6RQdmhVpLS@ z-8qkB|R?|BUxY7-XR+`iG(T zUnu4uh(N|_8Q41`|7$h*T1ZbzeP54|mX`v~_Ad6W0`@MRNOu=YCj#JP?}8MtvGla; zB=dClLjEgj=;0n(V(m&0a`6=KBrLwj-r3R)Ddg}1X@?=qWjDqlEG#X2-^yA_RLa)I z%Fy zE@S=;YigdXbn0tGU98>zBJngRxPaZg z4}Em};QNCeRvBW&mTPQ5BxDdBT#18Hk%KY>_!s^c{TKcpW^DtS&-gDUL)cgz^B?|_ z{}6cir|;=bO39h`!ImSoWKER}!t?f=BhS-Z`dNM!voYMbZ6$Flj@#pjil?cSUSi#w zwmoE%;j^q{>RoOG@h|>y5q4Ng!{^w5OP0Oga~*VfvBsa z6uzvFDD+867K*fvrAl6NcptZ~4DDN+u?kXlA8J^-Eo?}iv8dd-Zhhy&=t74PYd2DL z2jKZJc$u0L+>stdH1#a|RlkcwItA2POFc4RRox4)Tcj~+JMZs0hMbo1^l+SC1T*Hu zcj56KlktGd5~IP8JL*STg1mWIldSc3-EJRM*DxT?ho~eMN8*qlHC8 z=556QGUa2J1>@lp!f>2Z^l^j3iAo9NWOz0%DOKeAYj7vHbs()XcLLelG*PyZA$$5= z>rU=vEPU`;=AK}NK?GI;j&m?dGXp54nj>8KdfW?fzTfAf zXw2YbRGwhull-QpF@3)uTr{R^LoK7^U`Fh`Kxzj7p2aw@iI;}=3ci+jipF%aYI%<- zt1z2BUMRRz`@E2L zNE$<4;JIV#*p;l}a6=jT0ExencZm~wSTYczf7F{p-*giaOJ;DniQxWZGTa!Gp*q#K zar;>aAYkait}mH)$ljEzr!mvduZ6=)0rq!)#-GYW)Et@#bhJw@vdgT9l7r*jyywyM zz^us#tyz;4G%Ihud;<61T1C zeaIlO#OivFa%3OEi@m#?YJn}g>=n>&D#;> z|7vmbQ;rH$g2p6VkHs6gWEoiWF9kSe180<1N4*;K}$K zjj?^mq?yIsS*fY60I5~+#PZnJTs7*l%6bWnn{a?EG#lbA)0iJR0WVfN7q*J(C(0Bc ziEWAUR+9^Li?>QmsQOHh3b5yT?F$*dJLuxIsl{t1L{F1};{{XWTg<$z0V2|0DEnmy5;XNRM;%u_l#ZNz?8`GqE8wDqw{}Sv`^MuDMisfOox4LgJCq*I)irGP?sZAy zJ?FJZHhgapEEX^YOL2j2wCQoadF#h!uK|I{=Jh{Q63dYH(JV0VSBCf-nw2niz#}y^ zYQJkcz=^cpl+}<7G=@LATVCsU_>r-r?~D5;JI^==$S}eZjDLkYRvnLUyOTRMuSu$Osz4#Q`#4ZLRCJIezo+ z2T>R0bWGN5Z2r_*mD=pjVo&L@aVg1YmMk>_+3EM@tGuB>f9)dDWm5r3b?!H|%8<7^ z*Z9;Pa5nrB-K%HP#7pC)xM{)(D&Jb$2&EVKTXMjOm3#g|UCOeU2xNA+>+Hq@s$V%1 z@;xbN%nFTa?%=iv2L$G=L}ydzmpy(KXAxshGW6=zN80ZwKq-3n$DMO9Ts0$%=B&ET zW`2>e*39}GpvlOD-J{MkBy5X;s&$0rg)cnzglpwF4>e$a&kjPGK4#0&s-O4!MBi&Z z6V)UA-ZG8|9FKN>;Kl#yE;fu#9q~se5bKGi?0q77KkXYdAwo#;6zkcEyYx6aTM$f022+Zg;f0QX(X0>NyJ0&`5ZwuoT%RTYWq(%A<+f5cb`S&>ddz(Lyi8?pPdMjyRJzrE1tpg9xlzx$CH%@U3zkR@JEdc5(H78#Y*9huC>0~qu29eDrHActUm?2vImz3gl00Q5JR5De{%ISeO# zTXn8()w+gz+3zKKUx5mHzc(DW)E1PXFFTdp33{1DA<&rIYeKTKq7P~vb00v%9A>Y8 zxWyhWa&>-oL-9NjyJ_o!jZ$tEi_L54^((R~z9hU3iG=B0J@WyXcwa~TRvKsf@y`Ql z#``UC^vok?@BH!cezK#Jq;N%jXEgqDl9B9myJ{e}V69R6HvCOn$A^pG-BOeKrT90n zoWODlH2!M(^*e1Vo@vh+-&PU7C|P6A4p zTqPcT{u+?)E>xRUuV(65| z;um8w0lvdem z31&nlO<#d&S3~s2?ah9_KF#syT|GaYZ~t;oAm8l>MdkD#5gmJxf{^Zm=J+8hhA_G! z$#C`X^d83d@68iccBl5K33Ya5N$I9-HuF{rpu{d?{TjIFuctgk#<##wX_1#lg;<_L zVM*ECxorUb&VUzud$Xdt!#C;MsySNad28Gy65z4fuMJ?pVJcD_;X$3j+n1k2oXrE-?6RWO?%>n4gY*j5eF4BkvKsto} zhngP~^{L;it6*cn(M0;lvHM-01ID`Tq_Pj5o zlZdr;ELssN*K4tS5|}FcgI?(~dselsn7i=+q)Tv@%18`MK@SnD)+flE=Cyegmhn4=HJ zB410k$dM6p|`zNME7Rk?7w9BUJp2fBQVr<>`8Om+V{QL#k=%XCMmi3yl;NhW6QC1bYf+_Fd7udq3 z>C$={h{Ob1-{bh2%*xQ-ml#l7`|&)(>#v~ENLtAqR3`onUH3&%X0{&l z!`*s%Vu-{HM@rl5@Mhb|869KtGbnW)uGolq!wEV?I@yimOGU%U4dHkxxKDmjFQglDDyE5wmkJR zb}l&mGCAzZtsdjpSL7r2u2cn=B0uArdY-=~?}n#5`9+R|BYKW)1Gx00NsC$aU$*Z( zMdRbHt3?Z@2gRRWsES&vh^q8Nl8g?N``7vu8^_1Ic-&gp55k}?(cEvzN~eOPlTmZGWiYjLC<*RozKi+J zVXWrkl~&MrP@4qsH6~Iy2P7*(vrdeR?B0254uE|_reX^Ot(;Z$^12pqbdM~_K>>t) ziYuX$|FL--sRzn@hg}-+(vIB(mOUdBQ#nClAsLqf)cuJ8@n`1cGiDh=MpOozoWMEygd<<$HfHju4?u+Iwv&RXRSXhJO~SfVwsX@r|z3+%m) zD2h_gLj-X1+E$yHd~4EexlUsCk)t~XS~#i2l&c2s?a!__lK{Nna^7q{eKmXKL{w=CZM?1vJ~hY)JhR=OT26%)P;U}0gNW_d&nw|(v)YA{SuXRb&`koBV3|AO zLHtC++e@*4r8M9x+~qcV-k9+7BX44g0>s$i7X_O#VlAjOc`gj_Hv4)!q4s5?W&(3> zN}`m4p(@s!I{J6IB}vq9o`Xlf!U1HZ7`+DW`&A||QDt$4l@Rf?j=oi+ucY(S&X}ds z&2S)GYvcRtu5f6pagvBPu_!|rz;+&lxj0B-Q9XZ2xi?CHiCnI<#+Xb z2{!)5o)klV#L#u_88<~2HcrlUHGydl&U`8PgOz2uRf{(Vy0|~Upa$6PtBxbv`WPN> zS2W|Aj`Y6AyG?!4xFXvnn=mqU35=eY^oL1)6lT&|_@-MuXd}k-fbCt9;EFK^o^y*z zRXlmu9DO+b%(jcfGd9al2gX>*#4~ zL{hr?^i8%M{Ain!JxINqPy=rfA{UN3UpN@HKJh+p4sPhkPKO|hoZ5Hhb)xe>{shVo zG<8@WPR|{o@!>%mHg|lA6?`G5t1`8-c`lCA4d$LF9SC6kzD$`6leho2SzK=6&~z~b zxmZQJjGo4Ssf-V{oV(Juu&?Aq-O+jv)w*tFF@<@T4DX>x@y{p&8ci+lti2}!&EZ>c zuB{J%Ww{~^sPwo90>-DvL34268^^2`h;=qdsL_Ujoh=%#fKB}Pi?Jj;+{$8U1Z^-G zTfk*saRX{eF0oU_n`6y4V3g@o1yoSjk!@}$pcnjxFuBI3k`GluKFl(zFNC`58fhOX z;;?Ogv=pBcFu$6`;7T7D6(L6JDOq&WDh)@Ngn5R5GNdbJvC^D`u(}MHPTp9dpmA}0 zAyBxrqbPXp9@dt`r2zRw?6IXn zLx*I-L=R~5(COl9P57`2!@lZO%E}3b(_Ly*r~S?~Na}?>ac{{avc-J{nj2x+dzjK& z7Bno-vlSL~uA@Q43C7`X9|ssvi^7tO9^B=f(T&j6j}b1uigF%tqpArmxo+`=e?o1?Nd!Ics!AKW9DzFGqT)bRHp12tj6ITt5uWFA27~|y+9g7nkvG6-nz;Yp8ooE|EE1aEuC6>+j0dOMA5E6lH1DYroVoCR=?4< zhiC|8*I4?~aPz5|1jP@=~?Li E0&sH0lK=n! diff --git a/docker/dts/sample.jpg b/docker/dts/sample.jpg deleted file mode 100644 index b44fda912ac6b7fe154fe83e6e6d83900e3554f8..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 6920 zcmc(EXH!&B^CwB71VJSyxdVa(5tJm7VeSwGL}ds{lq^}YB58=jC4+##jAW23Sp`(0 zFyyQ-h9C0PyJAOIvTXpY}7;_>djp43pYp7N0o2^`zr<5W`8Yx~Yj0bG%RDpGhzS_a$FFwz z6*{UtNx!Yl;w}9f58?tD3 z9&&vrih6!?^vTx(My7z})Ef|q%*tiUaoU)@gVALlDtSaLDw-e za_}9?WVuuHjGLh|$Sq#a0Fj6$4~>3DN%1C)x|WVEDeJ$$zA&q-<4i(E`ta)&e<4cR z(MZ>aj5VriNp>dg!HrIB-Sr!0T=i zRbfe3u{(kg`n-NE+wc0Kq&_6&y!wc@pj3FAjR(z|n!>t|tYb>Q78*-@a``a$j8ns;XcAP+t zD|kjY&Nqw=bLzABbf3kz&v}{IaL^6apBk@VR@NYA`CG5kTg@c|i18*W$;FY_=SR%c zmLuWX!q)r1mLGUnH^7)Z}^W(g~7|nmXFL2%178526HGH z3u-;A?bCnofNhPSztdVv?o0`$LoSFAIZzk4gxdq?$M7W zmDr%yYqa?)2kx$4Wq_FMZ_E(I1o@FHVhG~F-^`v8u!H%c`DBQL=drXc^5g$v=j(C`i8u5{XC zT=_=K>-$1~E+o*HfvJc*(eP*a4GrT4zWsc(CY*y!W8`3Z^nyrA8vve1*>m164eb_v z|KK?q)5)RjHLjw{X7Y5cARpPtRU{d%1d&^2I(bazjr|ET4woNHuF`o*4%pot=QQ;; zPB=*`N&>yzvr4q8l%Ru3{s^h1`6j})fe-Fu2TMGlyWj#{Vu?Gvbe^lMTgDNOAP0@R zZ8pzp$F_!KH#Zq(m}c{LctJ}+4!95fcH5)JD_)$rG4+TvioC#M*T$hEN!9*_3iJuO|5m{{M)FZf-#dfj?i|L3n~-D@lk-gk z|2O1NeN?*Ibk8Q&i+6y?6K8G%sk|fZhFpEk*s zquusg^79Y4EE&DYAjyQPI`?wx9)u@%XF2sEXI9x;px4Ts<4}JlC2#X@`Tk$73qe90 zR!CD>d!`Mn-cS5Azxgdk6)Hhv;;%>Jjhr(L&3l*q9kPIPDmV@rJ)F^R|-T6*j3XTby#G+fhLeVAUo~m{iYdAzpnr5 zwf4pBqPod4Md-oKgYstNrH1(j^u#us zV6*?dSK>iDOB~-XXu%1%NZ*qog{=gG+kv5jdiPo=z;t+Q{Ak8rj&9#K*-SoEiz6Yz zhR$|PNU4S?Dcg$FwzR8v-U1@<6V#m*_Sp{|LOR(ueS~SCYV^JdDCh6axRxs|Fd#rm zV8EvGom-D0By{x;6?j0Jw7#Uhyf{G3U1v_0P(w66;dfU2WKfYaF<8btqxvl4i+ZMOL1P{nG%pQb=cpp78 z$=pr{8~(780CvjyQe}h;r#g9kXk>iWo{z@FM9+yX+w6V&0;QRNuCpr8gD%YTGc2)2 z@r=~-p_s`(Wf|1q1iCu3wcoOO_3%$)!k2xAd4dz_bnUw-E9M_}wnZd@>q9Mc2?Y*6 znLfLpC|C5bX5Rj3;J)nhw}2D=M6m^P6jOAQdfkt&RqKo#l7t{C--{Vu-D$nue^(B- zC+)tap)pCqSC?E=n9egJLm4w{|1hIADh^6*DvM(?oL)gs{7vGs+7+ZTZ!1DZ&^Rsz zkOgaMTz1UxTmL>uIIE;#GP$t%)9aP$b0ftbGUF4{QjzSL>IAmqpRKofg9Cm#MWibx zB2pUsaW*QDmmA;2^geJt^akCnk8BX6^;FtI@`5U?>zl!h_x_a}@?zy*z1EO^Sxf}7 z+23|@6#zA^o{RaM7Sv}3M>Mwan}-4-3lc(f&(0S|~Ap0s`3er^rhW}Dh6tR9bb9D1{Ps3Bgh2!GS2qCbfu`zyCUH0Q7c_Z~P~yp! zbCb6jakjYVm3cYV!Zw&dwO5_)ZlZL<#lZPMCC7@#Nhb`AsZYku=PC>O(=f3xjdfI1 z#11p?N^WOXe@~Zu@yza`A|2j!Ay;OxZ)GzrF=l52jXiTp^uQM{?*Ep?%um!nkv+PbFO&cI|d?w*g4RL zH4eECp&=gAw?NEN7azHXF!z)DdG~B*EDJVE`BlxguBFwj%B}j42-+tQ)Vp}(1G2F` z4hGG%PIeRD`_zpOnqnB)hR?J8@UgyfV^gGXB?Biko+8mmZl+Z&fM2x6sC5S(*V6Xs z@=vGqltC#z4we&8PKn0Tr(M6*vg(ogiupq&@#~Ux?kxYM(Api?{R_3XKB z$sllq(m+az_kfkB*7M91v$qG z170aNsCj~6bj&P-$u&|Tnjr;oQY+C{D2%E-7186lf(CNl7^fc5#ga*})DP-U#~F@b zt-24h0Od9A8iE_)i8EJWI#m$kNo%9;=(l+Refo>Dh1MU31qz)GP()7eG0}-9DG2F3 zY>XYGW(r{_k_y!bP3vO*_|Ysub#MBBhA?MWl$37SWU*~01Ipa8mhXYf-a4u?>)0k3 zDm9#9OpN^%6q1<5pW6a3?)G`YceXxPwfQ9eS~EkdzG{xSLINE7l?-Z^<+H@6SOsb} ztg*VN&R$3}@dy${VoNzq`_=^V2%}or;W%d!*V|Tm9t5MK-MqHw=HKg@wpjj)rWIe=sf_O&sssoD*9?>oOJTx)g4Ezb4q~QAxkXWfaXswEmU=gZ zJtq#Btn&8;t#Rvbx|Cp&plr=W=VjEFjeU>t(5U^T7V$;?@(YiO>#bY>ikknM&Hm@u z%EY@v_UI5rMiP(`;3a(jo4!1K8T_LA;mPjP;-_UM^==zKHd-^AoYh+C0H5^;Q{??h zPi22C)n`86^DG>O?FKG4oW_^=LxAIGx7gk-d(rG~y6x_#g7>{YG%QtmbG>^`y|Zy^ zKQa5pWNY3J(kaAxD;BK;mFqXXd=`)**UzZ@ojWzKqyFmd5lNE&qn-NI=p(~95!;{R z&GYr0_f1?Hk7!R_e8jIn{)wdW7Srqd@L*&Vh3LKRwD-(TT*h7N8tckP*`N<1Ba^K# zeCz!?FXLD@TGbQ=slgWbX49M1!2M$QshX4KK};-PqU4ERgT{P++5XJ{Yd$K_UCq{_ zWE{7#{`S?qyT=TWe>@Xku=RS04#h+Our{q~{J2C|Fc^Rf{r+}ZSkhs`{M7v5Ls9P- z6)F=+4WbQ4zBkAHGQOO6le5ZK8|oEB3^n-iQbA;Y-MD?w@a|P(VT0;)$^b}-5m=zb z-aGVq^QlM$6(BMZ9Tk$v>`gm%1tL*nJe4-w^pdD5cEPF3ICj*0-=rCg%aHsmTudjum6EA*f*-nf`$e&Bsj#x8Y|uV z3Z}h+USkWFXG-g6Ard6CvCs1}iG!)NC&9nC=Icc|^3&vSB;ZT9K;Mb1_}hlxLe(Cb znhDHvpBJgqq5JD1c?7@0&KcG|8yQWux%n0A#!wc&2}Z633_$1T>u;g4a5|}7R0ckd zq4TmRBTJv{(Ow-RF+^gDBc*G!kK1u{LPsG-0#g>?O7)mHUeEz@vuuav1|K#V>jxE> z;FRIGRG@Y`C&;x@Dy;WT${xxdO8qK&xziCWLo;&b0cuuRUo3TFP;<8gr{4FLMY^p+ z4wsjHkVggooNHPWq?I0 z4T#tcgF}JUrsBW*Hjd1M6iog}fwItRD(JdR^zt0pdGAH7bF>Wnmiq3i)&xu*9}6J3 zrsJ!=hh(>%Xih%-9A(}tuqy`wC(?jm~W(7mq;Fpu^k($_4 z!cDIYc3(#nMQG$9{P_iKs!Wl;8uXg3lbC+x>5PIFPirvcYC-!4bE{4yfFQV%HmEzr>%wIvGe{_1>AH_r%)=>c_9V5 zN#GJZb3;6go&56QN;L3N2JjK@aGkrTk01Fen2@XpF}L|fz^08j3TlwA!~wxZANOZ8 zKAbcxVD3#RlyVSM$#GNH;8v#;i8{_>;CM6?u&$6~)WrR!X8{vbmu5K#8BcBNSu^@c zx-jE}Sw7ne1wyqqf4}btg|_Rbhy)WFqWXduvXXE2#3$2*>i=4{5YzP4v znU~UV{nYQa1@i^R#B@=vcM?@+^)zsfYS5U{_7NsX5yClMbvL0>pLOo+)E7gr>8)}S z0tBpN*9pa2`5Ajq4*C*9*ZF5%m7FKc zN4EVnG}g9g)+No_>yF?K&1wCrT!&oz@bncha&pQKCiPXEMSJm=Ue$n=B+En2>_pL3 zV;(&3HnrOQq&+kA(aZ~*4ib;(OkdSAz*wI#=A5V8&5cTKJUJpK8l)pFS1GA(zQayz~n-XumY9&@pHIAnS1b(tHbS;U`{?C*t82 z=F7uq!>Q;3KD*C1pr)h}TNS(+)@&0-l{Q^K4TT)r#VJ(d#r@Rw)oOfevikDHcG;kKQSvo%$Cz~=!P{~o{$G-=^`qSo}~F|V3i&jo8;boZ4-7U(BgTv{6LQ?AZ5+dKD53a>Qzi@i{eIU zd9GqlUMhZRlZv100<9i7UjD2O9dc$mP@|`+m}ENJqd~RX?OubVU)vFPmrPkVxy?fJ z!|eNylDo?SheW!zLn1D8HK}>Q7(CaBzafnTEYaxUZNXW+Fg=4P@nU+EJ#qKRHI<@W zM3F>zbm?!HVW7cr3ee(#ZHjx*Yrg*Ict{8}2z!_g_$Q z&tGYjNaOh&ce1xK=<+8oj+V>2id^($GU3MTzZ56ya`Q3SvTLwv`)m%22JgRMj3ak} z3}}s985ea)9cu(bWX2rtrS*LK=Gho+?%l~hg!fVY8`~+*u1=BB8?Q~#JGE7sd5j1Q zsL~7`cYKi6%Uh>Vfp^=_4eFWZ9UEY!7YI#z|GxLzzOS}!jh#)oA}yk5&oI$-b!^Me zAU3ntXvbaR9aYD8+VfDe>DhRyVA$^70I7zKt;8a$EB{?&Uq^z#Z(C$qvS+|CvPp3Ml>;&iOBdLx6Msi|+hir04&sNOk{AVgE Date: Tue, 1 Jul 2025 20:07:57 +0000 Subject: [PATCH 05/10] Finnished run-script removal and in-out directories --- tesp_api/service/event_actions.py | 18 ++--- tesp_api/utils/docker.py | 128 +++++++++++++++++++++++------- tests/test_jsons/state_true.json | 4 +- 3 files changed, 110 insertions(+), 40 deletions(-) diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index 7aea126..fcb1abb 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -182,10 +182,10 @@ async def handle_run_task(event: Event) -> None: stage_in_command = singularity_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) # container_cmds.append(stage_in_command) - + print("Buidling commands") for i, executor in enumerate(task.executors): if CONTAINER_TYPE == "docker": - run_command, script_content = docker_run_command(executor, task_id, resource_conf, volume_confs, + run_command = docker_run_command(executor, task_id, resource_conf, volume_confs, input_confs, output_confs, stage_in_mount, i) elif CONTAINER_TYPE == "singularity": mount_job_dir = payload['task_config']['job_directory'] @@ -193,10 +193,10 @@ async def handle_run_task(event: Event) -> None: input_confs, output_confs, stage_in_mount, mount_job_dir, i) - await pulsar_operations.upload( - payload['task_id'], DataType.INPUT, - file_content=Just(script_content), - file_path=f'run_script_{i}.sh') + # await pulsar_operations.upload( + # payload['task_id'], DataType.INPUT, + # file_content=Just(script_content), + # file_path=f'run_script_{i}.sh') container_cmds.append(run_command) if CONTAINER_TYPE == "docker": @@ -209,9 +209,9 @@ async def handle_run_task(event: Event) -> None: # Join all commands with " && " run_commands = " && ".join(container_cmds) - - run_command = (f"""set -xe && {stage_in_command} && {run_commands} && {stage_out_command}""") - + parts = ["set -xe", stage_in_command, run_commands, stage_out_command] + non_empty_parts = [p.strip() for p in parts if p and p.strip()] + run_command = " && ".join(non_empty_parts) print(run_command) command_start_time = datetime.datetime.now(datetime.timezone.utc) diff --git a/tesp_api/utils/docker.py b/tesp_api/utils/docker.py index f7abe78..5dd6fb1 100644 --- a/tesp_api/utils/docker.py +++ b/tesp_api/utils/docker.py @@ -1,4 +1,5 @@ import os +import re import shlex from urllib.parse import urlparse from typing import Dict, List, Tuple @@ -8,6 +9,9 @@ from tesp_api.repository.model.task import TesTaskExecutor, TesTaskOutput, TesTaskIOType from tesp_api.utils.functional import get_else_throw, maybe_of +SHELL_PATTERN = re.compile( + r"[|&;<>(){}$*?\"'\\`]" # shell metacharacters +) class DockerRunCommandBuilder: @@ -52,17 +56,52 @@ def with_env(self, name: str, value: str): self._envs[name] = value return self - def with_command(self, command: List[str], stdin: Maybe[str] = Nothing, - stdout: Maybe[str] = Nothing, stderr: Maybe[str] = Nothing): - command_str = " ".join(command) - self._command = Just(command_str) if command_str else Nothing - - # sh -c '' # there probably must be ' instead of " because of the passing unresolved envs into the container - self._command = self._command.map(lambda _command: - f'{_command}' - f'{stdin.maybe("", lambda x: " <" + x)}' - f'{stdout.maybe("", lambda x: " 1>" + x)}' - f'{stderr.maybe("", lambda x: " 2>" + x)}') + def requires_shell(self, command: List[str]) -> bool: + return any( + isinstance(arg, str) and SHELL_PATTERN.search(arg) + for arg in command + ) + + def escape_redirections(self, cmd: str, stdin=None, stdout=None, stderr=None) -> str: + redirections = [ + ('<', stdin), + ('>', stdout), + ('2>', stderr), + ] + for op, val in redirections: + if isinstance(val, Maybe): + val = val.maybe(None, lambda x: None if str(x) == "Nothing" else x) + elif str(val) == "Nothing": + val = None + if val: + cmd += f" {op} {shlex.quote(val)}" + return cmd + + def with_command(self, command: List[str], stdin=Nothing, stdout=Nothing, stderr=Nothing): + if not command: + self._command = Nothing + return self + + if self.requires_shell(command): + shell_cmd = " ".join(map(shlex.quote, map(str, command))) + shell_cmd = self.escape_redirections(shell_cmd, stdin, stdout, stderr) + self._command = Just(["sh", "-c", shell_cmd]) + else: + # Only for direct commands, sanitize the redirections + def maybe_path(val): + if isinstance(val, Maybe): + return val.maybe(None, lambda x: None if str(x) == "Nothing" else x) + return None if str(val) == "Nothing" else val + + stdin_val = maybe_path(stdin) + stdout_val = maybe_path(stdout) + stderr_val = maybe_path(stderr) + + cmd = list(map(str, command)) + redirs = [("<", stdin_val), (">", stdout_val), ("2>", stderr_val)] + cmd += [part for op, val in redirs if val for part in (op, val)] + self._command = Just(cmd) + return self def reset(self) -> None: @@ -75,18 +114,50 @@ def reset(self) -> None: return self def get_run_command(self) -> str: - resources_str = (f'{self._resource_cpu.maybe("", lambda cpu: " --cpus="+str(cpu))}' - f'{self._resource_mem.maybe("", lambda mem: " --memory="+str(mem)+"g")}') - bind_mounts_str = " ".join(map(lambda v_paths: f'-v \"{v_paths[1]}\":\"{v_paths[0]}\"', self._bind_mounts.items())) - volumes_str = " ".join(map(lambda v_paths: f'-v \"{v_paths[1]}\":\"{v_paths[0]}\"', self._volumes.items())) - docker_image = get_else_throw(self._docker_image, ValueError('Docker image is not set')) - workdir_str = self._workdir.maybe("", lambda workdir: f"-w=\"{str(workdir)}\"") - env_str = " ".join(map(lambda env: f'-e {env[0]}=\"{env[1]}\"', self._envs.items())) - command_str = self._command.maybe("", lambda x: x) + print("Constructing docker run command...") + + print("Before maybe(_resource_cpu)") + cpu_flag = self._resource_cpu.maybe("", lambda cpu: f"--cpus={cpu}") + print("After maybe(_resource_cpu)") + + print("Before maybe(_resource_mem)") + mem_flag = self._resource_mem.maybe("", lambda mem: f"--memory={mem}") + print("After maybe(_resource_mem)") + + print("Before maybe(_workdir)") + workdir_str = self._workdir.maybe("", lambda w: f'-w="{w}"') + print("After maybe(_workdir)") + + print("Before maybe(_docker_image)") + image = self._docker_image.maybe("", lambda i: i) + print("After maybe(_docker_image)") + + bind_mounts_str = " ".join( + f'-v "{host}":"{container}"' for container, host in self._bind_mounts.items() + ) + volumes_str = " ".join( + f'-v "{host}":"{container}"' for container, host in self._volumes.items() + ) + env_str = " ".join( + f'-e {shlex.quote(k)}={shlex.quote(v)}' for k, v in self._envs.items() + ) + + def quote_command(cmd): + print(f"quote_command called with: {cmd} (type: {type(cmd)})") + if isinstance(cmd, str): + return cmd + if isinstance(cmd, list): + return " ".join(shlex.quote(arg) for arg in cmd) + return "" + + print("Before maybe(_command)") + command_str = self._command.maybe("", quote_command) + print("After maybe(_command)") + + full_command = f"docker run {cpu_flag} {mem_flag} {workdir_str} {env_str} {volumes_str} {bind_mounts_str} {image} {command_str}".strip() - run_command = f'docker run {resources_str} {workdir_str} {env_str} {volumes_str} {bind_mounts_str} {docker_image} {command_str}' self.reset() - return run_command + return full_command def get_run_command_script(self, inputs_directory: str, i: int) -> Tuple[str, str]: resources_str = (f'{self._resource_cpu.maybe("", lambda cpu: " --cpus="+str(cpu))}' @@ -97,7 +168,7 @@ def get_run_command_script(self, inputs_directory: str, i: int) -> Tuple[str, st workdir_str = self._workdir.maybe("", lambda workdir: f"-w=\"{str(workdir)}\"") volumes_str += f' -v "{inputs_directory}/run_script_{i}.sh":"/tmp/{self._job_id}/run_script_{i}.sh"' env_str = " ".join(map(lambda env: f'-e {env[0]}=\"{env[1]}\"', self._envs.items())) - command_str = self._command.maybe("", lambda x: x) + command_str = maybe_of(self._command).maybe("", lambda x: " ".join(shlex.quote(arg) for arg in x)) chmod_commands = f"chmod +x /tmp/{self._job_id}/run_script_{i}.sh" if self._bind_mounts: @@ -140,7 +211,7 @@ def docker_run_command(executor: TesTaskExecutor, job_id: str, resource_conf: di [command_builder.with_bind_mount(input_conf['container_path'], input_conf['pulsar_path']) for input_conf in input_confs] - return command_builder.get_run_command_script(inputs_directory, i) + return command_builder.get_run_command() # _script(inputs_directory, i) def docker_stage_in_command( executor: TesTaskExecutor, @@ -211,17 +282,17 @@ def docker_stage_out_command( path = output['container_path'] url = output['url'] output_type = output.get('type', TesTaskIOType.FILE) + print(f"Staging out: {path} -> {url} (type: {output_type})") if output_type == TesTaskIOType.DIRECTORY: # bash find + curl for recursive upload inside container cmd = ( f"base={shlex.quote(path)}; " f"url={shlex.quote(url)}; " - f"find \"$base\" -type f -exec sh -c '" - f"for filepath do " + f"find \"$base\" -type f | while read -r filepath; do " f"relpath=\"${{filepath#$base/}}\"; " - f"curl -X POST -F \"file=@${{filepath}}\" -F \"path=${{relpath}}\" \"$url\"; " - f"done' sh {{}} +" + f"curl -X POST -F \"file=@${{filepath}};filename=${{relpath}}\" \"$url\"; " + f"done" ) stage_out_commands.append(cmd) else: @@ -264,7 +335,8 @@ def map_volumes(job_id: str, volumes: List[str], outputs: List[TesTaskOutput]): output_confs.append({ 'container_path': output.path, 'url': output.url, - 'volume_name': volume_name + 'volume_name': volume_name, + 'type': output.type }) for v in volumes: diff --git a/tests/test_jsons/state_true.json b/tests/test_jsons/state_true.json index e57b0f5..643bafa 100644 --- a/tests/test_jsons/state_true.json +++ b/tests/test_jsons/state_true.json @@ -2,9 +2,7 @@ "executors":[ { "image":"ubuntu", - "command":[ - "sleep", "20", "&&", "true" - ] + "command": ["sh", "-c", "sleep 5 && echo done"] } ] } From 4aac7343fddc15c0eaba4f124dafc7f429bd25fd Mon Sep 17 00:00:00 2001 From: Debian Date: Wed, 2 Jul 2025 09:51:03 +0000 Subject: [PATCH 06/10] Staging command removed for unsepcified tasks --- tesp_api/service/event_actions.py | 64 ++++++++++++++++--------------- tesp_api/utils/docker.py | 24 +++++++++++- tests/test_data/env_test_1 | 1 + tests/test_data/env_test_2 | 1 + tests/test_jsons/envs.json | 45 +++++++++++----------- tests/test_jsons/multi_true.json | 34 +++++++--------- tests/test_jsons/volumes.json | 51 ++++++++++++------------ 7 files changed, 121 insertions(+), 99 deletions(-) create mode 100644 tests/test_data/env_test_1 create mode 100644 tests/test_data/env_test_2 diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index fcb1abb..7e63f8f 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -165,49 +165,53 @@ async def handle_run_task(event: Event) -> None: start_time=Just(datetime.datetime.now(datetime.timezone.utc)) ) - # prepare docker commands container_cmds = list() - # stage-in - print("Payload:") - print(payload) - stage_in_mount = payload['task_config']['inputs_directory'] stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest", - command=[], - workdir=Path("/downloads")) + command=[], + workdir=Path("/downloads")) + # stage-in + stage_in_command = "" + + print(stage_exec) + if input_confs: # Only generate stage-in command if inputs exist + print("Preparing stage-in command") + stage_in_mount = payload['task_config']['inputs_directory'] - if CONTAINER_TYPE == "docker": - stage_in_command = docker_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) - elif CONTAINER_TYPE == "singularity": - stage_exec.image = "docker://" + stage_exec.image - stage_in_command = singularity_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) + if CONTAINER_TYPE == "docker": + stage_in_command = docker_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) + elif CONTAINER_TYPE == "singularity": + stage_exec.image = "docker://" + stage_exec.image + stage_in_command = singularity_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) - # container_cmds.append(stage_in_command) - print("Buidling commands") + # main executors + print("Building commands") + print(stage_exec) for i, executor in enumerate(task.executors): if CONTAINER_TYPE == "docker": run_command = docker_run_command(executor, task_id, resource_conf, volume_confs, - input_confs, output_confs, stage_in_mount, i) + input_confs, output_confs, stage_in_mount if input_confs else "", i) elif CONTAINER_TYPE == "singularity": mount_job_dir = payload['task_config']['job_directory'] run_command, script_content = singularity_run_command(executor, task_id, resource_conf, volume_confs, - input_confs, output_confs, stage_in_mount, mount_job_dir, i) + input_confs, output_confs, + stage_in_mount if input_confs else "", + mount_job_dir, i) - - # await pulsar_operations.upload( - # payload['task_id'], DataType.INPUT, - # file_content=Just(script_content), - # file_path=f'run_script_{i}.sh') container_cmds.append(run_command) - if CONTAINER_TYPE == "docker": - stage_out_command = docker_stage_out_command(stage_exec, resource_conf, output_confs, volume_confs) - elif CONTAINER_TYPE == "singularity": - mount_job_dir = payload['task_config']['job_directory'] - bind_mount = payload['task_config']['inputs_directory'] - stage_out_command = singularity_stage_out_command(stage_exec, resource_conf, bind_mount, - output_confs, volume_confs, mount_job_dir) - - # Join all commands with " && " + print(stage_exec) + # stage-out + stage_out_command = "" + if output_confs: # Only generate stage-out command if outputs exist + print("Preparing stage-out command") + if CONTAINER_TYPE == "docker": + stage_out_command = docker_stage_out_command(stage_exec, resource_conf, output_confs, volume_confs) + elif CONTAINER_TYPE == "singularity": + mount_job_dir = payload['task_config']['job_directory'] + bind_mount = payload['task_config']['inputs_directory'] + stage_out_command = singularity_stage_out_command(stage_exec, resource_conf, bind_mount, + output_confs, volume_confs, mount_job_dir) + run_commands = " && ".join(container_cmds) parts = ["set -xe", stage_in_command, run_commands, stage_out_command] non_empty_parts = [p.strip() for p in parts if p and p.strip()] diff --git a/tesp_api/utils/docker.py b/tesp_api/utils/docker.py index 5dd6fb1..b524681 100644 --- a/tesp_api/utils/docker.py +++ b/tesp_api/utils/docker.py @@ -56,6 +56,14 @@ def with_env(self, name: str, value: str): self._envs[name] = value return self + def _is_shell_wrapped(self, command) -> bool: + if isinstance(command, list): + if len(command) >= 2 and all(isinstance(part, str) for part in command[:2]): + return command[0] == "sh" and command[1] == "-c" + elif isinstance(command, str): + return command.startswith("sh -c ") + return False + def requires_shell(self, command: List[str]) -> bool: return any( isinstance(arg, str) and SHELL_PATTERN.search(arg) @@ -77,17 +85,29 @@ def escape_redirections(self, cmd: str, stdin=None, stdout=None, stderr=None) -> cmd += f" {op} {shlex.quote(val)}" return cmd - def with_command(self, command: List[str], stdin=Nothing, stdout=Nothing, stderr=Nothing): + def with_command(self, command, stdin=Nothing, stdout=Nothing, stderr=Nothing): if not command: self._command = Nothing return self + if self._is_shell_wrapped(command): + # If command is a string, split into list form before assigning + if isinstance(command, str): + parts = command.split(" ", 2) + if len(parts) < 3: + raise ValueError(f"Malformed shell command string: {command}") + self._command = Just(parts) + else: + self._command = Just(command) + return self + + # If command needs shell (but not already wrapped), wrap it if self.requires_shell(command): shell_cmd = " ".join(map(shlex.quote, map(str, command))) shell_cmd = self.escape_redirections(shell_cmd, stdin, stdout, stderr) self._command = Just(["sh", "-c", shell_cmd]) else: - # Only for direct commands, sanitize the redirections + # Direct command with sanitized redirections def maybe_path(val): if isinstance(val, Maybe): return val.maybe(None, lambda x: None if str(x) == "Nothing" else x) diff --git a/tests/test_data/env_test_1 b/tests/test_data/env_test_1 new file mode 100644 index 0000000..b2c9cad --- /dev/null +++ b/tests/test_data/env_test_1 @@ -0,0 +1 @@ +first upload successful diff --git a/tests/test_data/env_test_2 b/tests/test_data/env_test_2 new file mode 100644 index 0000000..b89622e --- /dev/null +++ b/tests/test_data/env_test_2 @@ -0,0 +1 @@ +second upload successful diff --git a/tests/test_jsons/envs.json b/tests/test_jsons/envs.json index ff681ee..f1e64a3 100644 --- a/tests/test_jsons/envs.json +++ b/tests/test_jsons/envs.json @@ -1,29 +1,28 @@ { - "outputs": [ - + "outputs": [ { - "path": "/tesp-api/tests/test_data/env_test_1", - "url": "http://172.17.0.1:5000/upload", - "type": "FILE" + "path": "/tesp-api/tests/test_data/env_test_1", + "url": "http://172.17.0.1:5000/upload", + "type": "FILE" }, { - "path": "/tesp-api/tests/test_data/env_test_2", - "url": "http://172.17.0.1:5000/upload", - "type": "FILE" + "path": "/tesp-api/tests/test_data/env_test_2", + "url": "http://172.17.0.1:5000/upload", + "type": "FILE" } - -], - "executors":[ - { - "image":"ubuntu", - "command":[ - "echo", "$TEST_FILE", ">", "/tesp-api/tests/test_data/env_test_1", "&&", "echo", "$TEST_FILE_2", ">", "/tesp-api/tests/test_data/env_test_2" - ], - "env": { - "TEST_FILE": "first upload successful", - "TEST_FILE_2": "second upload successful" - } - } - ] + ], + "executors": [ + { + "image": "ubuntu", + "command": [ + "sh", + "-c", + "echo \"$TEST_FILE\" > /tesp-api/tests/test_data/env_test_1 && echo \"$TEST_FILE_2\" > /tesp-api/tests/test_data/env_test_2" + ], + "env": { + "TEST_FILE": "first upload successful", + "TEST_FILE_2": "second upload successful" + } + } + ] } - diff --git a/tests/test_jsons/multi_true.json b/tests/test_jsons/multi_true.json index 6b4f445..3a07a8f 100644 --- a/tests/test_jsons/multi_true.json +++ b/tests/test_jsons/multi_true.json @@ -1,22 +1,16 @@ { - "executors":[ - { - "image":"ubuntu", - "command":[ - "sleep", "5", "&&", "true" - ] - }, - { - "image":"ubuntu", - "command":[ - "sleep", "5", "&&", "true" - ] - }, - { - "image":"ubuntu", - "command":[ - "sleep", "5", "&&", "true" - ] - } - ] + "executors": [ + { + "image": "ubuntu", + "command": ["sh", "-c", "sleep 5 && true"] + }, + { + "image": "ubuntu", + "command": ["sh", "-c", "sleep 5 && true"] + }, + { + "image": "ubuntu", + "command": ["sh", "-c", "sleep 5 && true"] + } + ] } diff --git a/tests/test_jsons/volumes.json b/tests/test_jsons/volumes.json index a1d34a2..2e0b45c 100644 --- a/tests/test_jsons/volumes.json +++ b/tests/test_jsons/volumes.json @@ -1,26 +1,29 @@ { - "inputs": [ - { - "path": "/data/file_http", - "url": "http://172.17.0.1:5000/download/test.txt", - "type": "FILE" - } - ], - "volumes": [ - "/vol/" - ], - "executors":[ - { - "image":"ubuntu", - "command":[ - "cp", "/data/file_http", "/vol/", "&&", "cat", "/vol/file_http" - ] - }, - { - "image":"ubuntu", - "command":[ - "cat", "/vol/file_http" - ] - } - ] + "inputs": [ + { + "path": "/data/file_http", + "url": "http://172.17.0.1:5000/download/test.txt", + "type": "FILE" + } + ], + "volumes": [ + "/vol/" + ], + "executors": [ + { + "image": "ubuntu", + "command": [ + "sh", + "-c", + "cp /data/file_http /vol/ && cat /vol/file_http" + ] + }, + { + "image": "ubuntu", + "command": [ + "cat", + "/vol/file_http" + ] + } + ] } From ca2888fa071aeabc66b333d11d75dcbbcf0ab727 Mon Sep 17 00:00:00 2001 From: Debian Date: Wed, 2 Jul 2025 10:09:30 +0000 Subject: [PATCH 07/10] Staging command removed for unsepcified tasks --- tests/test_data/env_test_1 | 1 - tests/test_data/env_test_2 | 1 - 2 files changed, 2 deletions(-) delete mode 100644 tests/test_data/env_test_1 delete mode 100644 tests/test_data/env_test_2 diff --git a/tests/test_data/env_test_1 b/tests/test_data/env_test_1 deleted file mode 100644 index b2c9cad..0000000 --- a/tests/test_data/env_test_1 +++ /dev/null @@ -1 +0,0 @@ -first upload successful diff --git a/tests/test_data/env_test_2 b/tests/test_data/env_test_2 deleted file mode 100644 index b89622e..0000000 --- a/tests/test_data/env_test_2 +++ /dev/null @@ -1 +0,0 @@ -second upload successful From 3106ecc526c4bd705528dd8262ed06fa2e76b8d0 Mon Sep 17 00:00:00 2001 From: Debian Date: Thu, 3 Jul 2025 07:58:23 +0000 Subject: [PATCH 08/10] Singularity and Docker merged into Container --- tesp_api/service/event_actions.py | 109 ++++----- tesp_api/utils/container.py | 368 +++++++++++++++++++++++++++++ tesp_api/utils/docker.py | 370 ------------------------------ tesp_api/utils/singularity.py | 190 --------------- tests/smoke_tests.py | 2 +- tests/test_data/env_test_1 | 1 + tests/test_data/env_test_2 | 1 + 7 files changed, 426 insertions(+), 615 deletions(-) create mode 100644 tesp_api/utils/container.py delete mode 100644 tesp_api/utils/docker.py delete mode 100644 tesp_api/utils/singularity.py create mode 100644 tests/test_data/env_test_1 create mode 100644 tests/test_data/env_test_2 diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index 7e63f8f..cb7e933 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -7,17 +7,18 @@ from bson.objectid import ObjectId from pymonad.promise import Promise -from tesp_api.utils.docker import ( - docker_run_command, - docker_stage_in_command, - docker_stage_out_command, - map_volumes -) -from tesp_api.utils.singularity import ( - singularity_run_command, - singularity_stage_in_command, - singularity_stage_out_command -) +#from tesp_api.utils.docker import ( +# docker_run_command, +# docker_stage_in_command, +# docker_stage_out_command, +# map_volumes +#) +#from tesp_api.utils.singularity import ( +# singularity_run_command, +# singularity_stage_in_command, +# singularity_stage_out_command +#) +from tesp_api.utils.container import stage_in_command, run_command, stage_out_command, map_volumes from tesp_api.service.pulsar_service import pulsar_service from tesp_api.service.event_dispatcher import dispatch_event from tesp_api.utils.functional import get_else_throw, maybe_of @@ -36,7 +37,7 @@ ) from tesp_api.repository.task_repository_utils import append_task_executor_logs, update_last_task_log_time -CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "both") +CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "docker") @local_handler.register(event_name="queued_task") @@ -170,58 +171,58 @@ async def handle_run_task(event: Event) -> None: command=[], workdir=Path("/downloads")) # stage-in - stage_in_command = "" - - print(stage_exec) - if input_confs: # Only generate stage-in command if inputs exist - print("Preparing stage-in command") + stage_in_cmd = "" + if input_confs: stage_in_mount = payload['task_config']['inputs_directory'] + stage_in_cmd = stage_in_command( + stage_exec, + resource_conf, + stage_in_mount, + input_confs, + CONTAINER_TYPE + ) - if CONTAINER_TYPE == "docker": - stage_in_command = docker_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) - elif CONTAINER_TYPE == "singularity": - stage_exec.image = "docker://" + stage_exec.image - stage_in_command = singularity_stage_in_command(stage_exec, resource_conf, stage_in_mount, input_confs) - - # main executors - print("Building commands") - print(stage_exec) + # main execution + container_cmds = [] for i, executor in enumerate(task.executors): - if CONTAINER_TYPE == "docker": - run_command = docker_run_command(executor, task_id, resource_conf, volume_confs, - input_confs, output_confs, stage_in_mount if input_confs else "", i) - elif CONTAINER_TYPE == "singularity": - mount_job_dir = payload['task_config']['job_directory'] - run_command, script_content = singularity_run_command(executor, task_id, resource_conf, volume_confs, - input_confs, output_confs, - stage_in_mount if input_confs else "", - mount_job_dir, i) - - container_cmds.append(run_command) - - print(stage_exec) + run_cmd = run_command( + executor=executor, + job_id=task_id, + resource_conf=resource_conf, + volume_confs=volume_confs, + input_confs=input_confs, + output_confs=output_confs, + inputs_directory=stage_in_mount if input_confs else "", + container_type=CONTAINER_TYPE, + job_directory=payload['task_config']['job_directory'] if CONTAINER_TYPE == "singularity" else None, + executor_index=i + ) + container_cmds.append(run_cmd) + # stage-out - stage_out_command = "" - if output_confs: # Only generate stage-out command if outputs exist - print("Preparing stage-out command") - if CONTAINER_TYPE == "docker": - stage_out_command = docker_stage_out_command(stage_exec, resource_conf, output_confs, volume_confs) - elif CONTAINER_TYPE == "singularity": - mount_job_dir = payload['task_config']['job_directory'] - bind_mount = payload['task_config']['inputs_directory'] - stage_out_command = singularity_stage_out_command(stage_exec, resource_conf, bind_mount, - output_confs, volume_confs, mount_job_dir) - + stage_out_cmd = "" + if output_confs: + stage_out_cmd = stage_out_command( + stage_exec, + resource_conf, + output_confs, + volume_confs, + container_type=CONTAINER_TYPE, + bind_mount=payload['task_config']['inputs_directory'] if CONTAINER_TYPE == "singularity" else None, + job_directory=payload['task_config']['job_directory'] if CONTAINER_TYPE == "singularity" else None + ) + + # Combine commands run_commands = " && ".join(container_cmds) - parts = ["set -xe", stage_in_command, run_commands, stage_out_command] + parts = ["set -xe", stage_in_cmd, run_commands, stage_out_cmd] non_empty_parts = [p.strip() for p in parts if p and p.strip()] - run_command = " && ".join(non_empty_parts) - print(run_command) + full_command = " && ".join(non_empty_parts) + print(full_command) command_start_time = datetime.datetime.now(datetime.timezone.utc) # start the task (docker container/s) in the pulsar - await pulsar_operations.run_job(task_id, run_command) + await pulsar_operations.run_job(task_id, full_command) # wait for the task command_status = await pulsar_operations.job_status_complete(str(task_id)) diff --git a/tesp_api/utils/container.py b/tesp_api/utils/container.py new file mode 100644 index 0000000..9daea2d --- /dev/null +++ b/tesp_api/utils/container.py @@ -0,0 +1,368 @@ +import os +import re +import shlex +from urllib.parse import urlparse +from typing import Dict, List, Tuple, Optional, Union + +from pymonad.maybe import Nothing, Maybe, Just + +from tesp_api.repository.model.task import TesTaskExecutor, TesTaskOutput, TesTaskIOType +from tesp_api.utils.functional import get_else_throw, maybe_of + +SHELL_PATTERN = re.compile(r"[|&;<>(){}$*?\"'\\]") + +class ContainerCommandBuilder: + def __init__(self, container_type: str) -> None: + self.container_type = container_type + self._job_id: str = "" + self._resource_cpu: Maybe[str] = Nothing + self._resource_mem: Maybe[str] = Nothing + self._image: Maybe[str] = Nothing + self._workdir: Maybe[str] = Nothing + self._envs: Dict[str, str] = {} + self._volumes: Dict[str, str] = {} + self._bind_mounts: Dict[str, str] = {} + self._command: Maybe[str] = Nothing + self._stdin: Maybe[str] = Nothing + self._stdout: Maybe[str] = Nothing + self._stderr: Maybe[str] = Nothing + + def with_job_id(self, job_id: str): + self._job_id = job_id + return self + + def with_resource(self, resources: dict): + if not resources: + return self + self._resource_cpu = maybe_of(resources.get("cpu_cores")) + self._resource_mem = maybe_of(resources.get("ram_gb")) + return self + + def with_bind_mount(self, container_path: str, host_path: str): + self._bind_mounts[container_path] = host_path + return self + + def with_volume(self, container_path: str, volume_name: str): + self._volumes[container_path] = volume_name + return self + + def with_image(self, image: str): + if self.container_type == "singularity" and not image.startswith("docker://"): + self._image = Just(f"docker://{image}") + else: + self._image = Just(image) + return self + + def with_workdir(self, workdir: str): + self._workdir = maybe_of(workdir) + return self + + def with_env(self, name: str, value: str): + self._envs[name] = value + return self + + def with_command(self, command: List[str], stdin: Maybe[str] = Nothing, + stdout: Maybe[str] = Nothing, stderr: Maybe[str] = Nothing): + self._stdin = stdin + self._stdout = stdout + self._stderr = stderr + + if not command: + self._command = Nothing + return self + + # Check if already wrapped in shell + if self._is_shell_wrapped(command): + self._command = Just(command) + return self + + # Wrap in shell if needed + if self.requires_shell(command) or self.container_type == "singularity": + shell_cmd = " ".join(shlex.quote(str(arg)) for arg in command) + shell_cmd = self._escape_redirections( + shell_cmd, + self._stdin, + self._stdout, + self._stderr + ) + self._command = Just(["sh", "-c", shell_cmd]) + else: + cmd = list(map(str, command)) + cmd = self._add_redirections(cmd) + self._command = Just(cmd) + + return self + + def _is_shell_wrapped(self, command) -> bool: + if isinstance(command, list): + return len(command) >= 2 and command[0] == "sh" and command[1] == "-c" + return False + + def requires_shell(self, command: List[str]) -> bool: + return any( + isinstance(arg, str) and SHELL_PATTERN.search(arg) + for arg in command + ) + + def _escape_redirections(self, cmd: str, stdin, stdout, stderr) -> str: + redirections = [ + ('<', stdin), + ('>', stdout), + ('2>', stderr), + ] + for op, val in redirections: + if val and val != Nothing: + path = val.maybe("", lambda x: x) + if path: + cmd += f" {op} {shlex.quote(path)}" + return cmd + + def _add_redirections(self, cmd: List[str]) -> List[str]: + redirections = [] + if self._stdin != Nothing: + path = self._stdin.maybe("", lambda x: x) + if path: + redirections.extend(["<", path]) + if self._stdout != Nothing: + path = self._stdout.maybe("", lambda x: x) + if path: + redirections.extend([">", path]) + if self._stderr != Nothing: + path = self._stderr.maybe("", lambda x: x) + if path: + redirections.extend(["2>", path]) + return cmd + redirections + + def reset(self) -> None: + self._resource_cpu = Nothing + self._resource_mem = Nothing + self._image = Nothing + self._workdir = Nothing + self._envs = {} + self._volumes = {} + self._bind_mounts = {} + self._command = Nothing + self._stdin = Nothing + self._stdout = Nothing + self._stderr = Nothing + return self + + def get_run_command(self) -> str: + # Common resource flags + cpu_flag = self._resource_cpu.maybe("", lambda cpu: + f"--cpus={cpu}" if self.container_type == "docker" else f"--cpu={cpu}") + + mem_flag = self._resource_mem.maybe("", lambda mem: + f"--memory={mem}g" if self.container_type == "docker" else f"--memory={mem}G") + + # Environment variables + env_flags = [] + for k, v in self._envs.items(): + if self.container_type == "docker": + env_flags.append(f'-e {shlex.quote(k)}={shlex.quote(v)}') + else: + env_flags.append(f'--env {k}="{v}"') + + # Work directory + workdir_flag = self._workdir.maybe("", lambda w: + f'-w "{w}"' if self.container_type == "docker" else f'--pwd "{w}"') + + # Image + image = self._image.maybe("", lambda i: i) + + # Mounts + mount_flags = [] + if self.container_type == "docker": + for container_path, host_path in self._bind_mounts.items(): + mount_flags.append(f'-v "{host_path}":"{container_path}"') + for container_path, volume_name in self._volumes.items(): + mount_flags.append(f'-v "{volume_name}":"{container_path}"') + else: # singularity + for container_path, host_path in self._bind_mounts.items(): + mount_flags.append(f'-B "{host_path}":"{container_path}"') + for container_path, volume_name in self._volumes.items(): + mount_flags.append(f'-B "{volume_name}":"{container_path}"') + + # Command + command_str = self._command.maybe("", lambda cmd: + " ".join(shlex.quote(arg) for arg in cmd) if isinstance(cmd, list) else cmd) + + # Build final command + if self.container_type == "docker": + return ( + f"docker run {cpu_flag} {mem_flag} {workdir_flag} " + f"{' '.join(env_flags)} {' '.join(mount_flags)} " + f"{image} {command_str}" + ).strip() + else: # singularity + return ( + f"singularity exec {cpu_flag} {mem_flag} {workdir_flag} " + f"{' '.join(env_flags)} {' '.join(mount_flags)} " + f"{image} {command_str}" + ).strip() + +# Unified command functions +def stage_in_command( + executor: TesTaskExecutor, + resource_conf: dict, + bind_mount: str, + input_confs: List[dict], + container_type: str +) -> str: + builder = ContainerCommandBuilder(container_type) \ + .with_image(executor.image) \ + .with_workdir(executor.workdir) \ + .with_resource(resource_conf) + + commands = [] + for input_conf in input_confs: + url = input_conf.get('url') + if not url: + continue + + path = input_conf['pulsar_path'] + input_type = input_conf.get('type', TesTaskIOType.FILE) + filename = os.path.basename(path) + + if input_type == TesTaskIOType.DIRECTORY: + # Recursive download + commands.append( + f"wget --mirror --no-parent --no-host-directories " + f"--directory-prefix={shlex.quote(filename)} {shlex.quote(url)}" + ) + else: + # Single file download + commands.append(f"curl -o {shlex.quote(filename)} {shlex.quote(url)}") + + if commands: + builder.with_command(["sh", "-c", " && ".join(commands)]) + + builder.with_bind_mount(executor.workdir, bind_mount) + + if executor.env: + for env_name, env_value in executor.env.items(): + builder.with_env(env_name, env_value) + + return builder.get_run_command() + +def run_command( + executor: TesTaskExecutor, + job_id: str, + resource_conf: dict, + volume_confs: List[dict], + input_confs: List[dict], + output_confs: List[dict], + inputs_directory: str, + container_type: str, + job_directory: Optional[str] = None, + executor_index: int = 0 +) -> str: + builder = ContainerCommandBuilder(container_type) \ + .with_job_id(job_id) \ + .with_image(executor.image) \ + .with_command( + list(map(str, executor.command)), + maybe_of(executor.stdin).map(str), + maybe_of(executor.stdout).map(str), + maybe_of(executor.stderr).map(str) + ) \ + .with_workdir(executor.workdir) \ + .with_resource(resource_conf) + + if executor.env: + for env_name, env_value in executor.env.items(): + builder.with_env(env_name, env_value) + + # Handle volumes and bind mounts + for volume_conf in volume_confs: + builder.with_volume(volume_conf['container_path'], volume_conf['volume_name']) + + for input_conf in input_confs: + builder.with_bind_mount(input_conf['container_path'], input_conf['pulsar_path']) + + return builder.get_run_command() + +def stage_out_command( + executor: TesTaskExecutor, + resource_conf: dict, + output_confs: List[dict], + volume_confs: List[dict], + container_type: str, + bind_mount: Optional[str] = None, + job_directory: Optional[str] = None +) -> str: + builder = ContainerCommandBuilder(container_type) \ + .with_image(executor.image) \ + .with_workdir(executor.workdir) \ + .with_resource(resource_conf) + + commands = [] + for output in output_confs: + path = output['container_path'] + url = output['url'] + output_type = output.get('type', TesTaskIOType.FILE) + + if output_type == TesTaskIOType.DIRECTORY: + # Recursive upload + cmd = ( + f"find {shlex.quote(path)} -type f -exec " + f"curl -X POST -F 'file=@{{}}' {shlex.quote(url)} \\;" + ) + else: + # Single file upload + cmd = f"curl -X POST -F 'file=@{shlex.quote(path)}' {shlex.quote(url)}" + + commands.append(cmd) + + if commands: + builder.with_command(["sh", "-c", " && ".join(commands)]) + + # Mount required directories + if container_type == "singularity" and bind_mount: + builder.with_bind_mount(executor.workdir, bind_mount) + + for volume_conf in volume_confs: + if container_type == "docker": + builder.with_volume(volume_conf['container_path'], volume_conf['volume_name']) + elif container_type == "singularity" and job_directory: + builder.with_bind_mount(volume_conf['container_path'], job_directory) + + if executor.env: + for env_name, env_value in executor.env.items(): + builder.with_env(env_name, env_value) + + return builder.get_run_command() + +# Volume mapping function remains the same +def map_volumes(job_id: str, volumes: List[str], outputs: List[TesTaskOutput]): + output_confs: List[dict] = [] + volume_confs: List[dict] = [] + existing_volume_paths = [] + + # Process outputs + for output in outputs: + output_dirname = os.path.dirname(output.path) + volume_name = f"vol-{job_id}-{output_dirname.replace('/', '')}" + + if output_dirname not in existing_volume_paths: + volume_confs.append({ + 'volume_name': volume_name, + 'container_path': output_dirname + }) + existing_volume_paths.append(output_dirname) + + output_confs.append({ + 'container_path': output.path, + 'url': output.url, + 'volume_name': volume_name, + 'type': output.type + }) + + for v in volumes: + if str(v) not in existing_volume_paths: + volume_confs.append({ + 'volume_name': f"vol-{job_id}-{str(v).replace('/', '')}", + 'container_path': str(v) + }) + + return output_confs, volume_confs diff --git a/tesp_api/utils/docker.py b/tesp_api/utils/docker.py deleted file mode 100644 index b524681..0000000 --- a/tesp_api/utils/docker.py +++ /dev/null @@ -1,370 +0,0 @@ -import os -import re -import shlex -from urllib.parse import urlparse -from typing import Dict, List, Tuple - -from pymonad.maybe import Nothing, Maybe, Just - -from tesp_api.repository.model.task import TesTaskExecutor, TesTaskOutput, TesTaskIOType -from tesp_api.utils.functional import get_else_throw, maybe_of - -SHELL_PATTERN = re.compile( - r"[|&;<>(){}$*?\"'\\`]" # shell metacharacters -) - -class DockerRunCommandBuilder: - - def __init__(self) -> None: - self._job_id: str = "" - self._resource_cpu: Maybe[str] = Nothing - self._resource_mem: Maybe[str] = Nothing - self._docker_image: Maybe[str] = Nothing - self._workdir: Maybe[str] = Nothing - self._envs: Dict[str, str] = {} - self._volumes: Dict[str, str] = {} - self._bind_mounts: Dict[str, str] = {} - self._command: Maybe[str] = Nothing - - def with_job_id(self, job_id: str): - self._job_id = job_id - return self - - def with_resource(self, resources: dict): - if not resources: return self - self._resource_cpu = maybe_of(resources["cpu_cores"]) - self._resource_mem = maybe_of(resources["ram_gb"]) - return self - - def with_bind_mount(self, container_path: str, host_path: str): - self._bind_mounts[container_path] = host_path - return self - - def with_volume(self, container_path: str, volume_name: str): - self._volumes[container_path] = volume_name - return self - - def with_image(self, image: str): - self._docker_image = Just(image) - return self - - def with_workdir(self, workdir: str): - self._workdir = maybe_of(workdir) - return self - - def with_env(self, name: str, value: str): - self._envs[name] = value - return self - - def _is_shell_wrapped(self, command) -> bool: - if isinstance(command, list): - if len(command) >= 2 and all(isinstance(part, str) for part in command[:2]): - return command[0] == "sh" and command[1] == "-c" - elif isinstance(command, str): - return command.startswith("sh -c ") - return False - - def requires_shell(self, command: List[str]) -> bool: - return any( - isinstance(arg, str) and SHELL_PATTERN.search(arg) - for arg in command - ) - - def escape_redirections(self, cmd: str, stdin=None, stdout=None, stderr=None) -> str: - redirections = [ - ('<', stdin), - ('>', stdout), - ('2>', stderr), - ] - for op, val in redirections: - if isinstance(val, Maybe): - val = val.maybe(None, lambda x: None if str(x) == "Nothing" else x) - elif str(val) == "Nothing": - val = None - if val: - cmd += f" {op} {shlex.quote(val)}" - return cmd - - def with_command(self, command, stdin=Nothing, stdout=Nothing, stderr=Nothing): - if not command: - self._command = Nothing - return self - - if self._is_shell_wrapped(command): - # If command is a string, split into list form before assigning - if isinstance(command, str): - parts = command.split(" ", 2) - if len(parts) < 3: - raise ValueError(f"Malformed shell command string: {command}") - self._command = Just(parts) - else: - self._command = Just(command) - return self - - # If command needs shell (but not already wrapped), wrap it - if self.requires_shell(command): - shell_cmd = " ".join(map(shlex.quote, map(str, command))) - shell_cmd = self.escape_redirections(shell_cmd, stdin, stdout, stderr) - self._command = Just(["sh", "-c", shell_cmd]) - else: - # Direct command with sanitized redirections - def maybe_path(val): - if isinstance(val, Maybe): - return val.maybe(None, lambda x: None if str(x) == "Nothing" else x) - return None if str(val) == "Nothing" else val - - stdin_val = maybe_path(stdin) - stdout_val = maybe_path(stdout) - stderr_val = maybe_path(stderr) - - cmd = list(map(str, command)) - redirs = [("<", stdin_val), (">", stdout_val), ("2>", stderr_val)] - cmd += [part for op, val in redirs if val for part in (op, val)] - self._command = Just(cmd) - - return self - - def reset(self) -> None: - self._resource_cpu = Nothing - self._resource_mem = Nothing - self._docker_image = Nothing - self._workdir = Nothing - self._volumes = {} - self._bind_mounts = {} - return self - - def get_run_command(self) -> str: - print("Constructing docker run command...") - - print("Before maybe(_resource_cpu)") - cpu_flag = self._resource_cpu.maybe("", lambda cpu: f"--cpus={cpu}") - print("After maybe(_resource_cpu)") - - print("Before maybe(_resource_mem)") - mem_flag = self._resource_mem.maybe("", lambda mem: f"--memory={mem}") - print("After maybe(_resource_mem)") - - print("Before maybe(_workdir)") - workdir_str = self._workdir.maybe("", lambda w: f'-w="{w}"') - print("After maybe(_workdir)") - - print("Before maybe(_docker_image)") - image = self._docker_image.maybe("", lambda i: i) - print("After maybe(_docker_image)") - - bind_mounts_str = " ".join( - f'-v "{host}":"{container}"' for container, host in self._bind_mounts.items() - ) - volumes_str = " ".join( - f'-v "{host}":"{container}"' for container, host in self._volumes.items() - ) - env_str = " ".join( - f'-e {shlex.quote(k)}={shlex.quote(v)}' for k, v in self._envs.items() - ) - - def quote_command(cmd): - print(f"quote_command called with: {cmd} (type: {type(cmd)})") - if isinstance(cmd, str): - return cmd - if isinstance(cmd, list): - return " ".join(shlex.quote(arg) for arg in cmd) - return "" - - print("Before maybe(_command)") - command_str = self._command.maybe("", quote_command) - print("After maybe(_command)") - - full_command = f"docker run {cpu_flag} {mem_flag} {workdir_str} {env_str} {volumes_str} {bind_mounts_str} {image} {command_str}".strip() - - self.reset() - return full_command - - def get_run_command_script(self, inputs_directory: str, i: int) -> Tuple[str, str]: - resources_str = (f'{self._resource_cpu.maybe("", lambda cpu: " --cpus="+str(cpu))}' - f'{self._resource_mem.maybe("", lambda mem: " --memory="+str(mem)+"g")}') - bind_mounts_str = " ".join(map(lambda v_paths: f'-v \"{v_paths[1]}\":\"{v_paths[0]}\"', self._bind_mounts.items())) - volumes_str = " ".join(map(lambda v_paths: f'-v \"{v_paths[1]}\":\"{v_paths[0]}\"', self._volumes.items())) - docker_image = get_else_throw(self._docker_image, ValueError('Docker image is not set')) - workdir_str = self._workdir.maybe("", lambda workdir: f"-w=\"{str(workdir)}\"") - volumes_str += f' -v "{inputs_directory}/run_script_{i}.sh":"/tmp/{self._job_id}/run_script_{i}.sh"' - env_str = " ".join(map(lambda env: f'-e {env[0]}=\"{env[1]}\"', self._envs.items())) - command_str = maybe_of(self._command).maybe("", lambda x: " ".join(shlex.quote(arg) for arg in x)) - - chmod_commands = f"chmod +x /tmp/{self._job_id}/run_script_{i}.sh" - if self._bind_mounts: - chmod_commands += ' && ' + ' && '.join(f"chmod +x {key}" for key in self._bind_mounts) - if self._volumes: - chmod_commands += ' && ' + ' && '.join(f"chmod +x {key}" for key in self._volumes) - - # Define the content of the script - script_content = f'''\ - #!/bin/bash - {command_str} - ''' - - run_command = (f'docker run {resources_str} {workdir_str} {env_str} ' - f'{volumes_str} {bind_mounts_str} {docker_image} ' - f'sh -c "{chmod_commands} && /tmp/{self._job_id}/run_script_{i}.sh"') - - self.reset() - return run_command, script_content - -def docker_run_command(executor: TesTaskExecutor, job_id: str, resource_conf: dict, volume_confs: List[dict], - input_confs: List[dict], output_confs: List[dict], inputs_directory: str, i: int) -> Tuple[str, str]: - command_builder = DockerRunCommandBuilder()\ - .with_job_id(job_id) \ - .with_image(executor.image) \ - .with_command( - list(map(lambda x: str(x), executor.command)), - maybe_of(executor.stdin).map(lambda x: str(x)), - maybe_of(executor.stdout).map(lambda x: str(x)), - maybe_of(executor.stderr).map(lambda x: str(x))) \ - .with_workdir(executor.workdir) \ - .with_resource(resource_conf) - - if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] - - [command_builder.with_volume(volume_conf['container_path'], volume_conf['volume_name']) - for volume_conf in volume_confs] - [command_builder.with_bind_mount(input_conf['container_path'], input_conf['pulsar_path']) - for input_conf in input_confs] - - return command_builder.get_run_command() # _script(inputs_directory, i) - -def docker_stage_in_command( - executor: TesTaskExecutor, - resource_conf: dict, - bind_mount: str, - input_confs: List[dict] -) -> str: - command_builder = ( - DockerRunCommandBuilder() - .with_image(executor.image) - .with_workdir(executor.workdir) - .with_resource(resource_conf) - ) - - stage_in_commands = [] - - for input_conf in input_confs: - url = input_conf.get('url') - input_type = input_conf.get('type', TesTaskIOType.FILE) - pulsar_path = os.path.basename(input_conf['pulsar_path']) - - if not url: - continue - - scheme = urlparse(url).scheme - - if input_type == TesTaskIOType.DIRECTORY: - if scheme in ('http', 'https', 'ftp'): - url_quoted = shlex.quote(url) - cmd = ( - f"wget --mirror --no-parent --no-host-directories " - f"--directory-prefix={pulsar_path} '{url_quoted}'" - ) - else: - raise ValueError(f"Unsupported scheme for directory input: {scheme}") - else: - cmd = f"curl -o {pulsar_path} '{url}'" - - stage_in_commands.append(cmd) - - if stage_in_commands: - full_command = " && ".join(stage_in_commands) - command_builder._command = Just(f'sh -c "{full_command}"') - - command_builder.with_bind_mount(executor.workdir, bind_mount) - - if executor.env: - for env_name, env_value in executor.env.items(): - command_builder.with_env(env_name, env_value) - - return command_builder.get_run_command() - -def docker_stage_out_command( - executor: TesTaskExecutor, - resource_conf: dict, - output_confs: List[dict], - volume_confs: List[dict] -) -> str: - command_builder = ( - DockerRunCommandBuilder() - .with_image(executor.image) - .with_workdir(executor.workdir) - .with_resource(resource_conf) - ) - stage_out_commands = [] - - for output in output_confs: - path = output['container_path'] - url = output['url'] - output_type = output.get('type', TesTaskIOType.FILE) - print(f"Staging out: {path} -> {url} (type: {output_type})") - - if output_type == TesTaskIOType.DIRECTORY: - # bash find + curl for recursive upload inside container - cmd = ( - f"base={shlex.quote(path)}; " - f"url={shlex.quote(url)}; " - f"find \"$base\" -type f | while read -r filepath; do " - f"relpath=\"${{filepath#$base/}}\"; " - f"curl -X POST -F \"file=@${{filepath}};filename=${{relpath}}\" \"$url\"; " - f"done" - ) - stage_out_commands.append(cmd) - else: - safe_path = shlex.quote(path) - safe_url = shlex.quote(url) - cmd = f"curl -X POST -H 'Content-Type: multipart/form-data' -F 'file=@{safe_path}' {safe_url}" - stage_out_commands.append(cmd) - - if stage_out_commands: - full_command = " && ".join(stage_out_commands) - command_builder._command = Just(f"sh -c {shlex.quote(full_command)}") - - for volume_conf in volume_confs: - command_builder.with_volume(volume_conf['container_path'], volume_conf['volume_name']) - - if executor.env: - for env_name, env_value in executor.env.items(): - command_builder.with_env(env_name, env_value) - - return command_builder.get_run_command() - -def map_volumes(job_id: str, volumes: List[str], outputs: List[TesTaskOutput]): - output_confs: List[dict] = [] - volume_confs: List[dict] = [] - - existing_volume_paths = [] - - # Process outputs - for output in outputs: - output_dirname = os.path.dirname(output.path) - volume_name = f"vol-{job_id}-{output_dirname.replace('/', '')}" - - if output_dirname not in existing_volume_paths: - volume_confs.append({ - 'volume_name': volume_name, - 'container_path': output_dirname - }) - existing_volume_paths.append(output_dirname) - - output_confs.append({ - 'container_path': output.path, - 'url': output.url, - 'volume_name': volume_name, - 'type': output.type - }) - - for v in volumes: - if str(v) not in existing_volume_paths: - volume_confs.append({ - 'volume_name': f"vol-{job_id}-{str(v).replace('/', '')}", - 'container_path': str(v) - }) - - return output_confs, volume_confs - diff --git a/tesp_api/utils/singularity.py b/tesp_api/utils/singularity.py deleted file mode 100644 index 4fc2967..0000000 --- a/tesp_api/utils/singularity.py +++ /dev/null @@ -1,190 +0,0 @@ -import os -from typing import Dict, List, Tuple - -from pymonad.maybe import Nothing, Maybe, Just - -from tesp_api.repository.model.task import TesTaskExecutor, TesTaskOutput -from tesp_api.utils.functional import get_else_throw, maybe_of - - -class SingularityCommandBuilder: - - def __init__(self) -> None: - self._job_id: str = "" - self._resource_cpu: Maybe[str] = Nothing - self._resource_mem: Maybe[str] = Nothing - self._singularity_image: Maybe[str] = Nothing - self._workdir: Maybe[str] = Nothing - self._envs: Dict[str, str] = {} - self._volumes: Dict[str, str] = {} - self._bind_mounts: Dict[str, str] = {} - self._command: Maybe[str] = Nothing - - def with_job_id(self, job_id: str): - self._job_id = job_id - return self - - def with_resource(self, resources: dict): - if not resources: return self - self._resource_cpu = maybe_of(resources["cpu_cores"]) - self._resource_mem = maybe_of(resources["ram_gb"]) - return self - - def with_bind_mount(self, container_path: str, host_path: str): - self._bind_mounts[container_path] = host_path - return self - - def with_volume(self, container_path: str, volume_name: str): - self._volumes[container_path] = volume_name - return self - - def with_image(self, image: str): - self._singularity_image = Just(image) - return self - - def with_workdir(self, workdir: str): - self._workdir = maybe_of(workdir) - return self - - def with_env(self, name: str, value: str): - self._envs[name] = value - return self - - def with_command(self, command: List[str], stdin: Maybe[str] = Nothing, - stdout: Maybe[str] = Nothing, stderr: Maybe[str] = Nothing): - command_str = " ".join(command) - self._command = Just(command_str) if command_str else Nothing - - # sh -c '' # there probably must be ' instead of " because of the passing unresolved envs into the container - self._command = self._command.map(lambda _command: - f'/bin/bash -c \'{_command}' # f'"{_command}' - f'{stdin.maybe("", lambda x: " <" + x)}' - f'{stdout.maybe("", lambda x: " 1>" + x)}' - f'{stderr.maybe("", lambda x: " 2>" + x)}\'') - return self - - def reset(self) -> None: - self._resource_cpu = Nothing - self._resource_mem = Nothing - self._singularity_image = Nothing - self._workdir = Nothing - self._volumes = {} - self._bind_mounts = {} - return self - - def get_singularity_run_command(self) -> str: - resources_str = (f'{self._resource_cpu.maybe("", lambda cpu: " --cpus="+str(cpu))}' - f'{self._resource_mem.maybe("", lambda mem: " --memory="+str(mem)+"g")}') - first_key, first_value = next(iter(self._bind_mounts.items())) - bind_mounts_str = f'-B "{first_value}":"{first_key}"' - volumes_str = " ".join(map(lambda v_paths: f'-B \"{v_paths[1]}\":\"{v_paths[0]}\"', self._volumes.items())) - singularity_image = get_else_throw(self._singularity_image, ValueError('Singularity image is not set')) - workdir_str = self._workdir.maybe("", lambda workdir: f"--pwd \"{str(workdir)}\"") - env_str = " ".join(map(lambda env: f'--env {env[0]}=\"{env[1]}\"', self._envs.items())) - command_str = self._command.maybe("", lambda x: x) - - run_command = f"""singularity exec {resources_str} {workdir_str} {env_str} {volumes_str} {bind_mounts_str} {singularity_image} {command_str}""" - # run_command = f'singularity exec {resources_str} {workdir_str} {env_str} {volumes_str} {bind_mounts_str} {singularity_image} {command_str}' - self.reset() - return run_command - - def get_singularity_run_command_script(self, inputs_directory: str, i: int) -> Tuple[str, str]: - resources_str = (f'{self._resource_cpu.maybe("", lambda cpu: " --cpus="+str(cpu))}' - f'{self._resource_mem.maybe("", lambda mem: " --memory="+str(mem)+"g")}') - bind_mounts_str = " ".join(map(lambda v_paths: f'-B \"{v_paths[1]}\":\"{v_paths[0]}\"', self._bind_mounts.items())) - volumes_str = " ".join(map(lambda v_paths: f'-B \"{v_paths[1]}\":\"{v_paths[0]}\"', self._volumes.items())) - singularity_image = get_else_throw(self._singularity_image, ValueError('Singularity image is not set')) - workdir_str = self._workdir.maybe("", lambda workdir: f"--pwd \"{str(workdir)}\"") - volumes_str += f' -B "{inputs_directory}/run_script_{i}.sh":"/tmp/{self._job_id}/run_script_{i}.sh"' - env_str = " ".join(map(lambda env: f'--env {env[0]}=\"{env[1]}\"', self._envs.items())) - command_str = self._command.maybe("", lambda x: x) - - # Define the content of the script - script_content = f'''\ - #!/bin/bash - {command_str} - ''' - - run_command = (f'singularity exec {resources_str} {workdir_str} {env_str} ' - f'{volumes_str} {bind_mounts_str} {singularity_image} ' - f'/bin/bash /tmp/{self._job_id}/run_script_{i}.sh') - self.reset() - return run_command, script_content - -def singularity_run_command(executor: TesTaskExecutor, job_id: str, resource_conf: dict, - volume_confs: List[dict], input_confs: List[dict], - output_confs: List[dict], inputs_directory: str, job_directory: str, i: int) -> Tuple[str, str]: - command_builder = SingularityCommandBuilder() \ - .with_job_id(job_id) \ - .with_image(executor.image) \ - .with_command( - list(map(lambda x: str(x), executor.command)), - maybe_of(executor.stdin).map(lambda x: str(x)), - maybe_of(executor.stdout).map(lambda x: str(x)), - maybe_of(executor.stderr).map(lambda x: str(x))) \ - .with_workdir(executor.workdir) \ - .with_resource(resource_conf) - - if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] - - # This is made only for Galaxy and wil likely not work with different structure of a job - command_builder.with_volume(volume_confs[0]['container_path'], job_directory) - [command_builder.with_bind_mount(input_conf['container_path'], input_conf['pulsar_path']) - for input_conf in input_confs] - - return command_builder.get_singularity_run_command_script(inputs_directory, i) - -def singularity_stage_in_command(executor: TesTaskExecutor, resource_conf: dict, bind_mount: str, - input_confs: List[dict]) -> str: - command_builder = SingularityCommandBuilder() \ - .with_image(executor.image) \ - .with_workdir(executor.workdir) \ - .with_resource(resource_conf) - - command = "" - - for input_conf in input_confs: - if input_conf['url']: - filename = os.path.basename(input_conf['pulsar_path']) - command += f"""curl -o {filename} '{input_conf['url']}' && """ - # command += "curl -o " + os.path.basename(input_conf['pulsar_path']) + " '" + input_conf['url'] + "' && " - command = command[:-3] - - command_builder._command = Just('sh -c "' + command + '"') - - command_builder.with_bind_mount(executor.workdir, bind_mount) - if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] - - return command_builder.get_singularity_run_command() - -def singularity_stage_out_command(executor: TesTaskExecutor, resource_conf: dict, bind_mount: str, - output_confs: List[dict], volume_confs: List[dict], job_directory: str) -> str: - command_builder = SingularityCommandBuilder() \ - .with_image(executor.image) \ - .with_workdir(executor.workdir) \ - .with_resource(resource_conf) - - command = "" - - for output in output_confs: - command += f"""curl -X POST -H 'Content-Type: multipart/form-data' -F 'file=@{output['container_path']}' '{output['url']}' && """ - - # command += "curl -X POST -H 'Content-Type: multipart/form-data' -F 'file=@" \ - # + output['container_path'] + "' '" + output['url'] + "' && " - command = command[:-3] - - command_builder._command = Just('sh -c "' + command + '"') - - # This is made only for Galaxy and wil likely not work with different structure of a job - command_builder.with_bind_mount(volume_confs[0]['container_path'], job_directory) - #command_builder.with_volume(volume_confs[0]['container_path'], job_directory) - - if executor.env: - [command_builder.with_env(env_name, env_value) - for env_name, env_value in executor.env.items()] - - return command_builder.get_singularity_run_command() diff --git a/tests/smoke_tests.py b/tests/smoke_tests.py index f60c5f9..4177273 100644 --- a/tests/smoke_tests.py +++ b/tests/smoke_tests.py @@ -163,7 +163,7 @@ def test_inputs(): # Downloads and copies file to the shared volume and displays its content with two separate executors. def test_volumes(): - assert _test_simple("volumes.json", 60) + assert _test_simple("volumes.json", 100) # Verifies that environment variables from envs.json are correctly echoed to output files with expected content. def test_envs(): diff --git a/tests/test_data/env_test_1 b/tests/test_data/env_test_1 new file mode 100644 index 0000000..b2c9cad --- /dev/null +++ b/tests/test_data/env_test_1 @@ -0,0 +1 @@ +first upload successful diff --git a/tests/test_data/env_test_2 b/tests/test_data/env_test_2 new file mode 100644 index 0000000..b89622e --- /dev/null +++ b/tests/test_data/env_test_2 @@ -0,0 +1 @@ +second upload successful From 8b47b6d6128af90da96ce0afd3fa6498051add14 Mon Sep 17 00:00:00 2001 From: Debian Date: Thu, 3 Jul 2025 07:58:48 +0000 Subject: [PATCH 09/10] Singularity and Docker merged into Container --- tests/test_data/env_test_1 | 1 - tests/test_data/env_test_2 | 1 - 2 files changed, 2 deletions(-) delete mode 100644 tests/test_data/env_test_1 delete mode 100644 tests/test_data/env_test_2 diff --git a/tests/test_data/env_test_1 b/tests/test_data/env_test_1 deleted file mode 100644 index b2c9cad..0000000 --- a/tests/test_data/env_test_1 +++ /dev/null @@ -1 +0,0 @@ -first upload successful diff --git a/tests/test_data/env_test_2 b/tests/test_data/env_test_2 deleted file mode 100644 index b89622e..0000000 --- a/tests/test_data/env_test_2 +++ /dev/null @@ -1 +0,0 @@ -second upload successful From 47c40d4251961deba502b02608173d802929db00 Mon Sep 17 00:00:00 2001 From: Debian Date: Mon, 7 Jul 2025 09:36:22 +0000 Subject: [PATCH 10/10] Minor changes --- tesp_api/service/event_actions.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/tesp_api/service/event_actions.py b/tesp_api/service/event_actions.py index cb7e933..7c235c5 100644 --- a/tesp_api/service/event_actions.py +++ b/tesp_api/service/event_actions.py @@ -2,22 +2,9 @@ import datetime from typing import List from pathlib import Path - from pymonad.maybe import Just from bson.objectid import ObjectId from pymonad.promise import Promise - -#from tesp_api.utils.docker import ( -# docker_run_command, -# docker_stage_in_command, -# docker_stage_out_command, -# map_volumes -#) -#from tesp_api.utils.singularity import ( -# singularity_run_command, -# singularity_stage_in_command, -# singularity_stage_out_command -#) from tesp_api.utils.container import stage_in_command, run_command, stage_out_command, map_volumes from tesp_api.service.pulsar_service import pulsar_service from tesp_api.service.event_dispatcher import dispatch_event @@ -136,7 +123,7 @@ async def setup_data(job_id: ObjectId, 'input_confs': res_input_output_confs[2], 'output_confs': res_input_output_confs[3] })).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\ - .then(lambda x: x) # invokes promise returned by error handler, otherwise acts as identity function + .then(lambda x: x) @local_handler.register(event_name="run_task")