Source code for tests.unit.sts.socket_multiplexer_test

# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import sys
import os
import itertools
import time
from threading import Thread
import logging
log = logging.getLogger()
from sts.util.socket_mux.server_socket_multiplexer import *
from sts.util.socket_mux.sts_socket_multiplexer import *

sys.path.append(os.path.dirname(__file__) + "/../../..")

[docs]class MultiplexerTest(unittest.TestCase): client_messages = [ "foo", "bar", "baz" ]
[docs] def setup_server(self, address): import socket import os mux_select = ServerMultiplexedSelect() ServerMockSocket.bind_called = False listener = ServerMockSocket(socket.AF_UNIX, socket.SOCK_STREAM, set_true_listen_socket=mux_select.set_true_listen_socket) listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listener.bind(address) listener.listen(16) return (mux_select, listener)
[docs] def setup_client(self, num_socks, address): from pox.lib.util import connect_socket_with_backoff io_master = MultiplexedSelect() socket = connect_socket_with_backoff(address=address) io_worker = io_master.create_worker_for_socket(socket) io_master.set_true_io_worker(io_worker) demux = STSSocketDemultiplexer(io_worker, address) mock_socks = [] for i in xrange(num_socks): mock_socket = STSMockSocket(None, None) mock_socket.connect(address) mock_socket.send(self.client_messages[i]) mock_socks.append(mock_socket) io_master.select(mock_socks, mock_socks, [])
[docs] def wait_for_next_accept(self, listener, mux_select): log.info("waiting for next accept") rl = [] while listener not in rl: (rl, _, _) = mux_select.select([listener], [], [], 0.1)
[docs] def test_basic(self): address = "basic_pipe" try: t = Thread(target=self.setup_client, args=(1,address,), name="MainThread") t.start() (mux_select, listener) = self.setup_server(address) self.wait_for_next_accept(listener, mux_select) mock_sock = listener.accept()[0] (rl, _, _) = mux_select.select([mock_sock], [], []) start = last = time.time() while mock_sock not in rl: time.sleep(0.05) if time.time() - start > 5: self.fail("Did not find socket in rl in 5 seconds") elif time.time() - last > 1: log.debug("waiting for socket %s in rl %s..." % ( str(mock_sock), repr(rl))) last = time.time() (rl, _, _) = mux_select.select(rl, [], []) d = mock_sock.recv(2048) self.assertEqual(self.client_messages[0], d) finally: try: os.unlink(address) except OSError: if os.path.exists(address): raise RuntimeError("can't remove PIPE socket %s" % str(address))
[docs] def test_three_incoming(self): address = "three_pipe" try: t = Thread(target=self.setup_client, args=(3,address,), name="MainThread") t.start() (mux_select, listener) = self.setup_server(address) mock_socks = [] for i in xrange(len(self.client_messages)): self.wait_for_next_accept(listener, mux_select) mock_sock = listener.accept()[0] (rl, _, _) = mux_select.select([mock_sock], [], []) start = last = time.time() while mock_sock not in rl: if time.time() - start > 5: self.fail("Did not find socket in rl in 5 seconds") elif time.time() - last > 1: log.debug("waiting for socket %s in rl %s..." % ( str(mock_sock), repr(rl))) last = time.time() (rl, _, _) = mux_select.select(rl, [], []) time.sleep(0.05) d = mock_sock.recv(2048) # order should be deterministic self.assertEqual(self.client_messages[i], d) finally: try: os.unlink(address) except OSError: if os.path.exists(address): raise RuntimeError("can't remove PIPE socket %s" % str(address))