其他
python实用脚本函数:邮件发送
示例
完整代码
#!/usr/bin/env python3
# coding: utf-8
__author__ = 'ChenyangGao <https://chenyanggao.github.io/>'
__version__ = (0, 0, 1)
__all__ = [
'make_message', 'make_attachment', 'ctx_smtp', 'sendmail',
'read_file_as_attachment', 'read_textfile_as_attachment',
'make_image', 'attch_image_cites',
]
# Reference:
# - [smtplib](https://docs.python.org/3/library/smtplib.html)
# - [email](https://docs.python.org/3/library/email.html)
# - [email: Examples](https://docs.python.org/3/library/email.examples.html)
from contextlib import contextmanager
from email.header import Header
from email.message import Message
from email.mime.application import MIMEApplication
from email.mime.image import MIMEImage
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.utils import formataddr
from functools import partial, update_wrapper
from os import PathLike
from os.path import basename
from re import compile as re_compile, Pattern
from sys import getfilesystemencoding
from smtplib import SMTP, SMTP_SSL
from typing import (
cast, Any, Dict, Final, Iterable, Optional, Sequence,
SupportsBytes, Tuple, Union,
)
# 下面是 邮件地址 的几种可能的模式
MAIL_ADDR_CRE: Final[Tuple[Pattern, ...]] = (
re_compile(r'\s*<(?P<addr>(?P<name>[^@]+)@[^> ]+)'),
re_compile(r'\s*(?P<name>[^<]+)\s*<(?P<addr>[^> ]+)'),
re_compile(r'\s*(?P<addr>(?P<name>[^@]+)@\S+)'),
)
DEAULT_ATTACHMENT_FILENAME_CHARSET: Final[str] = 'gb18030'
DEAULT_ATTACHMENT_FILENAME_LANGUAGE: Final[str] = ''
def extract_addr(mail_addr: str, /) -> str:
'尝试从一个邮件地址中提取并返回形如 x@y 的部分,如果失败则原样返回'
for cre in MAIL_ADDR_CRE:
match = cre.match(mail_addr)
if match is not None:
return match['addr']
return mail_addr
def format_address(mail_addr: str, /) -> str:
'''尝试把一个邮件地址转换成形如'x <y@z>'的格式,如果失败则原样返回'''
for cre in MAIL_ADDR_CRE:
match = cre.match(mail_addr)
if match is not None:
break
else:
return mail_addr
return formataddr((match['name'], match['addr']))
def make_addresses(addrs: Union[str, Sequence[str]], /) -> str:
'''尝试把一批邮件地址进行格式化,并用';'连接成一个字符串'''
if isinstance(addrs, str):
return format_address(addrs)
return ';'.join(map(format_address, addrs))
def make_message(
message: Dict[str, Any],
content_type: str = 'plain',
encoding: str = 'utf-8',
/,
**params,
) -> MIMEMultipart:
'''创建一个邮件内容对象。
:param message: 邮件的具体内容。
`message`可以有以下这些键名(区分大小写):
From: 发送者
# type: Union[email.header.Header, str]
To: 接收者(1个或多个)
# type: Union[email.header.Header, Sequence[str]]
Cc: 抄送者(1个或多个)(可缺省)
# type: Union[email.header.Header, Sequence[str]]
Subject: 主题(可缺省)
# type: Union[email.header.Header, str]
Content: 正文(可缺省)
# type: Union[email.message.Message, str]
Attachments: 附件(1个或多个)(可缺省)
# type: Sequence[Union[
email.message.Message,
Tuple[Union[str, bytes], Union[str, Tuple[str, str, str]]]
]]
:param content_type: 文本 MIME 类型,会构造为'text/{content_type}'。
:param encoding: 文本编码。
:params params: 其它可以传给`email.mime.multipart.MIMEMultipart`构造方法的参数。
:return: 根据`message`创建的一个`email.mime.multipart.MIMEMultipart`实例。
'''
msg = MIMEMultipart(**params)
msg_from = message['From']
if not isinstance(msg_from, Header):
msg['From'] = Header(make_addresses(msg_from), encoding)
msg_to = message['To']
if not isinstance(msg_to, Header):
msg['To'] = Header(make_addresses(msg_to), encoding)
msg_cc = message.get('Cc')
if msg_cc is not None and not isinstance(msg_cc, Header):
msg['Cc'] = Header(make_addresses(msg_cc), encoding)
msg_subject = message.get('Subject')
if msg_subject is not None and not isinstance(msg_subject, Header):
msg['Subject'] = Header(msg_subject, encoding)
msg_content = message.get('Content')
if msg_content is not None:
if not isinstance(msg_content, Message):
msg_content = MIMEText(msg_content, content_type, encoding)
msg.attach(msg_content)
msg_attachments = message.get('Attachments')
if msg_attachments is not None:
for arg in msg_attachments:
if not isinstance(arg, Message):
attachment = make_attachment(*arg)
msg.attach(attachment)
return msg
def make_attachment(
filedata: Union[bytes, str, SupportsBytes],
filename: Union[str, Tuple[str, str, str]],
/,
filename_charset: str = DEAULT_ATTACHMENT_FILENAME_CHARSET,
filename_language: str = DEAULT_ATTACHMENT_FILENAME_LANGUAGE,
**params,
) -> MIMEApplication:
'''创建一个附件内容对象。
:params filedata: 文件的数据。
:params filename: 文件名。如果是`str`,那么会被视为(`filename_charset`, `filename_language`, `filename`)。
:params filename_charset: 文件名的编码。
:params filename_language: 文件名的语言。
:params params: 其它可以传给`email.mime.application.MIMEApplication`构造方法的参数。
:return: 根据参数信息创建的一个`email.mime.application.MIMEApplication`实例。
'''
if isinstance(filedata, str):
filedata = filedata.encode()
elif not isinstance(filedata, bytes):
filedata = bytes(filedata)
attachment = MIMEApplication(filedata, **params)
if isinstance(filename, str):
filename = (filename_charset, filename_language, filename)
attachment.add_header('Content-Type', 'application/octet-stream')
attachment.add_header('Content-Disposition', 'attachment',
filename=filename)
return attachment
@contextmanager
def ctx_smtp(
sender: Optional[str] = None,
password: str = '',
host = None,
port: int = None,
with_ssl: bool = False,
):
'''上下文管理器。
__enter__:创建一个 SMTP 客户端,登录(如果需要),上下文管理器返回`smtplib.SMTP`实例;
__exit__: 退出登录(如果在`__enter__`里登录过的话),关闭 SMTP 客户端;
:param sender: 邮件发送者的地址或者名称,也是登录 SMTP 服务器的账号,如果为 None,那么不登录,
并且使用缺省值:'anonymous'。
:param password: 邮件发送者登录SMTP 服务的服务器的口令。
:param host: SMTP 服务的服务器地址。
:param port: SMTP 服务的服务器端口,如果是 None,那么自动指定:
如果`with_ssl`为 False,端口是 25;
如果`with_ssl`为 True,端口是 465;
:param with_ssl: 如果为 True,SMTP 客户端是`smtplib.SMTP_SSL`对象,否则是`smtplib.SMTP`对象。
'''
cls = SMTP_SSL if with_ssl else SMTP
if port is None:
port = 465 if with_ssl else 25
if host is None:
smtp = cls()
else:
smtp = cls(host, port)
try:
if sender is None:
yield (smtp, partial(sendmail, from_addr='anonymous', smtp=smtp))
else:
smtp.login(extract_addr(sender), password)
try:
yield (smtp, partial(sendmail, from_addr=sender, smtp=smtp))
finally:
smtp.quit()
finally:
smtp.close()
def _ensure_smtp(func):
def wrapper(*args, smtp=None, **kwargs):
if smtp is None:
with ctx_smtp(host='localhost', port=0) as (smtp, _):
return func(*args, smtp=smtp, **kwargs)
else:
func(*args, smtp=smtp, **kwargs)
return update_wrapper(wrapper, func)
@_ensure_smtp
def sendmail(
to_addrs: Union[str, Sequence[str]],
*,
from_addr: str = 'anonymous',
message: Union[bytes, str, Message, Dict[str, Any]] = {'Subject': 'test'},
content_type: str = 'plain',
attachments: Optional[Sequence[Any]] = None,
smtp: Optional[SMTP] = None,
**kwargs,
) -> None:
'''发送邮件到一批指定的邮件地址。
:param to_addrs: 接收邮件的地址或地址的序列。
:param from_addr: 发送邮件的地址或者发送者名称。
:param message: 邮件内容。
如果是`bytes`或`str`,那么会作为邮件内容发送出去,即使有`attachments`也不会添加。
如果是`email.message.Message`,那么如果有`attachments`就会添加上去,
然后作为最终邮件内容发送出去。如果是`dict`,那么会补全缺失信息,并添加`attachments`,
再转换成`email.message.Message`,然后作为最终邮件内容发送出去。
作为字典可以有以下这些键名(区分大小写):
From: 发送者(缺省时设为`from_addr`)
To: 接收者(1个或多个)(缺省时设为`to_addrs`)
Cc: 抄送者(1个或多个)(可缺省)
Subject: 主题(可缺省)
Content: 正文(可缺省)
Attachments: 附件(1个或多个)(可缺省)
:param content_type: 正文的文本类型,最终会视为 text/{content_type} 的 MIME 类型。
:param attachments: 附件的序列,只要能添加,就会添加到 message 中。
附件可能是以下格式之一:
如果是`email.message.Message`实例,则作为附件对象直接使用;
如果模式是 (filedata, filename),则传给`make_attachment`,创建附件对象;
如果模式是 filedata,则会分配一个 filename,然后传给`make_attachment`,创建附件对象;
:param smtp: 预期是一个`smtplib.SMTP`实例,如果未提供,则会尝试使用系统中邮件服务,
此时系统中可能启动了诸如 sendmail、postfix、dovecot 这类程序。
'''
smtp = cast(SMTP, smtp)
if not to_addrs:
raise ValueError('`to_addrs`不能为空')
from_addr_: str = extract_addr(from_addr)
to_addrs_: Tuple[str, ...]
if isinstance(to_addrs, str):
to_addrs_ = (extract_addr(to_addrs),)
else:
to_addrs_ = tuple(map(extract_addr, to_addrs))
if isinstance(message, (bytes, str)):
smtp.sendmail(from_addr_, to_addrs_, message, **kwargs)
return
elif isinstance(message, Message):
msg = message
elif isinstance(message, dict):
message.setdefault('From', from_addr)
message.setdefault('To', to_addrs)
msg = make_message(message, content_type)
else:
raise TypeError(f'`message`不支持的邮件类型:{type(message)}')
if attachments is not None:
for i, attachment in enumerate(attachments, 1):
if type(attachment) is tuple:
args = attachment
attachment = make_attachment(*args)
elif not isinstance(attachment, Message):
data = attachment
filename = '%d.%s' % (i, ('data', 'txt')[isinstance(data, str)])
attachment = make_attachment(data, filename)
msg.attach(attachment)
smtp.sendmail(from_addr_, to_addrs_, msg.as_string(), **kwargs)
def read_file_as_attachment(
path: Union[str, PathLike],
filename: Union[None, str, Tuple[str, str, str]] = None,
filename_charset: str = DEAULT_ATTACHMENT_FILENAME_CHARSET,
filename_language: str = DEAULT_ATTACHMENT_FILENAME_LANGUAGE,
) -> MIMEApplication:
'读取一个文件作为附件'
if filename is None:
filename = basename(path)
return make_attachment(
open(path, 'rb').read(),
filename,
filename_charset=filename_charset,
filename_language=filename_language,
)
def read_textfile_as_attachment(
path: Union[str, PathLike],
filename: Union[None, str, Tuple[str, str, str]] = None,
filename_charset: str = DEAULT_ATTACHMENT_FILENAME_CHARSET,
filename_language: str = DEAULT_ATTACHMENT_FILENAME_LANGUAGE,
) -> MIMEText:
'读取一个文本文件作为附件'
if filename is None:
filename = basename(path)
attachment = MIMEText(
open(path, 'rb').read(), # type: ignore
'base64',
getfilesystemencoding(),
)
if isinstance(filename, str):
filename = (filename_charset, filename_language, filename)
attachment.add_header('Content-Type', 'application/octet-stream')
attachment.add_header('Content-Disposition', 'attachment',
filename=filename)
return attachment
def make_image(
image: Union[bytes, str, PathLike],
cid: Any = None,
) -> MIMEImage:
'创建一个图片对象,或可作为引用(`cid`不是`None`)'
if not isinstance(image, bytes):
image = open(image, 'rb').read()
mime_image = MIMEImage(image)
if cid is None:
mime_image.add_header('Content-ID', '<%s>' % cid)
return mime_image
def attch_image_cites(
cid_image_pairs: Iterable[Tuple[Any, Union[bytes, str, PathLike]]],
content_root: Optional[MIMEMultipart] = None,
):
'使用一批图片作为引用,并附加到一个内容对象上'
if content_root is None:
content_root = MIMEMultipart('alternative')
for cid, image in cid_image_pairs:
mime_image = make_image(image, cid)
content_root.attach(mime_image)
return content_root
往期推荐