# Copyright 2011-2013 Colin Scott
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sts.util.io_master import IOMaster
from pox.lib.ioworker.io_worker import JSONIOWorker, IOWorker
import select
import socket
import logging
import errno
import base64
import threading
log = logging.getLogger("sock_mux")
# TODO(cs): what if the controller doesn't use a select loop? The demuxing can
# still be achieved, it's just that all socket calls will be blocking. We
# would also need to make sure that our code is thread-safe.
# The wire protocol is fairly simple:
# - all messages are wrapped in a json hash
# - each hash has two fields: `id', and `type'
# - `id' identifies a channel. The value of `id' is shared between the client
# socket and the corresponding socket in the server.
# - Upon connect(), tell the server that we've connected. `type' is set to
# "SYN", and an additional `address' field tells the server the proper
# address to return from accept().
# - Upon seeing the SYN for an id it has not observed before, the server
# creates a MockSocket and stores it to be accept()'ed by the mock listener
# socket.
# - All data messages are of type `data', and include a `data' field
[docs]class SocketDemultiplexer(object):
[docs] def __init__(self, true_io_worker):
self.true_io_worker = true_io_worker
self.client_info = true_io_worker.socket.getsockname()
self.json_worker = JSONIOWorker(true_io_worker,
on_json_received=self._on_receive)
self.id2socket = {}
self.log = logging.getLogger("sockdemux")
def _on_receive(self, _, json_hash):
if 'id' not in json_hash or 'type' not in json_hash:
raise ValueError("Invalid json_hash %s" % str(json_hash))
[docs]class MockSocket(object):
[docs] def __init__(self, protocol, sock_type, sock_id=-1, json_worker=None):
self.protocol = protocol
self.sock_type = sock_type
self.sock_id = sock_id
self.json_worker = json_worker
self.pending_reads = []
[docs] def ready_to_read(self):
return self.pending_reads != []
[docs] def send(self, data):
# base 64 occasionally adds extraneous newlines: bit.ly/aRTmNu
json_safe_data = base64.b64encode(data).replace("\n", "")
wrapped = {'id' : self.sock_id, 'type' : 'data', 'data' : json_safe_data}
self.json_worker.send(wrapped)
# that just put it on a buffer. Now, actually send...
# TODO(cs): this is hacky. Should really define our own IOWorker class
buf = self.json_worker.io_worker.send_buf
try:
l = self.json_worker.io_worker.socket.send(buf)
except socket.error as (s_errno, strerror):
if s_errno != errno.EAGAIN:
raise
l = 0
# Note that if l != len(buf), the rest of the data will be sent on the
# next select() [since true_io_worker._ready_to_send will still be True.
# In this case our return value will be a lie, but there won't be any
# negative consequences of this, since the client is a MockSocket, and we
# filter them out of the select call anyway.
self.json_worker.io_worker._consume_send_buf(l)
return len(data)
[docs] def recv(self, bufsize):
if self.pending_reads == []:
log.warn("recv() called with an empty buffer")
# Never block
return None
# TODO(cs): don't ignore bufsize
data = self.pending_reads.pop(0)
return data
[docs] def append_read(self, data):
self.pending_reads.append(data)
[docs] def fileno(self):
return self.sock_id
[docs] def setsockopt(self, *args, **kwargs):
# TODO(cs): implement me
pass
[docs] def setblocking(self, _):
# We never block anyway
pass
[docs] def getpeername(self):
pass
[docs] def close(self):
# TODO(cs): implement me
pass
[docs]def is_mocked(sock_or_io_worker):
return sock_or_io_worker.fileno() < 0
[docs]def sort_sockets(rl, wl, xl):
for l in [rl, wl, xl]:
l.sort(key=lambda s: s.fileno())
return (rl, wl, xl)
[docs]class MultiplexedSelect(IOMaster):
# Note that there will be *two* IOMasters running in the process. This one
# runs below the normal IOMaster. MultiplexedSelect subclasses IOMaster only to
# wrap its true socket(s) in an internal IOWorker. Also note that the normal
# IOMaster's pinger sockets will in fact be MockSockets. We have the only
# real pinger socket (MultiplexedSelect must be instantiated before
# socket.socket is overridden).
# The caller may pass in classes that wrap our MockSockets. select() can
# only rely on the fileno() to tell whether the socket is ready to read.
# Therefore we keep a map fileno() -> MockSocket.ready_to_read.
# TODO(cs): perhaps this shouldn't be a class variable
fileno2ready_to_read = {}
[docs] def __init__(self, *args, **kwargs):
super(MultiplexedSelect, self).__init__(*args, **kwargs)
self.true_io_workers = []
self.log = logging.getLogger("mux_select")
[docs] def set_true_io_worker(self, true_io_worker):
self.true_io_workers.append(true_io_worker)
[docs] def ready_to_read(self, sock_or_io_worker):
fileno = sock_or_io_worker.fileno()
if fileno >= 0:
raise ValueError("Not a MockSocket!")
if fileno not in self.fileno2ready_to_read:
raise RuntimeError("Unknown mock fileno %d" % fileno)
return self.fileno2ready_to_read[fileno]()
[docs] def select(self, rl, wl, xl, timeout=0):
''' Note that this layer is *below* IOMaster's Select loop '''
# If this isn't the main thread, use normal select
if (threading.current_thread().name != "MainThread" and
threading.current_thread().name != "BackgroundIOThread"):
if hasattr(select, "_old_select"):
return select._old_select(rl, wl, xl, timeout)
else:
return select.select(rl, wl, xl, timeout)
# Always remove MockSockets or wrappers of MockSockets
# (don't mess with other non-socket fds)
mock_read_socks = [ s for s in rl if is_mocked(s) ]
mock_write_workers = [ w for w in wl if is_mocked(w) ]
(rl, wl, xl) = [ [s for s in l if not is_mocked(s) ]
for l in [rl, wl, xl] ]
# Grab the sock lists for our internal socket. These lists will contain
# our true_io_worker(s), along with our pinger.
(our_rl, our_wl, our_xl) = self.grab_workers_rwe()
# If any of our mock sockets are ready to read, and our true_socket
# doesn't have pending writes, return immediately
ready_to_read_mock = [ s for s in mock_read_socks if self.ready_to_read(s) ]
if (ready_to_read_mock != [] or mock_write_workers != []) and our_wl == []:
return sort_sockets(ready_to_read_mock, mock_write_workers, [])
if hasattr(select, "_old_select"):
(rl, wl, xl) = select._old_select(rl+our_rl, wl+our_wl, xl+our_xl, timeout)
else:
(rl, wl, xl) = select.select(rl+our_rl, wl+our_wl, xl+our_xl, timeout)
(rl, wl, xl) = self.handle_socks_rwe(rl, wl, xl, mock_read_socks,
mock_write_workers)
return (rl, wl, xl)
[docs] def handle_socks_rwe(self, rl, wl, xl, mock_read_socks, mock_write_workers):
if self.pinger in rl:
self.pinger.pongAll()
rl.remove(self.pinger)
for true_io_worker in self.true_io_workers:
if true_io_worker in xl:
raise RuntimeError("Error in true socket")
if true_io_worker in rl:
rl.remove(true_io_worker)
# Trigger self.true_io_worker.on_received
try:
data = true_io_worker.socket.recv(self._BUF_SIZE)
if data:
true_io_worker._push_receive_data(data)
else:
print "Closing true_io_worker after empty read"
true_io_worker.close()
self._workers.discard(true_io_worker)
except socket.error as (s_errno, strerror):
if s_errno != errno.EWOULDBLOCK:
print ("Socket read error: " + strerror)
true_io_worker.close()
self._workers.discard(true_io_worker)
if true_io_worker in wl:
wl.remove(true_io_worker)
try:
l = true_io_worker.socket.send(true_io_worker.send_buf)
if l > 0:
true_io_worker._consume_send_buf(l)
except socket.error as (s_errno, strerror):
if s_errno != errno.EAGAIN and s_errno != errno.EWOULDBLOCK:
print "Socket error: " + strerror
true_io_worker.close()
self._workers.discard(true_io_worker)
# Now add MockSockets that are ready to read
rl += [ s for s in mock_read_socks if self.ready_to_read(s) ]
# As well as MockSockets that are ready to write.
# This will cause the IOMaster above to flush the
# io_worker's buffers into our true_io_worker.
wl += mock_write_workers
# Sort all sockets to ensure determinism
return sort_sockets(rl, wl, xl)