Source code for tests.unit.sts.topology_test

# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
# Copyright 2012-2013 Sam Whitlock
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import sys
import os.path
import itertools
from copy import copy
import types

sys.path.append(os.path.dirname(__file__) + "/../../..")

from sts.topology import *
from sts.traffic_generator import *
from pox.lib.ioworker.io_worker import RecocoIOLoop
from pox.openflow.software_switch import SoftwareSwitch
from pox.lib.graph.graph import Graph
from sts.entities import Host, HostInterface

[docs]class topology_test(unittest.TestCase): _io_loop = RecocoIOLoop() _io_ctor = _io_loop.create_worker_for_socket
[docs] def test_create_switch(self): s = create_switch(1, 3) self.assertEqual(len(s.ports), 3) self.assertEqual(s.dpid, 1) s2 = create_switch(2, 3) self.assertNotEqual(s2.ports[1].hw_addr, s.ports[1].hw_addr)
[docs] def test_create_meshes(self): """ Create meshes of several sizes and ensure they are fully connected """ for i in (2,3, 5, 12): self._test_create_mesh(i)
def _test_create_mesh(self, size): mesh = MeshTopology(size) self.assertEqual(len(mesh.switches), size) self.assertEqual([sw for sw in mesh.switches if len(sw.ports) == size ], mesh.switches) # check that all pairs of switches are connected to each other non_connected_sw_pairs = [pair for pair in itertools.permutations(mesh.switches, 2) if pair[0] != pair[1] ] # create a list of all (switch, port) pairs sw_port_pairs = [ (sw, p) for sw in mesh.switches for _, p in sw.ports.iteritems() ] # check the all hosts are connected once non_connected_hosts = set(mesh.hosts) # create a list of all (host, interface) pairs host_interface_pairs = [ (host, interface) for host in mesh.hosts for interface in host.interfaces ] # collect the 'other' (switch, port) pairs. This + (hosts, interface) pairs should # end up the same as sw_port_pairs + host_interface_pairs other_sw_port_pairs = [] # List of (host, interface) edge tail pairs other_host_interface_pairs = [] # list of (switch, port) access ports access_sw_port_pairs = [] for switch in mesh.switches: for port_no, port in switch.ports.iteritems(): # TODO: abuse of dynamic types... get_connected_port is a field (other_node, other_port) = mesh.get_connected_port(switch, port) if type(other_node) == Host: self.assertTrue( other_node in non_connected_hosts, "%s" % str(other_node) ) non_connected_hosts.remove(other_node) access_sw_port_pairs.append( (switch, port) ) other_host_interface_pairs.append( (other_node, other_port) ) else: self.assertTrue( (switch, other_node) in non_connected_sw_pairs, "Switches %s, %s connected twice" % (switch, other_node)) non_connected_sw_pairs.remove( (switch, other_node) ) other_sw_port_pairs.append( (other_node, other_port) ) self.assertEqual(len(non_connected_sw_pairs), 0, "Non-connected switches: %s" % non_connected_sw_pairs) self.assertEqual(len(non_connected_hosts), 0, "Non-connected hosts: %s" % non_connected_hosts) # Ensure the list was already unique self.assertEqual(len(set(sw_port_pairs)), len(sw_port_pairs)) self.assertEqual(len(set(other_sw_port_pairs)), len(other_sw_port_pairs)) self.assertEqual(len(set(access_sw_port_pairs)), len(access_sw_port_pairs)) # Test all switches connected self.assertEqual(set(sw_port_pairs), set(other_sw_port_pairs).union(set(access_sw_port_pairs))) # Ensure the list was already unique self.assertEqual(len(set(host_interface_pairs)), len(host_interface_pairs)) self.assertEqual(len(set(other_host_interface_pairs)), len(other_host_interface_pairs)) self.assertEqual(set(host_interface_pairs), set(other_host_interface_pairs)) # Make sure that all access links are accounted for self.assertTrue(len(mesh.access_links) == len(mesh.hosts)) for access_link in mesh.access_links: self.assertTrue((access_link.host, access_link.interface) in host_interface_pairs) self.assertTrue((access_link.switch, access_link.switch_port) in access_sw_port_pairs) # Make sure that all network links are accounted for self.assertTrue(len(mesh.network_links) == len(mesh.switches) * (len(mesh.switches) - 1)) for link in mesh.network_links: self.assertTrue((link.start_software_switch, link.start_port) in sw_port_pairs) self.assertTrue((link.end_software_switch, link.end_port) in sw_port_pairs)
[docs]class FullyMeshedLinkTest(unittest.TestCase): _io_loop = RecocoIOLoop() _io_ctor = _io_loop.create_worker_for_socket
[docs] def setUp(self): self.dpid2switch = { switch_id: create_switch(switch_id, 2) for switch_id in xrange(1, 4) } self.switches = self.dpid2switch.values() self.links = MeshTopology.FullyMeshedLinks(self.dpid2switch) self.get_connected_port = self.links
[docs] def test_connected_ports(self): # this is the sum of i from i=1 to i=n-1, *2 because links are unidirectional expected_link_length = 2*reduce(lambda x,y: x+y, xrange(1,len(self.switches)), 0) self.assertEqual(expected_link_length, len(set(self.links.network_links)))
[docs]class TopologyUnitTest(unittest.TestCase): _io_loop = RecocoIOLoop() _io_ctor = _io_loop.create_worker_for_socket
[docs] def setUp(self): class MockSwitch(SoftwareSwitch): _eventMixin_events = set([DpPacketOut]) def __init__(self, dpid, ports): self.has_forwarded = False self.dpid = dpid self.ports = {} for port in ports: self.ports[port.port_no] = port def process_packet(self, packet, in_port): self.has_forwarded = True def create_mock_switch(num_ports, switch_id): ports = [] for port_no in range(1, num_ports+1): port = ofp_phy_port( port_no=port_no, hw_addr=EthAddr("00:00:00:00:%02x:%02x" % (switch_id, port_no)) ) # monkey patch an IP address onto the port for anteater purposes port.ip_addr = "1.1.%d.%d" % (switch_id, port_no) ports.append(port) return MockSwitch(switch_id, ports) self.g = Graph() self.switches = [create_mock_switch(2, 1), create_mock_switch(2, 2)] self.hosts = [Host([HostInterface(EthAddr("00:00:00:00:00:01"))], name="host1"), Host([HostInterface(EthAddr("00:00:00:00:00:02"))], name="host2")] for switch in self.switches: self.g.add(switch) for host in self.hosts: self.g.add(host) self.g.link((self.switches[0], self.switches[0].ports[2]), (self.switches[1], self.switches[1].ports[2])) self.g.link((self.hosts[0], self.hosts[0].interfaces[0]), (self.switches[0], self.switches[0].ports[1])) self.g.link((self.hosts[1], self.hosts[1].interfaces[0]), (self.switches[1], self.switches[1].ports[1])) topology = Topology.populate_from_topology(self.g) self.patch = BufferedPatchPanelForTopology(self.g) self.switches_calc = topology.switches self.hosts_calc = topology.hosts self.access_links = topology.access_links # Line 152 is throwing an exception! See: # https://github.com/noxrepo/pox/issues/36 for more information.
[docs] def xxx_marked_as_broken_generakted_topology(self): self.assertEqual(len(self.access_links), len(self.hosts)) self.assertEqual(len(self.hosts_calc), len(self.hosts)) self.assertEqual(len(self.switches_calc), len(self.switches))
[docs]class BufferedPanelTest(unittest.TestCase): _io_loop = RecocoIOLoop() _io_ctor = _io_loop.create_worker_for_socket
[docs] def setUp(self): class MockSwitch(SoftwareSwitch): _eventMixin_events = set([DpPacketOut]) def __init__(self, dpid, ports): self.has_forwarded = False self.dpid = dpid self.ports = {} for port in ports: self.ports[port.port_no] = port def process_packet(self, packet, in_port): self.has_forwarded = True def create_mock_switch(num_ports, switch_id): ports = [] for port_no in range(1, num_ports+1): port = ofp_phy_port( port_no=port_no, hw_addr=EthAddr("00:00:00:00:%02x:%02x" % (switch_id, port_no)) ) # monkey patch an IP address onto the port for anteater purposes port.ip_addr = "1.1.%d.%d" % (switch_id, port_no) ports.append(port) return MockSwitch(switch_id, ports) num_ports = 1 self.dpid2switch = { dpid: create_mock_switch(num_ports, dpid) for dpid in xrange(1,3) } self.switches = self.dpid2switch.values() self.m = BufferedPatchPanel(self.switches, [], MeshTopology.FullyMeshedLinks(self.dpid2switch)) self.traffic_generator = TrafficGenerator() self.switch1 = self.switches[0] self.switch2 = self.switches[1] self.port = self.switch1.ports.values()[0] self.icmp_packet = self.traffic_generator.icmp_ping(self.port, None) self.dp_out_event = DpPacketOut(self.switch1, self.icmp_packet, self.port)
[docs] def test_buffering(self): self.switch1.raiseEvent(self.dp_out_event) self.assertFalse(self.switch2.has_forwarded, "should not have forwarded yet") self.assertFalse(len(self.m.queued_dataplane_events) == 0, "should have buffered packet") self.m.permit_dp_event(self.dp_out_event) self.assertTrue(len(self.m.queued_dataplane_events) == 0, "should have cleared buffer") self.assertTrue(self.switch2.has_forwarded, "should have forwarded")
[docs] def test_drop(self): # raise the event self.switch1.raiseEvent(self.dp_out_event) self.assertFalse(self.switch2.has_forwarded, "should not have forwarded yet") self.assertFalse(len(self.m.queued_dataplane_events) == 0, "should have buffered packet") self.m.drop_dp_event(self.dp_out_event) self.assertTrue(len(self.m.queued_dataplane_events) == 0, "should have cleared buffer") self.assertFalse(self.switch2.has_forwarded, "should not have forwarded")
if __name__ == '__main__': unittest.main()