Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions skyflow/utils/_skyflow_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ class Error(Enum):
EMPTY_CREDENTIAL_FILE_PATH_IN_CONFIG = f"{error_prefix} Initialization failed. Invalid credentials for {{}} with id {{}}. Specify a valid file path."
EMPTY_CREDENTIAL_FILE_PATH = f"{error_prefix} Initialization failed. Invalid credentials. Specify a valid file path."
INVALID_CREDENTIAL_FILE_PATH_IN_CONFIG = f"{error_prefix} Initialization failed. Invalid credentials for {{}} with id {{}}. Expected file path to be a string."
INVALID_CREDENTIAL_FILE_PATH = f"{error_prefix} Initialization failed. Invalid credentials. Expected file path to be a string."
INVALID_CREDENTIAL_FILE_PATH = f"{error_prefix} Initialization failed. Invalid credentials. Expected file path to be a valid file path."
EMPTY_CREDENTIALS_TOKEN_IN_CONFIG = f"{error_prefix} Initialization failed. Invalid token for {{}} with id {{}}.Specify a valid credentials token."
EMPTY_CREDENTIALS_TOKEN = f"{error_prefix} Initialization failed. Invalid token.Specify a valid credentials token."
INVALID_CREDENTIALS_TOKEN_IN_CONFIG = f"{error_prefix} Initialization failed. Invalid credentials token for {{}} with id {{}}. Expected token to be a string."
INVALID_CREDENTIALS_TOKEN = f"{error_prefix} Initialization failed. Invalid credentials token. Expected token to be a string."
EXPIRED_TOKEN = f"${error_prefix} Initialization failed. Given token is expired. Specify a valid credentials token."
EXPIRED_BEARER_TOKEN = f"{error_prefix} Initialization failed. Bearer token is invalid or expired."
EXPIRED_TOKEN = f"{error_prefix} Initialization failed. Given token is expired. Specify a valid credentials token."
EMPTY_API_KEY_IN_CONFIG = f"{error_prefix} Initialization failed. Invalid api key for {{}} with id {{}}.Specify a valid api key."
EMPTY_API_KEY= f"{error_prefix} Initialization failed. Invalid api key.Specify a valid api key."
INVALID_API_KEY_IN_CONFIG = f"{error_prefix} Initialization failed. Invalid api key for {{}} with id {{}}. Expected api key to be a string."
Expand Down Expand Up @@ -118,10 +119,11 @@ class Error(Enum):

INVALID_IDS_TYPE = f"{error_prefix} Validation error. 'ids' has a value of type {{}}. Specify 'ids' as list."
INVALID_REDACTION_TYPE = f"{error_prefix} Validation error. 'redaction' has a value of type {{}}. Specify 'redaction' as type Skyflow.RedactionType."
INVALID_COLUMN_NAME = f"{error_prefix} Validation error. 'column' has a value of type {{}}. Specify 'column' as a string."
INVALID_COLUMN_VALUE = f"{error_prefix} Validation error. columnValues key has a value of type {{}}. Specify columnValues key as list."
INVALID_COLUMN_NAME = f"{error_prefix} Validation error. column_name has a value of type {{}}. Specify 'column' as a string."
INVALID_COLUMN_VALUE = f"{error_prefix} Validation error. column_values key has a value of type {{}}. Specify column_values key as list."
INVALID_COLUMN_VALUES = f"{error_prefix} Validation error. column_values key is an empty list. Specify at least one column value when column_name is passed."
INVALID_FIELDS_VALUE = f"{error_prefix} Validation error. fields key has a value of type{{}}. Specify fields key as list."
BOTH_OFFSET_AND_LIMIT_SPECIFIED = f"${error_prefix} Validation error. Both offset and limit cannot be present at the same time"
BOTH_OFFSET_AND_LIMIT_SPECIFIED = f"{error_prefix} Validation error. Both offset and limit cannot be present at the same time"
INVALID_OFF_SET_VALUE = f"{error_prefix} Validation error. offset key has a value of type {{}}. Specify offset key as integer."
INVALID_LIMIT_VALUE = f"{error_prefix} Validation error. limit key has a value of type {{}}. Specify limit key as integer."
INVALID_DOWNLOAD_URL_VALUE = f"{error_prefix} Validation error. download_url key has a value of type {{}}. Specify download_url key as boolean."
Expand Down Expand Up @@ -366,7 +368,7 @@ class ErrorLogs(Enum):
SKYFLOW_ID_IS_REQUIRED = f"{ERROR}: [{error_prefix}] Invalid {{}} request. Skyflow Id is required."
EMPTY_SKYFLOW_ID = f"{ERROR}: [{error_prefix}] Invalid {{}} request. Skyflow Id can not be empty."

