From ae28c0a1640d59393b0097a117d8e04580517c2e Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Tue, 11 Mar 2008 17:06:54 +0000 Subject: [PATCH] Initial implementation of feeds2disk --- src/libprs500/manual/templates/basic.html | 29 -- src/libprs500/terminfo.py | 5 +- src/libprs500/threadpool.py | 331 ++++++++++++++++++++++ src/libprs500/web/feeds/__init__.py | 109 +++++++ src/libprs500/web/feeds/main.py | 44 ++- src/libprs500/web/feeds/news.py | 259 +++++++++++++++-- src/libprs500/web/feeds/templates.py | 162 +++++++++++ src/libprs500/web/fetch/simple.py | 20 +- 8 files changed, 893 insertions(+), 66 deletions(-) delete mode 100644 src/libprs500/manual/templates/basic.html create mode 100644 src/libprs500/threadpool.py create mode 100644 src/libprs500/web/feeds/templates.py diff --git a/src/libprs500/manual/templates/basic.html b/src/libprs500/manual/templates/basic.html deleted file mode 100644 index d472c9e298..0000000000 --- a/src/libprs500/manual/templates/basic.html +++ /dev/null @@ -1,29 +0,0 @@ - - - - - - - - - - - - - - - %body - -
- - - - \ No newline at end of file diff --git a/src/libprs500/terminfo.py b/src/libprs500/terminfo.py index 839fe5b370..1114f39642 100644 --- a/src/libprs500/terminfo.py +++ b/src/libprs500/terminfo.py @@ -190,14 +190,17 @@ class ProgressBar: self.cleared = 1 #: true if we haven't drawn the bar yet. def update(self, percent, message=''): + if isinstance(message, unicode): + message = message.encode('utf-8', 'ignore') if self.cleared: sys.stdout.write(self.header) self.cleared = 0 n = int((self.width-10)*percent) + msg = message.center(self.width) sys.stdout.write( self.term.BOL + self.term.UP + self.term.CLEAR_EOL + (self.bar % (100*percent, '='*n, '-'*(self.width-10-n))) + - self.term.CLEAR_EOL + message.center(self.width)) + self.term.CLEAR_EOL + msg) def clear(self): if not self.cleared: diff --git a/src/libprs500/threadpool.py b/src/libprs500/threadpool.py new file mode 100644 index 0000000000..917665bf92 --- /dev/null +++ b/src/libprs500/threadpool.py @@ -0,0 +1,331 @@ +"""Easy to use object-oriented thread pool framework. + +A thread pool is an object that maintains a pool of worker threads to perform +time consuming operations in parallel. It assigns jobs to the threads +by putting them in a work request queue, where they are picked up by the +next available thread. This then performs the requested operation in the +background and puts the results in a another queue. + +The thread pool object can then collect the results from all threads from +this queue as soon as they become available or after all threads have +finished their work. It's also possible, to define callbacks to handle +each result as it comes in. + +The basic concept and some code was taken from the book "Python in a Nutshell" +by Alex Martelli, copyright 2003, ISBN 0-596-00188-6, from section 14.5 +"Threaded Program Architecture". I wrapped the main program logic in the +ThreadPool class, added the WorkRequest class and the callback system and +tweaked the code here and there. Kudos also to Florent Aide for the exception +handling mechanism. + +Basic usage: + +>>> pool = TreadPool(poolsize) +>>> requests = makeRequests(some_callable, list_of_args, callback) +>>> [pool.putRequest(req) for req in requests] +>>> pool.wait() + +See the end of the module code for a brief, annotated usage example. + +Website : http://chrisarndt.de/en/software/python/threadpool/ +""" + +__all__ = [ + 'makeRequests', + 'NoResultsPending', + 'NoWorkersAvailable', + 'ThreadPool', + 'WorkRequest', + 'WorkerThread' +] + +__author__ = "Christopher Arndt" +__version__ = "1.2.3" +__revision__ = "$Revision: 1.5 $" +__date__ = "$Date: 2006/06/23 12:32:25 $" +__license__ = 'Python license' + +# standard library modules +import sys +import threading +import Queue + +# exceptions +class NoResultsPending(Exception): + """All work requests have been processed.""" + pass + +class NoWorkersAvailable(Exception): + """No worker threads available to process remaining requests.""" + pass + +# classes +class WorkerThread(threading.Thread): + """Background thread connected to the requests/results queues. + + A worker thread sits in the background and picks up work requests from + one queue and puts the results in another until it is dismissed. + """ + + def __init__(self, requestsQueue, resultsQueue, **kwds): + """Set up thread in daemonic mode and start it immediatedly. + + requestsQueue and resultQueue are instances of Queue.Queue passed + by the ThreadPool class when it creates a new worker thread. + """ + + threading.Thread.__init__(self, **kwds) + self.setDaemon(1) + self.workRequestQueue = requestsQueue + self.resultQueue = resultsQueue + self._dismissed = threading.Event() + self.start() + + def run(self): + """Repeatedly process the job queue until told to exit.""" + + while not self._dismissed.isSet(): + # thread blocks here, if queue empty + request = self.workRequestQueue.get() + if self._dismissed.isSet(): + # if told to exit, return the work request we just picked up + self.workRequestQueue.put(request) + break # and exit + try: + self.resultQueue.put( + (request, request.callable(*request.args, **request.kwds)) + ) + except: + request.exception = True + self.resultQueue.put((request, sys.exc_info())) + + def dismiss(self): + """Sets a flag to tell the thread to exit when done with current job. + """ + + self._dismissed.set() + + +class WorkRequest: + """A request to execute a callable for putting in the request queue later. + + See the module function makeRequests() for the common case + where you want to build several WorkRequests for the same callable + but with different arguments for each call. + """ + + def __init__(self, callable, args=None, kwds=None, requestID=None, + callback=None, exc_callback=None): + """Create a work request for a callable and attach callbacks. + + A work request consists of the a callable to be executed by a + worker thread, a list of positional arguments, a dictionary + of keyword arguments. + + A callback function can be specified, that is called when the results + of the request are picked up from the result queue. It must accept + two arguments, the request object and the results of the callable, + in that order. If you want to pass additional information to the + callback, just stick it on the request object. + + You can also give a callback for when an exception occurs. It should + also accept two arguments, the work request and a tuple with the + exception details as returned by sys.exc_info(). + + requestID, if given, must be hashable since it is used by the + ThreadPool object to store the results of that work request in a + dictionary. It defaults to the return value of id(self). + """ + + if requestID is None: + self.requestID = id(self) + else: + try: + hash(requestID) + except TypeError: + raise TypeError("requestID must be hashable.") + self.requestID = requestID + self.exception = False + self.callback = callback + self.exc_callback = exc_callback + self.callable = callable + self.args = args or [] + self.kwds = kwds or {} + + +class ThreadPool: + """A thread pool, distributing work requests and collecting results. + + See the module doctring for more information. + """ + + def __init__(self, num_workers, q_size=0): + """Set up the thread pool and start num_workers worker threads. + + num_workers is the number of worker threads to start initialy. + If q_size > 0 the size of the work request queue is limited and + the thread pool blocks when the queue is full and it tries to put + more work requests in it (see putRequest method). + """ + + self.requestsQueue = Queue.Queue(q_size) + self.resultsQueue = Queue.Queue() + self.workers = [] + self.workRequests = {} + self.createWorkers(num_workers) + + def createWorkers(self, num_workers): + """Add num_workers worker threads to the pool.""" + + for i in range(num_workers): + self.workers.append(WorkerThread(self.requestsQueue, + self.resultsQueue)) + + def dismissWorkers(self, num_workers): + """Tell num_workers worker threads to quit after their current task. + """ + + for i in range(min(num_workers, len(self.workers))): + worker = self.workers.pop() + worker.dismiss() + + def putRequest(self, request, block=True, timeout=0): + """Put work request into work queue and save its id for later.""" + + assert isinstance(request, WorkRequest) + self.requestsQueue.put(request, block, timeout) + self.workRequests[request.requestID] = request + + def poll(self, block=False): + """Process any new results in the queue.""" + + while True: + # still results pending? + if not self.workRequests: + raise NoResultsPending + # are there still workers to process remaining requests? + elif block and not self.workers: + raise NoWorkersAvailable + try: + # get back next results + request, result = self.resultsQueue.get(block=block) + # has an exception occured? + if request.exception and request.exc_callback: + request.exc_callback(request, result) + # hand results to callback, if any + if request.callback and not \ + (request.exception and request.exc_callback): + request.callback(request, result) + del self.workRequests[request.requestID] + except Queue.Empty: + break + + def wait(self, sleep=0): + """Wait for results, blocking until all have arrived.""" + + while 1: + try: + self.poll(True) + time.sleep(sleep) + except NoResultsPending: + break + +# helper functions +def makeRequests(callable, args_list, callback=None, exc_callback=None): + """Create several work requests for same callable with different arguments. + + Convenience function for creating several work requests for the same + callable where each invocation of the callable receives different values + for its arguments. + + args_list contains the parameters for each invocation of callable. + Each item in 'args_list' should be either a 2-item tuple of the list of + positional arguments and a dictionary of keyword arguments or a single, + non-tuple argument. + + See docstring for WorkRequest for info on callback and exc_callback. + """ + + requests = [] + for item in args_list: + if isinstance(item, tuple): + requests.append( + WorkRequest(callable, item[0], item[1], callback=callback, + exc_callback=exc_callback) + ) + else: + requests.append( + WorkRequest(callable, [item], None, callback=callback, + exc_callback=exc_callback) + ) + return requests + +################ +# USAGE EXAMPLE +################ + +if __name__ == '__main__': + import random + import time + + # the work the threads will have to do (rather trivial in our example) + def do_something(data): + time.sleep(random.randint(1,5)) + result = round(random.random() * data, 5) + # just to show off, we throw an exception once in a while + if result > 3: + raise RuntimeError("Something extraordinary happened!") + return result + + # this will be called each time a result is available + def print_result(request, result): + print "**Result: %s from request #%s" % (result, request.requestID) + + # this will be called when an exception occurs within a thread + def handle_exception(request, exc_info): + print "Exception occured in request #%s: %s" % \ + (request.requestID, exc_info[1]) + + # assemble the arguments for each job to a list... + data = [random.randint(1,10) for i in range(20)] + # ... and build a WorkRequest object for each item in data + requests = makeRequests(do_something, data, print_result, handle_exception) + + # or the other form of args_lists accepted by makeRequests: ((,), {}) + data = [((random.randint(1,10),), {}) for i in range(20)] + requests.extend( + makeRequests(do_something, data, print_result, handle_exception) + ) + + # we create a pool of 3 worker threads + main = ThreadPool(3) + + # then we put the work requests in the queue... + for req in requests: + main.putRequest(req) + print "Work request #%s added." % req.requestID + # or shorter: + # [main.putRequest(req) for req in requests] + + # ...and wait for the results to arrive in the result queue + # by using ThreadPool.wait(). This would block until results for + # all work requests have arrived: + # main.wait() + + # instead we can poll for results while doing something else: + i = 0 + while 1: + try: + main.poll() + print "Main thread working..." + time.sleep(0.5) + if i == 10: + print "Adding 3 more worker threads..." + main.createWorkers(3) + i += 1 + except KeyboardInterrupt: + print "Interrupted!" + break + except NoResultsPending: + print "All results collected." + break diff --git a/src/libprs500/web/feeds/__init__.py b/src/libprs500/web/feeds/__init__.py index f0bae9a89c..f54f3c254a 100644 --- a/src/libprs500/web/feeds/__init__.py +++ b/src/libprs500/web/feeds/__init__.py @@ -17,4 +17,113 @@ ''' Contains the logic for parsing feeds. ''' +import time, logging +from datetime import datetime +from libprs500.web.feeds.feedparser import parse + +class Article(object): + + time_offset = datetime.now() - datetime.utcnow() + + def __init__(self, id, title, url, summary, published, content): + self.id = id + self.title = title + self.url = url + self.summary = summary + self.content = content + self.date = published + self.utctime = datetime(*self.date[:6]) + self.localtime = self.utctime + self.time_offset + + def __repr__(self): + return \ +(u'''\ +Title : %s +URL : %s +Summary : %s +Date : %s +Has content : %s +'''%(self.title, self.url, self.summary[:20]+'...', self.localtime.strftime('%a, %d %b, %Y %H:%M'), + bool(self.content))).encode('utf-8') + + def __str__(self): + return repr(self) + + +class Feed(object): + + def __init__(self): + ''' + Parse a feed into articles. + ''' + self.logger = logging.getLogger('feeds2disk') + + def populate_from_feed(self, feed, title=None, oldest_article=7, + max_articles_per_feed=100): + entries = feed.entries + feed = feed.feed + self.title = feed.get('title', 'Unknown feed') if not title else title + self.description = feed.get('description', '') + image = feed.get('image', {}) + self.image_url = image.get('href', None) + self.image_width = image.get('width', 88) + self.image_height = image.get('height', 31) + self.image_alt = image.get('title', '') + + self.articles = [] + self.id_counter = 0 + self.added_articles = [] + + self.oldest_article = oldest_article + + for item in entries: + if len(self.articles) > max_articles_per_feed: + break + self.parse_article(item) + + def parse_article(self, item): + id = item.get('id', 'internal id#'+str(self.id_counter)) + if id in self.added_articles: + return + published = item.get('date_parsed', time.gmtime()) + self.id_counter += 1 + self.added_articles.append(id) + + title = item.get('title', 'Untitled article') + link = item.get('link', None) + description = item.get('summary', None) + + content = '\n'.join(i.value for i in item.get('content', [])) + if not content.strip(): + content = None + + article = Article(id, title, link, description, published, content) + delta = datetime.utcnow() - article.utctime + if delta.days*24*3600 + delta.seconds <= 24*3600*self.oldest_article: + self.articles.append(article) + else: + self.logger.debug('Skipping article %s as it is too old.'%title) + + def __iter__(self): + return iter(self.articles) + + def __len__(self): + return len(self.articles) + + def __repr__(self): + res = [('%20s\n'%'').replace(' ', '_')+repr(art) for art in self] + + return '\n'+'\n'.join(res)+'\n' + + def __str__(self): + return repr(self) + + +def feed_from_xml(raw_xml, title=None, oldest_article=7, max_articles_per_feed=100): + feed = parse(raw_xml) + pfeed = Feed() + pfeed.populate_from_feed(feed, title=title, + oldest_article=oldest_article, + max_articles_per_feed=max_articles_per_feed) + return pfeed diff --git a/src/libprs500/web/feeds/main.py b/src/libprs500/web/feeds/main.py index cd2e11db64..df5ef3c36b 100644 --- a/src/libprs500/web/feeds/main.py +++ b/src/libprs500/web/feeds/main.py @@ -17,7 +17,7 @@ from libprs500.web.feeds.news import BasicNewsRecipe ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. '''''' -import sys, os +import sys, os, logging from libprs500.web.recipes import get_feed, compile_recipe from libprs500.web.fetch.simple import option_parser as _option_parser @@ -53,26 +53,38 @@ If you specify this option, any argument to %prog is ignored and a default recip p.add_option('--lrf', default=False, action='store_true', help='Optimize fetching for subsequent conversion to LRF.') p.add_option('--recursions', default=0, type='int', help=_('Number of levels of links to follow on webpages that are linked to from feeds. Defaul %default')) + p.add_option('--output-dir', default=os.getcwd(), + help=_('The directory in which to store the downloaded feeds. Defaults to the current directory.')) + p.add_option('--no-progress-bar', dest='progress_bar', default=True, action='store_false', + help=_('Dont show the progress bar')) + p.add_option('--debug', action='store_true', default=False, + help='Very verbose output, useful for debugging.') return p -def simple_progress_bar(*args): - print '%d%%'%(args[0]*100), +def simple_progress_bar(percent, msg): + print '%d%%'%(percent*100), sys.stdout.flush() + +def no_progress_bar(percent, msg): + print msg -def main(args=sys.argv, notification=None): +def main(args=sys.argv, notification=None, handler=None): p = option_parser() opts, args = p.parse_args(args) if notification is None: from libprs500.terminfo import TerminalController, ProgressBar term = TerminalController(sys.stdout) - try: - pb = ProgressBar(term, _('Fetching feeds...')) - notification = pb.update - except ValueError: - notification = simple_progress_bar - print _('Fetching feeds...') + if opts.progress_bar: + try: + pb = ProgressBar(term, _('Fetching feeds...')) + notification = pb.update + except ValueError: + notification = simple_progress_bar + print _('Fetching feeds...') + else: + notification = no_progress_bar if len(args) != 2: p.print_help() @@ -98,11 +110,15 @@ def main(args=sys.argv, notification=None): print args[1], 'is an invalid recipe' return 1 - recipe = recipe(opts, p, notification) - index = recipe.download() - - + if handler is None: + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(logging.DEBUG if opts.debug else logging.INFO if opts.verbose else logging.WARN) + handler.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) + logging.getLogger('feeds2disk').addHandler(handler) + recipe = recipe(opts, p, notification) + recipe.download() + return 0 if __name__ == '__main__': diff --git a/src/libprs500/web/feeds/news.py b/src/libprs500/web/feeds/news.py index 549425e6fc..debc6d8a92 100644 --- a/src/libprs500/web/feeds/news.py +++ b/src/libprs500/web/feeds/news.py @@ -1,5 +1,4 @@ #!/usr/bin/env python - ## Copyright (C) 2008 Kovid Goyal kovid@kovidgoyal.net ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by @@ -18,9 +17,16 @@ The backend to parse feeds and create HTML that can then be converted to an ebook. ''' -import logging +import logging, os, cStringIO, traceback, time +import urlparse from libprs500 import browser +from libprs500.ebooks.BeautifulSoup import BeautifulSoup +from libprs500.web.feeds import feed_from_xml, templates +from libprs500.web.fetch.simple import option_parser as web2disk_option_parser +from libprs500.web.fetch.simple import RecursiveFetcher +from libprs500.threadpool import WorkRequest, ThreadPool, NoResultsPending + class BasicNewsRecipe(object): ''' @@ -48,6 +54,10 @@ class BasicNewsRecipe(object): #: @type: integer delay = 0 + #: Number of simultaneous downloads. Set to 1 if the server is picky. + #: @type: integer + simultaneous_downloads = 5 + #: Timeout for fetching files from server in seconds #: @type: integer timeout = 10 @@ -55,7 +65,7 @@ class BasicNewsRecipe(object): #: The format string for the date shown on the first page #: By default: Day Name Day Number Month Name Year #: @type: string - timefmt = ' [%a %d %b %Y]' + timefmt = ' %a, %d %b %Y' #: Max number of characters in the short description. #: @type: integer @@ -94,6 +104,19 @@ class BasicNewsRecipe(object): #: @type: list of strings html2lrf_options = [] + #: List of tags to be removed. Specified tags are removed from downloaded HTML. + #: A tag is specified as a dictionary of the form:: + #: { + #: name : 'tag name', #e.g. 'div' + #: attrs : a dictionary, #e.g. {class: 'advertisment'} + #: } + #: All keys are optional. For a full explanantion of the search criteria, see + #: U{http://www.crummy.com/software/BeautifulSoup/documentation.html#The basic find method: findAll(name, attrs, recursive, text, limit, **kwargs)} + #: A common example:: + #: remove_tags = [dict(name='div', attrs={'class':'advert'})] + #: This will remove all