"""RPC Implementation, originally written for the Python Idle IDE
For security reasons, GvR requested that Idle's Python execution server process
connect to the Idle process, which listens for the connection. Since Idle has
only one client per server, this was not a limitation.
+---------------------------------+ +-------------+
| SocketServer.BaseRequestHandler | | SocketIO |
+---------------------------------+ +-------------+
| + -------------------+ |
+-------------------------+ +-----------------+
| RPCHandler | | RPCClient |
| [attribute of RPCServer]| | |
+-------------------------+ +-----------------+
The RPCServer handler class is expected to provide register/unregister methods.
RPCHandler inherits the mix-in class SocketIO, which provides these methods.
See the Idle run.main() docstring for further information on how this was
assert isinstance(co, types.CodeType)
assert isinstance(co, types.CodeType)
return unpickle_code, (ms,)
# XXX KBK 24Aug02 function pickling capability not used in Idle
# def unpickle_function(ms):
# def pickle_function(fn):
# assert isinstance(fn, type.FunctionType)
copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
class RPCServer(SocketServer.TCPServer):
def __init__(self, addr, handlerclass=None):
handlerclass = RPCHandler
SocketServer.TCPServer.__init__(self, addr, handlerclass)
"Override TCPServer method, no bind() phase for connecting entity"
def server_activate(self):
"""Override TCPServer method, connect() instead of listen()
Due to the reversed connection, self.server_address is actually the
address of the Idle Client to which we are connecting.
self.socket.connect(self.server_address)
"Override TCPServer method, return already connected socket"
return self.socket, self.server_address
def handle_error(self, request, client_address):
"""Override TCPServer method
Error message goes to __stderr__. No error message if exiting
normally or socket raised EOF. Other exceptions not handled in
server code will cause os._exit.
print>>erf, '\n' + '-'*40
print>>erf, 'Unhandled server exception!'
print>>erf, 'Thread: %s' % threading.currentThread().getName()
print>>erf, 'Client Address: ', client_address
print>>erf, 'Request: ', repr(request)
traceback.print_exc(file=erf)
print>>erf, '\n*** Unrecoverable, server exiting!'
#----------------- end class RPCServer --------------------
request_queue = Queue.Queue(0)
response_queue = Queue.Queue(0)
def __init__(self, sock, objtable=None, debugging=None):
self.sockthread = threading.currentThread()
if debugging is not None:
self.debugging = debugging
"override for specific exit action"
s = self.location + " " + str(threading.currentThread().getName())
def register(self, oid, object):
self.objtable[oid] = object
def unregister(self, oid):
def localcall(self, seq, request):
self.debug("localcall:", request)
how, (oid, methodname, args, kwargs) = request
return ("ERROR", "Bad request format")
if oid not in self.objtable:
return ("ERROR", "Unknown object id: %r" % (oid,))
if methodname == "__methods__":
_getmethods(obj, methods)
if methodname == "__attributes__":
_getattributes(obj, attributes)
return ("OK", attributes)
if not hasattr(obj, methodname):
return ("ERROR", "Unsupported method name: %r" % (methodname,))
method = getattr(obj, methodname)
ret = method(*args, **kwargs)
if isinstance(ret, RemoteObject):
request_queue.put((seq, (method, args, kwargs)))
return ("ERROR", "Unsupported message type: %s" % how)
msg = "*** Internal Error: rpc.py:SocketIO.localcall()\n\n"\
" Object: %s \n Method: %s \n Args: %s\n"
print>>sys.__stderr__, msg % (oid, method, args)
traceback.print_exc(file=sys.__stderr__)
return ("EXCEPTION", None)
def remotecall(self, oid, methodname, args, kwargs):
self.debug("remotecall:asynccall: ", oid, methodname)
seq = self.asynccall(oid, methodname, args, kwargs)
return self.asyncreturn(seq)
def remotequeue(self, oid, methodname, args, kwargs):
self.debug("remotequeue:asyncqueue: ", oid, methodname)
seq = self.asyncqueue(oid, methodname, args, kwargs)
return self.asyncreturn(seq)
def asynccall(self, oid, methodname, args, kwargs):
request = ("CALL", (oid, methodname, args, kwargs))
if threading.currentThread() != self.sockthread:
cvar = threading.Condition()
self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
self.putmessage((seq, request))
def asyncqueue(self, oid, methodname, args, kwargs):
request = ("QUEUE", (oid, methodname, args, kwargs))
if threading.currentThread() != self.sockthread:
cvar = threading.Condition()
self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
self.putmessage((seq, request))
def asyncreturn(self, seq):
self.debug("asyncreturn:%d:call getresponse(): " % seq)
response = self.getresponse(seq, wait=0.05)
self.debug(("asyncreturn:%d:response: " % seq), response)
return self.decoderesponse(response)
def decoderesponse(self, response):
self.debug("decoderesponse: EXCEPTION")
self.debug("decoderesponse: EOF")
self.decode_interrupthook()
self.debug("decoderesponse: Internal ERROR:", what)
raise SystemError, (how, what)
def decode_interrupthook(self):
"""Listen on socket until I/O not ready or EOF
pollresponse() will loop looking for seq number None, which
never comes, and exit on EOFError.
self.getresponse(myseq=None, wait=0.05)
self.debug("mainloop:return")
def getresponse(self, myseq, wait):
response = self._getresponse(myseq, wait)
response = how, self._proxify(what)
if isinstance(obj, RemoteProxy):
return RPCProxy(self, obj.oid)
if isinstance(obj, types.ListType):
return map(self._proxify, obj)
# XXX Check for other types -- not currently needed
def _getresponse(self, myseq, wait):
self.debug("_getresponse:myseq:", myseq)
if threading.currentThread() is self.sockthread:
# this thread does all reading of requests or responses
response = self.pollresponse(myseq, wait)
# wait for notification from socket handling thread
while myseq not in self.responses:
response = self.responses[myseq]
self.debug("_getresponse:%s: thread woke up: response: %s" %
del self.responses[myseq]
self.nextseq = seq = self.nextseq + 2
def putmessage(self, message):
self.debug("putmessage:%d:" % message[0])
s = pickle.dumps(message)
except pickle.PicklingError:
print >>sys.__stderr__, "Cannot pickle:", repr(message)
s = struct.pack("<i", len(s)) + s
r, w, x = select.select([], [self.sock], [])
n = self.sock.send(s[:BUFSIZE])
except (AttributeError, TypeError):
raise IOError, "socket no longer exists"
bufstate = 0 # meaning: 0 => reading count; 1 => reading data
def pollpacket(self, wait):
if len(self.buffer) < self.bufneed:
r, w, x = select.select([self.sock.fileno()], [], [], wait)
s = self.sock.recv(BUFSIZE)
if self.bufstate == 0 and len(self.buffer) >= 4:
self.buffer = self.buffer[4:]
self.bufneed = struct.unpack("<i", s)[0]
if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
packet = self.buffer[:self.bufneed]
self.buffer = self.buffer[self.bufneed:]
def pollmessage(self, wait):
packet = self.pollpacket(wait)
message = pickle.loads(packet)
except pickle.UnpicklingError:
print >>sys.__stderr__, "-----------------------"
print >>sys.__stderr__, "cannot unpickle packet:", repr(packet)
traceback.print_stack(file=sys.__stderr__)
print >>sys.__stderr__, "-----------------------"
def pollresponse(self, myseq, wait):
"""Handle messages received on the socket.
Some messages received may be asynchronous 'call' or 'queue' requests,
and some may be responses for other threads.
'call' requests are passed to self.localcall() with the expectation of
immediate execution, during which time the socket is not serviced.
'queue' requests are used for tasks (which may block or hang) to be
processed in a different thread. These requests are fed into
request_queue by self.localcall(). Responses to queued requests are
taken from response_queue and sent across the link with the associated
sequence numbers. Messages in the queues are (sequence_number,
request/response) tuples and code using this module removing messages
from the request_queue is responsible for returning the correct
sequence number in the response_queue.
pollresponse() will loop until a response message with the myseq
sequence number is received, and will save other responses in
self.responses and notify the owning thread.
# send queued response if there is one available
qmsg = response_queue.get(0)
message = (seq, ('OK', response))
# poll for message on link
message = self.pollmessage(wait)
if message is None: # socket not ready
self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
# process or queue a request
if how in ("CALL", "QUEUE"):
self.debug("pollresponse:%d:localcall:call:" % seq)
response = self.localcall(seq, resq)
self.debug("pollresponse:%d:localcall:response:%s"
self.putmessage((seq, response))
# don't acknowledge the 'queue' request!
# return if completed message transaction
# must be a response for a different thread:
cv = self.cvars.get(seq, None)
# response involving unknown sequence number is discarded,
# probably intended for prior incarnation of server
self.responses[seq] = resq
"action taken upon link being closed by peer"
self.responses[key] = ('EOF', None)
# call our (possibly overridden) exit function
"Classes using rpc client/server can override to augment EOF action"
#----------------- end class SocketIO --------------------
class RemoteObject(object):
class RemoteProxy(object):
class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
def __init__(self, sock, addr, svr):
svr.current_handler = self ## cgt xxx
SocketIO.__init__(self, sock)
SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)