COLUMN_VALUES_IS_REQUIRED_TOKENIZE = f"{ERROR}: [{error_prefix}] Invalid {{}} request. ColumnValues are required."
COLUMN_VALUES_IS_REQUIRED_TOKENIZE = f"{ERROR}: [{error_prefix}] Invalid {{}} request. column_values are required."
EMPTY_COLUMN_GROUP_IN_COLUMN_VALUES = f"{ERROR}: [{error_prefix}] Invalid {{}} request. Column group can not be null or empty in column values at index %s2."

EMPTY_QUERY= f"{ERROR}: [{error_prefix}] Invalid {{}} request. Query can not be empty."
Expand Down
154 changes: 110 additions & 44 deletions skyflow/utils/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,27 +106,41 @@ def convert_detected_entity_to_entity_info(detected_entity):
def construct_invoke_connection_request(request, connection_url, logger) -> PreparedRequest:
url = parse_path_params(connection_url.rstrip('/'), request.path_params)

try:
if isinstance(request.headers, dict):
header = to_lowercase_keys(json.loads(
json.dumps(request.headers)))
else:
raise SkyflowError(SkyflowMessages.Error.INVALID_REQUEST_HEADERS.value, invalid_input_error_code)
except Exception:
raise SkyflowError(SkyflowMessages.Error.INVALID_REQUEST_HEADERS.value, invalid_input_error_code)
header = None
content_type = None

if not HttpHeader.CONTENT_TYPE.lower() in header:
header[HttpHeader.CONTENT_TYPE_LOWERCASE] = ContentType.JSON.value
if request.headers is not None:
try:
if isinstance(request.headers, dict):
header = to_lowercase_keys(json.loads(
json.dumps(request.headers)))

content_type = header.get(HttpHeader.CONTENT_TYPE_LOWERCASE)
else:
raise SkyflowError(SkyflowMessages.Error.INVALID_REQUEST_HEADERS.value, invalid_input_error_code)
except SkyflowError:
raise
except Exception:
raise SkyflowError(SkyflowMessages.Error.INVALID_REQUEST_HEADERS.value, invalid_input_error_code)

try:
if isinstance(request.body, dict):
json_data, files = get_data_from_content_type(
request.body, header[HttpHeader.CONTENT_TYPE_LOWERCASE]
)
else:
json_data = None
files = {}

if request.body is not None:
try:
if isinstance(request.body, dict):
json_data, files = get_data_from_content_type(
request.body, content_type
)
else:
raise SkyflowError(SkyflowMessages.Error.INVALID_REQUEST_BODY.value, invalid_input_error_code)
except SkyflowError:
raise
except Exception as e:
raise SkyflowError(SkyflowMessages.Error.INVALID_REQUEST_BODY.value, invalid_input_error_code)
except Exception as e:
raise SkyflowError( SkyflowMessages.Error.INVALID_REQUEST_BODY.value, invalid_input_error_code)

if files and header and content_type == ContentType.FORMDATA.value:
header.pop(HttpHeader.CONTENT_TYPE_LOWERCASE, None)

validate_invoke_connection_params(logger, request.query_params, request.path_params)

Expand Down Expand Up @@ -176,16 +190,54 @@ def render_key(parents):
def get_data_from_content_type(data, content_type):
converted_data = data
files = {}

