First try at a threaded listener that can tell CC if calibre is busy serving another device.

This commit is contained in:
Charles Haley 2012-09-16 19:12:38 +02:00
parent 839b68d4b6
commit a094235344

View File

@ -10,9 +10,12 @@ Created on 29 Jun 2012
import socket, select, json, inspect, os, traceback, time, sys, random import socket, select, json, inspect, os, traceback, time, sys, random
import posixpath import posixpath
import hashlib, threading import hashlib, threading
import Queue
from base64 import b64encode, b64decode from base64 import b64encode, b64decode
from functools import wraps from functools import wraps
from errno import EAGAIN, EINTR from errno import EAGAIN, EINTR
from threading import Thread
from calibre import prints from calibre import prints
from calibre.constants import numeric_version, DEBUG from calibre.constants import numeric_version, DEBUG
@ -48,6 +51,109 @@ def synchronous(tlockname):
return _synched return _synched
class ConnectionListener (Thread):
def __init__(self, driver):
Thread.__init__(self)
self.daemon = True
self.driver = driver
self.keep_running = True
def stop(self):
self.keep_running = False
def run(self):
queue_not_serviced_count = 0
device_socket = None
while self.keep_running:
try:
time.sleep(1) # Limit to one book per two seconds
except:
# Happens during interpreter shutdown
break
if not self.keep_running:
break
if not self.driver.connection_queue.empty():
self.driver._debug('queue not empty')
queue_not_serviced_count += 1
if queue_not_serviced_count >= 3:
self.driver._debug('queue not serviced')
try:
sock = self.driver.connection_queue.get_nowait()
s = self.driver._json_encode(
self.driver.opcodes['CALIBRE_BUSY'], {})
self.driver._send_byte_string(device_socket, (b'%d' % len(s)) + s)
sock.close()
except Queue.Empty:
pass
queue_not_serviced_count = 0
if getattr(self.driver, 'broadcast_socket', None) is not None:
while True:
ans = select.select((self.driver.broadcast_socket,), (), (), 0)
if len(ans[0]) > 0:
try:
packet = self.driver.broadcast_socket.recvfrom(100)
remote = packet[1]
message = str(self.driver.ZEROCONF_CLIENT_STRING + b' (on ' +
str(socket.gethostname().partition('.')[0]) +
b'),' + str(self.driver.port))
self.driver._debug('received broadcast', packet, message)
self.driver.broadcast_socket.sendto(message, remote)
except:
pass
else:
break
if self.driver.connection_queue.empty() and \
getattr(self.driver, 'listen_socket', None) is not None:
ans = select.select((self.driver.listen_socket,), (), (), 0)
if len(ans[0]) > 0:
# timeout in 10 ms to detect rare case where the socket went
# way between the select and the accept
try:
self.driver._debug('attempt to open device socket')
device_socket = None
self.driver.listen_socket.settimeout(0.010)
device_socket, ign = eintr_retry_call(
self.driver.listen_socket.accept)
self.driver.listen_socket.settimeout(None)
device_socket.settimeout(None)
try:
peer = self.driver.device_socket.getpeername()[0]
attempts = self.drjver.connection_attempts.get(peer, 0)
if attempts >= self.MAX_UNSUCCESSFUL_CONNECTS:
self.driver._debug('too many connection attempts from', peer)
device_socket.close()
device_socket = None
# raise InitialConnectionError(_('Too many connection attempts from %s') % peer)
else:
self.driver.connection_attempts[peer] = attempts + 1
except InitialConnectionError:
raise
except:
pass
try:
self.driver.connection_queue.put_nowait(device_socket)
except Queue.Full:
device_socket.close();
device_socket = None
self.driver._debug('driver is not answering')
except socket.timeout:
pass
except socket.error:
x = sys.exc_info()[1]
self.driver._debug('unexpected socket exception', x.args[0])
device_socket.close()
device_socket = None
# raise
class SDBook(Book): class SDBook(Book):
def __init__(self, prefix, lpath, size=None, other=None): def __init__(self, prefix, lpath, size=None, other=None):
Book.__init__(self, prefix, lpath, size=size, other=other) Book.__init__(self, prefix, lpath, size=size, other=other)
@ -112,6 +218,7 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin):
'OK' : 0, 'OK' : 0,
'BOOK_DATA' : 10, 'BOOK_DATA' : 10,
'BOOK_DONE' : 11, 'BOOK_DONE' : 11,
'CALIBRE_BUSY' : 18,
'DELETE_BOOK' : 13, 'DELETE_BOOK' : 13,
'DISPLAY_MESSAGE' : 17, 'DISPLAY_MESSAGE' : 17,
'FREE_SPACE' : 5, 'FREE_SPACE' : 5,
@ -373,7 +480,7 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin):
pos += len(v) pos += len(v)
return data return data
def _send_byte_string(self, s): def _send_byte_string(self, sock, s):
if not isinstance(s, bytes): if not isinstance(s, bytes):
self._debug('given a non-byte string!') self._debug('given a non-byte string!')
raise PacketError("Internal error: found a string that isn't bytes") raise PacketError("Internal error: found a string that isn't bytes")
@ -382,11 +489,11 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin):
while sent_len < total_len: while sent_len < total_len:
try: try:
if sent_len == 0: if sent_len == 0:
amt_sent = self.device_socket.send(s) amt_sent = sock.send(s)
else: else:
amt_sent = self.device_socket.send(s[sent_len:]) amt_sent = sock.send(s[sent_len:])
if amt_sent <= 0: if amt_sent <= 0:
raise IOError('Bad write on device socket') raise IOError('Bad write on socket')
sent_len += amt_sent sent_len += amt_sent
except socket.error as e: except socket.error as e:
self._debug('socket error', e, e.errno) self._debug('socket error', e, e.errno)
@ -410,7 +517,7 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin):
if print_debug_info and extra_debug: if print_debug_info and extra_debug:
self._debug('send string', s) self._debug('send string', s)
self.device_socket.settimeout(self.MAX_CLIENT_COMM_TIMEOUT) self.device_socket.settimeout(self.MAX_CLIENT_COMM_TIMEOUT)
self._send_byte_string((b'%d' % len(s)) + s) self._send_byte_string(self.device_socket, (b'%d' % len(s)) + s)
if not wait_for_response: if not wait_for_response:
return None, None return None, None
return self._receive_from_client(print_debug_info=print_debug_info) return self._receive_from_client(print_debug_info=print_debug_info)
@ -620,17 +727,9 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin):
break break
if getattr(self, 'listen_socket', None) is not None: if getattr(self, 'listen_socket', None) is not None:
ans = select.select((self.listen_socket,), (), (), 0)
if len(ans[0]) > 0:
# timeout in 10 ms to detect rare case where the socket went
# way between the select and the accept
try: try:
self.device_socket = None ans = self.connection_queue.get_nowait()
self.listen_socket.settimeout(0.010) self.device_socket = ans
self.device_socket, ign = eintr_retry_call(
self.listen_socket.accept)
self.listen_socket.settimeout(None)
self.device_socket.settimeout(None)
self.is_connected = True self.is_connected = True
try: try:
peer = self.device_socket.getpeername()[0] peer = self.device_socket.getpeername()[0]
@ -645,13 +744,8 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin):
raise raise
except: except:
pass pass
except socket.timeout: except Queue.Empty:
self._close_device_socket() self.is_connected = False
except socket.error:
x = sys.exc_info()[1]
self._debug('unexpected socket exception', x.args[0])
self._close_device_socket()
raise
return (self.is_connected, self) return (self.is_connected, self)
return (False, None) return (False, None)
@ -1127,17 +1221,22 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin):
self._debug('broadcast socket listening on port', port) self._debug('broadcast socket listening on port', port)
break break
message = None
if port == 0: if port == 0:
self.broadcast_socket.close() self.broadcast_socket.close()
self.broadcast_socket = None self.broadcast_socket = None
message = 'attaching port to broadcast socket failed. This is not fatal.' message = 'attaching port to broadcast socket failed. This is not fatal.'
self._debug(message) self._debug(message)
return message
self.connection_queue = Queue.Queue(1)
self.connection_listener = ConnectionListener(self)
self.connection_listener.start()
return message
@synchronous('sync_lock') @synchronous('sync_lock')
def shutdown(self): def shutdown(self):
if getattr(self, 'listen_socket', None) is not None: if getattr(self, 'listen_socket', None) is not None:
self.connection_listener.stop()
unpublish_zeroconf('calibre smart device client', unpublish_zeroconf('calibre smart device client',
'_calibresmartdeviceapp._tcp', self.port, {}) '_calibresmartdeviceapp._tcp', self.port, {})
self._close_listen_socket() self._close_listen_socket()