Add OAuth 2.0 support for email sending

This commit is contained in:
Pavel Ferencz 2026-02-03 22:09:37 +02:00
parent c3b73ce6b1
commit 3052302e56
6 changed files with 701 additions and 26 deletions

View File

@ -0,0 +1,187 @@
#!/usr/bin/env python
# License: GPLv3 Copyright: 2026, Kovid Goyal <kovid at kovidgoyal.net>
from qt.core import (
QApplication, QComboBox, QDialog, QEvent, QFormLayout, QGroupBox,
QHBoxLayout, QLabel, QProgressDialog, QPushButton, Qt, QTextBrowser, QVBoxLayout,
)
from calibre.gui2 import error_dialog, info_dialog
class OAuth2SetupDialog(QDialog):
GMAIL_DOMAINS = ('@gmail.com', '@googlemail.com')
OUTLOOK_DOMAINS = ('@outlook.com', '@hotmail.com', '@live.com', '@msn.com')
def __init__(self, parent, provider='gmail', email=''):
super().__init__(parent)
self.setWindowTitle(_('Setup OAuth 2.0 Authentication'))
self.resize(550, 400)
self.tokens = None
self.email_address = email
from calibre.utils.oauth2 import get_available_providers
self.available_providers = get_available_providers()
self.provider, self.provider_detected = self._detect_provider(email, provider)
self.setup_ui()
def _detect_provider(self, email, fallback='gmail'):
from calibre.utils.oauth2 import is_provider_available
available_names = [p[0] for p in self.available_providers]
email_lower = email.lower()
if any(email_lower.endswith(d) for d in self.GMAIL_DOMAINS):
if is_provider_available('gmail'):
return 'gmail', True
elif any(email_lower.endswith(d) for d in self.OUTLOOK_DOMAINS):
if is_provider_available('outlook'):
return 'outlook', True
if fallback in available_names:
return fallback, False
if available_names:
return available_names[0], False
return fallback, False
def setup_ui(self):
layout = QVBoxLayout(self)
header = QLabel(_('<h3>Setup OAuth 2.0 Authentication</h3>'
'<p>OAuth 2.0 is the recommended authentication method for Gmail and Outlook. '
'It is more secure than using passwords and does not require app-specific passwords.</p>'))
header.setWordWrap(True)
layout.addWidget(header)
provider_group = QGroupBox(_('Email Provider'))
provider_layout = QFormLayout(provider_group)
self.provider_combo = QComboBox()
for provider_id, provider_display in self.available_providers:
self.provider_combo.addItem(_(provider_display), provider_id)
idx = self.provider_combo.findData(self.provider)
if idx >= 0:
self.provider_combo.setCurrentIndex(idx)
if self.provider_detected:
provider_name = {'gmail': 'Gmail', 'outlook': 'Outlook'}.get(self.provider, self.provider)
self.provider_label = QLabel(f'<b>{provider_name}</b>')
provider_layout.addRow(_('Provider:'), self.provider_label)
self.provider_combo.hide()
elif len(self.available_providers) > 1:
self.provider_note = QLabel(_('<i>Could not detect provider from email domain.</i>'))
self.provider_note.setWordWrap(True)
provider_layout.addRow(self.provider_note)
provider_layout.addRow(_('Provider:'), self.provider_combo)
elif len(self.available_providers) == 1:
provider_name = self.available_providers[0][1]
self.provider_label = QLabel(f'<b>{provider_name}</b>')
provider_layout.addRow(_('Provider:'), self.provider_label)
self.provider_combo.hide()
layout.addWidget(provider_group)
instructions_group = QGroupBox(_('Instructions'))
instructions_layout = QVBoxLayout(instructions_group)
instructions = QTextBrowser()
instructions.setOpenExternalLinks(False)
instructions.setMaximumHeight(120)
instructions.setHtml(_('<ol>'
'<li>Click "Authorize" below</li>'
'<li>Your web browser will open to the provider\'s login page</li>'
'<li>Sign in and grant calibre permission to send email</li>'
'<li>The browser will redirect back automatically</li>'
'</ol>'
'<p><b>Note:</b> Calibre never sees your password.</p>'))
instructions_layout.addWidget(instructions)
layout.addWidget(instructions_group)
self.status_label = QLabel()
self.status_label.setWordWrap(True)
layout.addWidget(self.status_label)
button_layout = QHBoxLayout()
button_layout.addStretch()
self.authorize_button = QPushButton(_('Authorize'))
self.authorize_button.setDefault(True)
self.authorize_button.clicked.connect(self.start_oauth_flow)
button_layout.addWidget(self.authorize_button)
self.cancel_button = QPushButton(_('Cancel'))
self.cancel_button.clicked.connect(self.reject)
button_layout.addWidget(self.cancel_button)
layout.addLayout(button_layout)
def get_selected_provider(self):
return self.provider if self.provider_detected else self.provider_combo.currentData()
def start_oauth_flow(self):
provider = self.get_selected_provider()
self.authorize_button.setEnabled(False)
self.cancel_button.setEnabled(False)
self.status_label.setText(_('Starting authorization...'))
progress = QProgressDialog(
_('Waiting for authorization in browser...\n\nThis dialog will close automatically when done.'),
_('Cancel'), 0, 0, self)
progress.setWindowTitle(_('OAuth Authorization'))
progress.setWindowModality(Qt.WindowModality.WindowModal)
progress.setMinimumDuration(0)
progress.setValue(0)
from threading import Thread
class FlowCompleteEvent(QEvent):
EVENT_TYPE = QEvent.Type(QEvent.registerEventType())
def __init__(self, success, error_msg):
super().__init__(self.EVENT_TYPE)
self.success = success
self.error_msg = error_msg
self.FlowCompleteEvent = FlowCompleteEvent
def run_flow():
try:
from calibre.utils.oauth2 import OAuth2Error, start_oauth_flow as do_oauth_flow
self.tokens = do_oauth_flow(provider)
QApplication.instance().postEvent(self, FlowCompleteEvent(True, None))
except OAuth2Error as e:
QApplication.instance().postEvent(self, FlowCompleteEvent(False, str(e)))
except Exception as e:
QApplication.instance().postEvent(self, FlowCompleteEvent(False, str(e)))
def handle_flow_complete(event):
progress.close()
if event.success:
self.status_label.setText(_('<b style="color: green;">Authorization successful!</b>'))
info_dialog(self, _('Success'), _('OAuth 2.0 authorization was successful!'), show=True)
self.accept()
else:
self.status_label.setText(_('<b style="color: red;">Authorization failed</b>'))
self.authorize_button.setEnabled(True)
self.cancel_button.setEnabled(True)
error_dialog(self, _('Authorization Failed'),
_('OAuth 2.0 authorization failed:\n\n{error}').format(error=event.error_msg or _('Unknown error')), show=True)
self.handle_flow_complete = handle_flow_complete
def cancel_flow():
progress.close()
self.status_label.setText(_('Authorization cancelled'))
self.authorize_button.setEnabled(True)
self.cancel_button.setEnabled(True)
progress.canceled.connect(cancel_flow)
Thread(target=run_flow, daemon=True).start()
def event(self, e):
if hasattr(self, 'FlowCompleteEvent') and isinstance(e, self.FlowCompleteEvent):
self.handle_flow_complete(e)
return True
return super().event(e)
def get_tokens(self):
return self.tokens
if __name__ == '__main__':
from calibre.gui2 import Application
app = Application([])
d = OAuth2SetupDialog(None, provider='gmail', email='test@gmail.com')
d.exec()
print('Got tokens:', d.tokens.keys() if d.tokens else 'None')