if content_type == ContentType.URLENCODED.value:
converted_data = http_build_query(data)
elif content_type == ContentType.FORMDATA.value:
converted_data = r_urlencode(list(), dict(), data)
files = {(None, None)}
print("Hello")
converted_data = None
files = {}
for key, value in data.items():
files[key] = (None, str(value))
elif content_type == ContentType.JSON.value:
converted_data = json.dumps(data)
elif content_type == ContentType.XML.value or content_type == 'application/xml' or content_type == 'text/xml':
if isinstance(data, dict):
converted_data = dict_to_xml(data)
else:
converted_data = str(data)
elif content_type == ContentType.HTML.value or content_type == 'text/html':
if isinstance(data, dict):
converted_data = json.dumps(data)
else:
converted_data = str(data)
else:
if isinstance(data, dict):
converted_data = json.dumps(data)
else:
converted_data = str(data)

return converted_data, files

def dict_to_xml(data, root_tag='root'):
def build_xml(d, tag='item'):
if isinstance(d, dict):
xml_parts = [f'<{tag}>']
for key, value in d.items():
xml_parts.append(build_xml(value, key))
xml_parts.append(f'</{tag}>')
return ''.join(xml_parts)
elif isinstance(d, list):
return ''.join([build_xml(item, tag) for item in d])
else:
return f'<{tag}>{d}</{tag}>'

xml_parts = [f'<{root_tag}>']
for key, value in data.items():
xml_parts.append(build_xml(value, key))
xml_parts.append(f'</{root_tag}>')
return ''.join(xml_parts)


def get_metrics():
sdk_name_version = SdkPrefix.SKYFLOW_PYTHON + SDK_VERSION
Expand Down Expand Up @@ -347,39 +399,50 @@ def parse_invoke_connection_response(api_response: requests.Response):
content = api_response.content
if isinstance(content, bytes):
content = content.decode(EncodingType.UTF_8)

try:
api_response.raise_for_status()
try:
data = json.loads(content)
metadata = {}
if HttpHeader.X_REQUEST_ID in api_response.headers:
metadata[ResponseField.REQUEST_ID] = api_response.headers[HttpHeader.X_REQUEST_ID]

content_type = api_response.headers.get(HttpHeader.CONTENT_TYPE_LOWERCASE, '').lower()

if ContentTypeConstants.APPLICATION_JSON in content_type or not content_type:
try:
data = json.loads(content)
except json.JSONDecodeError:
data = content
else:
data = content

metadata = {}
if HttpHeader.X_REQUEST_ID in api_response.headers:
metadata[ResponseField.REQUEST_ID] = api_response.headers[HttpHeader.X_REQUEST_ID]

return InvokeConnectionResponse(data=data, metadata=metadata, errors=None)
except Exception as e:
raise SkyflowError(SkyflowMessages.Error.RESPONSE_NOT_JSON.value.format(content), status_code)
return InvokeConnectionResponse(data=data, metadata=metadata, errors=None)

except HTTPError:
message = SkyflowMessages.Error.API_ERROR.value.format(status_code)
request_id = api_response.headers.get(HttpHeader.X_REQUEST_ID)

try:
error_response = json.loads(content)
request_id = api_response.headers[HttpHeader.X_REQUEST_ID]
error_response = json.loads(content)
error_from_client = api_response.headers.get(HttpHeader.ERROR_FROM_CLIENT)

status_code = error_response.get(ResponseField.ERROR, {}).get(ResponseField.HTTP_CODE, 500) # Default to 500 if not found
status_code = error_response.get(ResponseField.ERROR, {}).get(ResponseField.HTTP_CODE, status_code)
http_status = error_response.get(ResponseField.ERROR, {}).get(ResponseField.HTTP_STATUS)
grpc_code = error_response.get(ResponseField.ERROR, {}).get(ResponseField.GRPC_CODE)
details = error_response.get(ResponseField.ERROR, {}).get(ResponseField.DETAILS)
message = error_response.get(ResponseField.ERROR, {}).get(ResponseField.MESSAGE, SkyflowMessages.Error.UNKNOWN_ERROR_DEFAULT_MESSAGE.value)

if error_from_client is not None:
if details is None: details = []
if details is None:
details = []
error_from_client_bool = error_from_client.lower() == BooleanString.TRUE
details.append({ResponseField.ERROR_FROM_CLIENT: error_from_client_bool})

raise SkyflowError(message, status_code, request_id, grpc_code, http_status, details)

