Fix multithreading issues in feeds2disk. This should make news fetching much more robust, though a little slower

This commit is contained in:
Kovid Goyal 2008-11-06 12:23:54 -08:00
parent dbe52fd1b3
commit 6b9b37215d

View File

@ -9,8 +9,8 @@ UTF-8 encoding with any charset declarations removed.
''' '''
import sys, socket, os, urlparse, logging, re, time, copy, urllib2, threading, traceback import sys, socket, os, urlparse, logging, re, time, copy, urllib2, threading, traceback
from urllib import url2pathname from urllib import url2pathname
from threading import RLock
from httplib import responses from httplib import responses
from contextlib import closing
from calibre import setup_cli_handlers, browser, sanitize_file_name, \ from calibre import setup_cli_handlers, browser, sanitize_file_name, \
relpath, LoggingInterface relpath, LoggingInterface
@ -21,6 +21,23 @@ from calibre.utils.config import OptionParser
class FetchError(Exception): class FetchError(Exception):
pass pass
class closing(object):
'Context to automatically close something at the end of a block.'
def __init__(self, thing):
self.thing = thing
def __enter__(self):
return self.thing
def __exit__(self, *exc_info):
try:
self.thing.close()
except Exception:
pass
_browser_lock = RLock()
def basename(url): def basename(url):
parts = urlparse.urlsplit(url) parts = urlparse.urlsplit(url)
path = url2pathname(parts.path) path = url2pathname(parts.path)
@ -138,42 +155,32 @@ class RecursiveFetcher(object, LoggingInterface):
tag.extract() tag.extract()
return self.preprocess_html_ext(soup) return self.preprocess_html_ext(soup)
def fetch_url(self, url): def fetch_url(self, url):
data = None data = None
self.log_debug('Fetching %s', url) self.log_debug('Fetching %s', url)
delta = time.time() - self.last_fetch_at delta = time.time() - self.last_fetch_at
if delta < self.delay: if delta < self.delay:
time.sleep(delta) time.sleep(delta)
try: with _browser_lock:
try: try:
with closing(self.browser.open(url)) as f: with closing(self.browser.open(url)) as f:
data = response(f.read()) data = response(f.read()+f.read())
data.newurl = f.geturl() data.newurl = f.geturl()
except AttributeError: except urllib2.URLError, err:
time.sleep(2) if hasattr(err, 'code') and responses.has_key(err.code):
try: raise FetchError, responses[err.code]
if getattr(err, 'reason', [0])[0] == 104: # Connection reset by peer
self.log_debug('Connection reset by peer retrying in 1 second.')
time.sleep(1)
with closing(self.browser.open(url)) as f: with closing(self.browser.open(url)) as f:
data = response(f.read()) data = response(f.read()+f.read())
data.newurl = f.geturl() data.newurl = f.geturl()
except AttributeError: else:
data = response(urllib2.urlopen(url).read()) raise err
data.newurl = f.geturl() finally:
except urllib2.URLError, err: self.last_fetch_at = time.time()
if hasattr(err, 'code') and responses.has_key(err.code): return data
raise FetchError, responses[err.code]
if getattr(err, 'reason', [0])[0] == 104: # Connection reset by peer
self.log_debug('Connection reset by peer retrying in 1 second.')
time.sleep(1)
if hasattr(f, 'close'):
f.close()
with closing(self.browser.open(url)) as f:
data = f.read()
else:
raise err
finally:
self.last_fetch_at = time.time()
return data
def start_fetch(self, url): def start_fetch(self, url):