"""CGI-savvy HTTP Server.
This module builds on SimpleHTTPServer by implementing GET and POST
requests to cgi-bin scripts.
If the os.fork() function is not present (e.g. on Windows),
os.popen2() is used as a fallback, with slightly altered semantics; if
that function is not present either (e.g. on Macintosh), only Python
scripts are supported, and they are executed by the current process.
In all cases, the implementation is intentionally naive -- all
requests are executed sychronously.
-- it may execute arbitrary Python code or external programs.
Note that status code 200 is sent prior to execution of a CGI script, so
scripts cannot send other status codes such as 302 (redirect).
__all__ = ["CGIHTTPRequestHandler"]
class CGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
"""Complete HTTP server with GET, HEAD and POST commands.
GET and HEAD also support running CGI scripts.
The POST command is *only* implemented for CGI scripts.
# Determine platform specifics
have_fork = hasattr(os, 'fork')
have_popen2 = hasattr(os, 'popen2')
have_popen3 = hasattr(os, 'popen3')
# Make rfile unbuffered -- we need to read one line and then pass
# the rest to a subprocess, so we can't use buffered input.
This is only implemented for CGI scripts.
self.send_error(501, "Can only POST to CGI scripts")
"""Version of send_head that support CGI scripts"""
return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)
"""Test whether self.path corresponds to a CGI script.
Returns True and updates the cgi_info attribute to the tuple
(dir, rest) if self.path requires running a CGI script.
If any exception is raised, the caller should assume that
self.path was rejected as invalid and act accordingly.
The default implementation tests whether the normalized url
path begins with one of the strings in self.cgi_directories
(and the next character is a '/' or the end of the string).
collapsed_path = _url_collapse_path(self.path)
dir_sep = collapsed_path.find('/', 1)
head, tail = collapsed_path[:dir_sep], collapsed_path[dir_sep+1:]
if head in self.cgi_directories:
self.cgi_info = head, tail
cgi_directories = ['/cgi-bin', '/htbin']
def is_executable(self, path):
"""Test whether argument path is an executable file."""
def is_python(self, path):
"""Test whether argument path is a Python script."""
head, tail = os.path.splitext(path)
return tail.lower() in (".py", ".pyw")
"""Execute a CGI script."""
dir, rest = self.cgi_info
i = path.find('/', len(dir)+1)
scriptdir = self.translate_path(nextdir)
if os.path.isdir(scriptdir):
dir, rest = nextdir, nextrest
i = path.find('/', len(dir)+1)
# find an explicit query string, if present.
rest, _, query = rest.partition('?')
# dissect the part after the directory name into a script name &
# a possible additional path, to be stored in PATH_INFO.
script, rest = rest[:i], rest[i:]
scriptname = dir + '/' + script
scriptfile = self.translate_path(scriptname)
if not os.path.exists(scriptfile):
self.send_error(404, "No such CGI script (%r)" % scriptname)
if not os.path.isfile(scriptfile):
self.send_error(403, "CGI script is not a plain file (%r)" %
ispy = self.is_python(scriptname)
if not (self.have_fork or self.have_popen2 or self.have_popen3):
self.send_error(403, "CGI script is not a Python script (%r)" %
if not self.is_executable(scriptfile):
self.send_error(403, "CGI script is not executable (%r)" %
# Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
# XXX Much of the following could be prepared ahead of time!
env = copy.deepcopy(os.environ)
env['SERVER_SOFTWARE'] = self.version_string()
env['SERVER_NAME'] = self.server.server_name
env['SERVER_PROTOCOL'] = self.protocol_version
env['SERVER_PORT'] = str(self.server.server_port)
env['REQUEST_METHOD'] = self.command
uqrest = urllib.unquote(rest)
env['PATH_INFO'] = uqrest
env['PATH_TRANSLATED'] = self.translate_path(uqrest)
env['SCRIPT_NAME'] = scriptname
env['QUERY_STRING'] = query
host = self.address_string()
if host != self.client_address[0]:
env['REMOTE_HOST'] = host
env['REMOTE_ADDR'] = self.client_address[0]
authorization = self.headers.getheader("authorization")
authorization = authorization.split()
if len(authorization) == 2:
env['AUTH_TYPE'] = authorization[0]
if authorization[0].lower() == "basic":
authorization = base64.decodestring(authorization[1])
authorization = authorization.split(':')
if len(authorization) == 2:
env['REMOTE_USER'] = authorization[0]
if self.headers.typeheader is None:
env['CONTENT_TYPE'] = self.headers.type
env['CONTENT_TYPE'] = self.headers.typeheader
length = self.headers.getheader('content-length')
env['CONTENT_LENGTH'] = length
referer = self.headers.getheader('referer')
env['HTTP_REFERER'] = referer
for line in self.headers.getallmatchingheaders('accept'):
if line[:1] in "\t\n\r ":
accept = accept + line[7:].split(',')
env['HTTP_ACCEPT'] = ','.join(accept)
ua = self.headers.getheader('user-agent')
env['HTTP_USER_AGENT'] = ua
co = filter(None, self.headers.getheaders('cookie'))
env['HTTP_COOKIE'] = ', '.join(co)
# XXX Other HTTP_* headers
# Since we're setting the env in the parent, provide empty
# values to override previously set values
self.send_response(200, "Script output follows")
decoded_query = query.replace('+', ' ')
# Unix -- fork as we should
if '=' not in decoded_query:
self.wfile.flush() # Always flush before forking
pid, sts = os.waitpid(pid, 0)
# throw away additional data [see bug #427345]
while select.select([self.rfile], [], [], 0)[0]:
if not self.rfile.read(1):
self.log_error("CGI script exit status %#x", sts)
os.dup2(self.rfile.fileno(), 0)
os.dup2(self.wfile.fileno(), 1)
os.execve(scriptfile, args, env)
self.server.handle_error(self.request, self.client_address)
# Non Unix - use subprocess
if self.is_python(scriptfile):
if interp.lower().endswith("w.exe"):
# On Windows, use python.exe, not pythonw.exe
interp = interp[:-5] + interp[-4:]
cmdline = [interp, '-u'] + cmdline
self.log_message("command: %s", subprocess.list2cmdline(cmdline))
except (TypeError, ValueError):
p = subprocess.Popen(cmdline,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
if self.command.lower() == "post" and nbytes > 0:
data = self.rfile.read(nbytes)
# throw away additional data [see bug #427345]
while select.select([self.rfile._sock], [], [], 0)[0]:
if not self.rfile._sock.recv(1):
stdout, stderr = p.communicate(data)
self.log_error('%s', stderr)
self.log_error("CGI script exit status %#x", status)
self.log_message("CGI script exited OK")
def _url_collapse_path(path):
Given a URL path, remove extra '/'s and '.' path elements and collapse
any '..' references and returns a colllapsed path.
Implements something akin to RFC-2396 5.2 step 6 to parse relative paths.
The utility of this function is limited to is_cgi method and helps
preventing some security attacks.
Returns: The reconstituted URL, which will always start with a '/'.
Raises: IndexError if too many '..' occur within the path.
# Query component should not be involved.
path, _, query = path.partition('?')
path = urllib.unquote(path)
# Similar to os.path.split(os.path.normpath(path)) but specific to URL
# path semantics rather than local operating system semantics.
path_parts = path.split('/')
for part in path_parts[:-1]:
head_parts.pop() # IndexError if more '..' than prior parts
elif part and part != '.':
head_parts.append( part )
tail_part = path_parts.pop()
tail_part = '?'.join((tail_part, query))
splitpath = ('/' + '/'.join(head_parts), tail_part)
collapsed_path = "/".join(splitpath)
"""Internal routine to get nobody's uid"""
nobody = pwd.getpwnam('nobody')[2]
nobody = 1 + max(map(lambda x: x[2], pwd.getpwall()))
"""Test for executable file."""
return st.st_mode & 0111 != 0
def test(HandlerClass = CGIHTTPRequestHandler,
ServerClass = BaseHTTPServer.HTTPServer):
SimpleHTTPServer.test(HandlerClass, ServerClass)
if __name__ == '__main__':