except json.JSONDecodeError:
message = SkyflowMessages.Error.RESPONSE_NOT_JSON.value.format(content)
raise SkyflowError(message, status_code)
raise SkyflowError(content if content else message, status_code, request_id)

def parse_deidentify_text_response(api_response: DeidentifyStringResponse):
entities = [convert_detected_entity_to_entity_info(entity) for entity in api_response.entities]
Expand All @@ -397,9 +460,15 @@ def log_and_reject_error(description, status_code, request_id, http_status=None,
raise SkyflowError(description, status_code, request_id, grpc_code, http_status, details)

def handle_exception(error, logger):
# handle invalid cluster ID error scenario
if (isinstance(error, httpx.ConnectError)):
handle_generic_error(error, None, SkyflowMessages.ErrorCodes.INVALID_INPUT.value, logger)
if isinstance(error, httpx.ConnectError):
description = SkyflowMessages.Error.GENERIC_API_ERROR.value
log_and_reject_error(description, SkyflowMessages.ErrorCodes.INVALID_INPUT.value, None, logger=logger)
return

if not hasattr(error, 'headers') or not hasattr(error, 'body') or error.headers is None or error.body is None:
description = str(error) if error else SkyflowMessages.Error.GENERIC_API_ERROR.value
log_and_reject_error(description, SkyflowMessages.ErrorCodes.SERVER_ERROR.value, None, logger=logger)
return

request_id = error.headers.get(HttpHeader.X_REQUEST_ID, ErrorDefaults.UNKNOWN_REQUEST_ID)
content_type = error.headers.get(HttpHeader.CONTENT_TYPE_LOWERCASE)
Expand All @@ -411,9 +480,9 @@ def handle_exception(error, logger):
elif ContentTypeConstants.TEXT_PLAIN in content_type:
handle_text_error(error, data, request_id, logger)
else:
handle_generic_error(error, request_id, logger)
handle_generic_error_with_status(error, request_id, error.status, logger)
else:
handle_generic_error(error, request_id, logger)
handle_generic_error_with_status(error, request_id, error.status, logger)

def handle_json_error(err, data, request_id, logger):
try:
Expand All @@ -436,12 +505,9 @@ def handle_json_error(err, data, request_id, logger):
def handle_text_error(err, data, request_id, logger):
log_and_reject_error(data, err.status, request_id, logger = logger)

def handle_generic_error(err, request_id, logger):
handle_generic_error(err, request_id, err.status, logger = logger)

def handle_generic_error(err, request_id, status, logger):
def handle_generic_error_with_status(err, request_id, status, logger):
description = SkyflowMessages.Error.GENERIC_API_ERROR.value
log_and_reject_error(description, status, request_id, logger = logger)
log_and_reject_error(description, status, request_id, logger=logger)

def encode_column_values(get_request):
encoded_column_values = list()
Expand Down
2 changes: 2 additions & 0 deletions skyflow/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class DetectStatus:
FAILED = 'FAILED'
UNKNOWN = 'UNKNOWN'

class Detect:
WAIT_TIME = 64

class FileExtension:
JSON = 'json'
Expand Down
3 changes: 2 additions & 1 deletion skyflow/utils/enums/content_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ class ContentType(Enum):
PLAINTEXT = 'text/plain'
XML = 'text/xml'
URLENCODED = 'application/x-www-form-urlencoded'
FORMDATA = 'multipart/form-data'
FORMDATA = 'multipart/form-data'
HTML = 'text/html'
21 changes: 8 additions & 13 deletions skyflow/utils/validations/_validations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from skyflow.utils.constants import (
ApiKey, ResponseField, RequestParameter,
FileUploadField,
DeidentifyFileRequestField, RequestOperation, ConfigType, SqlCommand, ConfigField, OptionField, CredentialField
DeidentifyFileRequestField, RequestOperation, ConfigType, SqlCommand, ConfigField, OptionField, CredentialField, Detect
)
from skyflow.utils.logger import log_info, log_error_log
from skyflow.vault.detect import DeidentifyTextRequest, ReidentifyTextRequest, TokenFormat, Transformations, \
Expand Down Expand Up @@ -142,8 +142,8 @@ def validate_credentials(logger, credentials, config_id_type=None, config_id=Non
)
if is_expired(credentials.get(CredentialField.TOKEN), logger):
raise SkyflowError(
SkyflowMessages.Error.INVALID_CREDENTIALS_TOKEN.value.format(config_id_type, config_id)
if config_id_type and config_id else SkyflowMessages.Error.INVALID_CREDENTIALS_TOKEN.value,
SkyflowMessages.Error.EXPIRED_BEARER_TOKEN.value
if config_id_type and config_id else SkyflowMessages.Error.EXPIRED_BEARER_TOKEN.value,
invalid_input_error_code
)
elif CredentialField.API_KEY in credentials:
Expand Down Expand Up @@ -247,10 +247,8 @@ def validate_connection_config(logger, config):
SkyflowMessages.Error.INVALID_CONNECTION_URL.value.format(connection_id)
)

