Source code for sts.control_flow.replayer

# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

'''
control flow for running the simulation forward.
  - Replayer: takes as input a `superlog` with causal dependencies, and
    iteratively prunes until the MCS has been found
'''

from sts.control_flow.interactive import Interactive
from sts.control_flow.event_scheduler import DumbEventScheduler, EventScheduler
from sts.replay_event import *
from sts.event_dag import EventDag
import sts.input_traces.log_parser as log_parser
from sts.util.console import color
from sts.control_flow.base import ControlFlow, ReplaySyncCallback
from sts.util.convenience import find, find_index
from sts.topology import BufferedPatchPanel

import signal
import sys
import time
import random
import logging

log = logging.getLogger("Replayer")

[docs]class Replayer(ControlFlow): ''' Replay events from a trace To set the event scheduling parameters, pass them as keyword args to the constructor of this class, which will pass them on to the EventScheduler object it creates. ''' # Runtime stats: total_replays = 0 total_inputs_replayed = 0 # Interpolated time parameter. *not* the event scheduling epsilon: time_epsilon_microseconds = 500
[docs] def __init__(self, simulation_cfg, superlog_path_or_dag, create_event_scheduler=None, print_buffers=True, wait_on_deterministic_values=False, default_dp_permit=False, fail_to_interactive=False, input_logger=None, **kwargs): ControlFlow.__init__(self, simulation_cfg) if wait_on_deterministic_values: self.sync_callback = ReplaySyncCallback() else: self.sync_callback = ReplaySyncCallback(self.get_interpolated_time) if type(superlog_path_or_dag) == str: superlog_path = superlog_path_or_dag # The dag is codefied as a list, where each element has # a list of its dependents self.dag = EventDag(log_parser.parse_path(superlog_path)) else: self.dag = superlog_path_or_dag self.default_dp_permit = default_dp_permit # Set DataplanePermit and DataplaneDrop to passive if permit is set # to default for event in [ e for e in self.dag.events if type(e) in dp_events ]: event.passive = default_dp_permit self.dp_checker = None if default_dp_permit: self.dp_checker = DataplaneChecker(self.dag) self.print_buffers_flag = print_buffers # compute interpolate to time to be just before first event self.compute_interpolated_time(self.dag.events[0]) self.unexpected_state_changes = [] self.early_state_changes = [] self.event_scheduler_stats = None self.fail_to_interactive = fail_to_interactive self._input_logger = input_logger if create_event_scheduler: self.create_event_scheduler = create_event_scheduler else: self.create_event_scheduler = \ lambda simulation: EventScheduler(simulation, **{ k: v for k,v in kwargs.items() if k in EventScheduler.kwargs })
def _log_input_event(self, event, **kws): if self._input_logger is not None: self._input_logger.log_input_event(event, **kws)
[docs] def get_interpolated_time(self): ''' During divergence, the controller may ask for the current time more or less times than they did in the original run. We control the time, so we need to give them some answer. The answers we give them should be (i) monotonically increasing, and (ii) between the time of the last recorded ("landmark") event and the next landmark event, and (iii) as close to the recorded times as possible Our temporary solution is to always return the time right before the next landmark ''' # TODO(cs): implement Andi's improved time heuristic return self.interpolated_time
[docs] def compute_interpolated_time(self, current_event): next_time = current_event.time just_before_micro = next_time.microSeconds - self.time_epsilon_microseconds just_before_micro = max(0, just_before_micro) self.interpolated_time = SyncTime(next_time.seconds, just_before_micro)
[docs] def increment_round(self): msg.event(color.CYAN + ("Round %d" % self.logical_time) + color.WHITE)
[docs] def simulate(self, post_bootstrap_hook=None): ''' Caller *must* call simulation.clean_up() ''' Replayer.total_replays += 1 Replayer.total_inputs_replayed += len(self.dag.input_events) self.simulation = self.simulation_cfg.bootstrap(self.sync_callback) assert(isinstance(self.simulation.patch_panel, BufferedPatchPanel)) # TODO(aw): remove this hack self.simulation.fail_to_interactive = self.fail_to_interactive self.logical_time = 0 self.run_simulation_forward(self.dag, post_bootstrap_hook) if self.print_buffers_flag: self._print_buffers() return self.simulation
def _print_buffers(self): log.debug("Pending Message Receives:") for p in self.simulation.god_scheduler.pending_receives(): log.debug("- %s", p) log.debug("Pending State Changes:") for p in self.sync_callback.pending_state_changes(): log.debug("- %s", p)
[docs] def run_simulation_forward(self, dag, post_bootstrap_hook=None): event_scheduler = self.create_event_scheduler(self.simulation) event_scheduler.set_input_logger(self._input_logger) self.event_scheduler_stats = event_scheduler.stats if post_bootstrap_hook is not None: post_bootstrap_hook() old_interrupt = None def interrupt(sgn, frame): msg.interactive("Interrupting replayer, dropping to console (press ^C again to terminate)") signal.signal(signal.SIGINT, self.old_interrupt) self.old_interrupt = None raise KeyboardInterrupt() self.old_interrupt = signal.signal(signal.SIGINT, interrupt) try: for i, event in enumerate(dag.events): try: self.compute_interpolated_time(event) if self.default_dp_permit: self.dp_checker.check_dataplane(i, self.simulation) if isinstance(event, InputEvent): self._check_early_state_changes(dag, i, event) self._check_new_state_changes(dag, i) # TODO(cs): quasi race-condition here. If unexpected state change # happens *while* we're waiting for event, we basically have a # deadlock (if controller logging is set to blocking) until the # timeout occurs # TODO(cs): we don't actually allow new internal message events # through.. we only let new state changes through. Should experiment # with whether we would get better fidelity if we let them through. event_scheduler.schedule(event) if self.logical_time != event.round: self.logical_time = event.round self.increment_round() except KeyboardInterrupt as e: interactive = Interactive(self.simulation_cfg, input_logger=self._input_logger) interactive.simulate(self.simulation, bound_objects=( ('replayer', self), )) self.old_interrupt = signal.signal(signal.SIGINT, interrupt) finally: if self.old_interrupt: signal.signal(signal.SIGINT, self.old_interrupt) msg.event(color.B_BLUE+"Event Stats: %s" % str(event_scheduler.stats)) if self.default_dp_permit: msg.event(color.B_BLUE+"DataplaneDrop Stats: %s" % str(self.dp_checker.stats))
def _check_early_state_changes(self, dag, current_index, input): ''' Check whether any pending state change that were supposed to come *after* the current input have occured. If so, we have violated causality.''' pending_state_changes = self.sync_callback.pending_state_changes() if len(pending_state_changes) > 0: # TODO(cs): currently assumes a single controller (-> single pending state # change) state_change = pending_state_changes[0] next_expected = dag.next_state_change(current_index) original_input_index = dag.get_original_index_for_event(input) if (next_expected is not None and state_change == next_expected.pending_state_change and dag.get_original_index_for_event(next_expected) > original_input_index): raise RuntimeError("State change happened before expected! Causality violated") self.early_state_changes.append(repr(next_expected)) def _check_new_state_changes(self, dag, current_index): ''' If we are blocking controllers, it's bad news bears if an unexpected internal state change occurs. Check if there are any, and ACK them so that the execution can proceed.''' pending_state_changes = self.sync_callback.pending_state_changes() if len(pending_state_changes) > 0: # TODO(cs): currently assumes a single controller (-> single pending state # change) state_change = pending_state_changes[0] next_expected = dag.next_state_change(current_index) if (next_expected is None or state_change != next_expected.pending_state_change): log.info("Unexpected state change. Ack'ing") log_event = ControllerStateChange.from_pending_state_change(state_change) # Monkeypatch a "new internal event" marker to be logged to the JSON trace # (All fields picked up by event.to_json()) log_event.new_internal_event = True log_event.time = SyncTime.now() self._log_input_event(log_event) self.unexpected_state_changes.append(repr(state_change)) self.sync_callback.ack_pending_state_change(state_change) # --- Note: use DataplaneChecker at your own risk. I have observed it fail to # reproduce a bug that was reproducible with dataplane timeouts. # TODO(cs): should this go in event_scheduler.py?
[docs]class DataplaneChecker(object): ''' Dataplane permits are the default, *unless* they were explicitly dropped in the initial run. This class keeps track of whether pending dataplane events should be dropped or forwarded during replay'''
[docs] def __init__(self, event_dag, slop_buffer=10): ''' Consider the round i of a DataplaneDrop in the original event ordering. slop_buffer defines how tolerant we are to differences in the position of the correspdonding DataplaneDrop event in the pruned run. A slop_buffer of 4 says that we will tolerate the same DataplaneDrop in the pruned run occuring in the range [i-4,i+4]. If the corresponding DataplaneDrop occurs outside of this window, we won't detect it, and will treat it as a default DataplanePermit''' # Note it is not sufficient to simply track the entire list of drops/permits from # the original run, since the sequence of dataplane events may be different # in the pruned run. self.events = list(event_dag.events) self.stats = DataplaneCheckerStats(self.events) # The current sequence of dataplane events we expect within the current # window self.current_dp_fingerprints = [] # Mapping from fingerprint in current_dp_fingerprints to the index of the # event in self.events self.fingerprint_2_event_idx = {} self.slop_buffer = slop_buffer
[docs] def decide_drop(self, dp_event): ''' Returns True if this event should be dropped, False otherwise ''' # dp_event is a DpPacketOut object # TODO(cs): should have a sanity check in here somewhere to make sure # we're still dropping the right number of packets. Test on a high drop # rate fuzzer_params dp_fingerprint = (DPFingerprint.from_pkt(dp_event.packet), dp_event.node.dpid, dp_event.port.port_no) # Skip over the class name (first element of the tuple) event_fingerprint = find(lambda f: f[1:] == dp_fingerprint, self.current_dp_fingerprints) if event_fingerprint is None: # Default to permit if we didn't expect this dp event return False # Flush this event from both our current window and our events list, so # that we don't accidentally conflate distinct dp_events with the same # fingerprint self.current_dp_fingerprints.remove(event_fingerprint) event_idx = self.fingerprint_2_event_idx[event_fingerprint] self.events.pop(event_idx) # First element of the tuple is the Event class name if event_fingerprint[0] == "DataplanePermit": return False self.stats.record_drop(event_fingerprint) return True # DataplaneDrop
[docs] def update_window(self, current_round): ''' Update the current slop buffer ("the dp_events we expect to see") ''' self.current_dp_fingerprints = [] self.fingerprint_2_event_idx = {} # TODO(cs): O(n) operation head_idx = find_index(lambda e: e.round == current_round - self.slop_buffer, self.events) head_idx = max(head_idx, 0) tail_idx = find_index(lambda e: e.round == current_round + self.slop_buffer, self.events) if tail_idx is None: tail_idx = len(self.events) for i in xrange(head_idx, tail_idx): if (type(self.events[i]) == DataplanePermit or type(self.events[i]) == DataplaneDrop): fingerprint = self.events[i].fingerprint self.current_dp_fingerprints.append(fingerprint) self.fingerprint_2_event_idx[fingerprint] = i
[docs] def check_dataplane(self, current_round, simulation): ''' Check dataplane events for before playing then next event. - current_event_idx allows us to adjust our current slop buffer ''' self.update_window(current_round) for dp_event in simulation.patch_panel.queued_dataplane_events: if not simulation.topology.ok_to_send(dp_event): log.warn("Not valid to send dp_event %s" % str(dp_event)) simulation.patch_panel.drop_dp_event(dp_event) elif self.decide_drop(dp_event): simulation.patch_panel.drop_dp_event(dp_event) else: simulation.patch_panel.permit_dp_event(dp_event)
[docs]class DataplaneCheckerStats(object): ''' Tracks how many drops we actually performed vs. how many we expected to perform '''
[docs] def __init__(self, events): self.expected_drops = [e.fingerprint for e in events if type(e) == DataplaneDrop] self.actual_drops = []
[docs] def record_drop(self, fingerprint): self.actual_drops.append(fingerprint)
def __str__(self): ''' Warning: not idempotent! ''' s = ["Expected drops (%d), Actual drops (%d)" % (len(self.expected_drops), len(self.actual_drops)) ] s.append("Missed Drops (expected if TrafficInjections pruned):") for drop in self.expected_drops: if len(self.actual_drops) == 0 or drop != self.actual_drops[0]: s.append(" %s" % str(drop)) elif len(self.actual_drops) != 0 and drop == self.actual_drops[0]: self.actual_drops.pop(0) return "\n".join(s)