#!/usr/local/bin/python
"""CGI script to check the elements in the hotlist.
It prints the list, sorted by page status, and lets me delete and change
items.
This version even runs the requests in parallel (cool). What it
*doesn't* do is handle running out of some resource - threads, or
files, or etc. What we really need is a semaphore (initial value of 0)
that finishing threads release, and failing threads can wait on until
it's released so one of them can run. Or something like that.
"""
import urllib, sgmllib, string, pg
from hotlist import handler # The request handler...
from UserDict import UserDict
from threading import * # The threaded version, right?
# Configurable format goodies
item_format = """
%(status)d:
%(description)s
%(title)s
"""
prelude_format = """Content-type: text/html
Mike's checked hotlist
Mike's checked hotlist
"""
deleted_format = """Content-type: text/html
Hotlist item removed
Hotlist item removed
"""
changed_format = """Content-type: text/html
Hotlist item changed
Hotlist item changed
From
%s
To
%s
"""
postlude_format = "
"
class document(sgmllib.SGMLParser):
"A class to find the last TITLE element in an SGML document."
title = ""
store_title = 0
def start_title(my, attributes):
"Start saving up a title."
my.title = ""
my.store_title = 1
def end_title(my):
"Got a title, so stop saving for it!"
my.store_title = 0
def handle_data(my,data):
"Save the title data - if it is, that is."
if my.store_title: my.title = my.title + data
class checkedurl(Thread):
"The results of checking a URL."
def __init__(my, dict):
"Run the various parental init's"
Thread.__init__(my)
my.data = dict
def run(my):
"Fetch info for this URL"
try:
(file, info) = urllib.urlretrieve(my.data['url'])
except IOError, msg:
my.data.update(my.bad_document(msg.args))
else:
my.data.update(my.good_document(file, info))
def good_document(my, file, headers):
"Deal with a document that we managed to fetch properly."
dict = {'status': 200}
title = my.fetch_title(file)
if title == my.data['description']: dict['title'] = ''
else: dict['title'] = '(%s)' % \
(my.data['script_name'], my.data['id'],
urllib.quote(title), title)
return dict
def bad_document(my, args):
"Deal with a document that we failed to fetch."
if args[0] != 'http error':
dict = {'status': 600, 'title': `args`}
else:
dict = {'status': args[1]}
if dict['status'] / 100 == 3 and args[3].has_key('location'): # A relocation!
title = `(args[2], args[3]['location'])`
dict['title'] = '(%s, %s)' % \
(args[2], my.data['script_name'],
my.data['id'], args[3]['location'],
args[3]['location'])
else:
dict['title'] = args[2:]
return dict
def fetch_title(my, file):
"Get the title out of the HTML document in file."
parser = document()
data = open(file, 'r')
parser.feed(data.read())
data.close()
return string.strip(parser.title)
# For the retriever
urllib._urlopener = urllib.URLopener()
class checker(handler):
"Class to handle a CGI request for checking the list."
def __init__(my):
"Change a few of the formatting options."
handler.__init__(my)
my.item = item_format
my.prelude = prelude_format
my.postlude = postlude_format
my.deleted = deleted_format
my.changed = changed_format
def display_page(my, type):
"Returns the completed HTML page."
return my.prelude + my.display_list(my.get_list()) + my.postlude
def get_list(my):
"Get the list with URL & description."
my.db.query("set datestyle = 'iso'")
query = my.db.query('select id, description, URL from hotlist')
checkers = []
for res in query.dictresult():
dict = {}
dict.update(res)
dict.update(my.vals)
new = checkedurl(dict)
new.start()
checkers.append(new)
sortable = []
for check in checkers:
check.join()
if check.data['status'] == 200:
my.db.query("update hotlist set checked = 'now' where id = %s"
% check.data['id'])
sortable.append((check.data['status'], check.data['title'],
check.data))
sortable.sort()
sortable.reverse()
out = []
for x in sortable: out.append(x[-1])
return out
def get_item(my, id):
"Get a single item with URL & description."
my.db.query("set datestyle = 'iso'")
query = my.db.query('select id, description, URL from hotlist ' + \
'where id = %s' % id)
out = checkedurl(query.dictresult()[0])
out = out.start()
out.join()
return out
# do_XXXX files implement the PATH_INFO commands for these babies!
def do_delete(my, id):
"Delete an item from the hotlist."
item = my.display_item(my.get_item(id))
my.db.query('delete from hotlist where id = %s' % id)
return my.deleted + item + my.postlude
def do_relabel(my, extra):
"Change the description of the item to the rest of the string."
(id, new) = string.split(extra, '/', 1)
pre = my.display_item(my.get_item(id))
my.db.query("update hotlist set description = %s where id = %s" %
(pg._quote(new, None), id))
post = my.display_item(my.get_item(id))
return (my.changed % (pre, post)) + my.postlude
def do_moved(my, extra):
"Change the URL for an entry - as it's been moved."
(id, new) = string.split(extra, '/', 1)
pre = my.display_item(my.get_item(id))
my.db.query("update hotlist set URL = %s where id = %s" %
(pg._quote(new, None), id))
post = my.display_item(my.get_item(id))
return (my.changed % (pre, post)) + my.postlude
if __name__ == '__main__':
print checker().run()