if ConfigField.CREDENTIALS not in config:
raise SkyflowError(SkyflowMessages.Error.EMPTY_CREDENTIALS.value.format(ConfigType.CONNECTION, connection_id), invalid_input_error_code)

validate_credentials(logger, config.get(ConfigField.CREDENTIALS), ConfigType.CONNECTION, connection_id)
if "credentials" in config:
validate_credentials(logger, config.get("credentials"), "connection", connection_id)

return True

Expand Down Expand Up @@ -408,7 +406,7 @@ def validate_deidentify_file_request(logger, request: DeidentifyFileRequest):
if hasattr(request, DeidentifyFileRequestField.WAIT_TIME) and request.wait_time is not None:
if not isinstance(request.wait_time, (int, float)):
raise SkyflowError(SkyflowMessages.Error.INVALID_WAIT_TIME.value, invalid_input_error_code)
if request.wait_time < 0 and request.wait_time > 64: # noqa: PLR2004
if request.wait_time < 0 or request.wait_time > Detect.WAIT_TIME:
raise SkyflowError(SkyflowMessages.Error.WAIT_TIME_GREATER_THEN_64.value, invalid_input_error_code)

def validate_insert_request(logger, request):
Expand All @@ -432,9 +430,6 @@ def validate_insert_request(logger, request):
if key is None or key == "":
log_error_log(SkyflowMessages.ErrorLogs.EMPTY_OR_NULL_KEY_IN_VALUES.value.format(RequestOperation.INSERT), logger = logger)

if value is None or value == "":
log_error_log(SkyflowMessages.ErrorLogs.EMPTY_OR_NULL_VALUE_IN_VALUES.value.format(RequestOperation.INSERT, key), logger = logger)

if request.upsert is not None and (not isinstance(request.upsert, str) or not request.upsert.strip()):
log_error_log(SkyflowMessages.ErrorLogs.EMPTY_UPSERT.value(RequestOperation.INSERT), logger = logger)
raise SkyflowError(SkyflowMessages.Error.INVALID_UPSERT_OPTIONS_TYPE.value, invalid_input_error_code)
Expand Down Expand Up @@ -592,8 +587,8 @@ def validate_get_request(logger, request):
raise SkyflowError(SkyflowMessages.Error.INVALID_COLUMN_VALUE.value.format(type(column_values)), invalid_input_error_code)

if column_name and not column_values:
log_error_log(SkyflowMessages.ErrorLogs.COLUMN_NAME_IS_REQUIRED.value.format(RequestOperation.GET), logger = logger)
SkyflowError(SkyflowMessages.Error.INVALID_COLUMN_NAME.value.format(type(column_name)), invalid_input_error_code)
log_error_log(SkyflowMessages.ErrorLogs.COLUMN_NAME_IS_REQUIRED.value.format("GET"), logger = logger)
raise SkyflowError(SkyflowMessages.Error.INVALID_COLUMN_VALUES.value, invalid_input_error_code)

if (column_name or column_values) and skyflow_ids:
log_error_log(SkyflowMessages.ErrorLogs.BOTH_IDS_AND_COLUMN_NAME_PASSED.value.format(RequestOperation.GET), logger = logger)
Expand Down
Loading
Loading