View File

@ -128,13 +128,30 @@ class Sendmail:
# Microsoft changed the SMTP server
relay = 'smtp-mail.outlook.com'
oauth_token = None
password = from_hex_unicode(opts.relay_password) if opts.relay_password else None
if getattr(opts, 'auth_method', 'password') == 'oauth2':
from calibre.utils.oauth2 import get_token_manager, OAuth2Error
token_mgr = get_token_manager(opts.oauth_provider, opts.oauth_tokens)
try:
token_data = token_mgr.get_valid_token()
oauth_token = token_data['access_token']
# Save if refreshed
if token_data != opts.oauth_tokens:
email_config().set('oauth_tokens', token_data)
except OAuth2Error as e:
log.error(f'OAuth token refresh failed: {e}')
raise Exception(f'OAuth authentication failed: {e}. Please re-authorize in Preferences->Sharing->Sharing email')
sendmail(msg, efrom, eto, localhost=None,
verbose=1,
relay=relay,
username=opts.relay_username,
password=from_hex_unicode(opts.relay_password), port=opts.relay_port,
password=password, port=opts.relay_port,
encryption=opts.encryption,
debug_output=safe_debug)
debug_output=safe_debug,
oauth_token=oauth_token)
finally:
self.last_send_time = time.time()

View File

