Source code for bpo.miners

import pandas
import datetime
from statistics import mean
from problems import MinedProblem
from distributions import DistributionType, CategoricalDistribution, BetaDistribution, GammaDistribution, NormalDistribution, StratifiedNumericDistribution, UniformDistribution


[docs]def mine_problem(log, task_type_filter=None, datetime_format="%Y/%m/%d %H:%M:%S", earliest_start=None, latest_completion=None, min_resource_count=2, resource_schedule_timeunit=datetime.timedelta(hours=1), resource_schedule_repeat=168, datafields=dict(), max_error_std=None): """ Mines a problem and returns it as a :class:`.problems.Problem` that can be simulated. The log from which the model is mined must at least have the columns Case ID, Activity, Resource, Start Timestamp, Complete Timestamp, which identify the corresponding event log information. Activity labels are the same as Task Types for the purposes of the problem definition. The timing distributions associated with the problem are all in hours. Only cases that start on or after earliest_start and complete on or after latest_completion will be taken into account. Datafields specifies the columns in the log for which data fields will be learned. Datafields is a dictionary that maps column names to probability distributions. For the corresponding log column, a data type will be learned according to the specified distribution. The simulator can then draw samples for the distribution. :param log: a pandas dataframe from which the problem must be mined. :param task_type_filter: a function that takes the name of a task type/ activity and returns if it should be included, or None to include all task types. :param datetime_format: the datetime format the Start Timestamp and Complete Timestamp columns use. :param earliest_start: a datetime object that, if not None, indicates that only cases that start on or after this datetime should be included. :param:`earliest_start` and :param:`latest_completion` should either both be None or both have a value. :param latest_completion: a datetime object that, if not None, indicates that only cases that complete on or before this datetime should be included. :param:`earliest_start` and :param:`latest_completion` should either both be None or both have a value. :param min_resource_count: the minimum number of times a resource must have executed a task of a particular type, for it to be considered in the pool of resources for the task type. This must be greater than 1, otherwise the standard deviation of the processing time cannot be computed. :param resource_schedule_timeunit: the timeunit in which resource schedules should be represented. Default is 1 hour. :param resource_schedule_repeat: the number of times after which the resource schedule is expected to repeat itself. Default is 168 repeats (of 1 hour is a week). :param datafields: a mapping of string to DistributionType, where string must be the name is one of the columns of the log. :param max_error_std: a distribution that describes the maximum standard deviation of the error that the processing times can have. It must be specified as a fraction of the mean processing time. The actual fraction will be sampled from this probability distribution. By default, no maximum is set. :return: a :class:`.problems.Problem`. """ df = log.copy() df['Start Timestamp'] = pandas.to_datetime(df['Start Timestamp'], format=datetime_format) df['Complete Timestamp'] = pandas.to_datetime(df['Complete Timestamp'], format=datetime_format) df_cases = df.groupby('Case ID').agg(case_start=('Start Timestamp', 'min'), case_complete=('Start Timestamp', 'min'), trace=('Activity', lambda tss: list(tss))) df_cases = df_cases.sort_values(by='case_start') if earliest_start is not None and latest_completion is not None: df_cases = df_cases[(df_cases['case_start'] >= earliest_start) & (df_cases['case_complete'] <= latest_completion)] relevant_ids = list(df_cases.index) df = df[df['Case ID'].isin(relevant_ids)] # Filter the data to the relevant columns and add the durations to the tasks df = df[['Case ID', 'Activity', 'Resource', 'Start Timestamp', 'Complete Timestamp'] + list(datafields.keys())] df['Duration'] = df[['Start Timestamp', 'Complete Timestamp']].apply(lambda tss: (tss[1]-tss[0]).total_seconds()/3600, axis=1) # Sort the data df = df.sort_values(by=['Case ID', 'Complete Timestamp']) # Mine the datatypes and corresponding distributions data_types = dict() for datafield in datafields: if datafields[datafield] == DistributionType.CATEGORICAL: distribution = CategoricalDistribution() distribution.learn(list(df[datafield].value_counts().index), list(df[datafield].value_counts().values)) data_types[datafield] = distribution elif datafields[datafield] == DistributionType.GAMMA: distribution = GammaDistribution() distribution.learn(list(df[datafield])) data_types[datafield] = distribution elif datafields[datafield] == DistributionType.NORMAL: distribution = NormalDistribution() distribution.learn(list(df[datafield])) data_types[datafield] = distribution elif datafields[datafield] == DistributionType.BETA: distribution = BetaDistribution() distribution.learn(list(df[datafield])) data_types[datafield] = distribution # Get the task_types task_types = df['Activity'].unique() if task_type_filter is not None: task_types = [tt for tt in task_types if task_type_filter(tt)] # Get the resources resources = df['Resource'].unique() # Mine the processing time distributions as they depend on other elements # For each task, add the tasks that preceded it as columns # tt_happened: task type -> [] # each entry i in the list for task type tt (i.e. tt_happened[tt][i]) # is the number of times tasks of type tt have happened in a case in dataframe df up to but not including row i tt_happened = dict() for tt in task_types: tt_happened[tt] = [] current_case = None previous_task = None i = 0 for index, row in df.iterrows(): if row['Case ID'] != current_case: # if we are starting a new case, set all task counts to 0 again current_case = row['Case ID'] for tt in task_types: tt_happened[tt].append(0) else: # if we are on a case, task counts remain the same, but increase for the previous task for tt in task_types: if tt == previous_task: tt_happened[tt].append(tt_happened[tt][i-1] + 1) else: tt_happened[tt].append(tt_happened[tt][i-1]) previous_task = row['Activity'] i += 1 for tt in task_types: df[tt] = tt_happened[tt] # Now generate the distribution processing_times = StratifiedNumericDistribution() features = ['Activity', 'Resource'] + list(datafields.keys()) + list(task_types) onehot = ['Activity', 'Resource'] + [datafield for datafield in datafields if datafields[datafield] == DistributionType.CATEGORICAL] standardization = [datafield for datafield in datafields if datafields[datafield] != DistributionType.CATEGORICAL] processing_times.learn(df[features + ['Duration']], 'Duration', features, onehot, standardization, 'Activity', max_error_std) # Mine the control flow initial_tasks = dict() following_task = dict() interarrival_times = [] last_arrival_time = None for index, row in df_cases.iterrows(): if last_arrival_time is not None: interarrival_times.append((row['case_start'] - last_arrival_time).total_seconds()/3600) last_arrival_time = row['case_start'] if not row['trace'][0] in initial_tasks.keys(): initial_tasks[row['trace'][0]] = 0 initial_tasks[row['trace'][0]] += 1 for i in range(len(row['trace'])): predecessor = row['trace'][i] if i+1 >= len(row['trace']): successor = None else: successor = row['trace'][i+1] if not (predecessor, successor) in following_task: following_task[(predecessor, successor)] = 0 following_task[(predecessor, successor)] += 1 interarrival_time = GammaDistribution() interarrival_time.learn(interarrival_times) initial_task_distribution = [] for it in initial_tasks: initial_task_distribution.append((initial_tasks[it]/len(df_cases), it)) next_task_distribution = dict() task_occurrences = dict() for (predecessor, successor) in following_task: if predecessor not in next_task_distribution.keys(): next_task_distribution[predecessor] = dict() task_occurrences[predecessor] = 0 next_task_distribution[predecessor][successor] = following_task[(predecessor, successor)] task_occurrences[predecessor] += following_task[(predecessor, successor)] for predecessor in next_task_distribution: successors = [] for successor in next_task_distribution[predecessor]: successors.append((next_task_distribution[predecessor][successor]/task_occurrences[predecessor], successor)) next_task_distribution[predecessor] = successors # Mine the resource pools df_resources = df.groupby(['Activity', 'Resource'], as_index=False).agg(Duration_mean=('Duration', 'mean'), Duration_std=('Duration', 'std'), Resource_count=('Resource', 'count')) resource_pools = dict() for tt in task_types: resource_pools[tt] = [] for index, row in df_resources.iterrows(): if row["Resource_count"] > min_resource_count: resource_pools[row['Activity']].append(row['Resource']) # Mine the resource schedules begin = min(df['Start Timestamp']) end = max(df['Complete Timestamp']) hr = (begin, begin + resource_schedule_timeunit) schedule = [[] for _ in range(resource_schedule_repeat)] resource_presence = dict() # nr of hours during which a resource was present for r in resources: resource_presence[r] = 0 x = 0 while hr[1] <= end: # Tasks are within the hour hr (or other timeunit if that is chosen), if they hour ends or begins between the start and end of the task tasks_in_hour = df[((df['Start Timestamp'] <= hr[0]) & (df['Complete Timestamp'] >= hr[0])) | ( (df['Start Timestamp'] <= hr[1]) & (df['Complete Timestamp'] >= hr[1]))] resources_in_hour = tasks_in_hour['Resource'].unique() for r in resources_in_hour: resource_presence[r] += 1 schedule[x % resource_schedule_repeat].append(len(resources_in_hour)) x += 1 hr = (hr[0] + datetime.timedelta(hours=1), hr[1] + datetime.timedelta(hours=1)) for x in range(resource_schedule_repeat): schedule[x] = round(mean(schedule[x])) resource_weights = [] for r in resources: resource_weights.append(resource_presence[r]) # CREATE THE PROBLEM result = MinedProblem() result.schedule = schedule result.resource_weights = resource_weights result.task_types = list(task_types) # The task types result.resources = list(resources) # The resources result.initial_task_distribution = initial_task_distribution # The initial task type distribution result.next_task_distribution = next_task_distribution # The next task type distribution per task type result.interarrival_time = interarrival_time # The interarrival time result.resource_pools = resource_pools # The resource pool per task type result.data_types = data_types result.processing_times = processing_times # The processing time distributions return result