Source code for sts.util.io_master
# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import sys
import logging
import select
import socket
import time
import threading
from pox.lib.util import makePinger
from pox.lib.ioworker.io_worker import IOWorker
log = logging.getLogger("io_master")
[docs]class STSIOWorker(IOWorker):
""" An IOWorker that works with our IOMaster """
[docs] def __init__(self, socket, on_close):
IOWorker.__init__(self)
self.socket = socket
# (on_close factory method hides details of the Select loop)
self.on_close = on_close
[docs] def fileno(self):
""" Return the wrapped sockets' fileno """
return self.socket.fileno()
[docs] def send(self, data):
if threading.current_thread().name != "MainThread" and threading.current_thread().name != "BackgroundIOThread":
raise RuntimeError("Wrong thread: %s" % threading.current_thread())
""" send data from the client side. fire and forget. """
return IOWorker.send(self, data)
[docs] def close(self):
""" Register this socket to be closed. fire and forget """
# (don't close until Select loop is ready)
IOWorker.close(self)
# on_close is a function not a method
self.on_close(self)
# Note that IOMaster is used as the main select loop in POX (debugger branch)
[docs]class IOMaster(object):
"""
an IO handler that handles the select work for our IO worker
"""
_select_timeout = 5
_BUF_SIZE = 8192
[docs] def __init__ (self):
self._workers = set()
self.pinger = makePinger()
self.closed = False
self._close_requested = False
self._in_select = 0
[docs] def create_worker_for_socket(self, socket):
'''
Return an IOWorker wrapping the given socket.
'''
# Called from external threads.
# Does not register the IOWorker immediately with the select loop --
# rather, adds a command to the pending queue
# Our callback for io_worker.close():
def on_close(worker):
worker.socket.close()
self._workers.discard(worker)
worker = STSIOWorker(socket, on_close=on_close)
self._workers.add(worker)
return worker
[docs] def monkey_time_sleep(self):
"""monkey patches time.sleep to use this io_masters's time.sleep"""
self.original_time_sleep = time.sleep
# keep time._orig_sleep around for interrupt handler (procutils)
time._orig_sleep = time.sleep
time.sleep = self.sleep
def _ping(self):
if self.pinger:
self.pinger.ping()
[docs] def close_all(self):
if self._in_select > 0:
self._close_requested = True
self._ping()
else:
self._do_close_all()
def _do_close_all(self):
for w in list(self._workers):
try:
w.close()
except Exception as e:
log.warn("Error closing IOWorker %s: %s (%d)", w, e.strerror, e.errno)
if time.sleep is self.sleep:
time.sleep = self.original_time_sleep
if (self.pinger):
self.pinger.ping()
if hasattr(self.pinger, "close"):
self.pinger.close()
self.pinger = None
self.closed = True
[docs] def poll(self):
self.select(0)
[docs] def sleep(self, timeout):
start = time.time()
while not self.closed:
elapsed = time.time() - start
remaining = timeout - elapsed
if remaining < 0.01:
break
self.select(remaining)
[docs] def grab_workers_rwe(self):
# Now grab workers
read_sockets = list(self._workers) + [ self.pinger ]
write_sockets = [ worker for worker in self._workers if worker._ready_to_send ]
exception_sockets = list(self._workers)
return (read_sockets, write_sockets, exception_sockets)
[docs] def select(self, timeout=0):
self._in_select += 1
try:
read_sockets, write_sockets, exception_sockets = self.grab_workers_rwe()
rlist, wlist, elist = select.select(read_sockets, write_sockets, exception_sockets, timeout)
self.handle_workers_rwe(rlist, wlist, elist)
except select.error:
# TODO(cs): this is a hack: file descriptor is closed upon shut
# down, and select throws up.
sys.stderr.write("File Descriptor Closed\n")
except TypeError:
# Same behavior, error message is:
# TypeError: argument must be an int, or have a fileno() method.
sys.stderr.write("File Descriptor Closed\n")
finally:
self._in_select -= 1
if self._in_select == 0 and self._close_requested and not self.closed:
self._do_close_all()
[docs] def handle_workers_rwe(self, rlist, wlist, elist):
if self.pinger in rlist:
self.pinger.pongAll()
rlist.remove(self.pinger)
for worker in elist:
worker.close()
if worker in self._workers:
self._workers.discard(worker)
for worker in rlist:
try:
data = worker.socket.recv(self._BUF_SIZE)
if data:
worker._push_receive_data(data)
else:
log.warn("Closing socket due to empty read")
worker.close()
self._workers.discard(worker)
except socket.error as (s_errno, strerror):
log.error("Socket error: " + strerror)
worker.close()
self._workers.discard(worker)
for worker in wlist:
try:
l = worker.socket.send(worker.send_buf)
if l > 0:
worker._consume_send_buf(l)
except socket.error as (s_errno, strerror):
if s_errno != errno.EAGAIN:
log.error("Socket error: " + strerror)
worker.close()
self._workers.discard(worker)