0001"""
0002HTTP Utilities
0003(from web.py)
0004"""
0005
0006__all__ = [
0007 "expires", "lastmodified",
0008 "prefixurl", "modified",
0009 "redirect", "found", "seeother", "tempredirect",
0010 "write",
0011 "changequery",
0012 "background", "backgrounder",
0013 "Reloader", "reloader", "profiler",
0014]
0015
0016import sys, os, threading, urllib, urlparse
0017try: import datetime
0018except ImportError: pass
0019import net, utils, webapi as web
0020
0021def prefixurl(base=''):
0022 """
0023 Sorry, this function is really difficult to explain.
0024 Maybe some other time.
0025 """
0026 url = web.ctx.path.lstrip('/')
0027 for i in xrange(url.count('/')):
0028 base += '../'
0029 if not base:
0030 base = './'
0031 return base
0032
0033def expires(delta):
0034 """
0035 Outputs an `Expires` header for `delta` from now.
0036 `delta` is a `timedelta` object or a number of seconds.
0037 """
0038 if isinstance(delta, (int, long)):
0039 delta = datetime.timedelta(seconds=delta)
0040 date_obj = datetime.datetime.utcnow() + delta
0041 web.header('Expires', net.httpdate(date_obj))
0042
0043def lastmodified(date_obj):
0044 """Outputs a `Last-Modified` header for `datetime`."""
0045 web.header('Last-Modified', net.httpdate(date_obj))
0046
0047def modified(date=None, etag=None):
0048 n = web.ctx.env.get('HTTP_IF_NONE_MATCH')
0049 m = net.parsehttpdate(web.ctx.env.get('HTTP_IF_MODIFIED_SINCE', '').split(';')[0])
0050 validate = False
0051 if etag:
0052 raise NotImplementedError, "no etag support yet"
0053
0054 if date and m:
0055
0056
0057 if date-datetime.timedelta(seconds=1) <= m:
0058 validate = True
0059
0060 if validate: web.ctx.status = '304 Not Modified'
0061 return not validate
0062
0063"""
0064By default, these all return simple error messages that send very short messages
0065(like "bad request") to the user. They can and should be overridden
0066to return nicer ones.
0067"""
0068def redirect(url, status='301 Moved Permanently'):
0069 """
0070 Returns a `status` redirect to the new URL.
0071 `url` is joined with the base URL so that things like
0072 `redirect("about") will work properly.
0073 """
0074 newloc = urlparse.urljoin(web.ctx.home + web.ctx.path, url)
0075 web.ctx.status = status
0076 web.ctx.output = ''
0077 web.header('Content-Type', 'text/html')
0078 web.header('Location', newloc)
0079
0080
0081
0082def found(url):
0083 """A `302 Found` redirect."""
0084 return redirect(url, '302 Found')
0085
0086def seeother(url):
0087 """A `303 See Other` redirect."""
0088 return redirect(url, '303 See Other')
0089
0090def tempredirect(url):
0091 """A `307 Temporary Redirect` redirect."""
0092 return redirect(url, '307 Temporary Redirect')
0093
0094def write(cgi_response):
0095 """
0096 Converts a standard CGI-style string response into `header` and
0097 `output` calls.
0098 """
0099 cgi_response = str(cgi_response)
0100 cgi_response.replace('
', '
')
0101 head, body = cgi_response.split('
', 1)
0102 lines = head.split('
')
0103
0104 for line in lines:
0105 if line.isspace():
0106 continue
0107 hdr, value = line.split(":", 1)
0108 value = value.strip()
0109 if hdr.lower() == "status":
0110 web.ctx.status = value
0111 else:
0112 web.header(hdr, value)
0113
0114 web.output(body)
0115
0116def changequery(**kw):
0117 """
0118 Imagine you're at `/foo?a=1&b=2`. Then `changequery(a=3)` will return
0119 `/foo?a=3&b=2` -- the same URL but with the arguments you requested
0120 changed.
0121 """
0122 query = web.input(_method='get')
0123 for k, v in kw.iteritems():
0124 if v is None:
0125 query.pop(k, None)
0126 else:
0127 query[k] = v
0128 out = web.ctx.homepath + web.ctx.path
0129 if query:
0130 out += '?' + urllib.urlencode(query)
0131 return out
0132
0133def background(func):
0134 """A function decorator to run a long-running function as a background thread."""
0135 def internal(*a, **kw):
0136 web.data()
0137
0138 tmpctx = web._context[threading.currentThread()]
0139 web._context[threading.currentThread()] = utils.storage(web.ctx.copy())
0140
0141 def newfunc():
0142 web._context[threading.currentThread()] = tmpctx
0143 func(*a, **kw)
0144 myctx = web._context[threading.currentThread()]
0145 for k in myctx.keys():
0146 if k not in ['status', 'headers', 'output']:
0147 try: del myctx[k]
0148 except KeyError: pass
0149
0150 t = threading.Thread(target=newfunc)
0151 background.threaddb[id(t)] = t
0152 t.start()
0153 web.ctx.headers = []
0154 return seeother(changequery(_t=id(t)))
0155 return internal
0156background.threaddb = {}
0157
0158def backgrounder(func):
0159 def internal(*a, **kw):
0160 i = web.input(_method='get')
0161 if '_t' in i:
0162 try:
0163 t = background.threaddb[int(i._t)]
0164 except KeyError:
0165 return web.notfound()
0166 web._context[threading.currentThread()] = web._context[t]
0167 return
0168 else:
0169 return func(*a, **kw)
0170 return internal
0171
0172class Reloader:
0173 """
0174 Before every request, checks to see if any loaded modules have changed on
0175 disk and, if so, reloads them.
0176 """
0177 def __init__(self, func):
0178 self.func = func
0179 self.mtimes = {}
0180
0181
0182
0183
0184
0185 web.loadhooks['reloader'] = self.check
0186
0187
0188
0189
0190
0191
0192
0193
0194
0195
0196
0197
0198
0199
0200
0201 def check(self):
0202 for mod in sys.modules.values():
0203 try:
0204 mtime = os.stat(mod.__file__).st_mtime
0205 except (AttributeError, OSError, IOError):
0206 continue
0207 if mod.__file__.endswith('.pyc') and os.path.exists(mod.__file__[:-1]):
0209 mtime = max(os.stat(mod.__file__[:-1]).st_mtime, mtime)
0210 if mod not in self.mtimes:
0211 self.mtimes[mod] = mtime
0212 elif self.mtimes[mod] < mtime:
0213 try:
0214 reload(mod)
0215 except ImportError:
0216 pass
0217 return True
0218
0219 def __call__(self, e, o):
0220 self.check()
0221 return self.func(e, o)
0222
0223reloader = Reloader
0224
0225def profiler(app):
0226 """Outputs basic profiling information at the bottom of each response."""
0227 import profile
0228 def profile_internal(e, o):
0229 out, result = profile(app)(e, o)
0230 return out + ['<pre>' + net.websafe(result) + '</pre>']
0231 return profile_internal
0232
0233if __name__ == "__main__":
0234 import doctest
0235 doctest.testmod()