代码拉取完成,页面将自动刷新
# socketlibevent.py - MIT License
# phoenix@burninglabs.com
#
# Non-blocking socket I/O for Stackless Python using libevent, via pyevent.
#
# Usage:
# import sys, socketlibevent; sys.modules['socket'] = socketlibevent
#
# Based on Richard Tew's stacklesssocket module.
# Uses Dug Song's pyevent.
#
# Thanks a Heap !
import stackless, sys, time, traceback
from weakref import WeakValueDictionary
import socket as stdsocket
from socket import _fileobject
try:
import event
except:
try:
import rel; rel.override()
import event
except:
print "please install libevent and pyevent"
# http://code.google.com/p/pyevent/
print "(or 'stackless ez_setup.py rel' for quick testing)"
# http://code.google.com/p/registeredeventlistener/
sys.exit()
# For SSL support, this module uses the 'ssl' module (built in from 2.6 up):
# ('back-port' for Python < 2.6: http://pypi.python.org/pypi/ssl/)
try:
import ssl as ssl_
ssl_enabled = True
except:
ssl_enabled = False
# Nice socket globals import ripped from Minor Gordon's Yield.
if "__all__" in stdsocket.__dict__:
__all__ = stdsocket.__dict__["__all__"]
globals().update((key, value) for key, value in\
stdsocket.__dict__.iteritems() if key in __all__ or key == "EBADF")
else:
other_keys = ("error", "timeout", "getaddrinfo")
globals().update((key, value) for key, value in\
stdsocket.__dict__.iteritems() if key.upper() == key or key in\
other_keys)
_GLOBAL_DEFAULT_TIMEOUT = 0.1
# simple decorator to run a function in a tasklet
def tasklet(task):
def run(*args, **kwargs):
stackless.tasklet(task)(*args, **kwargs)
return run
# Event Loop Management
loop_running = False
sockets = WeakValueDictionary()
def die():
global sockets
sockets = {}
sys.exit()
@tasklet
def eventLoop():
global loop_running
global event_errors
while sockets.values():
# If there are other tasklets scheduled:
# use the nonblocking loop
# else: use the blocking loop
if stackless.getruncount() > 2: # main tasklet + this one
event.loop(True)
else:
event.loop(False)
stackless.schedule()
loop_running = False
def runEventLoop():
global loop_running
if not loop_running:
event.init()
event.signal(2, die)
event.signal(3, die)
eventLoop()
loop_running = True
# Replacement Socket Module Functions
def socket(family=AF_INET, type=SOCK_STREAM, proto=0):
return evsocket(stdsocket.socket(family, type, proto))
def create_connection(address, timeout=0.1):
s = socket()
s.connect(address, timeout)
return s
def ssl(sock, keyfile=None, certfile=None, server_side=False, ssl_version=ssl_.PROTOCOL_TLSv1):
if ssl_enabled:
return evsocketssl(sock, keyfile, certfile, server_side, ssl_version)
else:
raise RuntimeError(\
"SSL requires the 'ssl' module: 'http://pypi.python.org/pypi/ssl/'")
# Socket Proxy Class
class evsocket():
# XXX Not all socketobject methods are implemented!
# XXX Currently, the sockets are using the default, blocking mode.
def __init__(self, sock):
self.sock = sock
self.accepting = False
self.connected = False
self.remote_addr = None
self.fileobject = None
self.read_channel = stackless.channel()
self.write_channel = stackless.channel()
self.accept_channel = None
global sockets
sockets[id(self)] = self
runEventLoop()
def __getattr__(self, attr):
return getattr(self.sock, attr)
def listen(self, backlog=255):
self.accepting = True
return self.sock.listen(backlog)
def accept(self):
if not self.accept_channel:
self.accept_channel = stackless.channel()
event.event(self.handle_accept, handle=self.sock,
evtype=event.EV_READ | event.EV_PERSIST).add()
return self.accept_channel.receive()
@tasklet
def handle_accept(self, ev, sock, event_type, *arg):
s, a = self.sock.accept()
s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)
s = evsocket(s)
self.accept_channel.send((s,a))
def connect(self, address, timeout=0.1):
endtime = time.time() + timeout
while time.time() < endtime:
if self.sock.connect_ex(address) == 0:
self.connected = True
self.remote_addr = address
return
if not self.connected:
# One last try, just to raise an error
return self.sock.connect(address)
def send(self, data, *args):
event.write(self.sock, self.handle_send, data)
return self.write_channel.receive()
@tasklet
def handle_send(self, data):
self.write_channel.send(self.sock.send(data))
def sendall(self, data, *args):
while data:
try:
sent = self.send(data)
data = data[sent + 1:]
except:
raise
return None
def recv(self, bytes, *args):
event.read(self.sock, self.handle_recv, bytes)
return self.read_channel.receive()
@tasklet
def handle_recv(self, bytes):
self.read_channel.send(self.sock.recv(bytes))
def recvfrom(self, bytes, *args):
event.read(self.sock, self.handle_recvfrom, bytes)
return self.read_channel.receive()
@tasklet
def handle_recvfrom(self, bytes):
self.read_channel.send(self.sock.recvfrom(bytes))
def makefile(self, mode='r', bufsize=-1):
self.fileobject = stdsocket._fileobject(self, mode, bufsize)
return self.fileobject
def close(self):
# XXX Stupid workaround
# Don't close while the fileobject is still using the fakesocket
def _close():
while self.fileobject._sock == self:
stackless.schedule()
self._sock.close()
del sockets[id(self)]
if self.fileobject:
stackless.tasklet(_close)()
else:
self._sock.close()
del sockets[id(self)]
# SSL Proxy Class
class evsocketssl(evsocket):
def __init__(self, sock, keyfile=None, certfile=None, server_side=False, ssl_version=ssl_.PROTOCOL_TLSv1):
evsocket.__init__(self, sock)
# XXX This currently performs a BLOCKING handshake operation
# TODO Implement a non-blocking handshake
self.sock = ssl_.wrap_socket(sock, keyfile=keyfile, certfile=certfile, server_side=server_side, ssl_version=ssl_version)
@tasklet
def handle_accept(self, ev, sock, event_type, *arg):
s, a = self.sock.accept()
s.setsockopt(stdsocket.SOL_SOCKET, stdsocket.SO_REUSEADDR, 1)
s.setsockopt(stdsocket.IPPROTO_TCP, stdsocket.TCP_NODELAY, 1)
s = evsocketssl(s)
self.accept_channel.send((s,a))
if __name__ == "__main__":
sys.modules["socket"] = __import__(__name__)
# Minimal Client Test
# TODO: Add a Minimal Server Test
import urllib2
@tasklet
def test(i):
print "url read", i
print urllib2.urlopen("http://www.google.com").read(12)
for i in range(5):
test(i)
stackless.run()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。