Source code for sts.control_flow.peeker

# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import time
from collections import Counter

from sts.event_dag import EventDag
from sts.control_flow.replayer import Replayer
from sts.replay_event import Event, InternalEvent, InputEvent, WaitTime

log = logging.getLogger("sts")

[docs]class Peeker(object): # { % of inferred fingerprints that were ambiguous -> # # of replays where this % occurred } ambiguous_counts = Counter() # { class of event -> # occurences of ambiguity } ambiguous_events = Counter()
[docs] def __init__(self, simulation_cfg, default_wait_time_seconds=0.5, epsilon_time=0.2): try: import pytrie except ImportError: raise RuntimeError("Need to install pytrie: `sudo pip install pytrie`") # The prefix trie stores lists of input events as keys, # and lists of both input and internal events as values # Note that we pass the trie around between DAG views self.simulation_cfg = simulation_cfg self._prefix_trie = pytrie.Trie() self.default_wait_time_seconds = default_wait_time_seconds self.epsilon_time = epsilon_time
[docs] def peek(self, dag): ''' Infer which internal events are/aren't going to occur, ''' # TODO(cs): optimization: write the prefix trie to a file, in case we want to run # FindMCS again? input_events = dag.input_events if len(input_events) == 0: # Postcondition: input_events[-1] is not None # and self._events_list[-1] is not None return dag # Initilize current_input_prefix to the longest_match prefix we've # inferred previously (or [] if this is an entirely new prefix) current_input_prefix = list(self._prefix_trie\ .longest_prefix(input_events, default=[])) log.debug("Current input prefix: %s" % str(current_input_prefix)) # The value is both internal events and input events (values of the trie) # leading up, but not including the next input following the tail of the # prefix. # Note that we assume that there are no internal events before the first # input event (i.e. we assume quiescence) inferred_events = list(self._prefix_trie\ .longest_prefix_value(input_events, default=[])) log.debug("Current inferred_events: %s" % str(inferred_events)) inject_input_idx = len(current_input_prefix) # While we still have inputs to inject while inject_input_idx < len(input_events): # The input we're about to inject inject_input = input_events[inject_input_idx] if inject_input_idx < len(input_events) - 1: # there is a following input_event following_input = input_events[inject_input_idx + 1] else: following_input = None # The input following the one we're going to inject log.debug("peek()'ing after input %d" % (inject_input_idx)) expected_internal_events = \ get_expected_internal_events(inject_input, following_input, dag.events) # Optimization: if no internal events occured between this input and the # next, no need to peek() if expected_internal_events == []: log.debug("Optimization: no expected internal events") newly_inferred_events = [] Peeker.ambiguous_counts[0.0] += 1 else: wait_time_seconds = self.get_wait_time_seconds(inject_input, following_input) replay_dag = EventDag(inferred_events + [ inject_input ]) found_events = self.find_internal_events(replay_dag, wait_time_seconds) newly_inferred_events = self.match_and_filter(found_events, expected_internal_events) (current_input_prefix, inferred_events) = self._update_trie(current_input_prefix, inject_input, inferred_events, newly_inferred_events) inject_input_idx += 1 return EventDag(inferred_events)
[docs] def get_wait_time_seconds(self, first_event, second_event): if first_event is None or second_event is None: return self.default_wait_time_seconds else: return second_event.time.as_float() - first_event.time.as_float() + \ self.epsilon_time
[docs] def find_internal_events(self, replay_dag, wait_time_seconds): ''' Replay the replay_dag, then wait for wait_time_seconds and collect internal events that occur. Return the list of internal events. ''' replayer = Replayer(self.simulation_cfg, replay_dag) log.debug("Replaying prefix") simulation = replayer.simulate() # Directly after the last input has been injected, flush the internal # event buffers in case there were unaccounted internal events # Note that there isn't a race condition between flush()'ing and # incoming internal events, since sts is single-threaded # TODO(cs): flush() is not longer needed! simulation.god_scheduler.flush() simulation.controller_sync_callback.flush() # Now set all internal event buffers (GodScheduler for # ControlMessageReceives and ReplaySyncCallback for state changes) # to "pass through + record" simulation.set_pass_through() # Note that this is the monkey patched version of time.sleep log.debug("peek()'ing for %f seconds" % wait_time_seconds) time.sleep(wait_time_seconds) # Now turn off those pass-through and grab the inferred events newly_inferred_events = simulation.unset_pass_through() simulation.clean_up() return newly_inferred_events
[docs] def match_and_filter(self, newly_inferred_events, expected_internal_events): log.debug("Matching fingerprints") log.debug("Expected: %s" % str(expected_internal_events)) log.debug("Inferred: %s" % str(newly_inferred_events)) # TODO(cs): currently not calling this, out of paranoia. May inadvertently # prune expected internal events -- largely serves as an optimization #newly_inferred_events = match_fingerprints(newly_inferred_events, # expected_internal_events) # TODO(cs): need to prune any successors of e_i, in case we waited too # long count_overlapping_fingerprints(newly_inferred_events, expected_internal_events) newly_inferred_events = correct_timestamps(newly_inferred_events, expected_internal_events) log.debug("Matched events: %s" % str(newly_inferred_events)) return newly_inferred_events
def _update_trie(self, current_input_prefix, inject_input, inferred_events, newly_inferred_events): ''' Update the trie for this prefix ''' current_input_prefix = list(current_input_prefix) current_input_prefix.append(inject_input) # Make sure to prepend the input we just injected inferred_events = list(inferred_events) inferred_events.append(inject_input) inferred_events += newly_inferred_events self._prefix_trie[current_input_prefix] = inferred_events return (current_input_prefix, inferred_events)
[docs]def get_expected_internal_events(left_input, right_input, events_list): ''' Return previously observed internal events between the left_input and the right_input event left_input may be None case we return events from the beginning of events_list right_input may also be None, in which case we return all events following left_input ''' if left_input is None: left_idx = 0 else: left_idx = events_list.index(left_input) if right_input is None: right_idx = len(events_list) else: right_idx = events_list.index(right_input) return [ i for i in events_list[left_idx:right_idx] if isinstance(i, InternalEvent) ]
[docs]def count_overlapping_fingerprints(newly_inferred_events, expected_internal_events): ''' Track # of instances where an expected event matches 2 or more inferred events. Mutates Peeker.ambiguous_counts and Peeker.ambiguous_events''' expected_counts = Counter([e.fingerprint for e in expected_internal_events]) inferred_counts = Counter([e.fingerprint for e in newly_inferred_events]) total_redundant = 0 for fingerprint, count in expected_counts.iteritems(): if fingerprint in inferred_counts and inferred_counts[fingerprint] > count: redundant = inferred_counts[fingerprint] - count total_redundant += redundant # fingerprints[0] is the class name Peeker.ambiguous_events[fingerprint[0]] += redundant if len(newly_inferred_events) > 0: percent_redundant = total_redundant*1.0 / len(newly_inferred_events) else: percent_redundant = 0.0 Peeker.ambiguous_counts[percent_redundant] += 1 # Truncate the newly inferred events based on the expected # predecessors of next_input+1 # inferred_events (and ignore internal events that come afterward) # TODO(cs): perhaps these should be unordered
[docs]def match_fingerprints(newly_inferred_events, expected_internal_events): # Find the last internal event in expected_internal_events that # matches an event in newly_inferred_events. That is the new causal # parent of following_input expected_internal_events.reverse() inferred_fingerprints = set([e.fingerprint for e in newly_inferred_events]) if len(inferred_fingerprints) != len(newly_inferred_events): log.warn("Overlapping fingerprints in peek() (%d unique, %d total)" % (len(inferred_fingerprints),len(newly_inferred_events))) expected_fingerprints = set([e.fingerprint for e in expected_internal_events]) if len(expected_fingerprints) != len(expected_internal_events): log.warn("Overlapping expected fingerprints (%d unique, %d total)" % (len(expected_fingerprints),len(expected_internal_events))) for expected in expected_internal_events: if expected.fingerprint in inferred_fingerprints: # We've found our truncation point # following_input goes after the expected internal event # (we ignore all internal events that come after it) # TODO(cs): if the inferred events show up in a different order # than the expected events did originally, this algorithm might # inadvertently prune expected events (it assumes events are ordered) # If there are multiple matching fingerprints, find the instance of # the expected fingerprint (e.g., 2nd instance of the expected # fingerprint), and match it up with the same instance # of the inferred fingerprints expected_internal_events = [e for e in expected_internal_events if e.fingerprint == expected.fingerprint] # 1-based indexing of observed instances # Note that we are iterating through expected_internal_events in reverse # order instance_of_expected = len(expected_internal_events) observed_instance = 0 idx_of_last_observed_instance = -1 parent_index = -1 for index, event in enumerate(newly_inferred_events): if event.fingerprint == expected.fingerprint: observed_instance += 1 idx_of_last_observed_instance = index if observed_instance == instance_of_expected: parent_index = index break if parent_index == -1: log.warn(('''There were fewer instances of ''' '''inferred %s fingerprint than expected %s ''' % (str(newly_inferred_events),str(expected_internal_events)))) # Set parent_index to the index of the last occurrence parent_index = idx_of_last_observed_instance newly_inferred_events = newly_inferred_events[:parent_index+1] return newly_inferred_events # Else, no expected internal event was observed. return []
[docs]def correct_timestamps(new_events, old_events): ''' Set new_events' timestamps to approximately the same timestamp as old_events. Precondition: old_events != [] ''' # Lazy strategy: assign all new timestamps to the last timestamp of # old_events latest_old_ts = old_events[-1].time for e in new_events: e.time = latest_old_ts return new_events