from enum import Enum, auto
from abc import ABC, abstractmethod
from datetime import datetime, timedelta
from statistics import mean
import scipy.stats as st
import random
[docs]class EventType(Enum):
"""An enumeration for the types of event that can happen in the simulator."""
CASE_ARRIVAL = auto()
"""A case arrives.
:meta hide-value:"""
START_TASK = auto()
"""A task starts.
:meta hide-value:"""
COMPLETE_TASK = auto()
"""A task completes.
:meta hide-value:"""
PLAN_TASKS = auto()
"""An action is performed to assign tasks to resources.
:meta hide-value:"""
TASK_ACTIVATE = auto()
"""A task becomes ready to perform (but is not assigned to a resource).
:meta hide-value:"""
TASK_PLANNED = auto()
"""A task is assigned to a resource.
:meta hide-value:"""
COMPLETE_CASE = auto()
"""A case completes.
:meta hide-value:"""
SCHEDULE_RESOURCES = auto()
"""Resources are scheduled every full clock tick.
:meta hide-value:"""
[docs]class TimeUnit(Enum):
"""An enumeration for the unit in which simulation time is measured."""
SECONDS = auto()
"""
Measured in seconds.
:meta hide-value:
"""
MINUTES = auto()
"""
Measured in minutes.
:meta hide-value:
"""
HOURS = auto()
"""
Measured in hours.
:meta hide-value:
"""
DAYS = auto()
"""
Measured in days.
:meta hide-value:
"""
[docs]class Event:
"""
A simulation event.
:param event_type: the :class:`.EventType`.
:param moment: the moment in simulation time at which the event happens.
:param task: the task that triggered, or None for event_type == PLAN_TASKS.
:param resource: the resource that performs the task, or None for event_type not in [START_TASK, COMPLETE_TASK]
:param nr_tasks: the number of tasks that must be planned, or 0 for event_type != PLAN_TASKS
:param nr_resources: the number of resources that is available, or 0 for event_type != PLAN_TASKS
"""
def __init__(self, event_type, moment, task, resource=None, nr_tasks=0, nr_resources=0):
self.event_type = event_type
self.moment = moment
self.task = task
self.resource = resource
self.nr_tasks = nr_tasks
self.nr_resources = nr_resources
def __lt__(self, other):
return self.moment < other.moment
def __str__(self):
return str(self.event_type) + "\t(" + str(round(self.moment, 2)) + ")\t" + str(self.task) + "," + str(self.resource)
[docs]class ReporterElement(ABC):
"""
Abstract class that must be implemented by each concrete reporter element.
A reporter element is part of a :class:`.Reporter`. It receives a :meth:`.ReporterElement.report` call,
each time a simulation event occurs. It can then store information about that event. Once a simulation run
is completed, it receives a :meth:`.ReporterElement.summarize` call. It must then report aggregate
information that it stored about the events. Each time a simulation run starts, it receives
a :meth:`.ReporterElement.restart` call, upon which it must erase all previously stored information.
"""
[docs] @abstractmethod
def restart(self):
"""
Is invoked when a simulation run starts. Must erase all information to start a new report.
"""
raise NotImplementedError
[docs] @abstractmethod
def report(self, event):
"""
Is invoked when a simulation event occurs. Can store information about the event.
:param event: the simulation :class:`.Event` that occurred.
"""
raise NotImplementedError
[docs] @abstractmethod
def summarize(self):
"""
Is invoked when a simulation run ends. Must return aggregate information about stored event information.
:return: a list of tuples (label, value), where label is a meaningful name for the reported aggregate information,
and value is the corresponding information.
"""
raise NotImplementedError
[docs]class TasksReporterElement(ReporterElement):
"""
A :class:`.ReporterElement` that keeps information about tasks, specifically:
* tasks completed: the number of tasks that completed during the simulation run.
* task proc time: the average of the processing times of the completed tasks.
* task wait time: the average of the waiting times of the completed tasks.
This information is returned by the :meth:`.TasksReporterElement.summarize` method
by the specified labels.
"""
def __init__(self):
self.nr_tasks_completed = dict() # task_type X nr tasks completed of that type
self.nr_tasks_started = dict() # task_type X nr tasks started of that type
self.processing_time = dict() # task_type X total processing time of tasks of that type
self.waiting_time = dict() # task_type X total waiting time of tasks of that type
self.task_start_times = dict() # task id X start time
self.task_activation_times = dict() # task id X activation time
self.task_types = []
[docs] def restart(self):
self.__init__()
[docs] def report(self, event):
if event.event_type not in {EventType.TASK_ACTIVATE, EventType.START_TASK, EventType.COMPLETE_TASK}:
return
if event.task.task_type not in self.task_types:
self.task_types.append(event.task.task_type)
self.nr_tasks_completed[event.task.task_type] = 0
self.nr_tasks_started[event.task.task_type] = 0
self.processing_time[event.task.task_type] = 0
self.waiting_time[event.task.task_type] = 0
if event.event_type == EventType.TASK_ACTIVATE:
self.task_activation_times[event.task.id] = event.moment
elif event.event_type == EventType.START_TASK:
self.task_start_times[event.task.id] = event.moment
if event.task.id in self.task_activation_times.keys():
self.nr_tasks_started[event.task.task_type] += 1
self.waiting_time[event.task.task_type] += event.moment - self.task_activation_times[event.task.id]
del self.task_activation_times[event.task.id]
elif event.event_type == EventType.COMPLETE_TASK:
if event.task.id in self.task_start_times.keys():
self.nr_tasks_completed[event.task.task_type] += 1
self.processing_time[event.task.task_type] += event.moment - self.task_start_times[event.task.id]
del self.task_start_times[event.task.id]
[docs] def summarize(self):
result = []
for task_type in self.task_types:
result.append(("task " + task_type + " completed", self.nr_tasks_completed[task_type]))
result.append(("task " + task_type + " proc time", self.processing_time[task_type] / self.nr_tasks_completed[task_type]))
result.append(("task " + task_type + " wait time", self.waiting_time[task_type] / self.nr_tasks_started[task_type]))
return result
[docs]class CaseReporterElement(ReporterElement):
"""
A :class:`.ReporterElement` that keeps information about cases, specifically:
* cases completed: the number of cases that completed during the simulation run.
* cases cycle time: the average of the cycle times of the completed cases.
This information is returned by the :meth:`.CaseReporterElement.summarize` method
by the specified labels.
"""
def __init__(self):
self.nr_cases_completed = 0
self.cycle_time = 0
self.case_start_times = dict()
[docs] def restart(self):
self.__init__()
[docs] def report(self, event):
if event.event_type == EventType.CASE_ARRIVAL:
self.case_start_times[event.task.case_id] = event.moment
if event.event_type == EventType.COMPLETE_CASE and event.task.case_id in self.case_start_times.keys():
self.nr_cases_completed += 1
self.cycle_time += event.moment - self.case_start_times[event.task.case_id]
del self.case_start_times[event.task.case_id]
[docs] def summarize(self):
return [("cases completed", self.nr_cases_completed), ("case cycle time", self.cycle_time/self.nr_cases_completed)]
[docs]class EventLogReporterElement(ReporterElement):
"""
A :class:`.ReporterElement` that stored the simulation events that occur in an event log.
The :meth:`.EventLogReporterElement.summarize` method does not return any information.
As simulation time is a numerical value, some processing is done to convert simulation time
into a time format that can be read and interpreted by a process mining tool. To that end,
the timeunit in which simulation time is measured must be passed as well as the
initial_time moment in calendar time from which the simulation time will be measured.
A particular simulation_time moment will then be stored in the log as:
initial_time + simulation_time timeunits. Data can also be reported on by specifying the
corresponding data fields. The names of these data fields must correspond to names of data fields as
they appear in the problem.
:param filename: the name of the file in which the event log must be stored.
:param timeunit: the :class:`.TimeUnit` of simulation time.
:param initial_time: a datetime value.
:param time_format: a datetime formatting string.
:param data_fields: the data fields to report in the log.
"""
def __init__(self, filename, timeunit=TimeUnit.SECONDS, initial_time=datetime(2020, 1, 1), time_format="%Y-%m-%d %H:%M:%S.%f", data_fields=[]):
self.task_start_times = dict()
self.timeunit = timeunit
self.initial_time = initial_time
self.time_format = time_format
self.data_fields = data_fields
self.logfile = open(filename, "wt")
self.logfile.write("case_id,task,resource,start_time,completion_time")
for df in self.data_fields:
self.logfile.write(",")
self.logfile.write(df)
self.logfile.write("\n")
[docs] def restart(self):
raise NotImplementedError
[docs] def report(self, event):
def displace(time):
return self.initial_time + (timedelta(seconds=time) if self.timeunit == TimeUnit.SECONDS else timedelta(minutes=time) if self.timeunit == TimeUnit.MINUTES else timedelta(hours=time) if self.timeunit == TimeUnit.HOURS else timedelta(days=time) if self.timeunit == TimeUnit.DAYS else None)
if event.event_type == EventType.START_TASK:
self.task_start_times[event.task.id] = event.moment
elif event.event_type == EventType.COMPLETE_TASK and event.task.id in self.task_start_times.keys():
self.logfile.write(str(event.task.case_id) + ",")
self.logfile.write(str(event.task.task_type) + ",")
self.logfile.write(str(event.resource) + ",")
self.logfile.write(displace(self.task_start_times[event.task.id]).strftime(self.time_format) + ",")
self.logfile.write(displace(event.moment).strftime(self.time_format))
for df in self.data_fields:
self.logfile.write(",")
self.logfile.write('"' + str(event.task.data[df]) + '"')
self.logfile.write("\n")
self.logfile.flush()
del self.task_start_times[event.task.id]
[docs] def summarize(self):
self.logfile.close()
[docs]class Reporter:
"""
A Reporter consists of :class:`.ReporterElement` and reports on the information that is kept by
its elements. Consequently, it does not do much itself, it mainly passes on events to and
received aggregate information from its elements.
It receives a :meth:`.Reporter.report` call, each time a simulation event occurs, which it
forwards to its elements to enable them to store information about that event.
It receives a :meth:`.ReporterElement.summarize` call when a simulation run completes. It then
collects the summaries from each of its elements and returns them in one list.
It receives a :meth:`.ReporterElement.restart` call when a simulation run starts, which it
forwards to its elements to enable them to restart.
During the specified warmup time, the reporter will ignore all events.
:param warmup: a duration in simulation time.
:param reporter_elements: a list of :class:`.ReporterElement` instances, when None are provided,
creates a reporter with a :class:`.TaskReporterElement` and a :class:`.Case ReporterElement`.
"""
def __init__(self, warmup=0, reporter_elements=None):
self.warmup = warmup
if reporter_elements is None:
default_reporters = [TasksReporterElement(), CaseReporterElement()]
self.reporters = default_reporters
else:
self.reporters = reporter_elements
[docs] def restart(self):
for reporter in self.reporters:
reporter.restart()
[docs] def report(self, event):
if event.moment >= self.warmup:
for reporter in self.reporters:
reporter.report(event)
[docs] def summarize(self):
result = dict()
for reporter in self.reporters:
summary = reporter.summarize()
for summary_line in summary:
result[summary_line[0]] = summary_line[1]
return result
[docs] @staticmethod
def aggregate(summaries):
aggregation = dict()
for key in summaries:
avg = mean(summaries[key])
n = len(summaries[key])
ci = st.t.interval(0.95, n - 1, loc=avg, scale=st.sem(summaries[key]))
aggregation[key] = (avg, avg - ci[0])
return aggregation
[docs]class Simulator:
"""
A Simulator simulates a specified :class:`.Problem` using a specified :class:`.Planner`.
The results of the simulation are generated using the specified :class:`.Reporter`.
There are two main entry points into the simulator:
* :meth:`.simulate`, which simulates the (single) problem instance passed with the constructor; and
* :meth:`.replicate`, which simulates a collection of problem instances passed via the replicate method itself.
"""
def __init__(self, problem, reporter, planner):
self.events = []
self.unassigned_tasks = dict()
"""
The tasks that are currently not assigned. A dict task.id -> task, where task is an instance of :class:`.Task`.
A task is in this list if it must still be performed. After a task is completed does not re-appear in this list.
"""
self.assigned_tasks = dict()
"""
The tasks that are currently assigned. A dict task.id -> (task, resource, start), where:
* task is an instance of :class:`.Task`.
* start is the moment in simulation at which the resource will start or has started processing the task.
* resource is the label that identifies a resource in the :class:`.Problem`.
"""
self.available_resources = set()
"""
The set of resources that are currently available. Each resource is a label that identifies a resource in the :class:`.Problem`.
"""
self.away_resources = []
self.away_resources_weights = []
"""
The set of resources that are currently away (on a break, home, or working in another process) and consequently not available.
An away resource is a pair (resource, weight), such that the weight is the weight belonging to the resource according to the problem, i.e.:
away_resources_weights[i] == problem.resource_weights[problem.resources.index(away_resources[i])]
"""
self.busy_resources = dict()
"""
The resources that are currently busy. A dict resource -> (task, start), where:
* task is an instance of :class:`.Task` is the task that the resource is working on.
* start is the moment in simulation time at which the resource started working on the task.
"""
self.busy_cases = dict()
"""
The cases of which a task is currently being performed or must still be performed. A dict case_id -> [active task_id]
that maps case identifiers for which a task exists to the identifiers of those tasks.
"""
self.reserved_resources = dict()
"""
The resources that are currently reserved. A dict resource -> (task, start), where:
* resource is the label that identifies a resource in the :class:`.Problem`.
* task is an instance of :class:`.Task` is the task on which the resource is expected to work.
* start is the moment in simulation at which the resource starts processing the task.
"""
self.now = 0
"""
The current simulation time.
"""
self.reporter = reporter
self.planner = planner
self.problem = problem
self.init_simulation()
[docs] def init_simulation(self):
"""
Re-initializes the simulation, such that it can be run again.
"""
# set all resources to available
for r in self.problem.resources:
self.available_resources.add(r)
# generate resource scheduling event to start the schedule
self.events.append((0, Event(EventType.SCHEDULE_RESOURCES, 0, None)))
# reset the problem
self.problem.restart()
# generate arrival event for the first task of the first case
(t, task) = self.problem.next_case()
self.events.append((t, Event(EventType.CASE_ARRIVAL, t, task)))
[docs] def desired_nr_resources(self):
"""
The number of resources that is currently desired to be working now according to the problem schedule.
:return: a number of resources.
"""
return self.problem.schedule[int(self.now % len(self.problem.schedule))]
[docs] def working_nr_resources(self):
"""
The number of resources that is actually on the work floor now, either available, busy or reserved.
:return: a number of resources.
"""
return len(self.available_resources) + len(self.busy_resources) + len(self.reserved_resources)
[docs] def simulate(self, running_time):
"""
Runs the simulation for the instance that was passed in the constructor.
The simulator generates events (i.e. cases with their associated arrival times, data, and tasks)
as they are produced by the problem instance. The simulator generates a plan event each time a
task or resource becomes available. Events are handled by the reporter.
If the target utilization rate is set to a value (a fraction f) other than None, the simulator will ensure that resources
are occupied for approximately the targeted fraction f of the time. It does that by making resources busy with 'other
tasks' (i.e. tasks other than the ones of the problem being simulated, like tasks in another process). These 'other tasks'
are not specified further and their activity is not logged, but the resources are being kept busy on those tasks.
:param running_time: the amount of simulation time the simulation should be run for.
"""
tasks_per = resources_per = nr_per = 0
# repeat until the end of the simulation time:
while self.now <= running_time:
# get the first event e from the events
event = self.events.pop(0)
# t = time of e
self.now = event[0]
event = event[1]
self.reporter.report(event)
# if e is an arrival event:
if event.event_type == EventType.CASE_ARRIVAL:
# add new task
self.unassigned_tasks[event.task.id] = event.task
self.reporter.report(Event(EventType.TASK_ACTIVATE, self.now, event.task))
self.busy_cases[event.task.case_id] = [event.task.id]
self.events.append((self.now, Event(EventType.PLAN_TASKS, self.now, None, nr_tasks=len(self.unassigned_tasks), nr_resources=len(self.available_resources))))
# generate a new arrival event for the first task of the next case
(t, task) = self.problem.next_case()
self.events.append((t, Event(EventType.CASE_ARRIVAL, t, task)))
self.events.sort()
# if e is a start event:
elif event.event_type == EventType.START_TASK:
# create a complete event for task
t = self.now + self.problem.processing_time_sample(event.resource, event.task)
self.events.append((t, Event(EventType.COMPLETE_TASK, t, event.task, event.resource)))
self.events.sort()
if not self.problem.is_event(event.task.task_type): # for actual tasks (not events)
# set resource to busy
del self.reserved_resources[event.resource]
self.busy_resources[event.resource] = (event.task, self.now)
# if e is a complete event:
elif event.event_type == EventType.COMPLETE_TASK:
if not self.problem.is_event(event.task.task_type): # for actual tasks (not events)
# set resource to available, if it is still desired, otherwise set it to away
del self.busy_resources[event.resource]
if self.working_nr_resources() <= self.desired_nr_resources():
self.available_resources.add(event.resource)
else:
self.away_resources.append(event.resource)
self.away_resources_weights.append(self.problem.resource_weights[self.problem.resources.index(event.resource)])
# remove task from assigned tasks
del self.assigned_tasks[event.task.id]
self.busy_cases[event.task.case_id].remove(event.task.id)
# notify the tasks as complete to the problem and receive the next tasks
next_tasks = self.problem.complete_task(event.task)
# generate unassigned tasks for each next task
for next_task in next_tasks:
self.unassigned_tasks[next_task.id] = next_task
self.reporter.report(Event(EventType.TASK_ACTIVATE, self.now, next_task))
self.busy_cases[event.task.case_id].append(next_task.id)
if len(self.busy_cases[event.task.case_id]) == 0:
self.events.append((self.now, Event(EventType.COMPLETE_CASE, self.now, event.task)))
# generate a new planning event to start planning now for the newly available resource and next tasks
self.events.append((self.now, Event(EventType.PLAN_TASKS, self.now, None, nr_tasks=len(self.unassigned_tasks), nr_resources=len(self.available_resources))))
self.events.sort()
# if e is a schedule resources event: move resources between available/away,
# depending to how many resources should be available according to the schedule.
elif event.event_type == EventType.SCHEDULE_RESOURCES:
assert self.working_nr_resources() + len(self.away_resources) == len(self.problem.resources) # the number of resources must be constant
assert len(self.problem.resources) == len(self.problem.resource_weights) # each resource must have a resource weight
assert len(self.away_resources) == len(self.away_resources_weights) # each away resource must have a resource weight
if len(self.away_resources) > 0: # for each away resource, the resource weight must be taken from the problem resource weights
i = random.randrange(len(self.away_resources))
assert self.away_resources_weights[i] == self.problem.resource_weights[self.problem.resources.index(self.away_resources[i])]
required_resources = self.desired_nr_resources() - self.working_nr_resources()
if required_resources > 0:
# if there are not enough resources working
# randomly select away resources to work, as many as required
for i in range(required_resources):
random_resource = random.choices(self.away_resources, self.away_resources_weights)[0]
# remove them from away and add them to available resources
away_resource_i = self.away_resources.index(random_resource)
del self.away_resources[away_resource_i]
del self.away_resources_weights[away_resource_i]
self.available_resources.add(random_resource)
# generate a new planning event to put them to work
self.events.append((self.now, Event(EventType.PLAN_TASKS, self.now, None, nr_tasks=len(self.unassigned_tasks), nr_resources=len(self.available_resources))))
self.events.sort()
elif required_resources < 0:
# if there are too many resources working
# remove as many as possible, i.e. min(available_resources, -required_resources)
nr_resources_to_remove = min(len(self.available_resources), -required_resources)
resources_to_remove = random.sample(self.available_resources, nr_resources_to_remove)
for r in resources_to_remove:
# remove them from the available resources
self.available_resources.remove(r)
# add them to the away resources
self.away_resources.append(r)
self.away_resources_weights.append(self.problem.resource_weights[self.problem.resources.index(r)])
# plan the next resource schedule event
self.events.append((self.now+1, Event(EventType.SCHEDULE_RESOURCES, self.now+1, None)))
self.events.sort()
# if e is a planning event: do assignment
elif event.event_type == EventType.PLAN_TASKS:
# there only is an assignment if there are free resources and tasks
if len(self.unassigned_tasks) > 0 and len(self.available_resources) > 0:
tasks_per += len(self.unassigned_tasks)
resources_per += len(self.available_resources)
nr_per += 1
assignments = self.planner.assign(self)
# for each newly assigned task:
for (task, resource, moment) in assignments:
# create start event for task
self.events.append((moment, Event(EventType.START_TASK, moment, task, resource)))
self.reporter.report(Event(EventType.TASK_PLANNED, self.now, task))
# assign task
del self.unassigned_tasks[task.id]
self.assigned_tasks[task.id] = (task, resource, moment)
if not self.problem.is_event(task.task_type): # for actual tasks (not events)
# reserve resource
self.available_resources.remove(resource)
self.reserved_resources[resource] = (event.task, moment)
self.events.sort()
print(tasks_per/nr_per, resources_per/nr_per)
[docs] @staticmethod
def replicate(problem, planner, reporter, simulation_time, replications):
"""
Creates a simulator for each of the problem_instances, using the specified planner and reporter.
Simulates each problem instance by calling the :meth:`.simulate` method on it using the
specified simulation time. Returns the list of summaries generated by the reporter, one summary for
each simulated problem_instance.
:param problem: an instance of :class:`.Problem` to simulate.
:param planner: a :class:`.Planner`.
:param reporter: a :class:`.Reporter`.
:param simulation_time: the amount of simulation time for which each problem instance should be simulated.
:param replications: the number of replications to do.
:return: a list of summaries, generated by the reporter, one for each element of problem_instances.
"""
summaries = None
for i in range(replications):
reporter.restart()
problem.restart()
simulator = Simulator(problem, reporter, planner)
simulator.simulate(simulation_time)
summary = reporter.summarize()
if summaries is None:
summaries = summary.copy()
for key in summaries:
summaries[key] = []
for key in summary:
summaries[key].append(summary[key])
return summaries