Logo Search packages:      
Sourcecode: zope-cachefu version File versions  Download package

utils.py

##############################################################################
#
# Copyright (c) 2003-2005 struktur AG and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""
$Id: utils.py 24614 2006-06-09 00:03:28Z alecm $
"""

import sys
import socket
import httplib
import urlparse
import threading
import Queue
import logging
from thread import get_ident
from config import USE_HTTP_1_1_PURGE

T_TRANSIENT = (socket.timeout, httplib.HTTPException)
T_PERMANENT = (socket.herror, socket.gaierror, socket.error)
def check_transient(err):
    return isinstance(err, T_TRANSIENT) or issubclass(err, T_TRANSIENT)

def check_permanent(err):
    return isinstance(err, T_PERMANENT) or issubclass(err, T_PERMANENT)

queue_lock = threading.Lock()
worker_lock = threading.Lock()

class Logger(object):

    def __init__(self, name):
        self.logger = logging.getLogger(name)

    def __getattribute__(self, name):
        if name in ('logger',):
            return object.__getattribute__(self, name)
        def wrapper(*args, **kw):
            if __name__ == '__main__' and args:
                print args[0]
            return getattr(self.logger, name)(*args, **kw)
        return wrapper

logger = Logger('CMFSquidTool')

class Connection(httplib.HTTPConnection):

    def __init__(self, host, port=None, strict=None, scheme="http", timeout=5):
        self.scheme = scheme
        if scheme == "http":
            self.default_port = httplib.HTTP_PORT
        elif scheme == "https":
            self.default_port = httplib.HTTPS_PORT
        else:
            raise ValueError, "Invalid scheme '%s'" % (scheme,)
        httplib.HTTPConnection.__init__(self, host, port, strict)
        self.timeout = timeout

    def connect(self):
        # We want to connect in normal blocking mode, else Python on Windows
        # will timeout even for 'connection refused' style errors (fixed in
        # Python 2.4 on Windows)
        if self.scheme == "http":
            httplib.HTTPConnection.connect(self)
        elif self.scheme == "https":
            # Clone of httplib.HTTPSConnection.connect
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.connect((self.host, self.port))
            key_file = cert_file = None
            ssl = socket.ssl(sock, key_file, cert_file)
            self.sock = httplib.FakeSocket(sock, ssl)
        else:
            raise ValueError, "Invalid scheme '%s'" % (self.scheme,)
        # Once we have connected, set the timeout.
        self.sock.settimeout(self.timeout)

XERROR = (
    'x-squid-error',
    'x-eep-error',
    )

class Worker(threading.Thread):

    def __init__(self, host, scheme, producer):
        self.host = host
        self.scheme = scheme
        self.producer = producer
        super(Worker, self).__init__()

    def run(self):
        try:
            while True:
                # Queue should always exist!
                q = self.producer.queues[(self.host, self.scheme)]
                item = q.get()
                if item is None:
                    logger.debug('Stopping worker thread for '
                                 '(%s, %s).' % (self.host, self.scheme))
                    # Shut down thread signal
                    return
                url, purge_type = item
                # Got an item, prune it!
                (status, xcache, xerror) = self.producer.pruneUrl(url, purge_type)
                if status in (-1,):
                    # It's an error we can retry.
                    logger.debug('Transient failure on %s for %s, '
                                 'putting back in queue.' % (purge_type, url))
                    queue_put(q, item)
                if status in (0,):
                    # It's a permanent error, give up
                    logger.debug('Permanent failure on %s for %s, '
                                 'giving up.' % (purge_type, url))
        except:
            logger.exception('Exception in worker thread '
                             'for (%s, %s)' % (self.host, self.scheme))

def queue_put(q, item):
    for i in range(0, 3):
        # We give it 3 chances to get into the queue.
        try:
            q.put(item, block=False)
            break
        except Queue.Full:
            logger.debug('Queue was full (%s items), dropping an item '
                         'from it to make room.' % q.qsize())
            # If queue is full, cull an item from the queue.
            try:
                q.get_nowait()
            except Queue.Empty:
                # If emptied between Full and now (!), ignore.
                pass

class Producer(object):

    def __init__(self, factory=Connection, timeout=1, backlog=200):
        self.factory = factory
        self.timeout = timeout
        self.queues = {}
        self.connections = {}
        self.workers = {}
        self.backlog = backlog

    def getConnection(self, host, scheme):
        try:
            conn = self.connections[(get_ident(), host, scheme)]
        except KeyError:
            conn = self.factory(host, scheme=scheme, timeout=self.timeout)
            self.connections[(get_ident(), host, scheme)] = conn
        return conn

    def dropConnection(self, host, scheme):
        try:
            del self.connections[(get_ident(), host, scheme)]
        except KeyError:
            pass

    def stopThreads(self):
        for q in self.queues.values():
            queue_put(q, None)

    def pruneAsync(self, url, purge_type='PURGE', daemon=True):
        (scheme, host, path, params, query, fragment) = urlparse.urlparse(url)
        __traceback_info__ = (url, purge_type, scheme, host,
                              path, params, query, fragment)

        key = (host, scheme)

        try:
            q = self.queues[key]
        except KeyError:
            # If no queue for this host, scheme combo, create one.
            queue_lock.acquire()
            try:
                # Double check, another thread might have beaten us.
                if key not in self.queues:
                    self.queues[key] = q = Queue.Queue(self.backlog)
            finally:
                queue_lock.release()

        queue_put(q, (url, purge_type))
        logger.debug('Queued %s for %s.' % (url, purge_type))

        try:
            w = self.workers[key]
        except KeyError:
            # If no worker thread has been created yet, create one.
            worker_lock.acquire()
            try:
                # Double check, another thread might have beaten us.
                if key not in self.workers:
                    self.workers[key] = w = Worker(host, scheme, self)
                    logger.debug('Starting worker thread for (%s, %s).' %
                                 (host, scheme))
                    w.setDaemon(daemon)
                    w.start()
            finally:
                worker_lock.release()

        return 'Queued %s' % url

    def pruneUrl(self, url, purge_type='PURGE'):
        (scheme, host, path, params, query, fragment) = urlparse.urlparse(url)
        __traceback_info__ = (url, purge_type, scheme, host,
                              path, params, query, fragment)
        logger.debug('Started %s for %s.' % (purge_type, url))
        conn = self.getConnection(host, scheme)
        try:
            if USE_HTTP_1_1_PURGE:
                conn._http_vsn = 11
                conn._http_vsn_str = 'HTTP/1.1'
            else:
                conn._http_vsn = 10
                conn._http_vsn_str = 'HTTP/1.0'
                # When using HTTP 1.0 we use the full url as the purge path,
                # to allow for virtual hosting in squid
                path = url

            conn.putrequest(purge_type, path)
            conn.endheaders()

            resp = conn.getresponse()
            status = resp.status
            xcache = resp.getheader('x-cache', '')
            for header in XERROR:
                xerror = resp.getheader(header, '')
                if xerror:
                    # Break on first found.
                    break
            resp.read()
            if resp.isclosed():
                conn.close()
                self.dropConnection(host, scheme)
        except:
            status = 0
            err, msg, tb = sys.exc_info()
            if check_transient(err):
                status = -1
            if check_transient(err) or check_permanent(err):
                conn.close()
                self.dropConnection(host, scheme)
            try:
                from zExceptions.ExceptionFormatter import format_exception
            except ImportError:
                from traceback import format_exception
            xerror = '\n'.join(format_exception(err, msg, tb))
            # Avoid leaking a ref to traceback.
            del err, msg, tb
            xcache = ''
        logger.debug('Finished %s for %s: %s %s' % (
            purge_type, url, status, xcache))
        if xerror:
            logger.debug('Error while purging %s:\n%s' % (url, xerror))
        return (status, xcache, xerror)

_producer = Producer()
pruneUrl = _producer.pruneUrl
pruneAsync = _producer.pruneAsync
stopThreads = _producer.stopThreads

if __name__ == '__main__':
    for url in sys.argv[1:]:
        pruneUrl(url)
        # Prune async, but start thread in daemonic mode so we only
        # exit when the queue has been processed.
        for i in range(0, 3):
            pruneAsync(url, daemon=False)
    stopThreads()

Generated by  Doxygen 1.6.0   Back to index