@ -179,6 +179,8 @@ class SendEmail(QWidget, Ui_Form):
def __init__(self, parent=None):
QWidget.__init__(self, parent)
self.setupUi(self)
self.oauth_button = None
self.oauth_status_label = None
def initialize(self, preferred_to_address):
self.preferred_to_address = preferred_to_address
@ -209,6 +211,7 @@ class SendEmail(QWidget, Ui_Form):
QLineEdit.EchoMode.Password if
state == 0 else QLineEdit.EchoMode.Normal))
self.test_email_button.clicked.connect(self.test_email)
self.setup_oauth_ui()
def changed(self, *args):
self.changed_signal.emit()
@ -232,13 +235,26 @@ class SendEmail(QWidget, Ui_Form):
sys.stdout = sys.stderr = buf
tb = None
try:
oauth_token = None
password = from_hex_unicode(opts.relay_password) if opts.relay_password else None
if getattr(opts, 'auth_method', 'password') == 'oauth2':
from calibre.utils.oauth2 import get_token_manager, OAuth2Error
token_mgr = get_token_manager(opts.oauth_provider, opts.oauth_tokens)
try:
token_data = token_mgr.get_valid_token()
oauth_token = token_data['access_token']
except OAuth2Error as e:
raise Exception(f'OAuth token refresh failed: {e}')
msg = create_mail(opts.from_, to, 'Test mail from calibre',
'Test mail from calibre')
sendmail(msg, from_=opts.from_, to=[to],
verbose=3, timeout=30, relay=opts.relay_host,
username=opts.relay_username, debug_output=debug_out,
password=from_hex_unicode(opts.relay_password),
encryption=opts.encryption, port=opts.relay_port)
password=password,
encryption=opts.encryption, port=opts.relay_port,
oauth_token=oauth_token)
except Exception:
import traceback
tb = traceback.format_exc()
@ -298,6 +314,92 @@ class SendEmail(QWidget, Ui_Form):
self.relay_port.setValue(service['port'])
self.relay_tls.setChecked(True)
def setup_oauth_ui(self):
from calibre.utils.oauth2 import get_available_providers
if not get_available_providers():
return
self.oauth_button = QPushButton(_('Setup OAuth 2.0...'))
self.oauth_button.clicked.connect(self.oauth_button_clicked)
self.oauth_status_label = QLabel()
self.oauth_status_label.setWordWrap(True)
self.update_oauth_status()
self.verticalLayout_9.addWidget(self.oauth_button)
self.verticalLayout_9.addWidget(self.oauth_status_label)
def oauth_button_clicked(self):
opts = smtp_prefs().parse()
if getattr(opts, 'auth_method', 'password') == 'oauth2' and opts.oauth_provider:
self.clear_oauth()
else:
self.setup_oauth()
def update_oauth_status(self, *args):
if not self.oauth_status_label:
return
opts = smtp_prefs().parse()
if getattr(opts, 'auth_method', 'password') == 'oauth2' and opts.oauth_provider:
provider_name = {'gmail': 'Gmail', 'outlook': 'Outlook'}.get(opts.oauth_provider, opts.oauth_provider)
self.oauth_status_label.setText(_('Authorized for {}').format(provider_name))
self.oauth_status_label.setStyleSheet('color: green; font-weight: bold;')
self.oauth_button.setText(_('Clear Authorization'))
else:
self.oauth_status_label.setText('')
self.oauth_status_label.setStyleSheet('')
self.oauth_button.setText(_('Setup OAuth 2.0...'))
def clear_oauth(self):
if not question_dialog(self, _('Clear OAuth Authorization?'),
_('Remove stored OAuth tokens and switch to password authentication?')):
return
conf = smtp_prefs()
conf.set('auth_method', 'password')
conf.set('oauth_provider', None)
conf.set('oauth_tokens', None)
self.update_oauth_status()
self.changed()
def detect_oauth_provider(self, email):
from calibre.utils.oauth2 import get_available_providers, is_provider_available
email_lower = email.lower()
if '@gmail.com' in email_lower or '@googlemail.com' in email_lower:
if is_provider_available('gmail'):
return 'gmail'
elif any(d in email_lower for d in ('@outlook.com', '@hotmail.com', '@live.com', '@msn.com')):
if is_provider_available('outlook'):
return 'outlook'
available = get_available_providers()
return available[0][0] if available else None
def setup_oauth(self):
from calibre.gui2.dialogs.oauth_setup import OAuth2SetupDialog
email = str(self.email_from.text()).strip()
if not email:
error_dialog(self, _('Email Required'),
_('Please enter your email address first.')).exec()
return
d = OAuth2SetupDialog(self, provider=self.detect_oauth_provider(email), email=email)
if d.exec() != QDialog.DialogCode.Accepted:
return
provider = d.get_selected_provider()
conf = smtp_prefs()
conf.set('auth_method', 'oauth2')
conf.set('oauth_provider', provider)
conf.set('oauth_tokens', d.get_tokens())
if provider == 'gmail':
conf.set('relay_host', 'smtp.gmail.com')
self.relay_host.setText('smtp.gmail.com')
elif provider == 'outlook':
conf.set('relay_host', 'smtp-mail.outlook.com')
self.relay_host.setText('smtp-mail.outlook.com')
conf.set('relay_port', 587)
conf.set('encryption', 'TLS')
conf.set('relay_username', email)
self.relay_port.setValue(587)
self.relay_tls.setChecked(True)
self.relay_username.setText(email)
self.update_oauth_status()
self.changed()
def set_email_settings(self, to_set):
from_ = str(self.email_from.text()).strip()
if to_set and not from_:
@ -309,18 +411,23 @@ class SendEmail(QWidget, Ui_Form):
host = str(self.relay_host.text()).strip()
enc_method = ('TLS' if self.relay_tls.isChecked() else 'SSL'
if self.relay_ssl.isChecked() else 'NONE')
opts = smtp_prefs().parse()
auth_method = getattr(opts, 'auth_method', 'password')
if host:
# Validate input
if ((username and not password) or (not username and password)):
error_dialog(self, _('Bad configuration'),
_('You must either set both the username <b>and</b> password for '
'the mail server or no username and no password at all.')).exec()
return False
if not (username and password) and not question_dialog(
self, _('Are you sure?'),
_('No username and password set for mailserver. Most '
' mailservers need a username and password. Are you sure?')):
return False
# Validate input - skip password validation for OAuth
if auth_method != 'oauth2':
if ((username and not password) or (not username and password)):
error_dialog(self, _('Bad configuration'),
_('You must either set both the username <b>and</b> password for '
'the mail server or no username and no password at all.')).exec()
return False
if not (username and password) and not question_dialog(
self, _('Are you sure?'),
_('No username and password set for mailserver. Most '
' mailservers need a username and password. Are you sure?')):
return False
conf = smtp_prefs()
conf.set('from_', from_)
conf.set('relay_host', host if host else None)

