diff --git a/src/calibre/devices/smart_device_app/driver.py b/src/calibre/devices/smart_device_app/driver.py index 676cdf145c..1f44b40b66 100644 --- a/src/calibre/devices/smart_device_app/driver.py +++ b/src/calibre/devices/smart_device_app/driver.py @@ -10,9 +10,12 @@ Created on 29 Jun 2012 import socket, select, json, inspect, os, traceback, time, sys, random import posixpath import hashlib, threading +import Queue + from base64 import b64encode, b64decode from functools import wraps from errno import EAGAIN, EINTR +from threading import Thread from calibre import prints from calibre.constants import numeric_version, DEBUG @@ -48,6 +51,109 @@ def synchronous(tlockname): return _synched +class ConnectionListener (Thread): + + def __init__(self, driver): + Thread.__init__(self) + self.daemon = True + self.driver = driver + self.keep_running = True + + def stop(self): + self.keep_running = False + + def run(self): + queue_not_serviced_count = 0 + device_socket = None + while self.keep_running: + try: + time.sleep(1) # Limit to one book per two seconds + except: + # Happens during interpreter shutdown + break + + if not self.keep_running: + break + + if not self.driver.connection_queue.empty(): + self.driver._debug('queue not empty') + queue_not_serviced_count += 1 + if queue_not_serviced_count >= 3: + self.driver._debug('queue not serviced') + try: + sock = self.driver.connection_queue.get_nowait() + s = self.driver._json_encode( + self.driver.opcodes['CALIBRE_BUSY'], {}) + self.driver._send_byte_string(device_socket, (b'%d' % len(s)) + s) + sock.close() + except Queue.Empty: + pass + queue_not_serviced_count = 0 + + if getattr(self.driver, 'broadcast_socket', None) is not None: + while True: + ans = select.select((self.driver.broadcast_socket,), (), (), 0) + if len(ans[0]) > 0: + try: + packet = self.driver.broadcast_socket.recvfrom(100) + remote = packet[1] + message = str(self.driver.ZEROCONF_CLIENT_STRING + b' (on ' + + str(socket.gethostname().partition('.')[0]) + + b'),' + str(self.driver.port)) + self.driver._debug('received broadcast', packet, message) + self.driver.broadcast_socket.sendto(message, remote) + except: + pass + else: + break + + if self.driver.connection_queue.empty() and \ + getattr(self.driver, 'listen_socket', None) is not None: + ans = select.select((self.driver.listen_socket,), (), (), 0) + if len(ans[0]) > 0: + # timeout in 10 ms to detect rare case where the socket went + # way between the select and the accept + try: + self.driver._debug('attempt to open device socket') + device_socket = None + self.driver.listen_socket.settimeout(0.010) + device_socket, ign = eintr_retry_call( + self.driver.listen_socket.accept) + self.driver.listen_socket.settimeout(None) + device_socket.settimeout(None) + + try: + peer = self.driver.device_socket.getpeername()[0] + attempts = self.drjver.connection_attempts.get(peer, 0) + if attempts >= self.MAX_UNSUCCESSFUL_CONNECTS: + self.driver._debug('too many connection attempts from', peer) + device_socket.close() + device_socket = None +# raise InitialConnectionError(_('Too many connection attempts from %s') % peer) + else: + self.driver.connection_attempts[peer] = attempts + 1 + except InitialConnectionError: + raise + except: + pass + + try: + self.driver.connection_queue.put_nowait(device_socket) + except Queue.Full: + device_socket.close(); + device_socket = None + self.driver._debug('driver is not answering') + + except socket.timeout: + pass + except socket.error: + x = sys.exc_info()[1] + self.driver._debug('unexpected socket exception', x.args[0]) + device_socket.close() + device_socket = None +# raise + + class SDBook(Book): def __init__(self, prefix, lpath, size=None, other=None): Book.__init__(self, prefix, lpath, size=size, other=other) @@ -112,6 +218,7 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin): 'OK' : 0, 'BOOK_DATA' : 10, 'BOOK_DONE' : 11, + 'CALIBRE_BUSY' : 18, 'DELETE_BOOK' : 13, 'DISPLAY_MESSAGE' : 17, 'FREE_SPACE' : 5, @@ -373,7 +480,7 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin): pos += len(v) return data - def _send_byte_string(self, s): + def _send_byte_string(self, sock, s): if not isinstance(s, bytes): self._debug('given a non-byte string!') raise PacketError("Internal error: found a string that isn't bytes") @@ -382,11 +489,11 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin): while sent_len < total_len: try: if sent_len == 0: - amt_sent = self.device_socket.send(s) + amt_sent = sock.send(s) else: - amt_sent = self.device_socket.send(s[sent_len:]) + amt_sent = sock.send(s[sent_len:]) if amt_sent <= 0: - raise IOError('Bad write on device socket') + raise IOError('Bad write on socket') sent_len += amt_sent except socket.error as e: self._debug('socket error', e, e.errno) @@ -410,7 +517,7 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin): if print_debug_info and extra_debug: self._debug('send string', s) self.device_socket.settimeout(self.MAX_CLIENT_COMM_TIMEOUT) - self._send_byte_string((b'%d' % len(s)) + s) + self._send_byte_string(self.device_socket, (b'%d' % len(s)) + s) if not wait_for_response: return None, None return self._receive_from_client(print_debug_info=print_debug_info) @@ -620,39 +727,26 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin): break if getattr(self, 'listen_socket', None) is not None: - ans = select.select((self.listen_socket,), (), (), 0) - if len(ans[0]) > 0: - # timeout in 10 ms to detect rare case where the socket went - # way between the select and the accept + try: + ans = self.connection_queue.get_nowait() + self.device_socket = ans + self.is_connected = True try: - self.device_socket = None - self.listen_socket.settimeout(0.010) - self.device_socket, ign = eintr_retry_call( - self.listen_socket.accept) - self.listen_socket.settimeout(None) - self.device_socket.settimeout(None) - self.is_connected = True - try: - peer = self.device_socket.getpeername()[0] - attempts = self.connection_attempts.get(peer, 0) - if attempts >= self.MAX_UNSUCCESSFUL_CONNECTS: - self._debug('too many connection attempts from', peer) - self._close_device_socket() - raise InitialConnectionError(_('Too many connection attempts from %s') % peer) - else: - self.connection_attempts[peer] = attempts + 1 - except InitialConnectionError: - raise - except: - pass - except socket.timeout: - self._close_device_socket() - except socket.error: - x = sys.exc_info()[1] - self._debug('unexpected socket exception', x.args[0]) - self._close_device_socket() + peer = self.device_socket.getpeername()[0] + attempts = self.connection_attempts.get(peer, 0) + if attempts >= self.MAX_UNSUCCESSFUL_CONNECTS: + self._debug('too many connection attempts from', peer) + self._close_device_socket() + raise InitialConnectionError(_('Too many connection attempts from %s') % peer) + else: + self.connection_attempts[peer] = attempts + 1 + except InitialConnectionError: raise - return (self.is_connected, self) + except: + pass + except Queue.Empty: + self.is_connected = False + return (self.is_connected, self) return (False, None) @synchronous('sync_lock') @@ -1127,17 +1221,22 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin): self._debug('broadcast socket listening on port', port) break + message = None if port == 0: self.broadcast_socket.close() self.broadcast_socket = None message = 'attaching port to broadcast socket failed. This is not fatal.' self._debug(message) - return message + self.connection_queue = Queue.Queue(1) + self.connection_listener = ConnectionListener(self) + self.connection_listener.start() + return message @synchronous('sync_lock') def shutdown(self): if getattr(self, 'listen_socket', None) is not None: + self.connection_listener.stop() unpublish_zeroconf('calibre smart device client', '_calibresmartdeviceapp._tcp', self.port, {}) self._close_listen_socket()