Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions emails/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import urllib.parse as urlparse

from ..utils import to_unicode, to_native
from ..message import Message
from ..utils import fetch_url
from .local_store import (FileSystemLoader, ZipLoader, MsgLoader, FileNotFound)
Expand Down Expand Up @@ -77,7 +76,7 @@ def _extract_base_url(url):
# Load html page
r = fetch_url(url, requests_args=requests_params)
html = r.content
html = to_unicode(html, charset=guess_charset(r.headers, html))
html = html.decode(guess_charset(r.headers, html) or 'utf-8')
html = html.replace('\r\n', '\n') # Remove \r

return from_html(html,
Expand Down
16 changes: 8 additions & 8 deletions emails/loader/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
except ImportError:
import chardet

from ..utils import to_native, to_unicode

# HTML page charset stuff

Expand All @@ -29,24 +28,25 @@ def __init__(self, conv=None):
if k.startswith('re_'):
setattr(self, k, re.compile(conv(getattr(self, k)), re.I + re.S + re.M))

RULES_U = ReRules(conv=to_unicode)
RULES_U = ReRules(conv=lambda x: x.decode())
RULES_B = ReRules()


def guess_text_charset(text, is_html=False):
if is_html:
rules = isinstance(text, bytes) and RULES_B or RULES_U
is_bytes = isinstance(text, bytes)
rules = RULES_B if is_bytes else RULES_U
for meta in rules.re_meta.findall(text):
if rules.re_is_http_equiv.findall(meta):
for content in rules.re_parse_http_equiv.findall(meta):
for charset in rules.re_charset.findall(content):
return to_native(charset)
return charset.decode() if is_bytes else charset
else:
for charset in rules.re_charset.findall(meta):
return to_native(charset)
return charset.decode() if is_bytes else charset
# guess by chardet
if isinstance(text, bytes):
return to_native(chardet.detect(text)['encoding'])
return chardet.detect(text)['encoding']


def guess_html_charset(html):
Expand All @@ -68,7 +68,7 @@ def guess_charset(headers, html):
# guess by html content
charset = guess_html_charset(html)
if charset:
return to_unicode(charset)
return charset

COMMON_CHARSETS = ('ascii', 'utf-8', 'utf-16', 'windows-1251', 'windows-1252', 'cp850')

Expand Down Expand Up @@ -100,7 +100,7 @@ def decode_text(text,
_last_exc = None
for enc in _charsets:
try:
return to_unicode(text, charset=enc), enc
return text.decode(enc), enc
except UnicodeDecodeError as exc:
_last_exc = exc

Expand Down
9 changes: 3 additions & 6 deletions emails/loader/local_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from zipfile import ZipFile
import email

from ..utils import to_unicode, to_native, formataddr, decode_header
from ..utils import formataddr, decode_header
from ..loader.helpers import decode_text
from ..message import Message

Expand Down Expand Up @@ -183,7 +183,7 @@ def __init__(self, file, encoding='utf-8', base_path=None):
def _decode_filename(self, name):
for enc in self.common_filename_charsets:
try:
return to_unicode(name, enc)
return name.decode(enc) if isinstance(name, bytes) else name
except UnicodeDecodeError:
pass
return name
Expand All @@ -202,9 +202,6 @@ def get_file(self, name):

self._unpack()

if isinstance(name, str):
name = to_unicode(name, 'utf-8')

if name not in self._original_filenames:
name = self._decoded_filenames.get(name)

Expand All @@ -229,7 +226,7 @@ def __init__(self, msg, base_path=None):
if isinstance(msg, str):
self.msg = email.message_from_string(msg)
elif isinstance(msg, bytes):
self.msg = email.message_from_string(to_native(msg))
self.msg = email.message_from_string(msg.decode())
else:
self.msg = msg
self.base_path = base_path
Expand Down
10 changes: 8 additions & 2 deletions emails/store/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import urllib.parse as urlparse

from ..utils import fetch_url, encode_header, to_bytes
from ..utils import fetch_url, encode_header


MIMETYPE_UNKNOWN = 'application/unknown'
Expand Down Expand Up @@ -143,7 +143,13 @@ def mime(self) -> MIMEBase | None:
if p is None:
filename_header = encode_header(self.filename)
p = MIMEBase(*self.mime_type.split('/', 1), name=filename_header)
payload = to_bytes(self.data) or b''
data = self.data
if isinstance(data, str):
payload = data.encode()
elif data is not None:
payload = bytes(data)
else:
payload = b''
p.set_payload(payload)
encode_base64(p)
if 'content-disposition' not in self._headers:
Expand Down
3 changes: 1 addition & 2 deletions emails/testsuite/loader/test_rfc822_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import email
import datetime
import os.path
from emails.utils import to_native
import emails.loader
from emails.loader.local_store import MsgLoader

Expand Down Expand Up @@ -79,7 +78,7 @@ def test_msgloader():
def _try_decode(s, charsets=('utf-8', 'koi8-r', 'cp1251')):
for charset in charsets:
try:
return to_native(s, charset), charset
return s.decode(charset), charset
except UnicodeDecodeError:
pass
return None, None
Expand Down
9 changes: 4 additions & 5 deletions emails/testsuite/message/test_dkim.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from emails import Message
from io import StringIO

from emails.utils import to_bytes, to_native
from emails.exc import DKIMException
from emails.utils import load_email_charsets
import emails.packages.dkim
Expand Down Expand Up @@ -40,7 +39,7 @@ def _generate_key(length=1024):
try:
from Crypto.PublicKey import RSA
key = RSA.generate(length)
return to_bytes(key.exportKey()), to_bytes(key.publickey().exportKey())
return key.exportKey(), key.publickey().exportKey()
except ImportError:
return PRIV_KEY, PUB_KEY

Expand All @@ -49,15 +48,15 @@ def _check_dkim(message, pub_key=PUB_KEY):
def _plain_public_key(s):
return b"".join([l for l in s.split(b'\n') if not l.startswith(b'---')])
message = message.as_string()
o = emails.packages.dkim.DKIM(message=to_bytes(message))
o = emails.packages.dkim.DKIM(message=message.encode())
return o.verify(dnsfunc=lambda name: b"".join([b"v=DKIM1; p=", _plain_public_key(pub_key)]))


def test_dkim():

priv_key, pub_key = _generate_key(length=1024)

DKIM_PARAMS = [dict(key=StringIO(to_native(priv_key)),
DKIM_PARAMS = [dict(key=StringIO(priv_key.decode()),
selector='_dkim',
domain='somewhere1.net'),

Expand Down Expand Up @@ -150,7 +149,7 @@ def test_dkim_sign_twice():

priv_key, pub_key = _generate_key(length=1024)
message = Message(**common_email_data())
message.dkim(key=StringIO(to_native(priv_key)), selector='_dkim', domain='somewhere.net')
message.dkim(key=StringIO(priv_key.decode()), selector='_dkim', domain='somewhere.net')
for n in range(2):
message.subject = 'Test %s' % n
assert _check_dkim(message, pub_key)
3 changes: 1 addition & 2 deletions emails/testsuite/message/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import emails.exc
from io import StringIO

from emails.utils import to_unicode
from emails.utils import decode_header, MessageID
from emails.backend.inmemory import InMemoryBackend

Expand Down Expand Up @@ -73,7 +72,7 @@ def my_after_build(original_message, built_message):

s = m.as_string()
print("type of message.as_string() is {0}".format(type(s)))
assert AFTER_BUILD_HEADER in to_unicode(s, 'utf-8')
assert AFTER_BUILD_HEADER in s


def test_before_build():
Expand Down
3 changes: 1 addition & 2 deletions emails/testsuite/smtp_servers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import datetime
import random
import time
from emails.utils import to_unicode

DEFAULT_FROM = os.environ.get('SMTP_TEST_FROM_EMAIL') or 'python-emails@lavr.me'
SUBJECT_SUFFIX = os.environ.get('SMTP_TEST_SUBJECT_SUFFIX')
Expand Down Expand Up @@ -88,7 +87,7 @@ def patch_message(self, message):
message.mail_to = self.to_email

# TODO: this code breaks template in subject; fix it
if not to_unicode(message.subject).startswith(self.subject_prefix) :
if not message.subject.startswith(self.subject_prefix):
message.subject = " ".join([self.subject_prefix, message.subject,
'// %s' % SUBJECT_SUFFIX])

Expand Down
4 changes: 2 additions & 2 deletions emails/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import urllib.parse as urlparse

from .utils import to_unicode
from .loader.local_store import FileNotFound
from .store import MemoryFileStore, LazyHTTPFile
from .template.base import BaseTemplate
Expand Down Expand Up @@ -125,7 +124,8 @@ def _apply_to_style_uri(style_text, func):
dirty = True
value.uri = new_uri
if dirty:
return to_unicode(parser.cssText, 'utf-8')
css_text = parser.cssText
return css_text.decode('utf-8') if isinstance(css_text, bytes) else css_text
else:
return style_text

Expand Down
38 changes: 1 addition & 37 deletions emails/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import sys
import os
import socket
from time import mktime
Expand All @@ -9,7 +8,7 @@
from functools import wraps
from io import StringIO, BytesIO
from collections.abc import Callable
from typing import Any, TypeVar, cast, overload
from typing import Any, TypeVar, cast

import email.charset
from email import generator
Expand All @@ -25,41 +24,6 @@
F = TypeVar('F', bound=Callable[..., Any])


def to_native(x: str | bytes | None, charset: str = sys.getdefaultencoding(),
errors: str = 'strict') -> str | None:
if x is None or isinstance(x, str):
return x
return x.decode(charset, errors)


@overload
def to_unicode(x: None, charset: str = ..., errors: str = ...) -> None: ...
@overload
def to_unicode(x: str | bytes, charset: str = ..., errors: str = ...) -> str: ...
@overload
def to_unicode(x: Any, charset: str = ..., errors: str = ...) -> str | None: ...

def to_unicode(x: Any, charset: str = sys.getdefaultencoding(),
errors: str = 'strict') -> str | None:
if x is None:
return None
if not isinstance(x, bytes):
return str(x)
return x.decode(charset, errors)


def to_bytes(x: str | bytes | bytearray | memoryview | None,
charset: str = sys.getdefaultencoding(),
errors: str = 'strict') -> bytes | None:
if x is None:
return None
if isinstance(x, (bytes, bytearray, memoryview)):
return bytes(x)
if isinstance(x, str):
return x.encode(charset, errors)
raise TypeError('Expected bytes')


def formataddr(pair: tuple[str | None, str]) -> str:
"""
Takes a 2-tuple of the form (realname, email_address) and returns RFC2822-like string.
Expand Down
Loading