From 3c24953e471c5531e1ce4e6d65d72d02f8115e88 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Fri, 12 Sep 2025 13:17:47 -0700 Subject: [PATCH 1/4] python(feat): Add ParquetUploadService --- .../parquet/flat_dataset/.env-example | 3 + .../data_import/parquet/flat_dataset/main.py | 44 ++ .../parquet/flat_dataset/sample_data.parquet | Bin 0 -> 24829 bytes python/lib/sift_py/data_import/_config.py | 67 +++ .../lib/sift_py/data_import/_parquet_test.py | 408 ++++++++++++++++++ python/lib/sift_py/data_import/_utils.py | 79 ++++ python/lib/sift_py/data_import/config.py | 18 +- python/lib/sift_py/data_import/parquet.py | 229 ++++++++++ .../data_import/parquet_complex_types.py | 20 + python/lib/sift_py/data_import/status.py | 3 +- 10 files changed, 869 insertions(+), 2 deletions(-) create mode 100644 python/examples/data_import/parquet/flat_dataset/.env-example create mode 100644 python/examples/data_import/parquet/flat_dataset/main.py create mode 100644 python/examples/data_import/parquet/flat_dataset/sample_data.parquet create mode 100644 python/lib/sift_py/data_import/_parquet_test.py create mode 100644 python/lib/sift_py/data_import/_utils.py create mode 100644 python/lib/sift_py/data_import/parquet.py create mode 100644 python/lib/sift_py/data_import/parquet_complex_types.py diff --git a/python/examples/data_import/parquet/flat_dataset/.env-example b/python/examples/data_import/parquet/flat_dataset/.env-example new file mode 100644 index 000000000..cdef5f890 --- /dev/null +++ b/python/examples/data_import/parquet/flat_dataset/.env-example @@ -0,0 +1,3 @@ +SIFT_API_URI="" +SIFT_API_KEY="" +ASSET_NAME="" \ No newline at end of file diff --git a/python/examples/data_import/parquet/flat_dataset/main.py b/python/examples/data_import/parquet/flat_dataset/main.py new file mode 100644 index 000000000..7c8bcdf9b --- /dev/null +++ b/python/examples/data_import/parquet/flat_dataset/main.py @@ -0,0 +1,44 @@ +import os + +from dotenv import load_dotenv +from sift_py.data_import.parquet import ParquetUploadService +from sift_py.data_import.status import DataImportService +from sift_py.data_import.time_format import TimeFormatType +from sift_py.rest import SiftRestConfig + +if __name__ == "__main__": + """ + Example usage for uploading a Parquet (flat dataset). + """ + load_dotenv() + + sift_uri = os.getenv("SIFT_API_URI") + assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set" + + apikey = os.getenv("SIFT_API_KEY") + assert apikey, "expected 'SIFT_API_KEY' environment variable to be set" + + asset_name = os.getenv("ASSET_NAME") + assert asset_name, "expected 'ASSET_NAME' environment variable to be set" + + rest_config: SiftRestConfig = { + "uri": sift_uri, + "apikey": apikey, + } + + parquet_upload_service = ParquetUploadService(rest_config) + + import_service: DataImportService = parquet_upload_service.flat_dataset_upload( + asset_name=asset_name, + run_name="Example Parquet Upload", + path="sample_data.parquet", + time_path="timestamp", + time_format=TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS, + ) + + data_import = import_service.get_data_import() + print(data_import.model_dump_json(indent=1)) + + print("Waiting for upload to complete...") + import_service.wait_until_complete() + print("Upload example complete!") diff --git a/python/examples/data_import/parquet/flat_dataset/sample_data.parquet b/python/examples/data_import/parquet/flat_dataset/sample_data.parquet new file mode 100644 index 0000000000000000000000000000000000000000..cb2b874022e5e302c0740b0e97bfe7026e852e49 GIT binary patch literal 24829 zcmeI54_u9R|Nqa)DIH>=ld@P;$BCvoo$5rhPLhx$Nwt>#t0XN+tXWH1%o-M>nS~_Q z+|A6InY)n|Gt993nT5nQ+-Bxxx7)tY&nb*{?jfw~}Pg-lUxr?}SLUr{epNV-cT4bVj@k#;eb7(fS*fgu<{M=%CCbb`)c0$rdh zJOZZB4Z1@QFargcLr<^(OXvlUf)(_JKJXY=Ltp3zHed^OpoIRQ0(%$$1Hl0vhe6;7 zPT&k`Z~+atf*ZJl7Cc}u3;|Dg0*1nq-~~GHhGF0X!@(CufFF#6Q7{_(VGIPoSO|o1 z5Cr2P7$!gnOoUJfgK&5XBH(F=gh>zu(GUZ%5C`#)0FxmRk{}tTKnhHSRG0>7@C>BG zbjW}ikO?y(3ueJ=cowqZIhX^_Lk_$ExiA;@Ydi%)^*1%dQg>_H{>!BPrz(#lrD&TF{1b=}_cn3Da zyRZe`gRSsBRKW+Z4L*eJ@Dc2Qzd|*93_D>L)WB}2g-@Uk_P}2F6!yVqupbVc=68+lXTBNiCZ|Y@I`51|Tv4vMj&I=ukyQ@O0 zOJ11U(X^l6a_77a+x59hff~5mPHzyw3x$dRc9HVv? zSPm;&xWXmzPM*T`d8k3GRHvP%cp$N>R|=l?Za!z z7q1<$sNcw9pAAdajb5%!I$pEk)%9abyyy2CzOiV-`0}6);k!33eQRQ6%%0-mZ!LTK zsjBpEj_-c!wZBZN&XqsvTe19|*t#Y5Pt{hecz5!_H6vg5eY^O*DfOF^PSn2r`uo!w zcFuov#HN)WOuukw!&9GZdgH^HSI+EteZ*f@ee~>&tKXdXE_>dx>-3yvBbRf>Slj$RYy)knb9dT;wL#-c9uAU z8eBjFuHXjlptbBQ^DtJV%AB2gn@F5N4KAPoS8xM&(DpXzokg^5xKOSwXeiefTq)NU z+$h%;+$q-gStp0&O+2eT!d(ZxC(I-;x0ss49XQn|EQ=% zjXxwVqQ)N*ji~X5#8uSzL*gcC{2_4{HU5xjMU6is9->AYWssR66VWdJ6Jy}1_Rxw8 zfaU|Q+QZ`k7i(@@m^H^eJS;l49HOan*L7L@iSpliU3UGx+kggqKbL_K7(*v8fv#W* z-N6jZ!2)`L74!jX=m)l-1QiSb2N(oSpau=NfffdXCkzEI@CG071wR-C{ty6x5Cp*x z0-+EN5fBMc5Cd_L0Ev(cDUb?jkPaD;30W{3#Px}|HWAk)xsV6>Pyh>HAr!(QSOP_` z439}dDHI1ERj z9*)CFI0X%G1{$FWF2J{N5iY|OxC+*g9Y>gE9e8(&<|`u2`U%>4loFuKn)sj11$^&PZ$bb;0-?D3w|&P{2>4WAqavY z1VSMkA|Mi?AO_+f0TLk@QXm!5ARRIw6S817WWyZDfn3Ogd?j_Q9Z(HBp$2N94)(%6*bfKc5FCc1P!Gr9 zB%Fc}<{r>M6Y`{r^S4fL}z^TQ; z_)hTN`+)a*Y3Hu_ z?sr(>FHyJiXBYzw7c}Bdk~m=f=H-l$h;lJws6VW=pIZJ?e`0E|Q{?sVSDb9lXt>gz z(cr~5Ivse!Fz|ul;0q(b4@SZ$7!Cd~1_EF#1j0B7g7FXx6Cea8LMVhmI6MUr@H9li zB#44&h=Ev$gLp`Q$&d(1kPK5G1*SqOOoKFd2GU_VWWWr_gqe^9vtTwn3)#^AC#Lp{ z2CicMi!vJSBs6(Cv}80q{!3=VwQh@?+T!(_L9pulG>tJjhm6Ict8JVg?(u09fR|{=UgCti*_JdzZ)C;+W zCgx6wA`QZ*YaeZv*(A(xtTJsTqH(xG9MILm2=u4--HZ_x{~O*`CY8s z(k_TpiLQNyX5EmS=}=%w&V+H_n7o2+w!Vv0B6*^_GGMv(g)_1RW@J(Lt&W+uK;aNl znJCgJ%m+nQWl7H(F0>%8qM&bV{z6Ml!ohNpVA0DhwZ7_wvqpthWLng%j(xGPw`b;+ z3nJyB&(JwHZb%zDF0v-~Lf$W~U{PP6d{cXogwfBp(843P(Rhh1*%%|&#LZt~=fBJ; zNu*^c16FBg8Jv?BsmRZm-!Fbak$rGk;0BSXF(719=m)vyIxTY`V`I;n_?MPF9V5*MBhgeAiW;WPa>fJNcDWPh?-e z@{LFV89L|ojhh`VJW}FCE=VWa#6=~#Jfm&{L{f-%epk!Ec^6FA_>dhksx)!&n&Au0 zos&hHh;LyZkJ++sx|RBoFS5WkX-VnGBIUS^B5`EYvO!_n^1kU_=1&I6-qNI3%f_s9 zd*)k_N)oVY=&YY)-}WdEB&Vd4U2;+RxKiK614MF3P+7o=!SlW~+ZY^An#rhj$xAm* zs0eu{S>&69Y>NC~w&8b*ick_y7T8T$RuQ%(;q#3m10{TG>alI}zBAtxK`P4LbyHs3 z^z`=3@4po}DUmzo-2BP#V$aGbl2bY>QD)hcWz{o{wTlOetK9Ua{VRsNc-d-O25B#& z*QdU*ZN|l;?@ST-FPWE5eei6@zxCdpMIy{g{im(kKI>Z3=WmIOnAz7aAN%mdzxCOX zO=`@i>!-cBWZn5o%B@QC{W{GlavFH4& zwtI_6!|~sczHaYQhl!h0?&RZGD{TEq%MrPsrk*km2~*-{Ik#9f_fqy5D7+jLmGYJY z4|nNhlu%m;b*6Zk)NG76xNN*GmB=hN_Lf?S<4UF6Kze(EL_I;I{M-msT(yCBDYOz|ZN4Kt49^(Fw zvy)Mmm$@@wXv&=dz^buotP9o!tHElpu2@&B8`cf$j&;Xsv0AK$!ql*Y+VID$OzkGk z`OS(hH0VNu56fz3PeXeT%evB@EA2fj>qdKSwAV%}b8{PaiFJjiIcoLw^#tH#;UO{SQo4YtHHWrU9oOhH>^9>9jnD^ zu^yJ)v#o|^U^mUW>$7utJRRzrIl+Iv{mmG)d|?_pUt+H<45hh^Pq&z<%j zmetapmi8W&^`Jcu+ItB08~%cH6aG7`OsyVn_20Y#|E&Z6|J!oQ)E?*V+Y`7&>Cuwx z)^dw-Nf*U(lY76*8HsOT+@h2^NhFdzM!$TOlUNRVQgZF@2F+jQI&^woBD1cR$jqXu zrN*`;)dnV-hUyNcKGq-0%z~pnHnfN<`Pj%Rqv7L@)_K-DjctpfcFI+yB|AGgY-!lp z*{Q~Qmx<w3@!3SY;fJ{-k$S`|`FIJ2jJNnXFbs?wsE zz4qm$YxWMP*m7p?z{;9_pE_(g9P{bpRgG&t9kl)0nNJ<7Wj6bqYRqEyIoH{)-KXBC zIlIs0ppVUGn!~}dpSjk@t^LgHWX9Rg+#B+2_G=r9V)uJoC|$dM@Wm}>_Yb*JV{^ds z+Tqv(Puyr+dtm6TYiAEUDKWG?=p|Fc9n=}yl^*mqacw*}%yhWz=RRf=;yxd45nuYb zuhopk&qr9#vpwWzyEN|5NY%R1L!%ttYdkdCX}9ec{+c6kUyRY7EBzwC^T)<7#(Ejr z9S-zS#2+5#XSeQfkiYA>!{Y;o+Z_oGo)CXzLTLQDBOwtp&K;Q;HP7y7Xx!5HqhX2b z)*TH`dGFlOr_y%YeHoE)B>u~%v(ByiGBW$eb6-x%F;vz^SGqVHr2;2 z8m>GRS2Q8vSp4$%vSSG=XEYs~Try92Jh60X!ttc?b!EqsE8c55KBaQE@p1rcW|JT{qj!gdgIm;X8)_*(zv`cWFZeR?Xcy6g?;lAh-XL~ORUqMmY01gK-JGr;>s<47WH?1A2mOCm2gAypnmkU(J&cTM#(yK z=#bL{jE`E`Q14J$D6W;Ai_nC!0|cCOooxyHbix-`O=d1y*lhG)A!5<0lPj+^o!R3Zx8e>YzQHLRo1g;->>~ZQgw~M;kDs27t2FyBXlhR}joQsh zC+ZoDiNUq+{2S=9++zfydMlg>45$jkbuDNiy5rI(1PTf|s0jqwh-tLiF&k|P-HKNJ z@IB$#g^#~u zpeomo(5nY-5Z<1waihU}LotnkC2ygzkQrqR3X2mupaH!Xpq zC;V}hWEP;w-IzvWO~ucIZ~G4MpuxJ+nYfl@Z$=|}ifL4CwjWHrEB^7gww*6Q^KwrT z2C;L5gR8KLDW}6*+=;@SyBChiDYti)FuL*eeesnnXjH*I#9SmB6 z7GL^|KwN>`m%tz!F^xAq%0YLAZbujX@OQ!?g)Sp#(As_)u5Wj3M7KmX5twtcuOIdN zik`;R-Ejqaa?fD`WpSn>38eiz8rR_B7g3q(E_DBa+k}@V4<1E>F@|Cq9ZKFouMfUN zp!$evH1!sw#o-F>ET*wf*$Dy{yn6Z**nA=gS9WG0>f23BqiKcV7y`$Ab+}4TXQQdv zRj9V7m_~!mjsf(v&wmQ8W#`M#sN6FI2K4STmU`FT55u)1XenB7=^%li0+T=jRyJZ9 zn?9P0R)+o+o&Cd4gnbI##?fF``*d9OyWU3EM1Dgc<)&>A_1uc0a7i3jqI>szNuVgM z$9MvfKL_Aa6)!-qYHA5=KOhYzke~bnu0TUEjlD{?ppApC5Z-iT-~@WgN}G(!yR(=^ zld@9;j(YV9po+Dt-Mr-e$se^a>E*S5pHxcfsI_rlug|29 z$oAQEQJE;cG%k2mU7Yy)n2O|4tz?i~CChQ@V!>T3C)vB|E`HoD(#o1NxVe#{l3BVV zE?UIjL#{GO80;mLu9oEt!Lh=P2^BZ>-{J{*zwA`69BjzG$UxcJFSWYvDW^B;*E6Ou3r`9Vm|XvD-HSs_F@ z262uTt0u8^BT zyv8H>LiP#i5sZWhDG|~n#9;z5UC0MQBq4~ekcC3N5Mnt|enl2Fk#}W6E(&o8MP>=9 z7Ge~J_zQVe$T1<-;Yfmz3L!rT@puZ!5mGC}Bmx;PWQC9hAP+(z# z{UY*$RXjI9MNTd=GMSL4kf_PT=L#u9cmRQlL|&E5G?AE#kPsoW5-G3}A$wTG(*u~W zB;58w{Dq_?QQ%dClw%c-AyAR-%M2zH8YUzmnfQDm6$m-TDxM*rl9^8-<{>0PNX`@r ztVMnq_7vPsLdFZpOrgLEz-U6j+4ZJM5{1yoAK263-X1A$4jFOQ9e1 zX++$FgbB%>Mz{nKgDz{DCSNX-rEw%MPaBydQ>C$Gc%*TJ7@fwp6G0QHAeveXYheqB z&88mMba)s7o6j@yELk5RPC|4-#v|mcsK_@_kp-^e=@lxTJfYIB+kPyY6;E7X(@JN7 z%+jf%6rvR}5+Nx@MN*53v~m?`<0>9+q0+B+4OVP&7eH)wou^a7O2{A~UP8tpWX-6^ zdQp*wt|HG|#q%#zJi9`r-yBb2#pp;g=)Fe9!&q$gYKRqLQYFf&|?pC5_o_;4GU=lNJTFRYXB#KVBx+J7bi*8-i z<`EF1l$1L8PQg1(i^1v2wj7;4>rO~wUN5t>vOKAjSSe(dt`Z|tOEJ_61N`PC4(6vU zm8*IvjsM8Q5G@@I<0OC`@mWF7A6MfIzhY?;BkJHF+hY_j}+?b%H{ zlCh<0ljaqCaM$YQt`F(V4_PU1j8xXRSdp&$fQO3Qw|?f^RhCAUcX!k>lG+EZqf6`d z=G?lr4VKDHvC3tEtnK|jD;M`s?)OUWTMowAmo@$M>i*tRw5)is)_kj_a@ljr>q%n8 zfA#aj`#LjU_!ujFPk(pT`ksA9zrWnsPgviAuIAUZ=ErT7hn7i|6?3Jm@%=wL5o>(E z_W^6Htd~{F^-EFE_1;O=-n~EmvJZ-VkEN)ToA2*nUMp4BmP?iS!aP@s^mnl>w`@?q(O2_+-Q3&NNb}2T<;eq5<-QI2-tNksy1%okfi0bh@onwx z&i?t!(V=zwS!Sy&_(`hVc2bPz{eSBc<9WaL0poeko+`1lGm==|d(3I>ah23OMtLyD zz|-R1Kjv6kvCoTL|5y8brFVRNU=8+*e$2FmJH2F;uMJr|PqPl2%{jyhiJEGj-mR zb-p^?WZkgIdY`wqPB%up8>Z8Z6qe6-ET5(bj}z5&V}(0FglUf!)x6P-7EOCk7jMT3 z(Rbyea}0EJ2 z+_FBp0DYrOGOy6VF}h$;KP!1)OaH$5O3nQ>*B6!a zRlU_=6SJJd_3INX%8%1|j}zMyuh`~sV)B2fG(kTe&0Xu?>Pw1kOgADUC4StnX^xqW zj&Y)Y{rJz+FW)GADSdhUrsSvpEDCr}j}8rRx_jM4$fbGx?=1hY2QR;9&pRz{L`a6H zqZ>85rMy@JUIDG^AE57VnCP!%Rm5BUmfY-YULFlj@7au@bC}D$+tzsY3CFD9miZX| z;Q1J?@3nb8#Qcm1PYa)zHDwHa#f$n*t@T^x`l9%_k@`-WmskH*RMa(Zj{(kMQ(O8Jp%JayPxE+;{=NRgsN{WD-)ssk_nV`ZF`)GaipkOM5&zJ$>3t>&*D{=}FU4t<}zhoCY~rp5!{A QE&n7dO(c>){^NuH1MHS})c^nh literal 0 HcmV?d00001 diff --git a/python/lib/sift_py/data_import/_config.py b/python/lib/sift_py/data_import/_config.py index bd19c5fe5..a410146b4 100644 --- a/python/lib/sift_py/data_import/_config.py +++ b/python/lib/sift_py/data_import/_config.py @@ -7,6 +7,7 @@ from typing_extensions import Self from sift_py._internal.channel import channel_fqn +from sift_py.data_import.parquet_complex_types import ParquetComplexTypesImportModeType from sift_py.data_import.time_format import TimeFormatType from sift_py.error import _component_deprecation_warning from sift_py.ingestion.channel import ChannelBitFieldElement, ChannelDataType, ChannelEnumType @@ -239,3 +240,69 @@ class Hdf5DataCfg(ConfigDataModel): time_column: int = 1 value_dataset: str value_column: int = 1 + + +class ParquetTimeColumn(ConfigTimeModel): + """ + Defines a time column entry in the Parquet config. + """ + + path: str + + +class ParquetDataColumn(ConfigBaseModel): + """ + Defines a data column entry in the Parquet config. + """ + + path: str + channel_config: ConfigDataModel + + +class ParquetFlatDatasetConfig(ConfigBaseModel): + """ + Defines the flat dataset config for Parquet files. + """ + + time_column: ParquetTimeColumn + data_columns: List[ParquetDataColumn] + + +class ParquetConfigImpl(ConfigBaseModel): + """ + Defines the Parquet config spec. + """ + + asset_name: str + run_name: str = "" + run_id: str = "" + flat_dataset: Optional[ParquetFlatDatasetConfig] = None + footer_offset: int + footer_length: int + complex_types_import_mode: Union[str, ParquetComplexTypesImportModeType] + + @model_validator(mode="after") + def validate_config(self) -> Self: + if self.run_name and self.run_id: + raise PydanticCustomError( + "invalid_config_error", "Only specify run_name or run_id, not both." + ) + return self + + @field_validator("complex_types_import_mode", mode="before") + @classmethod + def convert_complex_types_import_mode(cls, raw: Optional[str]) -> Optional[str]: + """ + Converts the provided complex_types_import_mode value to a string. + """ + if raw is None: + return None + if isinstance(raw, ParquetComplexTypesImportModeType): + return raw.as_human_str() + elif isinstance(raw, str): + value = ParquetComplexTypesImportModeType.from_str(raw) + if value is not None: + return value.as_human_str() + raise PydanticCustomError( + "invalid_config_error", f"Invalid complex_types_import_mode: {raw}." + ) diff --git a/python/lib/sift_py/data_import/_parquet_test.py b/python/lib/sift_py/data_import/_parquet_test.py new file mode 100644 index 000000000..4786c5e21 --- /dev/null +++ b/python/lib/sift_py/data_import/_parquet_test.py @@ -0,0 +1,408 @@ +import json + +import pytest +from pytest_mock import MockFixture + +from sift_py.data_import.config import ParquetConfig +from sift_py.data_import.parquet import ParquetUploadService, _extract_parquet_footer +from sift_py.data_import.status import DataImportService +from sift_py.rest import SiftRestConfig + + +class MockResponse: + def __init__(self, status_code=200, text="", json_data=None): + self.status_code = status_code + self.text = text + self._json_data = json_data + + def json(self): + if self._json_data is not None: + return self._json_data + raise Exception("Invalid response") + + +rest_config: SiftRestConfig = { + "uri": "http://some_uri.com", + "apikey": "123123123", +} + + +@pytest.fixture +def parquet_config(): + return ParquetConfig( + { + "asset_name": "test_asset", + "flat_dataset": { + "time_column": {"path": "time", "format": "TIME_FORMAT_ABSOLUTE_UNIX_NANOSECONDS"}, + "data_columns": [ + { + "path": "col1", + "channel_config": {"name": "col1", "data_type": "CHANNEL_DATA_TYPE_INT_64"}, + }, + { + "path": "col2", + "channel_config": { + "name": "col2", + "data_type": "CHANNEL_DATA_TYPE_FLOAT", + "units": "m", + "description": "second column", + }, + }, + { + "path": "col3", + "channel_config": { + "name": "col3", + "data_type": "CHANNEL_DATA_TYPE_STRING", + "description": "third column", + }, + }, + ], + }, + "complex_types_import_mode": "PARQUET_COMPLEX_TYPES_IMPORT_MODE_BOTH", + "footer_offset": 0, + "footer_length": 0, + } + ) + + +def test_upload_invalid_extension(mocker: MockFixture, parquet_config): + svc = ParquetUploadService(rest_config) + with pytest.raises(Exception, match="Must use an uncompressed parquet file"): + svc.upload("file.txt", parquet_config) + + +def test_upload_config_request_failed(mocker: MockFixture, parquet_config): + mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_requests_post.return_value = MockResponse(status_code=400, text="Invalid request") + svc = ParquetUploadService(rest_config) + with pytest.raises(Exception, match="Config file upload request failed"): + svc.upload("file.parquet", parquet_config) + + +def test_upload_invalid_config_response(mocker: MockFixture, parquet_config): + mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_requests_post.return_value = MockResponse(status_code=200, text="not json") + mock_requests_post.return_value.json = lambda: (_ for _ in ()).throw( + Exception("Invalid response") + ) + svc = ParquetUploadService(rest_config) + with pytest.raises(Exception, match="Invalid response"): + svc.upload("file.parquet", parquet_config) + + +def test_upload_missing_keys(mocker: MockFixture, parquet_config): + mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_requests_post.return_value = MockResponse(status_code=200, json_data={}) + svc = ParquetUploadService(rest_config) + with pytest.raises(Exception, match="Response missing required keys"): + svc.upload("file.parquet", parquet_config) + + +def test_upload_data_file_failed(mocker: MockFixture, parquet_config): + mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_requests_post.side_effect = [ + MockResponse( + status_code=200, json_data={"uploadUrl": "http://upload.com", "dataImportId": "id123"} + ), + MockResponse(status_code=400, text="Upload failed"), + ] + mocker.patch("sift_py.data_import.parquet.ProgressFile", mocker.mock_open()) + svc = ParquetUploadService(rest_config) + with pytest.raises(Exception): + svc.upload("file.parquet", parquet_config) + + +def test_upload_success(mocker: MockFixture, parquet_config): + mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_requests_post.side_effect = [ + MockResponse( + status_code=200, json_data={"uploadUrl": "http://upload.com", "dataImportId": "id123"} + ), + MockResponse(status_code=200, text=""), + ] + mocker.patch("sift_py.data_import.parquet.ProgressFile", mocker.mock_open()) + svc = ParquetUploadService(rest_config) + result = svc.upload("file.parquet", parquet_config) + assert isinstance(result, DataImportService) + + +def test_upload_from_url_invalid_scheme(mocker: MockFixture, parquet_config): + svc = ParquetUploadService(rest_config) + with pytest.raises(Exception, match="Invalid URL scheme"): + svc.upload_from_url("ftp://file.parquet", parquet_config) + + +def test_upload_from_url_failed(mocker: MockFixture, parquet_config): + mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_requests_post.return_value = MockResponse(status_code=400, text="Invalid request") + svc = ParquetUploadService(rest_config) + with pytest.raises(Exception, match="URL upload request failed"): + svc.upload_from_url("http://file.parquet", parquet_config) + + +def test_upload_from_url_invalid_response(mocker: MockFixture, parquet_config): + mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_requests_post.return_value = MockResponse(status_code=200, text="not json") + mock_requests_post.return_value.json = lambda: (_ for _ in ()).throw( + Exception("Invalid response") + ) + svc = ParquetUploadService(rest_config) + with pytest.raises(Exception, match="Invalid response"): + svc.upload_from_url("http://file.parquet", parquet_config) + + +def test_upload_from_url_missing_keys(mocker: MockFixture, parquet_config): + mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_requests_post.return_value = MockResponse(status_code=200, json_data={}) + svc = ParquetUploadService(rest_config) + with pytest.raises(Exception, match="Response missing required keys"): + svc.upload_from_url("http://file.parquet", parquet_config) + + +def test_upload_from_url_success(mocker: MockFixture, parquet_config): + mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_requests_post.return_value = MockResponse( + status_code=200, json_data={"dataImportId": "id123"} + ) + svc = ParquetUploadService(rest_config) + result = svc.upload_from_url("http://file.parquet", parquet_config) + assert isinstance(result, DataImportService) + + +def test_flat_dataset_upload_invalid_extension(mocker: MockFixture, parquet_config): + svc = ParquetUploadService(rest_config) + with pytest.raises(Exception, match="Must use an uncompressed parquet file"): + svc.flat_dataset_upload("asset", "file.txt", "time") + + +def test_flat_dataset_upload_success(mocker: MockFixture, parquet_config): + svc = ParquetUploadService(rest_config) + mock_detect = mocker.patch.object(svc, "_detect_config_flat_dataset") + mock_detect.return_value = parquet_config.to_dict() + mock_post = mocker.patch("sift_py.rest.requests.Session.post") + + # First call: config upload, second call: data upload + mock_post.side_effect = [ + MockResponse( + status_code=200, json_data={"uploadUrl": "http://upload.com", "dataImportId": "id123"} + ), + MockResponse(status_code=200, text=""), + ] + mocker.patch("sift_py.data_import.parquet.ProgressFile", mocker.mock_open()) + result = svc.flat_dataset_upload("asset", "file.parquet", "time") + assert isinstance(result, DataImportService) + + # Check config upload call + config_call = mock_post.call_args_list[0] + assert config_call[1]["url"] == svc._upload_uri + + expected_config = { + "parquet_config": { + "asset_name": "asset", + "run_name": "", + "run_id": "", + "flat_dataset": { + "time_column": { + "format": "TIME_FORMAT_ABSOLUTE_UNIX_NANOSECONDS", + "relative_start_time": None, + "path": "time", + }, + "data_columns": [ + { + "path": "col1", + "channel_config": { + "name": "col1", + "data_type": "CHANNEL_DATA_TYPE_INT_64", + "units": "", + "description": "", + "enum_types": [], + "bit_field_elements": [], + }, + }, + { + "path": "col2", + "channel_config": { + "name": "col2", + "data_type": "CHANNEL_DATA_TYPE_FLOAT", + "units": "m", + "description": "second column", + "enum_types": [], + "bit_field_elements": [], + }, + }, + { + "path": "col3", + "channel_config": { + "name": "col3", + "data_type": "CHANNEL_DATA_TYPE_STRING", + "units": "", + "description": "third column", + "enum_types": [], + "bit_field_elements": [], + }, + }, + ], + }, + "footer_offset": 0, + "footer_length": 0, + "complex_types_import_mode": "PARQUET_COMPLEX_TYPES_IMPORT_MODE_BOTH", + } + } + assert expected_config == json.loads(config_call[1]["data"]) + + +def test_flat_dataset_upload_overrides_success(mocker: MockFixture, parquet_config): + svc = ParquetUploadService(rest_config) + mock_detect = mocker.patch.object(svc, "_detect_config_flat_dataset") + mock_detect.return_value = parquet_config.to_dict() + mock_post = mocker.patch("sift_py.rest.requests.Session.post") + + # First call: config upload, second call: data upload + mock_post.side_effect = [ + MockResponse( + status_code=200, json_data={"uploadUrl": "http://upload.com", "dataImportId": "id123"} + ), + MockResponse(status_code=200, text=""), + ] + mocker.patch("sift_py.data_import.parquet.ProgressFile", mocker.mock_open()) + result = svc.flat_dataset_upload( + "asset", + "file.parquet", + "time", + time_format="TIME_FORMAT_RELATIVE_SECONDS", + complex_types_import_mode="PARQUET_COMPLEX_TYPES_IMPORT_MODE_BYTES", + run_id="run_42", + relative_start_time="2024-01-01T00:00:00Z", + ) + assert isinstance(result, DataImportService) + + # Check config upload call + config_call = mock_post.call_args_list[0] + assert config_call[1]["url"] == svc._upload_uri + + config_call = mock_post.call_args_list[0] + expected_config = { + "parquet_config": { + "asset_name": "asset", + "run_name": "", + "run_id": "run_42", + "flat_dataset": { + "time_column": { + "format": "TIME_FORMAT_RELATIVE_SECONDS", + "relative_start_time": "2024-01-01T00:00:00Z", + "path": "time", + }, + "data_columns": [ + { + "path": "col1", + "channel_config": { + "name": "col1", + "data_type": "CHANNEL_DATA_TYPE_INT_64", + "units": "", + "description": "", + "enum_types": [], + "bit_field_elements": [], + }, + }, + { + "path": "col2", + "channel_config": { + "name": "col2", + "data_type": "CHANNEL_DATA_TYPE_FLOAT", + "units": "m", + "description": "second column", + "enum_types": [], + "bit_field_elements": [], + }, + }, + { + "path": "col3", + "channel_config": { + "name": "col3", + "data_type": "CHANNEL_DATA_TYPE_STRING", + "units": "", + "description": "third column", + "enum_types": [], + "bit_field_elements": [], + }, + }, + ], + }, + "footer_offset": 0, + "footer_length": 0, + "complex_types_import_mode": "PARQUET_COMPLEX_TYPES_IMPORT_MODE_BYTES", + } + } + assert expected_config == json.loads(config_call[1]["data"]) + + +def test_detect_config_flat_dataset_failed(mocker: MockFixture): + mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_requests_post.return_value = MockResponse(status_code=400, text="Failed") + svc = ParquetUploadService(rest_config) + svc._detect_config_uri = "http://detect.com" + mocker.patch("sift_py.data_import.parquet._extract_parquet_footer", return_value=(b"bytes", 8)) + with pytest.raises(Exception): + svc._detect_config_flat_dataset("file.parquet") + + +def test_extract_parquet_footer_invalid_magic(tmp_path): + file_path = tmp_path / "test.parquet" + with open(file_path, "wb") as f: + f.write(b"\x08\x00\x00\x00BADMAGIC") + with pytest.raises(Exception): + _extract_parquet_footer(str(file_path)) + + +def test_detect_config_flat_dataset_success(mocker: MockFixture): + # Mock the _extract_parquet_footer to return known bytes and offset + mocker.patch( + "sift_py.data_import.parquet._extract_parquet_footer", return_value=(b"footerbytes", 123) + ) + # Prepare a mock response for the requests post + mock_response_data = { + "parquet_config": { + "asset_name": "asset", + "flat_dataset": { + "time_column": {"path": "time", "format": "TIME_FORMAT_ABSOLUTE_UNIX_NANOSECONDS"}, + "data_columns": [], + }, + "footer_offset": 0, + "footer_length": 0, + } + } + mock_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_post.return_value = MockResponse(status_code=200, json_data=mock_response_data) + svc = ParquetUploadService(rest_config) + svc._detect_config_uri = "http://detect.com" + result = svc._detect_config_flat_dataset("file.parquet") + + # Should return the dict inside "parquet_config" and add the correct footer_offset + assert isinstance(result, dict) + assert result["asset_name"] == "asset" + assert result["footer_offset"] == 123 + + +def test_detect_config_flat_dataset_invalid_json(mocker: MockFixture): + mocker.patch( + "sift_py.data_import.parquet._extract_parquet_footer", return_value=(b"footerbytes", 123) + ) + mock_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_post.return_value = MockResponse(status_code=200, text="not json") + mock_post.return_value.json = lambda: (_ for _ in ()).throw(Exception("Invalid response")) + svc = ParquetUploadService(rest_config) + svc._detect_config_uri = "http://detect.com" + with pytest.raises(Exception, match="Invalid response"): + svc._detect_config_flat_dataset("file.parquet") + + +def test_detect_config_flat_dataset_missing_parquet_config(mocker: MockFixture): + mocker.patch( + "sift_py.data_import.parquet._extract_parquet_footer", return_value=(b"footerbytes", 123) + ) + mock_post = mocker.patch("sift_py.rest.requests.Session.post") + mock_post.return_value = MockResponse(status_code=200, json_data={}) + svc = ParquetUploadService(rest_config) + svc._detect_config_uri = "http://detect.com" + with pytest.raises(Exception, match="Parquet config missing from detect config response"): + svc._detect_config_flat_dataset("file.parquet") diff --git a/python/lib/sift_py/data_import/_utils.py b/python/lib/sift_py/data_import/_utils.py new file mode 100644 index 000000000..bc82942c9 --- /dev/null +++ b/python/lib/sift_py/data_import/_utils.py @@ -0,0 +1,79 @@ +import mimetypes +import os +import re +from pathlib import Path +from typing import Callable, List, Optional, Tuple, Union + +from alive_progress import alive_bar + + +def mime_and_content_type_from_path(path: Path) -> Tuple[str, Optional[str], Optional[str]]: + file_name = path.name + mime, encoding = mimetypes.guess_type(path) + return file_name, mime, encoding + + +def validate_file_type(path: Union[str, Path], valid_file_types: List[str]) -> Optional[str]: + posix_path = Path(path) if isinstance(path, str) else path + + if not posix_path.is_file(): + raise Exception(f"Provided path, '{path}', does not point to a regular file.") + + _, mimetype, content_encoding = mime_and_content_type_from_path(posix_path) + + if not mimetype: + raise Exception(f"The MIME-type of '{posix_path}' could not be computed.") + + if mimetype not in valid_file_types: + raise Exception( + f"{path} is not a valid file type ({mimetype}). Must be {', '.join(valid_file_types)}." + ) + + return content_encoding + + +def convert_keys_to_snake_case(obj): + """Recursively convert all dict keys from camelCase to snake_case.""" + + def camel_to_snake(name: str) -> str: + """Convert camelCase or PascalCase to snake_case.""" + s1 = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name) + return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s1).lower() + + if isinstance(obj, dict): + return {camel_to_snake(k): convert_keys_to_snake_case(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [convert_keys_to_snake_case(item) for item in obj] + else: + return obj + + +class ProgressFile: + """Displays the status with alive_bar while reading the file.""" + + # alive_bar only supports context managers, so we have to make the + # context manager calls manually. + _bar_context: Callable + + def __init__(self, path: Union[str, Path], disable=False): + self.path = path + + self.file_size = os.path.getsize(self.path) + if self.file_size == 0: + raise Exception(f"{path} is 0 bytes") + + self._file = open(self.path, mode="rb") + self._bar = alive_bar(self.file_size, unit=" bytes", disable=disable, scale="SI") + + def read(self, *args, **kwargs): + chunk = self._file.read(*args, **kwargs) + self._bar_context(len(chunk)) + return chunk + + def __enter__(self): + self._bar_context = self._bar.__enter__() + return self + + def __exit__(self, *args, **kwargs): + self._bar.__exit__(None, None, None) + return diff --git a/python/lib/sift_py/data_import/config.py b/python/lib/sift_py/data_import/config.py index 1ffc55d20..3e5c12a43 100644 --- a/python/lib/sift_py/data_import/config.py +++ b/python/lib/sift_py/data_import/config.py @@ -1,6 +1,6 @@ from typing import Any, Dict -from sift_py.data_import._config import CsvConfigImpl, Hdf5ConfigImpl +from sift_py.data_import._config import CsvConfigImpl, Hdf5ConfigImpl, ParquetConfigImpl class CsvConfig: @@ -33,3 +33,19 @@ def to_json(self) -> str: def to_dict(self) -> Dict[str, Any]: return self._hdf5_config.model_dump() + + +class ParquetConfig: + """ + Defines the Parquet config for data imports. + """ + + def __init__(self, config_info: Dict[str, Any]): + self._config_info = config_info + self._parquet_config = ParquetConfigImpl(**self._config_info) + + def to_json(self) -> str: + return self._parquet_config.model_dump_json() + + def to_dict(self) -> Dict[str, Any]: + return self._parquet_config.model_dump() diff --git a/python/lib/sift_py/data_import/parquet.py b/python/lib/sift_py/data_import/parquet.py new file mode 100644 index 000000000..31af027b2 --- /dev/null +++ b/python/lib/sift_py/data_import/parquet.py @@ -0,0 +1,229 @@ +import base64 +import gzip +import json +import os +import struct +from pathlib import Path +from typing import Optional, Tuple, Union +from urllib.parse import urljoin, urlparse + +from sift_py.data_import._utils import ProgressFile, convert_keys_to_snake_case +from sift_py.data_import.config import ParquetConfig +from sift_py.data_import.parquet_complex_types import ParquetComplexTypesImportModeType +from sift_py.data_import.status import DataImportService +from sift_py.data_import.time_format import TimeFormatType +from sift_py.rest import SiftRestConfig, _RestService + + +class ParquetUploadService(_RestService): + UPLOAD_PATH = "/api/v1/data-imports:upload" + URL_PATH = "/api/v1/data-imports:url" + DETECT_CONFIG_PATH = "/api/v0/data-imports:detect-config" + + _rest_conf: SiftRestConfig + _upload_uri: str + _url_uri: str + _apikey: str + + def __init__(self, rest_conf: SiftRestConfig): + super().__init__(rest_conf=rest_conf) + self._upload_uri = urljoin(self._base_uri, self.UPLOAD_PATH) + self._url_uri = urljoin(self._base_uri, self.URL_PATH) + self._detect_config_uri = urljoin(self._base_uri, self.DETECT_CONFIG_PATH) + + def upload( + self, + path: Union[str, Path], + parquet_config: ParquetConfig, + show_progress: bool = True, + ) -> DataImportService: + """ + Uploads the Parquet file pointed to by `path` using a custom Parquet config. + + Args: + path: The path to the Parquet file. + parquet_config: The Parquet config. + show_progress: Whether to show the status bar or not. + """ + if not path.endswith(".parquet"): + raise Exception("Must use an uncompressed parquet file") + + response = self._session.post( + url=self._upload_uri, + headers={ + "Content-Encoding": "application/octet-stream", + }, + data=json.dumps({"parquet_config": parquet_config.to_dict()}), + ) + + if response.status_code != 200: + raise Exception( + f"Config file upload request failed with status code {response.status_code}. {response.text}" + ) + + try: + upload_info = response.json() + except (json.decoder.JSONDecodeError, KeyError): + raise Exception(f"Invalid response: {response.text}") + + try: + upload_url: str = upload_info["uploadUrl"] + data_import_id: str = upload_info["dataImportId"] + except KeyError as e: + raise Exception(f"Response missing required keys: {e}") + + with ProgressFile(path, disable=not show_progress) as f: + headers = { + "Content-Encoding": "application/octet-stream", + } + + response = self._session.post( + url=upload_url, + headers=headers, + data=f, + ) + + if response.status_code != 200: + raise Exception( + f"Data file upload request failed with status code {response.status_code}. {response.text}" + ) + + return DataImportService(self._rest_conf, data_import_id) + + def upload_from_url( + self, + url: str, + parquet_config: ParquetConfig, + ) -> DataImportService: + """ + Uploads the Parquet file pointed to by `url` using a custom Parquet config. + """ + parsed_url = urlparse(url) + if parsed_url.scheme not in ["s3", "http", "https"]: + raise Exception( + f"Invalid URL scheme: '{parsed_url.scheme}'. Only S3 and HTTP(S) URLs are supported." + ) + + response = self._session.post( + url=self._url_uri, + data=json.dumps( + { + "url": url, + "parquet_config": parquet_config.to_dict(), + } + ), + ) + + if response.status_code != 200: + raise Exception( + f"URL upload request failed with status code {response.status_code}. {response.text}" + ) + + try: + upload_info = response.json() + except (json.decoder.JSONDecodeError, KeyError) as e: + raise Exception(f"Invalid response: {e}") + + try: + data_import_id: str = upload_info["dataImportId"] + except KeyError as e: + raise Exception(f"Response missing required keys: {e}") + + return DataImportService(self._rest_conf, data_import_id) + + def flat_dataset_upload( + self, + asset_name: str, + path: Union[str, Path], + time_path: str, + time_format: TimeFormatType = TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS, + complex_types_import_mode: ParquetComplexTypesImportModeType = ParquetComplexTypesImportModeType.BOTH, + run_name: Optional[str] = None, + run_id: Optional[str] = None, + relative_start_time: Optional[str] = None, + ) -> DataImportService: + """ + Uploads the Parquet file pointed to by `path` to the specified asset. This function will + automatically generate the Parquet Config using the footer. See the options + below for what parameters can be overridden. Use `upload` if you need to specify a custom Parquet config. + + Override `time_path` to specify which column contains timestamp information. Default is 1. + Override `time_format` to specify the time data format. Default is `TimeFormatType.ABSOLUTE_DATETIME`. + Override `complex_types_import_mode` to specify how to import complex types (maps and list). Default is both strings and bytes. + Override `run_name` to specify the name of the run to create for this data. Default is None. + Override `run_id` to specify the id of the run to add this data to. Default is None. + Override `relative_start_time` if a relative time format is used. Default is None. + """ + if not path.endswith(".parquet"): + raise Exception("Must use an uncompressed parquet file") + + config_info = self._detect_config_flat_dataset(path) + + config_info["asset_name"] = asset_name + + config_info["flat_dataset"]["time_column"]["path"] = time_path + config_info["flat_dataset"]["time_column"]["format"] = time_format + if relative_start_time is not None: + config_info["flat_dataset"]["time_column"]["relative_start_time"] = relative_start_time + + config_info["complex_types_import_mode"] = complex_types_import_mode + + if run_name is not None: + config_info["run_name"] = run_name + + if run_id is not None: + config_info["run_id"] = run_id + + parquet_config = ParquetConfig(config_info) + + return self.upload(path, parquet_config) + + def _detect_config_flat_dataset(self, path: Union[str, Path]) -> dict: + """Returns a dictionary representing the flat dataset Parquet config detected + from the file. + """ + footer_bytes, footer_offset = _extract_parquet_footer(path) + encoded_data = base64.b64encode(footer_bytes).decode("utf-8") + request_data = json.dumps( + { + "data": encoded_data, + "type": "DATA_TYPE_KEY_PARQUET_FLATDATASET", + } + ) + compressed_data = gzip.compress(request_data.encode()) + response = self._session.post( + url=self._detect_config_uri, data=compressed_data, headers={"Content-Encoding": "gzip"} + ) + + if response.status_code != 200: + raise Exception( + f"Detect config request failed with status code {response.status_code}. {response.text}" + ) + + try: + config_info = convert_keys_to_snake_case(response.json()) + except (json.decoder.JSONDecodeError, KeyError) as e: + raise Exception(f"Invalid response: {e}") + + if "parquet_config" not in config_info: + raise Exception(f"Parquet config missing from detect config response: {config_info}") + + # Add the footer_offset which includes the 8 byte footer tail. + config_info["parquet_config"]["footer_offset"] = footer_offset + + return config_info["parquet_config"] + + +def _extract_parquet_footer(filename: Union[str, Path]) -> Tuple[bytes, int]: + """Return the Parquet footer bytes and footer offset""" + # Footer length is 8 bytes long at the end of the file. + with open(filename, "rb") as f: + f.seek(-8, 2) + footer_tail_bytes = f.read(8) + footer_len = struct.unpack(" Optional["ParquetComplexTypesImportModeType"]: + try: + return cls(val) + except ValueError: + return None + + def as_human_str(self) -> str: + return self.value diff --git a/python/lib/sift_py/data_import/status.py b/python/lib/sift_py/data_import/status.py index 065f899d2..47a41b94b 100644 --- a/python/lib/sift_py/data_import/status.py +++ b/python/lib/sift_py/data_import/status.py @@ -42,7 +42,8 @@ class DataImport(BaseModel): source_url: str = "" status: Union[str, DataImportStatusType] error_message: str = "" - csv_config: dict + csv_config: Optional[dict] + parquet_config: Optional[dict] @field_validator("status", mode="before") @classmethod From c47b00ec420e4085ae2d9646cdca9d98f85c2819 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Fri, 12 Sep 2025 14:29:09 -0700 Subject: [PATCH 2/4] Appease mypy --- .../lib/sift_py/data_import/_parquet_test.py | 19 +++++++------------ python/lib/sift_py/data_import/_utils.py | 4 ++-- python/lib/sift_py/data_import/parquet.py | 4 ++-- python/lib/sift_py/data_import/status.py | 4 ++-- .../file_attachment/_internal/upload.py | 14 +++----------- 5 files changed, 16 insertions(+), 29 deletions(-) diff --git a/python/lib/sift_py/data_import/_parquet_test.py b/python/lib/sift_py/data_import/_parquet_test.py index 4786c5e21..0968b1c43 100644 --- a/python/lib/sift_py/data_import/_parquet_test.py +++ b/python/lib/sift_py/data_import/_parquet_test.py @@ -5,7 +5,9 @@ from sift_py.data_import.config import ParquetConfig from sift_py.data_import.parquet import ParquetUploadService, _extract_parquet_footer +from sift_py.data_import.parquet_complex_types import ParquetComplexTypesImportModeType from sift_py.data_import.status import DataImportService +from sift_py.data_import.time_format import TimeFormatType from sift_py.rest import SiftRestConfig @@ -81,10 +83,7 @@ def test_upload_config_request_failed(mocker: MockFixture, parquet_config): def test_upload_invalid_config_response(mocker: MockFixture, parquet_config): mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") - mock_requests_post.return_value = MockResponse(status_code=200, text="not json") - mock_requests_post.return_value.json = lambda: (_ for _ in ()).throw( - Exception("Invalid response") - ) + mock_requests_post.return_value = MockResponse(status_code=200, json_data=None) svc = ParquetUploadService(rest_config) with pytest.raises(Exception, match="Invalid response"): svc.upload("file.parquet", parquet_config) @@ -142,10 +141,7 @@ def test_upload_from_url_failed(mocker: MockFixture, parquet_config): def test_upload_from_url_invalid_response(mocker: MockFixture, parquet_config): mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") - mock_requests_post.return_value = MockResponse(status_code=200, text="not json") - mock_requests_post.return_value.json = lambda: (_ for _ in ()).throw( - Exception("Invalid response") - ) + mock_requests_post.return_value = MockResponse(status_code=200, json_data=None) svc = ParquetUploadService(rest_config) with pytest.raises(Exception, match="Invalid response"): svc.upload_from_url("http://file.parquet", parquet_config) @@ -269,8 +265,8 @@ def test_flat_dataset_upload_overrides_success(mocker: MockFixture, parquet_conf "asset", "file.parquet", "time", - time_format="TIME_FORMAT_RELATIVE_SECONDS", - complex_types_import_mode="PARQUET_COMPLEX_TYPES_IMPORT_MODE_BYTES", + time_format=TimeFormatType.RELATIVE_SECONDS, + complex_types_import_mode=ParquetComplexTypesImportModeType.BYTES, run_id="run_42", relative_start_time="2024-01-01T00:00:00Z", ) @@ -388,8 +384,7 @@ def test_detect_config_flat_dataset_invalid_json(mocker: MockFixture): "sift_py.data_import.parquet._extract_parquet_footer", return_value=(b"footerbytes", 123) ) mock_post = mocker.patch("sift_py.rest.requests.Session.post") - mock_post.return_value = MockResponse(status_code=200, text="not json") - mock_post.return_value.json = lambda: (_ for _ in ()).throw(Exception("Invalid response")) + mock_post.return_value = MockResponse(status_code=200, json_data=None) svc = ParquetUploadService(rest_config) svc._detect_config_uri = "http://detect.com" with pytest.raises(Exception, match="Invalid response"): diff --git a/python/lib/sift_py/data_import/_utils.py b/python/lib/sift_py/data_import/_utils.py index bc82942c9..b2802c6ef 100644 --- a/python/lib/sift_py/data_import/_utils.py +++ b/python/lib/sift_py/data_import/_utils.py @@ -4,7 +4,7 @@ from pathlib import Path from typing import Callable, List, Optional, Tuple, Union -from alive_progress import alive_bar +from alive_progress import alive_bar # type: ignore def mime_and_content_type_from_path(path: Path) -> Tuple[str, Optional[str], Optional[str]]: @@ -32,7 +32,7 @@ def validate_file_type(path: Union[str, Path], valid_file_types: List[str]) -> O return content_encoding -def convert_keys_to_snake_case(obj): +def convert_keys_to_snake_case(obj: dict) -> dict: """Recursively convert all dict keys from camelCase to snake_case.""" def camel_to_snake(name: str) -> str: diff --git a/python/lib/sift_py/data_import/parquet.py b/python/lib/sift_py/data_import/parquet.py index 31af027b2..40f484cbd 100644 --- a/python/lib/sift_py/data_import/parquet.py +++ b/python/lib/sift_py/data_import/parquet.py @@ -45,7 +45,7 @@ def upload( parquet_config: The Parquet config. show_progress: Whether to show the status bar or not. """ - if not path.endswith(".parquet"): + if not str(path).endswith(".parquet"): raise Exception("Must use an uncompressed parquet file") response = self._session.post( @@ -154,7 +154,7 @@ def flat_dataset_upload( Override `run_id` to specify the id of the run to add this data to. Default is None. Override `relative_start_time` if a relative time format is used. Default is None. """ - if not path.endswith(".parquet"): + if not str(path).endswith(".parquet"): raise Exception("Must use an uncompressed parquet file") config_info = self._detect_config_flat_dataset(path) diff --git a/python/lib/sift_py/data_import/status.py b/python/lib/sift_py/data_import/status.py index 47a41b94b..f1e1fec5d 100644 --- a/python/lib/sift_py/data_import/status.py +++ b/python/lib/sift_py/data_import/status.py @@ -42,8 +42,8 @@ class DataImport(BaseModel): source_url: str = "" status: Union[str, DataImportStatusType] error_message: str = "" - csv_config: Optional[dict] - parquet_config: Optional[dict] + csv_config: Optional[dict] = None + parquet_config: Optional[dict] = None @field_validator("status", mode="before") @classmethod diff --git a/python/lib/sift_py/file_attachment/_internal/upload.py b/python/lib/sift_py/file_attachment/_internal/upload.py index e1aa59aa2..16a62d925 100644 --- a/python/lib/sift_py/file_attachment/_internal/upload.py +++ b/python/lib/sift_py/file_attachment/_internal/upload.py @@ -1,11 +1,11 @@ -import mimetypes from pathlib import Path -from typing import Any, Dict, Optional, Tuple, Union +from typing import Any, Dict, Optional, Union from urllib.parse import urljoin from requests_toolbelt import MultipartEncoder from sift_py._internal.convert.json import to_json +from sift_py.data_import._utils import mime_and_content_type_from_path from sift_py.file_attachment.entity import Entity from sift_py.file_attachment.metadata import Metadata from sift_py.rest import SiftRestConfig, _RestService @@ -41,9 +41,7 @@ def upload_attachment( if not posix_path.is_file(): raise Exception(f"Provided path, '{path}', does not point to a regular file.") - file_name, mimetype, content_encoding = self.__class__._mime_and_content_type_from_path( - posix_path - ) + file_name, mimetype, content_encoding = mime_and_content_type_from_path(posix_path) if not mimetype: raise Exception(f"The MIME-type of '{posix_path}' could not be computed.") @@ -93,9 +91,3 @@ def upload_attachment( ) return response.json().get("remoteFile").get("remoteFileId") - - @staticmethod - def _mime_and_content_type_from_path(path: Path) -> Tuple[str, Optional[str], Optional[str]]: - file_name = path.name - mime, encoding = mimetypes.guess_type(path) - return file_name, mime, encoding From bc1c787aa829c08dd60f711c7ca2ffc16dcb1a83 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Mon, 15 Sep 2025 17:13:40 -0700 Subject: [PATCH 3/4] Implement PR feedback --- python/lib/sift_py/data_import/_utils_test.py | 27 +++++++++++++++++++ python/lib/sift_py/data_import/parquet.py | 12 ++++----- 2 files changed, 32 insertions(+), 7 deletions(-) create mode 100644 python/lib/sift_py/data_import/_utils_test.py diff --git a/python/lib/sift_py/data_import/_utils_test.py b/python/lib/sift_py/data_import/_utils_test.py new file mode 100644 index 000000000..56e3fbd5f --- /dev/null +++ b/python/lib/sift_py/data_import/_utils_test.py @@ -0,0 +1,27 @@ +import pytest + +from sift_py.data_import import _utils + + +def test_convert_keys_to_snake_case_simple(): + obj = {"camelCase": 1, "PascalCase": 2} + result = _utils.convert_keys_to_snake_case(obj) + assert "camel_case" in result + assert "pascal_case" in result + + +def test_convert_keys_to_snake_case_nested(): + obj = {"outerKey": {"innerKey": [{"deepKey": 1}, {"already_snake_case": 13}]}} + result = _utils.convert_keys_to_snake_case(obj) + assert "outer_key" in result + assert "inner_key" in result["outer_key"] + assert "deep_key" in result["outer_key"]["inner_key"][0] + assert "already_snake_case" in result["outer_key"]["inner_key"][1] + + +def test_progress_file_zero_bytes(tmp_path): + file = tmp_path / "empty.txt" + file.write_text("") + with pytest.raises(Exception) as excinfo: + _utils.ProgressFile(file) + assert "is 0 bytes" in str(excinfo.value) diff --git a/python/lib/sift_py/data_import/parquet.py b/python/lib/sift_py/data_import/parquet.py index 40f484cbd..512815af5 100644 --- a/python/lib/sift_py/data_import/parquet.py +++ b/python/lib/sift_py/data_import/parquet.py @@ -45,8 +45,8 @@ def upload( parquet_config: The Parquet config. show_progress: Whether to show the status bar or not. """ - if not str(path).endswith(".parquet"): - raise Exception("Must use an uncompressed parquet file") + # Verify this is a valid Parquet file. + _extract_parquet_footer(path) response = self._session.post( url=self._upload_uri, @@ -147,16 +147,14 @@ def flat_dataset_upload( automatically generate the Parquet Config using the footer. See the options below for what parameters can be overridden. Use `upload` if you need to specify a custom Parquet config. - Override `time_path` to specify which column contains timestamp information. Default is 1. - Override `time_format` to specify the time data format. Default is `TimeFormatType.ABSOLUTE_DATETIME`. + Set `time_path` to specify which column contains timestamp information and `time_format` + to specify the time data format. Default is `TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS`. + Override `complex_types_import_mode` to specify how to import complex types (maps and list). Default is both strings and bytes. Override `run_name` to specify the name of the run to create for this data. Default is None. Override `run_id` to specify the id of the run to add this data to. Default is None. Override `relative_start_time` if a relative time format is used. Default is None. """ - if not str(path).endswith(".parquet"): - raise Exception("Must use an uncompressed parquet file") - config_info = self._detect_config_flat_dataset(path) config_info["asset_name"] = asset_name From d73ce1dac594b442082d356490aaf11ba4bafa99 Mon Sep 17 00:00:00 2001 From: Marc Julien Date: Mon, 15 Sep 2025 18:00:01 -0700 Subject: [PATCH 4/4] Fix unit tests --- .../lib/sift_py/data_import/_parquet_test.py | 55 ++++++++++++++++--- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/python/lib/sift_py/data_import/_parquet_test.py b/python/lib/sift_py/data_import/_parquet_test.py index 0968b1c43..b2da4324d 100644 --- a/python/lib/sift_py/data_import/_parquet_test.py +++ b/python/lib/sift_py/data_import/_parquet_test.py @@ -69,11 +69,15 @@ def parquet_config(): def test_upload_invalid_extension(mocker: MockFixture, parquet_config): svc = ParquetUploadService(rest_config) - with pytest.raises(Exception, match="Must use an uncompressed parquet file"): + mocker.patch("sift_py.data_import.parquet.open", mocker.mock_open(read_data=b"a" * 200)) + with pytest.raises(Exception, match="Invalid Parquet file: missing magic bytes"): svc.upload("file.txt", parquet_config) def test_upload_config_request_failed(mocker: MockFixture, parquet_config): + mocker.patch( + "sift_py.data_import.parquet._extract_parquet_footer", return_value=(b"footerbytes", 8) + ) mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") mock_requests_post.return_value = MockResponse(status_code=400, text="Invalid request") svc = ParquetUploadService(rest_config) @@ -82,6 +86,9 @@ def test_upload_config_request_failed(mocker: MockFixture, parquet_config): def test_upload_invalid_config_response(mocker: MockFixture, parquet_config): + mocker.patch( + "sift_py.data_import.parquet._extract_parquet_footer", return_value=(b"footerbytes", 8) + ) mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") mock_requests_post.return_value = MockResponse(status_code=200, json_data=None) svc = ParquetUploadService(rest_config) @@ -90,6 +97,9 @@ def test_upload_invalid_config_response(mocker: MockFixture, parquet_config): def test_upload_missing_keys(mocker: MockFixture, parquet_config): + mocker.patch( + "sift_py.data_import.parquet._extract_parquet_footer", return_value=(b"footerbytes", 8) + ) mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") mock_requests_post.return_value = MockResponse(status_code=200, json_data={}) svc = ParquetUploadService(rest_config) @@ -112,6 +122,9 @@ def test_upload_data_file_failed(mocker: MockFixture, parquet_config): def test_upload_success(mocker: MockFixture, parquet_config): + mocker.patch( + "sift_py.data_import.parquet._extract_parquet_footer", return_value=(b"footerbytes", 8) + ) mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") mock_requests_post.side_effect = [ MockResponse( @@ -166,12 +179,16 @@ def test_upload_from_url_success(mocker: MockFixture, parquet_config): def test_flat_dataset_upload_invalid_extension(mocker: MockFixture, parquet_config): + mocker.patch("sift_py.data_import.parquet.open", mocker.mock_open(read_data=b"a" * 200)) svc = ParquetUploadService(rest_config) - with pytest.raises(Exception, match="Must use an uncompressed parquet file"): + with pytest.raises(Exception, match="Invalid Parquet file: missing magic bytes"): svc.flat_dataset_upload("asset", "file.txt", "time") def test_flat_dataset_upload_success(mocker: MockFixture, parquet_config): + mocker.patch( + "sift_py.data_import.parquet._extract_parquet_footer", return_value=(b"footerbytes", 8) + ) svc = ParquetUploadService(rest_config) mock_detect = mocker.patch.object(svc, "_detect_config_flat_dataset") mock_detect.return_value = parquet_config.to_dict() @@ -248,6 +265,10 @@ def test_flat_dataset_upload_success(mocker: MockFixture, parquet_config): def test_flat_dataset_upload_overrides_success(mocker: MockFixture, parquet_config): + mocker.patch( + "sift_py.data_import.parquet._extract_parquet_footer", + return_value=(b"parquetfooterbytes", 8), + ) svc = ParquetUploadService(rest_config) mock_detect = mocker.patch.object(svc, "_detect_config_flat_dataset") mock_detect.return_value = parquet_config.to_dict() @@ -342,12 +363,12 @@ def test_detect_config_flat_dataset_failed(mocker: MockFixture): svc._detect_config_flat_dataset("file.parquet") -def test_extract_parquet_footer_invalid_magic(tmp_path): - file_path = tmp_path / "test.parquet" - with open(file_path, "wb") as f: - f.write(b"\x08\x00\x00\x00BADMAGIC") - with pytest.raises(Exception): - _extract_parquet_footer(str(file_path)) +def test_extract_parquet_footer_invalid_magic(mocker): + mocker.patch( + "sift_py.data_import.parquet.open", mocker.mock_open(read_data=b"\x08\x00\x00\x00BADMAGIC") + ) + with pytest.raises(Exception, match="Invalid Parquet file: missing magic bytes"): + _extract_parquet_footer("test.parquet") def test_detect_config_flat_dataset_success(mocker: MockFixture): @@ -401,3 +422,21 @@ def test_detect_config_flat_dataset_missing_parquet_config(mocker: MockFixture): svc._detect_config_uri = "http://detect.com" with pytest.raises(Exception, match="Parquet config missing from detect config response"): svc._detect_config_flat_dataset("file.parquet") + + +def test_extract_parquet_footer_valid_parquet(mocker): + # Parquet files end with "PAR1" magic bytes + # Footer length is stored in the last 8 bytes before the final magic bytes + # We'll construct a valid parquet tailer file in memory + file_bytes = b"\x08\x00\x00\x00" + b"PAR1" + mocker.patch("sift_py.data_import.parquet.open", mocker.mock_open(read_data=file_bytes)) + mocker.patch("sift_py.data_import.parquet.os.path.getsize", return_value=24) + result_footer, result_offset = _extract_parquet_footer("valid.parquet") + assert result_offset == 16 + + +def test_extract_parquet_footer_too_short(mocker): + # File too short to be a valid parquet file + mocker.patch("sift_py.data_import.parquet.open", mocker.mock_open(read_data=b"PAR1")) + with pytest.raises(Exception, match="Invalid Parquet file: missing magic bytes"): + _extract_parquet_footer("short.parquet")