# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import sys
import os.path
import itertools
from copy import copy
import types
sys.path.append(os.path.dirname(__file__) + "/../../..")
from sts.replay_event import *
from sts.event_dag import *
[docs]class MockEvent(InputEvent):
[docs] def proceed(self, simulation):
pass
[docs]class MockInternalEvent(InternalEvent):
[docs] def __init__(self, fingerprint, label=None):
InternalEvent.__init__(self, label)
self.timed_out = False
self._fingerprint = fingerprint
@property
[docs] def fingerprint(self):
return self._fingerprint
[docs] def proceed(self, simulation):
pass
[docs]class event_dag_test(unittest.TestCase):
[docs] def test_split_basic(self):
events = [MockEvent(), MockEvent(), MockEvent(), MockEvent()]
dag = EventDag(events)
splits = split_list(dag.input_events, 2)
self.assertEqual(2, len(splits))
self.assertEqual(2, len(splits[0]))
self.assertEqual(2, len(splits[1]))
splits = split_list(dag.input_events, 4)
self.assertEqual(4, len(splits))
self.assertEqual(1, len(splits[0]))
self.assertEqual(1, len(splits[1]))
self.assertEqual(1, len(splits[2]))
self.assertEqual(1, len(splits[3]))
splits = split_list(dag.input_events, 3)
self.assertEqual(3, len(splits))
self.assertEqual(4, len(splits[0]) + len(splits[1]) + len(splits[2]))
[docs] def test_split_single(self):
events = [MockEvent()]
dag = EventDag(events)
splits = split_list(dag.input_events, 1)
self.assertEqual(1, len(splits))
self.assertEqual(1, len(splits[0]))
[docs] def test_split_zero(self):
events = []
dag = EventDag(events)
splits = split_list(dag.input_events, 1)
self.assertEqual(1, len(splits))
self.assertEqual(0, len(splits[0]))
[docs] def test_split_odd(self):
events = [MockEvent(), MockEvent(), MockEvent()]
dag = EventDag(events)
splits = split_list(dag.input_events, 2)
self.assertEqual(2, len(splits))
self.assertEqual(3, len(splits[0]) + len(splits[1]))
splits = split_list(dag.input_events, 3)
self.assertEqual(3, len(splits))
self.assertEqual(3, len(splits[0]) + len(splits[1]) + len(splits[2]))
[docs] def test_event_dag(self):
event_dag = EventDag( [ MockInternalEvent("a"), MockInputEvent() ])
self.assertEqual(2, len(event_dag))
self.assertEqual(2, len(event_dag.filter_unsupported_input_types()))
event_dag.mark_invalid_input_sequences()
self.assertEqual(2, len(event_dag))
[docs] def test_event_dag_subset(self):
mockInputEvent = MockInputEvent()
mockInputEvent2 = MockInputEvent()
events = [ MockInternalEvent('a'), mockInputEvent, mockInputEvent2 ]
event_dag = EventDag(events)
# subset of (full set) is a noop
self.assertEqual( event_dag.events, event_dag.input_subset(events).events)
# subset of () empty retains the internal event
self.assertEqual( event_dag.events[0:1], event_dag.input_subset([]).events)
sub_graph = event_dag.input_subset([mockInputEvent])
self.assertEqual( event_dag.events[0:2], sub_graph.events)
[docs] def test_event_dag_complement(self):
mockInputEvent = MockInputEvent()
mockInputEvent2 = MockInputEvent()
events = [ MockInternalEvent('a'), mockInputEvent, mockInputEvent2 ]
event_dag = EventDag(events)
# complement of (nothing) is full set
self.assertEqual( event_dag.events, event_dag.input_complement([]).events)
# complement of (all elements) retains only the internal event
self.assertEqual( event_dag.events[0:1], event_dag.input_complement(events).events)
sub_graph = event_dag.input_complement([mockInputEvent])
self.assertEqual( [ e for (i, e) in enumerate(event_dag.events) if i==0 or i==2 ], sub_graph.events)
[docs] def test_migration_simple(self):
events = [ MockInternalEvent('a'), HostMigration(1,1,2,2,"host1"),
MockInternalEvent('b'), HostMigration(2,2,3,3,"host1"),
MockInputEvent() ]
# Don't prune anything
event_dag = EventDag(events)
new_dag = event_dag.input_subset(events)
self.assertEqual(events, new_dag.events)
[docs] def test_migration_prune_1(self):
events = [ MockInternalEvent('a'), HostMigration(1,1,2,2,"host1"),
MockInternalEvent('b'), HostMigration(2,2,3,3,"host1"),
MockInputEvent() ]
# Prune the first migration
subset = [events[1]]
event_dag = EventDag(events)
new_dag = event_dag.input_complement(subset)
fingerprint = ('HostMigration',1,1,3,3,"host1")
self.assertEqual(fingerprint, new_dag.events[2].fingerprint)
[docs] def test_migration_prune_2(self):
events = [ MockInternalEvent('a'), HostMigration(1,1,2,2,"host1"),
MockInternalEvent('b'), HostMigration(2,2,3,3,"host1"),
MockInputEvent(), HostMigration(3,3,4,4,"host1"),
HostMigration(4,4,5,5,"host1") ]
# Prune the seconds and third migration
subset = [events[3], events[4], events[5]]
event_dag = EventDag(events)
new_dag = event_dag.input_complement(subset)
fingerprint = ('HostMigration',2,2,5,5,"host1")
self.assertEqual(fingerprint, new_dag.events[3].fingerprint)
[docs] def test_migration_prune_last(self):
events = [ MockInternalEvent('a'), HostMigration(1,1,2,2,"host1"),
MockInternalEvent('b'), HostMigration(2,2,3,3,"host1"),
MockInputEvent() ]
# Prune the last migration
subset = [events[3]]
event_dag = EventDag(events)
new_dag = event_dag.input_complement(subset)
fingerprint = ('HostMigration',1,1,2,2,"host1")
self.assertEqual(fingerprint, new_dag.events[1].fingerprint)
if __name__ == '__main__':
unittest.main()