import random
import pickle
from math import factorial
from abc import ABC, abstractmethod
[docs]class Task:
"""
A task.
:param task_id: the identifier of the task.
:param case_id: the identifier of the case to which the task belongs.
:param task_type: the type of the task, i.e. one of the :attr:`.Problem.task_types`.
"""
def __init__(self, task_id, case_id, task_type):
self.id = task_id
self.case_id = case_id
self.task_type = task_type
"""
a dictionary with additional data that is the result of the task, each item is a label -> value pair.
"""
self.data = dict()
def __str__(self):
return self.task_type + "(" + str(self.case_id) + ")_" + str(self.id) + (str(self.data) if len(self.data) > 0 else "")
[docs]class Problem(ABC):
"""
Abstract class that all problems must implement.
An object of the class is an instance of the problem, which is equivalent to a business process case.
An object has a next_case_id, which is the next case to arrive for the problem. case_id are sequential,
starting at 0 for the first case to arrive, 1 for the next case to arrive, etc.
An object also has a dictionary that maps case_id -> (arrival_time, initial_task), where arrival time
is the simulation time at which the case will arrive and initial_task is the first :class:`.Task`
that will be executed for the case.
A problem must define:
* the resources that exist
* the interarrival time distribution of cases
* the types of tasks that can be performed for cases
* rules for what the next task will be when a case first arrives or when a task is completed
* the processing time distribution for each task
* the data that is generated by a task
"""
@property
@abstractmethod
def resources(self):
"""A list of identifiers (typically names) of resources."""
raise NotImplementedError
@property
def resource_weights(self):
"""A list of weights, in the same order as the list of resources.
resource_weights[i] represents how much resources[i] works compared to the other resources.
A resource with a weight 2 is expected to work twice as much as a resource with weight 1.
Using resource_weights, we can randomly select a resource using random.choices(resources, resource_weights)[0],
to get a resource with the likelihood that that resource is indeed supposed to work.
By default all resources have equal weight, i.e. are equally available."""
return self._resource_weights
@resource_weights.setter
def resource_weights(self, value):
self._resource_weights = value
@property
def schedule(self):
"""A schedule that represents how many resources are available at a particular point in simulation time.
The schedule is a list, where schedule[t % len(schedule)] represents the number of resources that are
available during time interval t in simulation time. For example, if simulation time is measured in hours,
schedule[3 % len(schedule)] represents the number of resources available during the third hour.
By default all resources are always available."""
return self._schedule
@schedule.setter
def schedule(self, value):
self._schedule = value
@property
@abstractmethod
def task_types(self):
"""
A list of identifiers (typically labels) of task types.
"""
raise NotImplementedError
[docs] def is_event(self, task_type):
"""
Returns True if the task type is actually an event.
Events start immediately when they are activated.
They do not require a resource.
:param task_type: one of :attr:`.Problem.task_types`
:return: True or False, depending on whether the specified task type is an event or not
"""
return False
[docs] @abstractmethod
def sample_initial_task_type(self):
"""Returns an element of :attr:`.Problem.task_types` that is the first to execute in a case."""
raise NotImplementedError
[docs] def resource_pool(self, task_type):
"""
Returns for each task_type the subset of resources that can perform tasks of that type.
:param task_type: one of :attr:`.Problem.task_types`
:return: a list with elements of :attr:`.Problem.resources`
"""
return self.resources
def __init__(self):
self._resource_weights = [1]*len(self.resources)
self._schedule = [len(self.resources)]
self.next_case_id = 0
self.previous_case_arrival_time = 0
self.next_task_id = 0
self.history = dict()
self.restart()
[docs] @classmethod
def from_file(cls, filename):
"""
Instantiates the problem by reading it from file.
:param filename: the name of the file from which to read the problem.
:return: an instance of the :class:`.Problem`.
"""
with open(filename, 'rb') as handle:
instance = pickle.load(handle)
return instance
[docs] def save(self, filename):
"""
Saves the problem to file.
:param filename: the name of the file to save the problem to.
"""
with open(filename, 'wb') as handle:
pickle.dump(self, handle, protocol=pickle.HIGHEST_PROTOCOL)
[docs] @abstractmethod
def processing_time_sample(self, resource, task):
"""
Randomly samples the duration of the simulation time it will take the resource to perform the task.
:param resource: one of the :attr:`.Problem.resources` of the problem.
:param task: a :class:`.Task` that should come from the problem.
:return: a float representing a duration in simulation time.
"""
raise NotImplementedError
[docs] @abstractmethod
def interarrival_time_sample(self):
"""
Randomly samples the interarrival time of cases.
:return: a float representing a duration in simulation time.
"""
raise NotImplementedError
[docs] def data_sample(self, task):
"""
Randomly samples data for the task.
:param task: the :class:`.Task` for which the data will be sampled.
:return: a dictionary with additional data that can be stored in :attr:`.Task.data`.
"""
return dict()
[docs] def next_task_types_sample(self, task):
"""
Randomly samples the task types that will be performed for the case of the specified task, when that task completes.
:param task: a :class:`.Task` of this problem.
:return: a sublist of :attr:`.Problem.task_types`.
"""
return []
[docs] def restart(self):
"""
Restarts this problem instance, i.e.: sets the next case to arrive to the first case.
"""
self.next_case_id = 0
self.previous_case_arrival_time = 0
self.next_task_id = 0
self.history = dict()
[docs] def next_case(self):
"""
Returns the next case to arrive.
:return: (arrival_time, initial_task), where
arrival_time is the simulation time at which the case arrives, and
initial_task is the first task to perform for the case.
"""
arrival_time = self.previous_case_arrival_time + self.interarrival_time_sample()
initial_task_type = self.sample_initial_task_type()
case_id = self.next_case_id
initial_task = Task(self.next_task_id, case_id, initial_task_type)
initial_task.data = self.data_sample(initial_task)
self.next_case_id += 1
self.next_task_id += 1
self.previous_case_arrival_time = arrival_time
self.history[case_id] = []
return arrival_time, initial_task
[docs] def nr_cases_generated(self):
"""
Returns the number of cases that has been generated.
:return: an integer number of cases that has been generated for the problem so far.
"""
return self.next_case_id
[docs] def complete_task(self, task):
"""
Adds the specified task to the case history.
Returns the list of next tasks that are enabled after the task with the specified task_id in the specified case_id is completed.
:param task: the :class:`.Task` that completed.
:return: a list of tasks
"""
self.history[task.case_id].append(task)
next_tasks = []
for tt in self.next_task_types_sample(task):
new_task = Task(self.next_task_id, task.case_id, tt)
new_task.data = self.data_sample(new_task)
self.next_task_id += 1
next_tasks.append(new_task)
return next_tasks
[docs]class MMcProblem(Problem):
"""
A specific :class:`.Problem` that represents an M/M/c queue, i.e.:
it has one task type, multiple resources and exponential arrival and processing times.
This problem can be simulated, but it also has a method :meth:`.Problem.waiting_time_analytical`
to compute the waiting time analytically for comparison.
"""
resources = ["R" + str(i) for i in range(1, 3)]
task_types = ["T"]
def __init__(self):
super().__init__()
self.c = len(self.resources)
self.rate = (1/10) * max(self.c-1, 1)
self.ep = 9
[docs] def sample_initial_task_type(self):
return "T"
[docs] def processing_time_sample(self, resource, task):
return random.expovariate(1/self.ep)
[docs] def interarrival_time_sample(self):
return random.expovariate(self.rate)
[docs] def waiting_time_analytical(self):
rate = self.rate
c = self.c
ep = self.ep
rho = (rate*ep)/c
piw = (((c*rho)**c)/factorial(c))/((1-rho) * sum([(((c*rho)**n)/factorial(n)) for n in range(c)]) + (((c*rho)**c)/factorial(c)))
ew = piw*(1/(1-rho))*(ep/c)
return ew
[docs]class ImbalancedProblem(Problem):
"""
A specific :class:`.Problem` with two resources that have different processing times for the same task.
The difference between the performance of the resources is indicated by the 0 <= spread < 2.0, where
a higher spread means that the performance of the resources is more different. The resource that
performs better on the task is indicated by the data['optimal_resource'] of that task.
"""
resources = ["R1", "R2"]
task_types = ["T"]
def __init__(self, spread=1.0):
super().__init__()
self.spread = spread
[docs] def sample_initial_task_type(self):
return "T"
[docs] def processing_time_sample(self, resource, task):
ep = 18
if resource == task.data["optimal_resource"]:
return random.expovariate(1/((1.0-(self.spread/2.0))*ep))
else:
return random.expovariate(1/((1.0+(self.spread/2.0))*ep))
[docs] def interarrival_time_sample(self):
return random.expovariate(1/10)
[docs] def data_sample(self, task):
data = dict()
data["optimal_resource"] = random.choice(self.resources)
return data
[docs]class SequentialProblem(Problem):
"""
A specific :class:`.Problem` with two resources and two task types. Each case starts
with a task of type T1. After that is completed a task of type T2 must be processed.
The resources have different processing times for the tasks. The resource that
performs better on a task is indicated by the data['optimal_resource'] of that task.
Resource R1 performs better on task T1 and resource R2 on task T2.
"""
resources = ["R1", "R2"]
task_types = ["T1", "T2"]
[docs] def sample_initial_task_type(self):
return "T1"
[docs] def processing_time_sample(self, resource, task):
ep = 18
if resource == task.data["optimal_resource"]:
return random.expovariate(1/(0.5*ep))
else:
return random.expovariate(1/(1.5*ep))
[docs] def interarrival_time_sample(self):
return random.expovariate(1/20)
[docs] def data_sample(self, task):
data = dict()
data["optimal_resource"] = "R" + task.task_type[1]
return data
[docs] def next_task_types_sample(self, task):
if task.task_type == "T1":
return ["T2"]
return []
[docs]class MinedProblem(Problem):
"""
A specific :class:`.Problem` that represents process that is mined from
a business process event log, using :meth:`miners.mine_problem`.
The problem generates customer cases that have the same properties
as the cases in the event log from which it originate in terms of:
arrival rate, next-task probabilities, and performance of resources on
task types in terms of processing time distribution.
"""
resources = []
task_types = []
def __init__(self):
super().__init__()
"""
The initial task type distribution. A list of tuples of probability/ task type, where probability is
the probability that the task type is the initial task type of a case.
"""
self.initial_task_distribution = []
"""
The next task type distribution per task type. Maps a task type to a list of probability/ task type tuples,
where each pair is a next task type and the probability that that task type is the next task type. If
a tuple is probability/ None, this represents the probability that there is no next task and the
case completes.
"""
self.next_task_distribution = dict()
"""
The interarrival time distribution.
"""
self.interarrival_time = 0
"""
The resource pool per task type. Maps each task type to the list of resources that can execute tasks of that
type.
"""
self.resource_pools = dict()
"""
The data types. Is a mapping of data type names to distributions from which the data will be sampled.
In this class data is associated with a case. For each task in a case the same data will be returned.
"""
self.data_types = dict()
self.__case_data = dict()
"""
The processing time is a stratified distribution that depends on various features, specifically:
the task that is performed, the resource performing the task, the history of the case (i.e. how many instanced
of each task type completed so far), and the data of the case.
"""
self.processing_times = dict()
self.__number_task_type_occurrences = dict()
[docs] def sample_initial_task_type(self):
rd = random.random()
rs = 0
for (p, tt) in self.initial_task_distribution:
rs += p
if rd < rs:
return tt
print("WARNING: the probabilities of initial tasks do not add up to 1.0")
return self.initial_task_distribution[0]
[docs] def resource_pool(self, task_type):
return self.resource_pools[task_type]
[docs] def interarrival_time_sample(self):
return self.interarrival_time.sample()
[docs] def next_task_types_sample(self, task):
rd = random.random()
rs = 0
for (p, tt) in self.next_task_distribution[task.task_type]:
rs += p
if rd < rs:
if tt is None:
return []
else:
return [tt]
print("WARNING: the probabilities of next tasks do not add up to 1.0")
if self.next_task_distribution[0][1] is None:
return []
else:
return [self.next_task_distribution[0][1]]
[docs] def processing_time_sample(self, resource, task):
features = {**self.__number_task_type_occurrences[task.case_id], 'Activity': task.task_type, 'Resource': resource, **task.data}
return self.processing_times.sample(features)
[docs] def data_sample(self, task):
if task.case_id not in self.__case_data:
self.__case_data[task.case_id] = dict()
for dt in self.data_types:
self.__case_data[task.case_id][dt] = self.data_types[dt].sample()
return self.__case_data[task.case_id]
[docs] def restart(self):
super().restart()
self.__case_data = dict()
self.__number_task_type_occurrences = dict()
[docs] def next_case(self):
arrival_time, initial_task = super().next_case()
self.__number_task_type_occurrences[initial_task.case_id] = dict()
for tt in self.task_types:
self.__number_task_type_occurrences[initial_task.case_id][tt] = 0
return arrival_time, initial_task
[docs] def complete_task(self, task):
next_tasks = super().complete_task(task)
self.__number_task_type_occurrences[task.case_id][task.task_type] += 1
return next_tasks