sts Package

sts Package

control_flow_test Module

class tests.unit.sts.control_flow_test.ReplayerTest(methodName='runTest')[source]

Bases: unittest.case.TestCase

setup_controller_simulation()[source]
setup_dataplane_simulation()[source]
setup_migration_simulation()[source]
setup_simple_simulation()[source]
test_basic()[source]
test_controller_crash()[source]
test_dataplane_injection()[source]
test_migration()[source]
tmp_basic_superlog = '/tmp/superlog_basic.tmp'
tmp_controller_superlog = '/tmp/superlog_controller.tmp'
tmp_dataplane_superlog = '/tmp/superlog_dataplane.tmp'
tmp_migration_superlog = '/tmp/superlog_migration.tmp'
write_controller_crash_superlog()[source]
write_dataplane_trace_superlog()[source]
write_migration_superlog()[source]
write_simple_superlog()[source]

Make sure to delete afterwards!

tests.unit.sts.control_flow_test.handle_int(sigspec, frame)[source]

deferred_io_worker_test Module

Created on Mar 8, 2012

@author: rcs

class tests.unit.sts.deferred_io_worker_test.DeferredIOWorkerTest(methodName='runTest')[source]

Bases: unittest.case.TestCase

static call_later(func)[source]
test_not_received_until_permitted()[source]
test_not_sent_until_permitted()[source]
test_receive_consume()[source]

event_dag_test Module

class tests.unit.sts.event_dag_test.MockEvent(label=None, round=-1, time=None, dependent_labels=None, prunable=True)[source]

Bases: sts.replay_event.InputEvent

proceed(simulation)[source]
class tests.unit.sts.event_dag_test.MockInputEvent(label=None, round=-1, time=None, dependent_labels=None, prunable=True)[source]

Bases: sts.replay_event.InputEvent

proceed(simulation)[source]
class tests.unit.sts.event_dag_test.MockInternalEvent(fingerprint, label=None)[source]

Bases: sts.replay_event.InternalEvent

__init__(fingerprint, label=None)[source]
fingerprint[source]
proceed(simulation)[source]
class tests.unit.sts.event_dag_test.event_dag_test(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_event_dag()[source]
test_event_dag_complement()[source]
test_event_dag_subset()[source]
test_migration_prune_1()[source]
test_migration_prune_2()[source]
test_migration_prune_last()[source]
test_migration_simple()[source]
test_split_basic()[source]
test_split_odd()[source]
test_split_single()[source]
test_split_zero()[source]

mcs_finder_test Module

class tests.unit.sts.mcs_finder_test.MCSFinderTest(methodName='runTest')[source]

Bases: unittest.case.TestCase

all(mcs_finder_type)[source]
basic(mcs_finder_type)[source]
straddle(mcs_finder_type)[source]
test_all()[source]
test_all_efficient()[source]
test_basic()[source]
test_basic_efficient()[source]
test_straddle()[source]
test_straddle_efficient()[source]
class tests.unit.sts.mcs_finder_test.MockEfficientMCSFinder(event_dag, mcs)[source]

Bases: tests.unit.sts.mcs_finder_test.MockMCSFinderBase, sts.control_flow.mcs_finder.EfficientMCSFinder

__init__(event_dag, mcs)[source]
class tests.unit.sts.mcs_finder_test.MockInputEvent(fingerprint=None, **kws)[source]

Bases: sts.replay_event.InputEvent

__init__(fingerprint=None, **kws)[source]
fingerprint[source]
proceed(simulation)[source]
class tests.unit.sts.mcs_finder_test.MockMCSFinder(event_dag, mcs)[source]

Bases: tests.unit.sts.mcs_finder_test.MockMCSFinderBase, sts.control_flow.mcs_finder.MCSFinder

__init__(event_dag, mcs)[source]
class tests.unit.sts.mcs_finder_test.MockMCSFinderBase(event_dag, mcs)[source]

Bases: sts.control_flow.mcs_finder.MCSFinder

Overrides self.invariant_check and run_simulation_forward()

__init__(event_dag, mcs)[source]
log(message)[source]
replay(new_dag, hook=None)[source]

socket_multiplexer_test Module

class tests.unit.sts.socket_multiplexer_test.MultiplexerTest(methodName='runTest')[source]

Bases: unittest.case.TestCase

client_messages = ['foo', 'bar', 'baz']
setup_client(num_socks, address)[source]
setup_server(address)[source]
test_basic()[source]
test_three_incoming()[source]
wait_for_next_accept(listener, mux_select)[source]

topology_test Module

class tests.unit.sts.topology_test.BufferedPanelTest(methodName='runTest')[source]

Bases: unittest.case.TestCase

setUp()[source]
test_buffering()[source]
test_drop()[source]
class tests.unit.sts.topology_test.FullyMeshedLinkTest(methodName='runTest')[source]

Bases: unittest.case.TestCase

setUp()[source]
test_connected_ports()[source]
class tests.unit.sts.topology_test.TopologyUnitTest(methodName='runTest')[source]

Bases: unittest.case.TestCase

setUp()[source]
xxx_marked_as_broken_generakted_topology()[source]
class tests.unit.sts.topology_test.topology_test(methodName='runTest')[source]

Bases: unittest.case.TestCase

test_create_meshes()[source]

Create meshes of several sizes and ensure they are fully connected

test_create_switch()[source]