# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
# Copyright 2012-2013 Sam Whitlock
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sts.fingerprints.messages import *
from sts.replay_event import *
from pox.openflow.software_switch import DpPacketOut
import logging
import time
import math
from sys import maxint
from collections import defaultdict
from sts.util.convenience import find_index
log = logging.getLogger("event_dag")
[docs]def split_list(l, split_ways):
''' Split our inputs into split_ways separate lists '''
if split_ways < 1:
raise ValueError("Split ways must be greater than 0")
splits = []
split_interval = len(l) / split_ways # integer division = floor
remainder = len(l) % split_ways # remainder is guaranteed to be less than splitways
start_idx = 0
while len(splits) < split_ways:
split_idx = start_idx + split_interval
# the first 'remainder' chunks are made one element larger to chew
# up the remaining elements (remainder < splitways)
# note: len(l) = split_ways * split_interval + remainder
if remainder > 0:
split_idx += 1
remainder -= 1
splits.append(l[start_idx:split_idx])
start_idx = split_idx
return splits
[docs]class EventDagView(object):
[docs] def __init__(self, parent, events_list):
''' subset is a list '''
self._parent = parent
self._events_list = list(events_list)
self._events_set = set(self._events_list)
@property
[docs] def events(self):
'''Return the events in the DAG'''
return self._events_list
@property
@property
[docs] def next_state_change(self, index):
return self._parent.next_state_change(index, events=self.events)
[docs] def get_original_index_for_event(self, event):
return self._parent.get_original_index_for_event(event)
def __len__(self):
return len(self._events_list)
# TODO(cs): move these somewhere else
[docs]def migrations_per_host(events):
host2migrations = defaultdict(list)
for e in events:
if type(e) == HostMigration:
host2migrations[e.host_id].append(e)
return host2migrations
[docs]def replace_migration(replacee, old_location, new_location, event_list):
# `replacee' is the migration to be replaced
# Don't mutate replacee -- instead, replace it
new_migration = HostMigration(old_location[0], old_location[1],
new_location[0], new_location[1],
host_id=replacee.host_id,
time=replacee.time, label=replacee.label)
# TODO(cs): O(n^2)
index = event_list.index(replacee)
event_list[index] = new_migration
return new_migration
[docs]class EventDag(object):
'''A collection of Event objects. EventDags are primarily used to present a
view of the underlying events with some subset of the input events pruned
'''
# We peek ahead this many seconds after the timestamp of the subseqeunt
# event
# TODO(cs): be smarter about this -- peek() too far, and peek()'ing not far
# enough can both have negative consequences
_peek_seconds = 0.3
# If we prune a failure, make sure that the subsequent
# recovery doesn't occur
_failure_types = set([SwitchFailure, LinkFailure, ControllerFailure, ControlChannelBlock])
# NOTE: we treat failure/recovery as an atomic pair, since it doesn't make
# much sense to prune a recovery event
_recovery_types = set([SwitchRecovery, LinkRecovery, ControllerRecovery, ControlChannelUnblock])
# ignoring these input types
_ignored_input_types = set([WaitTime])
[docs] def __init__(self, events, prefix_trie=None):
'''events is a list of EventWatcher objects. Refer to log_parser.parse to
see how this is assembled.'''
# TODO(cs): ugly that the superclass has to keep track of
# PeekingEventDag's data
self._prefix_trie = prefix_trie
self._events_list = events
self._events_set = set(self._events_list)
self._label2event = {
event.label : event
for event in self._events_list
}
self._event2idx = {
event : i
for i, event in enumerate(self._events_list)
}
# TODO(cs): this should be moved to a dag transformer class
self._host2initial_location = {
host : migrations[0].old_location
for host, migrations in migrations_per_host(self._events_list).iteritems()
}
@property
[docs] def events(self):
'''Return the events in the DAG'''
return self._events_list
@property
@property
def _atomic_input_events(self, inputs):
# TODO(cs): memoize?
skipped_recoveries = set()
atomic_inputs = []
for e in inputs:
if e in skipped_recoveries:
continue
if type(e) in self._failure_types and e.dependent_labels != []:
if len(e.dependent_labels) != 1:
raise RuntimeError("Not expected to have more than one dependent label")
recovery = self._label2event[e.dependent_labels[0]]
skipped_recoveries.add(recovery)
atomic_inputs.append(AtomicInput(e, recovery))
else:
atomic_inputs.append(e)
return atomic_inputs
def _expand_atomics(self, atomic_inputs):
inputs = []
for e in atomic_inputs:
if type(e) == AtomicInput:
inputs.append(e.failure)
inputs.append(e.recovery)
else:
inputs.append(e)
inputs.sort(key=lambda e: self._event2idx[e])
return inputs
[docs] def compute_remaining_input_events(self, ignored_portion, events_list=None):
''' ignore all input events in ignored_inputs,
as well all of their dependent input events'''
if events_list is None:
events_list = self.events
remaining = []
for event in events_list:
if event not in ignored_portion:
remaining.append(event)
else:
# Add dependent to ignored_portion
for label in event.dependent_labels:
# Note that recoveries will be a dependent of preceding failures
dependent_event = self._label2event[label]
ignored_portion.add(dependent_event)
# Update the migration locations in remaining
self.update_migrations(remaining, ignored_portion, events_list)
return remaining
[docs] def update_migrations(self, remaining, ignored_portion, events_list):
''' Walk through remaining input events, and update the source location of
the host migration. For example, if one host migrates twice:
location A -> location B -> location C
And the first migration is pruned, update the second HostMigration event
to look like:
location A -> location C
Note: mutates remaining
'''
# TODO(cs): this should be moved outside of EventDag
# TODO(cs): this algorithm could be simplified substantially by invoking
# migrations_per_host()
# keep track of the most recent location of the host that did not involve
# a pruned HostMigration event
# location is: (ingress dpid, ingress port no)
currentloc2unprunedloc = {}
for m in [e for e in events_list if type(e) == HostMigration]:
src = m.old_location
dst = m.new_location
if m in ignored_portion:
if src in currentloc2unprunedloc:
# There was a prior migration in ignored_portion
# Update the new dst to point back to the unpruned location
unprunedlocation = currentloc2unprunedloc[src]
del currentloc2unprunedloc[src]
currentloc2unprunedloc[dst] = unprunedlocation
else:
# We are the first migration for this host in ignored_portion
# Point to our tail
currentloc2unprunedloc[dst] = src
else: # m in remaining
if src in currentloc2unprunedloc:
# There was a prior migration in ignored_portion
# Replace this HostMigration with a new one, with source at the
# last unpruned location
unpruned_loc = currentloc2unprunedloc[src]
del currentloc2unprunedloc[src]
new_loc = dst
replace_migration(m, unpruned_loc, new_loc, remaining)
def _ignored_except_internals_and_recoveries(self, ignored_portion):
# Note that dependent_labels only contains dependencies between input
# events. Dependencies with internal events are inferred by EventScheduler.
# Also note that we treat failure/recovery as an atomic pair, so we don't prune
# recovery events on their own.
return set(e for e in ignored_portion
if (isinstance(e, InputEvent) and e.prunable and
type(e) not in self._recovery_types))
def _ignored_except_internals(self, ignored_portion):
return set(e for e in ignored_portion if isinstance(e, InputEvent) and
e.prunable)
def _straighten_inserted_migrations(self, remaining_events):
''' This is a bit hairy: when migrations are added back in, there may be
gaps in host locations. We need to straighten out those gaps -- i.e. make
the series of host migrations for any given host a line.
Pre: remaining_events is sorted in the same relative order as the original
trace
'''
host2migrations = migrations_per_host(remaining_events)
for host, migrations in host2migrations.iteritems():
# Prime the loop with the initial location
previous_location = self._host2initial_location[host]
for m in migrations:
if m.old_location != previous_location:
replacement = replace_migration(m, previous_location,
m.new_location, remaining_events)
else:
replacement = m
previous_location = replacement.new_location
return remaining_events
[docs] def next_state_change(self, index, events=None):
''' Return the next ControllerStateChange that occurs at or after
index.'''
if events is None:
events = self.events
# TODO(cs): for now, assumes a single controller
for event in events[index:]:
if type(event) == ControllerStateChange:
return event
return None
[docs] def get_original_index_for_event(self, event):
return self._event2idx[event]
def __len__(self):
return len(self._events_list)