332
src/calibre/utils/oauth2.py Normal file
View File

@ -0,0 +1,332 @@
#!/usr/bin/env python
# License: GPLv3 Copyright: 2026, Kovid Goyal <kovid at kovidgoyal.net>
'''
OAuth 2.0 authentication for SMTP email sending (PKCE flow).
'''
import base64
import hashlib
import json
import os
import socket
import threading
import time
import webbrowser
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import parse_qs, urlencode, urlparse
from calibre import browser as get_browser
class OAuth2Error(Exception):
pass
def generate_pkce_pair():
code_verifier = base64.urlsafe_b64encode(os.urandom(32)).decode('utf-8').rstrip('=')
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode('utf-8')).digest()).decode('utf-8').rstrip('=')
return code_verifier, code_challenge
def generate_xoauth2_string(email, access_token):
# Format: user={email}\x01auth=Bearer {token}\x01\x01
auth_string = f'user={email}\x01auth=Bearer {access_token}\x01\x01'
return base64.b64encode(auth_string.encode('utf-8')).decode('utf-8')
def find_available_port(start_port=8080, max_attempts=10):
for port in range(start_port, start_port + max_attempts):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('localhost', port))
return port
except OSError:
continue
return None
class OAuth2Provider:
name = None
client_id = None
client_secret = None
auth_url = None
token_url = None
scopes = []
def __init__(self, redirect_uri=None):
self.redirect_uri = redirect_uri or 'http://localhost:8080/oauth2callback'
def get_authorization_url(self, state, code_challenge):
params = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'response_type': 'code',
'scope': ' '.join(self.scopes),
'state': state,
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
'access_type': 'offline',
'prompt': 'consent',
}
return f'{self.auth_url}?{urlencode(params)}'
def exchange_code_for_tokens(self, code, code_verifier):
br = get_browser()
data = {
'client_id': self.client_id,
'code': code,
'code_verifier': code_verifier,
'grant_type': 'authorization_code',
'redirect_uri': self.redirect_uri,
}
if self.client_secret is not None:
data['client_secret'] = self.client_secret
try:
response = br.open_novisit(self.token_url, data=urlencode(data).encode('utf-8'), timeout=30)
result = json.loads(response.read())
if 'error' in result:
error_msg = result.get('error_description', result['error'])
if 'redirect_uri_mismatch' in result.get('error', ''):
error_msg += f'\nRedirect URI: {self.redirect_uri}'
elif 'invalid_client' in result.get('error', ''):
error_msg += f'\nClient ID: {self.client_id}'
raise OAuth2Error(f'Token exchange failed: {error_msg}')
result['expires_at'] = int(time.time()) + result.get('expires_in', 3600)
return result
except Exception as e:
if isinstance(e, OAuth2Error):
raise
error_body = ''
if hasattr(e, 'read'):
try:
error_body = e.read()
if isinstance(error_body, bytes):
error_body = error_body.decode('utf-8', 'replace')
except Exception:
pass
if error_body:
try:
err_json = json.loads(error_body)
error_details = err_json.get('error_description') or err_json.get('error') or error_body
except json.JSONDecodeError:
error_details = error_body
else:
error_details = str(e)
raise OAuth2Error(f'Token exchange failed: {error_details}')
def refresh_access_token(self, refresh_token):
br = get_browser()
data = {
'client_id': self.client_id,
'refresh_token': refresh_token,
'grant_type': 'refresh_token',
}
if self.client_secret is not None:
data['client_secret'] = self.client_secret
try:
response = br.open_novisit(self.token_url, data=urlencode(data).encode('utf-8'), timeout=30)
result = json.loads(response.read())
if 'error' in result:
raise OAuth2Error(f'Token refresh failed: {result.get("error_description", result["error"])}')
result['expires_at'] = int(time.time()) + result.get('expires_in', 3600)
if 'refresh_token' not in result:
result['refresh_token'] = refresh_token
return result
except OAuth2Error:
raise
except Exception as e:
raise OAuth2Error(f'Token refresh failed: {e}')
class GmailOAuth2Provider(OAuth2Provider):
name = 'gmail'
client_id = os.environ.get('CALIBRE_GMAIL_CLIENT_ID', '410090137009-6f4jgjvsvcmtr1kqbtcro2j76rb7pio4.apps.googleusercontent.com')
client_secret = os.environ.get('CALIBRE_GMAIL_CLIENT_SECRET', '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')
auth_url = 'https://accounts.google.com/o/oauth2/v2/auth'
token_url = 'https://oauth2.googleapis.com/token'
scopes = ['https://mail.google.com/']
class OutlookOAuth2Provider(OAuth2Provider):
name = 'outlook'
client_id = os.environ.get('CALIBRE_OUTLOOK_CLIENT_ID', '') # register at portal.azure.com
client_secret = os.environ.get('CALIBRE_OUTLOOK_CLIENT_SECRET')
auth_url = 'https://login.microsoftonline.com/common/oauth2/v2.0/authorize'
token_url = 'https://login.microsoftonline.com/common/oauth2/v2.0/token'
scopes = ['https://outlook.office.com/SMTP.Send', 'offline_access']
PROVIDERS = {
'gmail': GmailOAuth2Provider,
'outlook': OutlookOAuth2Provider,
}
PROVIDER_DISPLAY_NAMES = {'gmail': 'Gmail', 'outlook': 'Outlook/Hotmail/Office 365'}
def get_provider(provider_name):
if provider_name not in PROVIDERS:
raise OAuth2Error(f'Unknown OAuth provider: {provider_name}')
return PROVIDERS[provider_name]()
def get_available_providers():
return [(name, PROVIDER_DISPLAY_NAMES.get(name, name))
for name, cls in PROVIDERS.items() if cls.client_id]
def is_provider_available(provider_name):
return provider_name in PROVIDERS and bool(PROVIDERS[provider_name].client_id)
class OAuth2CallbackHandler(BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path != '/oauth2callback':
self.send_error(404)
return
query = parse_qs(parsed.query)
if 'error' in query:
self.server.oauth_error = query['error'][0]
error_desc = query.get('error_description', [''])[0]
html = f'''
<html>
<head>
<meta charset="UTF-8">
<title>Authorization Failed</title>
<style>
body {{ font-family: sans-serif; text-align: center; padding: 50px; }}
h1 {{ color: #c62828; }}
</style>
</head>
<body>
<h1>Authorization Failed</h1>
<p>Error: {self.server.oauth_error}</p>
<p>{error_desc}</p>
<p>You can close this window.</p>
</body>
</html>
'''
elif 'code' in query and 'state' in query:
self.server.authorization_code = query['code'][0]
self.server.state = query['state'][0]
html = '''
<html>
<head>
<meta charset="UTF-8">
<title>Authorization Successful</title>
<style>
body { font-family: sans-serif; text-align: center; padding: 50px; }
h1 { color: #2e7d32; }
</style>
</head>
<body>
<h1>Authorization Successful!</h1>
<p>You can close this window and return to calibre.</p>
</body>
</html>
'''
else:
self.send_error(400, 'Missing required parameters')
return
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(html.encode('utf-8'))
threading.Thread(target=self.server.shutdown).start()
class OAuth2BrowserFlow:
def __init__(self, provider, start_port=8080):
self.provider = provider
self.start_port = start_port
def start_authorization_flow(self, timeout=300):
port = find_available_port(start_port=self.start_port)
if port is None:
raise OAuth2Error(f'No available port for OAuth callback (tried {self.start_port}-{self.start_port + 9})')
self.provider.redirect_uri = f'http://localhost:{port}/oauth2callback'
code_verifier, code_challenge = generate_pkce_pair()
state = base64.urlsafe_b64encode(os.urandom(32)).decode('utf-8').rstrip('=')
auth_url = self.provider.get_authorization_url(state, code_challenge)
server = HTTPServer(('localhost', port), OAuth2CallbackHandler)
server.oauth_error = None
server.authorization_code = None
server.state = None
if not webbrowser.open(auth_url):
raise OAuth2Error(f'Failed to open browser. Please open: {auth_url}')
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
start_time = time.time()
while server_thread.is_alive() and (time.time() - start_time) < timeout:
time.sleep(0.5)
if server_thread.is_alive():
server.shutdown()
raise OAuth2Error(f'Authorization timeout after {timeout} seconds')
if server.oauth_error:
raise OAuth2Error(f'Authorization failed: {server.oauth_error}')
if not server.authorization_code:
raise OAuth2Error('No authorization code received')
if server.state != state:
raise OAuth2Error('State mismatch - possible CSRF attack')
return self.provider.exchange_code_for_tokens(server.authorization_code, code_verifier)
class OAuth2TokenManager:
def __init__(self, provider, tokens=None):
self.provider = provider
self.tokens = tokens or {}
def get_valid_token(self):
if not self.tokens:
raise OAuth2Error('No tokens available. Please authorize first.')
expires_at = self.tokens.get('expires_at', 0)
if time.time() >= (expires_at - 300): # 5 min buffer
if 'refresh_token' not in self.tokens:
raise OAuth2Error('No refresh token available. Please re-authorize.')
self.tokens = self.provider.refresh_access_token(self.tokens['refresh_token'])
return self.tokens
def is_valid(self):
if not self.tokens or 'access_token' not in self.tokens:
return False
return time.time() < (self.tokens.get('expires_at', 0) - 300)
def start_oauth_flow(provider_name, start_port=8080):
flow = OAuth2BrowserFlow(get_provider(provider_name), start_port=start_port)
return flow.start_authorization_flow()
def get_token_manager(provider_name, tokens=None):
return OAuth2TokenManager(get_provider(provider_name), tokens)
def refresh_token_if_needed(provider_name, tokens):
token_mgr = get_token_manager(provider_name, tokens)
new_tokens = token_mgr.get_valid_token()
return new_tokens, new_tokens != tokens

View File

@ -123,7 +123,7 @@ def sendmail_direct(from_, to, msg, timeout, localhost, verbose,
debug_output=None):
from email.message import Message
from polyglot import smtplib
from calibre.utils import smtplib
hosts = get_mx(to.split('@')[-1].strip(), verbose)
timeout=None # Non blocking sockets sometimes don't work
kwargs = dict(timeout=timeout, local_hostname=sanitize_hostname(localhost or safe_localhost()))
@ -154,7 +154,7 @@ def get_smtp_class(use_ssl=False, debuglevel=0):
# in the constructor, because of https://bugs.python.org/issue36094
# which means the constructor calls connect(),
# but there is no way to set debuglevel before connect() is called
from polyglot import smtplib
from calibre.utils import smtplib
cls = smtplib.SMTP_SSL if use_ssl else smtplib.SMTP
bases = (cls,)
return type('SMTP', bases, {'debuglevel': debuglevel})
@ -162,7 +162,7 @@ def get_smtp_class(use_ssl=False, debuglevel=0):
def sendmail(msg, from_, to, localhost=None, verbose=0, timeout=None,
relay=None, username=None, password=None, encryption='TLS',
port=-1, debug_output=None, verify_server_cert=False, cafile=None):
port=-1, debug_output=None, verify_server_cert=False, cafile=None, oauth_token=None):
from email.message import Message
if relay is None:
for x in to:
@ -183,7 +183,9 @@ def sendmail(msg, from_, to, localhost=None, verbose=0, timeout=None,
context = ssl.create_default_context(cafile=cafile)
s.starttls(context=context)
s.ehlo()
if username is not None and password is not None:
if oauth_token is not None:
s.login(username, oauth_token, preferred_auths=['XOAUTH2'])
elif username is not None and password is not None:
s.login(username, password)
ret = None
try:
@ -352,6 +354,9 @@ def config(defaults=None):
c.add_opt('relay_username')
c.add_opt('relay_password')
c.add_opt('encryption', default='TLS', choices=['TLS', 'SSL'])
c.add_opt('auth_method', default='password', choices=['password', 'oauth2'])
c.add_opt('oauth_provider', default=None)
c.add_opt('oauth_tokens', default=None)
return c

View File

@ -316,14 +316,16 @@ class SMTP:
self.debug('connect:', msg)
return code, msg
def send(self, str):
'''Send `str' to the server.'''
def send(self, s):
'''Send `s' to the server.'''
if self.debuglevel > 0:
raw = repr(str)
raw = repr(s)
self.debug('send:', raw)
if hasattr(self, 'sock') and self.sock:
try:
self.sock.sendall(str)
if isinstance(s, str):
s = s.encode('utf-8')
self.sock.sendall(s)
except OSError:
self.close()
raise SMTPServerDisconnected('Server not connected')
@ -353,7 +355,7 @@ class SMTP:
'''
resp = []
if self.file is None:
self.file = self.sock.makefile('rb')
self.file = self.sock.makefile('r', encoding='utf-8', errors='replace')
while True:
try:
line = self.file.readline(_MAXLINE + 1)
@ -541,7 +543,7 @@ class SMTP:
if not (200 <= code <= 299):
raise SMTPHeloError(code, resp)
def login(self, user, password):
def login(self, user, password, preferred_auths=None):
'''Log in on an SMTP server that requires authentication.
The arguments are:
@ -576,6 +578,7 @@ class SMTP:
AUTH_PLAIN = 'PLAIN'
AUTH_CRAM_MD5 = 'CRAM-MD5'
AUTH_LOGIN = 'LOGIN'
AUTH_XOAUTH2 = 'XOAUTH2'
self.ehlo_or_helo_if_needed()
@ -588,7 +591,8 @@ class SMTP:
# List of authentication methods we support: from preferred to
# less preferred methods. Except for the purpose of testing the weaker
# ones, we prefer stronger methods like CRAM-MD5:
preferred_auths = [AUTH_CRAM_MD5, AUTH_PLAIN, AUTH_LOGIN]
if preferred_auths is None:
preferred_auths = [AUTH_CRAM_MD5, AUTH_PLAIN, AUTH_LOGIN, AUTH_XOAUTH2]
# Determine the authentication method we'll use
authmethod = None
@ -612,6 +616,12 @@ class SMTP:
if code != 334:
raise SMTPAuthenticationError(code, resp)
code, resp = self.docmd(encode_base64(password, eol=''))
elif authmethod == AUTH_XOAUTH2:
# For XOAUTH2, password is the access token
# Generate XOAUTH2 string: base64(user={email}\x01auth=Bearer {token}\x01\x01)
from calibre.utils.oauth2 import generate_xoauth2_string
auth_string = generate_xoauth2_string(user, password)
code, resp = self.docmd('AUTH', f'{AUTH_XOAUTH2} {auth_string}')
elif authmethod is None:
raise SMTPException('No suitable authentication method found.')
if code not in (235, 503):
@ -750,6 +760,23 @@ class SMTP:
# if we got here then somebody got our mail
return senderrs
def send_message(self, msg, from_addr=None, to_addrs=None,
mail_options=[], rcpt_options=[]):
'''Converts message to a string and passes it to sendmail.'''
if from_addr is None:
from_addr = msg['From']
if to_addrs is None:
raise ValueError('to_addrs must be specified')
from email.generator import Generator
from io import StringIO
f = StringIO()
g = Generator(f)
g.flatten(msg, linesep='\r\n')
flatmsg = f.getvalue()
return self.sendmail(from_addr, to_addrs, flatmsg, mail_options, rcpt_options)
def close(self):
'''Close the connection to the SMTP server.'''
try: