Source code for bpo.problems

import random
import pickle
from math import factorial
from abc import ABC, abstractmethod


[docs]class Task: """ A task. :param task_id: the identifier of the task. :param case_id: the identifier of the case to which the task belongs. :param task_type: the type of the task, i.e. one of the :attr:`.Problem.task_types`. """ def __init__(self, task_id, case_id, task_type): self.id = task_id self.case_id = case_id self.task_type = task_type """ a dictionary with additional data that is the result of the task, each item is a label -> value pair. """ self.data = dict() def __str__(self): return self.task_type + "(" + str(self.case_id) + ")_" + str(self.id) + (str(self.data) if len(self.data) > 0 else "")
[docs]class Problem(ABC): """ Abstract class that all problems must implement. An object of the class is an instance of the problem, which is equivalent to a business process case. An object has a next_case_id, which is the next case to arrive for the problem. case_id are sequential, starting at 0 for the first case to arrive, 1 for the next case to arrive, etc. An object also has a dictionary that maps case_id -> (arrival_time, initial_task), where arrival time is the simulation time at which the case will arrive and initial_task is the first :class:`.Task` that will be executed for the case. A problem must define: * the resources that exist * the interarrival time distribution of cases * the types of tasks that can be performed for cases * rules for what the next task will be when a case first arrives or when a task is completed * the processing time distribution for each task * the data that is generated by a task """ @property @abstractmethod def resources(self): """A list of identifiers (typically names) of resources.""" raise NotImplementedError @property def resource_weights(self): """A list of weights, in the same order as the list of resources. resource_weights[i] represents how much resources[i] works compared to the other resources. A resource with a weight 2 is expected to work twice as much as a resource with weight 1. Using resource_weights, we can randomly select a resource using random.choices(resources, resource_weights)[0], to get a resource with the likelihood that that resource is indeed supposed to work. By default all resources have equal weight, i.e. are equally available.""" return self._resource_weights @resource_weights.setter def resource_weights(self, value): self._resource_weights = value @property def schedule(self): """A schedule that represents how many resources are available at a particular point in simulation time. The schedule is a list, where schedule[t % len(schedule)] represents the number of resources that are available during time interval t in simulation time. For example, if simulation time is measured in hours, schedule[3 % len(schedule)] represents the number of resources available during the third hour. By default all resources are always available.""" return self._schedule @schedule.setter def schedule(self, value): self._schedule = value @property @abstractmethod def task_types(self): """ A list of identifiers (typically labels) of task types. """ raise NotImplementedError
[docs] def is_event(self, task_type): """ Returns True if the task type is actually an event. Events start immediately when they are activated. They do not require a resource. :param task_type: one of :attr:`.Problem.task_types` :return: True or False, depending on whether the specified task type is an event or not """ return False
[docs] @abstractmethod def sample_initial_task_type(self): """Returns an element of :attr:`.Problem.task_types` that is the first to execute in a case.""" raise NotImplementedError
[docs] def resource_pool(self, task_type): """ Returns for each task_type the subset of resources that can perform tasks of that type. :param task_type: one of :attr:`.Problem.task_types` :return: a list with elements of :attr:`.Problem.resources` """ return self.resources
def __init__(self): self._resource_weights = [1]*len(self.resources) self._schedule = [len(self.resources)] self.next_case_id = 0 self.previous_case_arrival_time = 0 self.next_task_id = 0 self.history = dict() self.restart()
[docs] @classmethod def from_file(cls, filename): """ Instantiates the problem by reading it from file. :param filename: the name of the file from which to read the problem. :return: an instance of the :class:`.Problem`. """ with open(filename, 'rb') as handle: instance = pickle.load(handle) return instance
[docs] def save(self, filename): """ Saves the problem to file. :param filename: the name of the file to save the problem to. """ with open(filename, 'wb') as handle: pickle.dump(self, handle, protocol=pickle.HIGHEST_PROTOCOL)
[docs] @abstractmethod def processing_time_sample(self, resource, task): """ Randomly samples the duration of the simulation time it will take the resource to perform the task. :param resource: one of the :attr:`.Problem.resources` of the problem. :param task: a :class:`.Task` that should come from the problem. :return: a float representing a duration in simulation time. """ raise NotImplementedError
[docs] @abstractmethod def interarrival_time_sample(self): """ Randomly samples the interarrival time of cases. :return: a float representing a duration in simulation time. """ raise NotImplementedError
[docs] def data_sample(self, task): """ Randomly samples data for the task. :param task: the :class:`.Task` for which the data will be sampled. :return: a dictionary with additional data that can be stored in :attr:`.Task.data`. """ return dict()
[docs] def next_task_types_sample(self, task): """ Randomly samples the task types that will be performed for the case of the specified task, when that task completes. :param task: a :class:`.Task` of this problem. :return: a sublist of :attr:`.Problem.task_types`. """ return []
[docs] def restart(self): """ Restarts this problem instance, i.e.: sets the next case to arrive to the first case. """ self.next_case_id = 0 self.previous_case_arrival_time = 0 self.next_task_id = 0 self.history = dict()
[docs] def next_case(self): """ Returns the next case to arrive. :return: (arrival_time, initial_task), where arrival_time is the simulation time at which the case arrives, and initial_task is the first task to perform for the case. """ arrival_time = self.previous_case_arrival_time + self.interarrival_time_sample() initial_task_type = self.sample_initial_task_type() case_id = self.next_case_id initial_task = Task(self.next_task_id, case_id, initial_task_type) initial_task.data = self.data_sample(initial_task) self.next_case_id += 1 self.next_task_id += 1 self.previous_case_arrival_time = arrival_time self.history[case_id] = [] return arrival_time, initial_task
[docs] def nr_cases_generated(self): """ Returns the number of cases that has been generated. :return: an integer number of cases that has been generated for the problem so far. """ return self.next_case_id
[docs] def complete_task(self, task): """ Adds the specified task to the case history. Returns the list of next tasks that are enabled after the task with the specified task_id in the specified case_id is completed. :param task: the :class:`.Task` that completed. :return: a list of tasks """ self.history[task.case_id].append(task) next_tasks = [] for tt in self.next_task_types_sample(task): new_task = Task(self.next_task_id, task.case_id, tt) new_task.data = self.data_sample(new_task) self.next_task_id += 1 next_tasks.append(new_task) return next_tasks
[docs]class MMcProblem(Problem): """ A specific :class:`.Problem` that represents an M/M/c queue, i.e.: it has one task type, multiple resources and exponential arrival and processing times. This problem can be simulated, but it also has a method :meth:`.Problem.waiting_time_analytical` to compute the waiting time analytically for comparison. """ resources = ["R" + str(i) for i in range(1, 3)] task_types = ["T"] def __init__(self): super().__init__() self.c = len(self.resources) self.rate = (1/10) * max(self.c-1, 1) self.ep = 9
[docs] def sample_initial_task_type(self): return "T"
[docs] def processing_time_sample(self, resource, task): return random.expovariate(1/self.ep)
[docs] def interarrival_time_sample(self): return random.expovariate(self.rate)
[docs] def waiting_time_analytical(self): rate = self.rate c = self.c ep = self.ep rho = (rate*ep)/c piw = (((c*rho)**c)/factorial(c))/((1-rho) * sum([(((c*rho)**n)/factorial(n)) for n in range(c)]) + (((c*rho)**c)/factorial(c))) ew = piw*(1/(1-rho))*(ep/c) return ew
[docs]class ImbalancedProblem(Problem): """ A specific :class:`.Problem` with two resources that have different processing times for the same task. The difference between the performance of the resources is indicated by the 0 <= spread < 2.0, where a higher spread means that the performance of the resources is more different. The resource that performs better on the task is indicated by the data['optimal_resource'] of that task. """ resources = ["R1", "R2"] task_types = ["T"] def __init__(self, spread=1.0): super().__init__() self.spread = spread
[docs] def sample_initial_task_type(self): return "T"
[docs] def processing_time_sample(self, resource, task): ep = 18 if resource == task.data["optimal_resource"]: return random.expovariate(1/((1.0-(self.spread/2.0))*ep)) else: return random.expovariate(1/((1.0+(self.spread/2.0))*ep))
[docs] def interarrival_time_sample(self): return random.expovariate(1/10)
[docs] def data_sample(self, task): data = dict() data["optimal_resource"] = random.choice(self.resources) return data
[docs]class SequentialProblem(Problem): """ A specific :class:`.Problem` with two resources and two task types. Each case starts with a task of type T1. After that is completed a task of type T2 must be processed. The resources have different processing times for the tasks. The resource that performs better on a task is indicated by the data['optimal_resource'] of that task. Resource R1 performs better on task T1 and resource R2 on task T2. """ resources = ["R1", "R2"] task_types = ["T1", "T2"]
[docs] def sample_initial_task_type(self): return "T1"
[docs] def processing_time_sample(self, resource, task): ep = 18 if resource == task.data["optimal_resource"]: return random.expovariate(1/(0.5*ep)) else: return random.expovariate(1/(1.5*ep))
[docs] def interarrival_time_sample(self): return random.expovariate(1/20)
[docs] def data_sample(self, task): data = dict() data["optimal_resource"] = "R" + task.task_type[1] return data
[docs] def next_task_types_sample(self, task): if task.task_type == "T1": return ["T2"] return []
[docs]class MinedProblem(Problem): """ A specific :class:`.Problem` that represents process that is mined from a business process event log, using :meth:`miners.mine_problem`. The problem generates customer cases that have the same properties as the cases in the event log from which it originate in terms of: arrival rate, next-task probabilities, and performance of resources on task types in terms of processing time distribution. """ resources = [] task_types = [] def __init__(self): super().__init__() """ The initial task type distribution. A list of tuples of probability/ task type, where probability is the probability that the task type is the initial task type of a case. """ self.initial_task_distribution = [] """ The next task type distribution per task type. Maps a task type to a list of probability/ task type tuples, where each pair is a next task type and the probability that that task type is the next task type. If a tuple is probability/ None, this represents the probability that there is no next task and the case completes. """ self.next_task_distribution = dict() """ The interarrival time distribution. """ self.interarrival_time = 0 """ The resource pool per task type. Maps each task type to the list of resources that can execute tasks of that type. """ self.resource_pools = dict() """ The data types. Is a mapping of data type names to distributions from which the data will be sampled. In this class data is associated with a case. For each task in a case the same data will be returned. """ self.data_types = dict() self.__case_data = dict() """ The processing time is a stratified distribution that depends on various features, specifically: the task that is performed, the resource performing the task, the history of the case (i.e. how many instanced of each task type completed so far), and the data of the case. """ self.processing_times = dict() self.__number_task_type_occurrences = dict()
[docs] def sample_initial_task_type(self): rd = random.random() rs = 0 for (p, tt) in self.initial_task_distribution: rs += p if rd < rs: return tt print("WARNING: the probabilities of initial tasks do not add up to 1.0") return self.initial_task_distribution[0]
[docs] def resource_pool(self, task_type): return self.resource_pools[task_type]
[docs] def interarrival_time_sample(self): return self.interarrival_time.sample()
[docs] def next_task_types_sample(self, task): rd = random.random() rs = 0 for (p, tt) in self.next_task_distribution[task.task_type]: rs += p if rd < rs: if tt is None: return [] else: return [tt] print("WARNING: the probabilities of next tasks do not add up to 1.0") if self.next_task_distribution[0][1] is None: return [] else: return [self.next_task_distribution[0][1]]
[docs] def processing_time_sample(self, resource, task): features = {**self.__number_task_type_occurrences[task.case_id], 'Activity': task.task_type, 'Resource': resource, **task.data} return self.processing_times.sample(features)
[docs] def data_sample(self, task): if task.case_id not in self.__case_data: self.__case_data[task.case_id] = dict() for dt in self.data_types: self.__case_data[task.case_id][dt] = self.data_types[dt].sample() return self.__case_data[task.case_id]
[docs] def restart(self): super().restart() self.__case_data = dict() self.__number_task_type_occurrences = dict()
[docs] def next_case(self): arrival_time, initial_task = super().next_case() self.__number_task_type_occurrences[initial_task.case_id] = dict() for tt in self.task_types: self.__number_task_type_occurrences[initial_task.case_id][tt] = 0 return arrival_time, initial_task
[docs] def complete_task(self, task): next_tasks = super().complete_task(task) self.__number_task_type_occurrences[task.case_id][task.task_type] += 1 return next_tasks