# Copyright 2011-2013 Colin Scott
# Copyright 2011-2013 Andreas Wundsam
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sts.replay_event import *
import time
from collections import Counter
import operator
import logging
log = logging.getLogger("event_scheduler")
[docs]class EventSchedulerStats(object):
[docs] def __init__(self):
self.event2matched = Counter()
self.event2timeouts = Counter()
self.replay_start = None
self.record_start = None
[docs] def start_replay(self, event):
self.replay_start = time.time()
self.record_start = event.time.as_float()
[docs] def time(self, event):
return format_time(time.time() - self.replay_start) + " " + \
format_time(event.time.as_float() - self.record_start)
[docs] def event_matched(self, event):
msg.event_success(self.time(event) + " Successfully matched event "+str(event))
# TODO(cs): maybe want more info than just class name? (e.g. fingerprint)
self.event2matched[event.__class__.__name__] += 1
[docs] def event_timed_out(self, event):
msg.event_timeout(self.time(event) + " Event timed out "+str(event))
self.event2timeouts[event.__class__.__name__] += 1
[docs] def sorted_match_counts(self):
for e, count in sorted(self.event2matched.items(),
key=operator.itemgetter(1)):
yield (e, count)
[docs] def sorted_timeout_counts(self):
for e, count in sorted(self.event2timeouts.items(),
key=operator.itemgetter(1)):
yield (e, count,)
def __str__(self):
total_matched = sum(self.event2matched.values())
total_timeouts = sum(self.event2timeouts.values())
s = []
s.append("Events matched: %d, timed out: %d\n" % (total_matched,
total_timeouts))
s.append("Matches per event type:\n")
for e, count in self.sorted_match_counts():
s.append(" %s %d\n" % (e, count,))
s.append("Timeouts per event type:\n")
for e, count in self.sorted_timeout_counts():
s.append(" %s %d\n" % (e, count,))
return "".join(s)
[docs]class EventSchedulerBase(object):
[docs] def __init__(self):
self._input_logger = None
def _log_event(self, event, **kws):
if self._input_logger is not None:
self._input_logger.log_input_event(event, **kws)
[docs]class DumbEventScheduler(EventSchedulerBase):
kwargs = set(['epsilon_seconds', 'sleep_interval_seconds'])
[docs] def __init__(self, simulation, epsilon_seconds=0.0, sleep_interval_seconds=0.2):
super(DumbEventScheduler, self).__init__()
self.simulation = simulation
self.epsilon_seconds = epsilon_seconds
self.sleep_interval_seconds = sleep_interval_seconds
self.last_event = None
self.stats = EventSchedulerStats()
[docs] def schedule(self, event):
if self.last_event:
rec_delta = (event.time.as_float() - self.last_event.time.as_float())
if rec_delta > 0:
log.info("Sleeping for %.0f ms before next event" % (rec_delta * 1000))
self.simulation.io_master.sleep(rec_delta)
else:
self.stats.start_replay(event)
log.debug("Waiting for %s (maximum wait time: %.0f ms)" %
( str(event).replace("\n", ""), self.epsilon_seconds * 1000) )
proceed = False
while True:
now = time.time()
if event.proceed(self.simulation):
proceed = True
break
elif now > end:
break
self.simulation.io_master.select(self.sleep_interval_seconds)
if proceed:
event.timed_out = False
self.stats.event_matches(event)
else:
event.timed_out = True
self.stats.event_timed_out(event)
event.time = SyncTime.now()
self._log_event(event)
self.last_event = event
[docs]class EventScheduler(EventSchedulerBase):
'''an EventWatchers schedules events. It controls their admission, and
any post-event delay '''
kwargs = set(['speedup', 'delay_input_events', 'initial_wait',
'epsilon_seconds', 'sleep_interval_seconds'])
[docs] def __init__(self, simulation, speedup=1.0,
delay_input_events=True, initial_wait=0.5, epsilon_seconds=0.5,
sleep_interval_seconds=0.2):
super(EventScheduler, self).__init__()
self.simulation = simulation
self.speedup = speedup
self.delay_input_events = delay_input_events
self.last_real_time = None
self.last_rec_time = None
self.initial_wait = initial_wait
self.epsilon_seconds = epsilon_seconds
self.sleep_interval_seconds = sleep_interval_seconds
self.started = False
self.stats = EventSchedulerStats()
[docs] def schedule(self, event):
if not self.started:
self.stats.start_replay(event)
self.started = True
if isinstance(event, InputEvent):
self.inject_input(event)
elif isinstance(event, InternalEvent):
self.wait_for_internal(event)
self.update_event_time(event)
# Set event.time to now for the replay log
event.time = SyncTime.now()
self._log_event(event)
[docs] def wait_for_internal(self, event):
wait_time_seconds = self.wait_time(event)
start = time.time()
# TODO(cs): why - 0.01?
end = start + wait_time_seconds - 0.01 + self.epsilon_seconds
if event.timeout_disallowed:
# Reaallllly far in the future
end = 30000000000 # Fri, 30 Aug 2920 05:20:00 GMT
log.debug("Waiting for %s forever" %
( repr(event).replace("\n", "")))
else:
log.debug("Waiting for %s (maximum wait time: %.0f ms)" %
( repr(event).replace("\n", ""), self.epsilon_seconds * 1000) )
self._poll_event(event, end)
def _poll_event(self, event, end_time):
proceed = False
while True:
now = time.time()
if event.proceed(self.simulation):
proceed = True
break
elif now > end_time:
break
self.simulation.io_master.select(self.sleep_interval_seconds)
if proceed:
event.timed_out = False
self.stats.event_matched(event)
self.update_event_time(event)
else:
event.timed_out = True
self.stats.event_timed_out(event)
event.time = SyncTime.now()
[docs] def update_event_time(self, event):
""" update our bearing on where we currently our in the timeline """
self.last_real_time = time.time()
self.last_rec_time = event.time
[docs] def wait_time(self, event):
""" returns how long to wait in seconds for an event to occur or be injected. """
if not self.last_real_time:
return self.initial_wait
rec_delta = (event.time.as_float() - self.last_rec_time.as_float()) / self.speedup
real_delta = time.time() - self.last_real_time
to_wait = rec_delta - real_delta
if to_wait > 10000:
raise RuntimeError("to_wait %d ms is way too big for event %s" %
(to_wait, str(event)))
return max(to_wait, 0)