if _debug: print 'parse connection closed'
def writeMessage(self, message):
- self.writeQueue.put(message)
+ try:
+ yield self.stream.write(message)
+ except ConnectionClosed:
+ yield self.connectionClosed()
+ except:
+ print traceback.print_exc()
+ #self.writeQueue.put(message)
def parseHandshake(self):
'''Parses the rtmp handshake'''
def parseRequests(self):
'''Parses complete messages until connection closed. Raises ConnectionLost exception.'''
+
+ print "parseRequests start", repr(self)
self.hr = MultitaskHTTPRequestHandler(self.stream, self.remote, None)
self.hr.close_connection = 1
self.cookies = CookieJar()
# over to "standard" HTTPRequestHandler, the data's already
# there.
print "parseRequests"
+ readok = (yield multitask.readable(self.stream.sock, 5000))
+ print "readok", readok
+ print
raw_requestline = (yield self.stream.readline())
if _debug: print "parseRequests, line", raw_requestline
if not raw_requestline:
raise ConnectionClosed
self.hr.raw_requestline = raw_requestline
- pos = self.hr.rfile.tell()
- #self.hr.rfile.truncate(0)
+ #pos = self.hr.rfile.tell()
+ pos = 0
+ self.hr.rfile.truncate(0)
self.hr.rfile.write(data)
print "parseRequests write after"
self.hr.rfile.seek(pos)
def write(self):
'''Writes messages to stream'''
+ print "starting protocol write loop", repr(self)
while True:
while self.writeQueue.empty(): (yield multitask.sleep(0.01))
data = self.writeQueue.get() # TODO this should be used using multitask.Queue and remove previous wait.
yield self.connectionClosed()
except:
print traceback.print_exc()
+ print "ending protocol write loop", repr(self)
class Command(object):
''' Class for command / data messages'''
self.objectEncoding = 0.0
self.queue = multitask.Queue() # receive queue used by application
multitask.add(self.parse())
- multitask.add(self.write())
+ #multitask.add(self.write())
def recv(self):
'''Generator to receive new Message (msg, arg) on this stream, or (None,None) if stream is closed.'''
def connectionClosed(self):
'''Called when the client drops the connection'''
if _debug: 'Client.connectionClosed'
- self.writeMessage(None)
- yield self.queue.put((None,None))
+ if self.stream.sock:
+ yield self.stream.close()
+ else:
+ yield None
+ #self.writeMessage(None)
+ #yield self.queue.put((None,None))
def messageReceived(self, msg):
if _debug: print 'messageReceived cmd=', msg.command, msg.path
inst = self.clients[session][0]
else:
inst = app()
+ self.clients[session] = [inst]; inst._clients=self.clients[session]
msg.server = inst # whew! just in time!
try:
methodname = "on%s" % msg.command
yield client.rejectConnection(reason='Exception on %s' % methodname)
continue
if result is True or result is None:
- if session not in self.clients:
- self.clients[session] = [inst]; inst._clients=self.clients[session]
if result is None:
msg.wfile.seek(0)
data = msg.wfile.read()
print 'close_connection requested'
try:
yield client.connectionClosed()
+ raise ConnectionClosed
except ConnectionClosed:
if _debug:
print 'close_connection done'