Commit c4385bc7 authored by aarongitrepos's avatar aarongitrepos

All

parents
*/__pycache__/
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
import os, sys
import pandas as pd
currentdir = os.path.dirname(os.path.realpath(__file__))
parentdir = os.path.dirname(currentdir)
sys.path.append(parentdir)
sys.path.append(currentdir)
import pysubgroup_mod as ps
import argparse
import numpy as np
def eliminate_reps(elem,l):
l.remove(elem)
for li in l:
if elem in li:
#if set(elem.split(" AND ")).issubset(li.split(" AND ")):
#if ', '.join(map(str,elem)) in ', '.join(map(str,li)):
return False
return True
def info_gained_algorithm(dataname,class_column,class_value,mode_parameter="default",depth=5,list_ignore=[],list_conds=[]):
"""
Parameters
----------
dataname: string
The name of the dataset allocated in the datasets directory to be used for analysis.
class_column: string
A column of the dataset that will be used as target.
class_value: ??
A value of the class_column. It corresponds to the condition that has to be meet, i.e, class_column==class_Value.
mode_parameter: string, optional (default=default)
This parameter controls the behaviour of algorithms search. For InfoGained algorithm, it is necessary to
use dynamic or maximum options. For other algorithms, the parameter has the value "default".
(possible values: dynamic, maximum, default)
depth: int, optional (default=5)
This parameter indicates the number of variables that will be added to rules.
list_ignore: list of strings, optional (default=None)
List containing the column names that will not be used in search activity.
list_conds: list of strings, optional (default=None)
List containing the column names that are neccesary to appear in rules. It will only work with InfoGained algorithm.
"""
df = pd.read_csv(currentdir+"/datasets/"+dataname+".csv",index_col=[0])
if class_column not in df.columns:
print("No class column")
exit(0)
if df[class_column].dtype in [np.int16, np.int32, np.int64]:
class_value = int(class_value)
if class_value not in df[class_column].unique():
print("The class_value specified is not an option")
exit(0)
target = ps.BinaryTarget (class_column, class_value)
searchspace = ps.create_selectors(df, ignore=list_ignore)
mode_parameter = {'dynamic' : 0, 'maximum': 1, "default":2}[mode_parameter]
task = ps.SubgroupDiscoveryTask (
df,
target,
searchspace,
mode=mode_parameter,
depth=depth,
filter_vars = list_conds,
qf=ps.WRAccQF())
result, result_cut = ps.InfoGainedSearch().execute(task)
#df_result = result.to_dataframe()
df_result_cut = result_cut.to_dataframe(mode=mode_parameter)
df_result_cut.drop_duplicates(inplace=True)
df_result_cut.reset_index(drop=True,inplace=True)
#df_result = df_result[df_result.apply(lambda row: eliminate_reps(row['subgroup'],list(df_result["subgroup"])), axis=1)]
df_result_cut = df_result_cut[df_result_cut.apply(lambda row: eliminate_reps(row['subgroup'],list(df_result_cut["subgroup"])), axis=1)]
df_result_cut["target"] = [class_value] * df_result_cut.shape[0]
#route = currentdir+"/results/"+dataname+"_"+class_column+"_"+str(class_value)+".csv"
""" dir_type = "max"
if mode_parameter == 0:
dir_type = "threshold"
route = parentdir+"/datasets_compared/"+dataname+"/InfoGained/"+dir_type+"/"+dataname+"_"+class_column+"_"+str(class_value)+".csv" """
route = currentdir+"/results/"+dataname+"_"+class_column+"_"+str(class_value)+".csv"
df_result_cut.to_csv(route, encoding="UTF-8",index=True)
if __name__ == "__main__":
#list_ignore=['Prog_Rec', 'ToxBin', 'boolenProg', 'booleanTox', 'NoProg-Tox', 'SiProg-Tox',"orgfam","target","target_num"]
# Cancer_stage, FirstTreatment
parser=argparse.ArgumentParser()
parser.add_argument('--dataname', type=str, required=True)
parser.add_argument('--class_column', type=str, required=True)
parser.add_argument('--class_value', type=str, required=True)
parser.add_argument('--mode', type=str, choices=["dynamic","maximum","default"], default="default")
parser.add_argument('--depth', type=int, required=True)
parser.add_argument("--list_ignore", nargs="*", type=str, default=[])
parser.add_argument("--list_conds", nargs="*", type=str, default=[])
args = parser.parse_args()
info_gained_algorithm(args.dataname,args.class_column,args.class_value,args.mode,args.depth,args.list_ignore,args.list_conds)
\ No newline at end of file
from pysubgroup_mod.subgroup_description import *
from pysubgroup_mod.algorithms import *
from pysubgroup_mod.measures import *
from pysubgroup_mod.utils import *
from pysubgroup_mod.binary_target import *
from pysubgroup_mod.numeric_target import *
from pysubgroup_mod.fi_target import *
from pysubgroup_mod.visualization import *
from pysubgroup_mod.refinement_operator import *
from pysubgroup_mod.representations import *
from pysubgroup_mod.constraints import *
from pysubgroup_mod.subgroup_description import *
'''
Created on 29.04.2016
@author: lemmerfn
'''
import copy
from time import time
from math import factorial
from itertools import combinations, chain
from heapq import heappush, heappop
from collections import Counter, namedtuple
import warnings
import numpy as np
import pysubgroup_mod as ps
class SubgroupDiscoveryTask:
'''
Capsulates all parameters required to perform standard subgroup discovery
'''
def __init__(self, data, target, search_space, qf, result_set_size=10, depth=3, min_quality=0, mode=0, timeout=3600, filter_vars=None, constraints=None):
self.data = data
self.target = target
self.search_space = search_space
self.qf = qf
self.result_set_size = result_set_size
self.depth = depth
self.min_quality = min_quality
self.mode = mode
self.timeout = timeout
if constraints is None:
constraints = []
if filter_vars is None:
filter_vars = []
self.filter_vars = filter_vars
self.constraints = constraints
self.constraints_monotone = [constr for constr in constraints if constr.is_monotone]
self.constraints_other = [constr for constr in constraints if not constr.is_monotone]
def constraints_satisfied(constraints, subgroup, statistics=None, data=None):
return all(constr.is_satisfied(subgroup, statistics, data) for constr in constraints)
class Apriori:
def __init__(self, representation_type=None, combination_name='Conjunction', use_numba=True):
self.combination_name = combination_name
if representation_type is None:
representation_type = ps.BitSetRepresentation
self.representation_type = representation_type
self.use_vectorization = False
self.use_repruning = True
#self.optimistic_estimate_name = 'optimistic_estimate'
self.optimistic_estimate_name = 'calculate_statistics'
self.next_level = self.get_next_level
self.compiled_func = None
if use_numba:
try:
import numba # pylint: disable=unused-import, import-outside-toplevel
self.next_level = self.get_next_level_numba
print('Apriori: Using numba for speedup')
except ImportError:
pass
def get_next_level_candidates(self, task, result, next_level_candidates):
promising_candidates = []
optimistic_estimate_function = getattr(task.qf, self.optimistic_estimate_name)
for sg in next_level_candidates:
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
#print(statistics)
""" if statistics.positives_count < 1:
continue """
ps.add_if_required(result, sg, task.qf.evaluate(sg, statistics, task.target, task.data), task, statistics=statistics)
#optimistic_estimate = optimistic_estimate_function(sg, task.target, task.data, statistics)
optimistic_estimate = task.qf.evaluate(sg, statistics, task.target, task.data)
#if optimistic_estimate >= ps.minimum_required_quality(result, task):
if optimistic_estimate >= task.min_quality:
#if ps.constraints_hold(task.constraints_monotone, sg, statistics, task.data):
promising_candidates.append((optimistic_estimate, sg.selectors))
#min_quality = ps.minimum_required_quality(result, task)
min_quality = task.min_quality
promising_candidates = [selectors for estimate, selectors in promising_candidates if estimate > min_quality]
#promising_candidates = [selectors for estimate, selectors in promising_candidates]
return promising_candidates
def get_next_level_candidates_vectorized(self, task, result, next_level_candidates):
promising_candidates = []
statistics = []
optimistic_estimate_function = getattr(task.qf, self.optimistic_estimate_name)
for sg in next_level_candidates:
statistics.append(task.qf.calculate_statistics(sg, task.target, task.data))
tpl_class = statistics[0].__class__
vec_statistics = tpl_class._make(np.array(tpl) for tpl in zip(*statistics))
qualities = task.qf.evaluate(None, task.target, task.data, vec_statistics)
optimistic_estimates = optimistic_estimate_function(None, None, None, vec_statistics)
for sg, quality, stats in zip(next_level_candidates, qualities, statistics):
ps.add_if_required(result, sg, quality, task, statistics=stats)
min_quality = ps.minimum_required_quality(result, task)
for sg, optimistic_estimate in zip(next_level_candidates, optimistic_estimates):
if optimistic_estimate >= min_quality:
promising_candidates.append(sg.selectors)
return promising_candidates
def reprune_lower_levels(self, promising_candidates, depth):
for k in range(1, depth):
promising_candidates_k = (combinations(selectors, k) for selectors in promising_candidates)
combination_counter = Counter(chain.from_iterable(promising_candidates_k))
d = depth + 1 - k
unpromising_combinations = set(frozenset(sel) for sel, count in combination_counter.items() if count < d)
promising_candidates = list(selectors for selectors in promising_candidates
if all(frozenset(comb) not in unpromising_combinations for comb in combinations(selectors, k)))
return promising_candidates
def get_next_level_numba(self, promising_candidates):
from numba import jit # pylint: disable=import-error, import-outside-toplevel
if not hasattr(self, 'compiled_func') or self.compiled_func is None:
@jit
def getNewCandidates(l, hashes):
result = []
for i in range(len(l)-1):
for j in range(i + 1, len(l)):
if hashes[i] == hashes[j]:
if np.all(l[i, :-1] == l[j, :-1]):
result.append((i, j))
return result
self.compiled_func = getNewCandidates
all_selectors = Counter(chain.from_iterable(promising_candidates))
d = {selector:i for i, selector in enumerate(all_selectors)}
l = [tuple(d[sel] for sel in selectors) for selectors in promising_candidates]
arr = np.array(l, dtype=int)
print(len(arr))
hashes = np.array([hash(tuple(x[:-1])) for x in l], dtype=np.int64)
candidates_int = self.compiled_func(arr, hashes)
return list((*promising_candidates[i], promising_candidates[j][-1]) for i, j in candidates_int)
def get_next_level(self, promising_candidates):
precomputed_list = list((tuple(sg), sg[-1], hash(tuple(sg[:-1])), tuple(sg[:-1])) for sg in promising_candidates)
return list((*sg1, new_selector) for (sg1, _, hash_l, selectors_l), (_, new_selector, hash_r, selectors_r) in combinations(precomputed_list, 2)
if (hash_l == hash_r) and (selectors_l == selectors_r))
def execute(self, task):
""" if not isinstance(task.qf, ps.BoundedInterestingnessMeasure):
raise RuntimeWarning("Quality function is unbounded, long runtime expected") """
task.qf.calculate_constant_statistics(task.data, task.target)
with self.representation_type(task.data, task.search_space) as representation:
combine_selectors = getattr(representation.__class__, self.combination_name)
result = []
# init the first level
next_level_candidates = []
for sel in task.search_space:
next_level_candidates.append(combine_selectors([sel]))
# level-wise search
depth = 1
while next_level_candidates:
# check sgs from the last level
if self.use_vectorization:
promising_candidates = self.get_next_level_candidates_vectorized(task, result, next_level_candidates)
else:
promising_candidates = self.get_next_level_candidates(task, result, next_level_candidates)
if depth == task.depth:
break
if self.use_repruning:
promising_candidates = self.reprune_lower_levels(promising_candidates, depth)
next_level_candidates_no_pruning = self.next_level(promising_candidates)
# select those selectors and build a subgroup from them
# for which all subsets of length depth (=candidate length -1) are in the set of promising candidates
set_promising_candidates = set(tuple(p) for p in promising_candidates)
next_level_candidates = [combine_selectors(selectors) for selectors in next_level_candidates_no_pruning
if all((subset in set_promising_candidates) for subset in combinations(selectors, depth))]
depth = depth + 1
result.sort(key=lambda x: x[0], reverse=True)
return ps.SubgroupDiscoveryResult(result, task)
class BestFirstSearch:
def execute(self, task):
result = []
queue = [(float("-inf"), ps.Conjunction([]))]
operator = ps.StaticSpecializationOperator(task.search_space)
task.qf.calculate_constant_statistics(task.data, task.target)
while queue:
q, old_description = heappop(queue)
q = -q
if not q > ps.minimum_required_quality(result, task):
break
for candidate_description in operator.refinements(old_description):
sg = candidate_description
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
ps.add_if_required(result, sg, task.qf.evaluate(sg, task.target, task.data, statistics), task, statistics=statistics)
if len(candidate_description) < task.depth:
optimistic_estimate = task.qf.optimistic_estimate(sg, task.target, task.data, statistics)
# compute refinements and fill the queue
if optimistic_estimate >= ps.minimum_required_quality(result, task):
if ps.constraints_satisfied(task.constraints_monotone, candidate_description, statistics, task.data):
heappush(queue, (-optimistic_estimate, candidate_description))
result.sort(key=lambda x: x[0], reverse=True)
return ps.SubgroupDiscoveryResult(result, task)
class GeneralisingBFS:
def __init__(self):
self.alpha = 1.10
self.discarded = [0, 0, 0, 0, 0, 0, 0]
self.refined = [0, 0, 0, 0, 0, 0, 0]
def execute(self, task):
result = []
queue = []
operator = ps.StaticGeneralizationOperator(task.search_space)
# init the first level
for sel in task.search_space:
queue.append((float("-inf"), ps.Disjunction([sel])))
task.qf.calculate_constant_statistics(task.data, task.target)
while queue:
q, candidate_description = heappop(queue)
q = -q
if q < ps.minimum_required_quality(result, task):
break
sg = candidate_description
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
quality = task.qf.evaluate(sg, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
qual = ps.minimum_required_quality(result, task)
if (quality, sg) in result:
new_queue = []
for q_tmp, c_tmp in queue:
if (-q_tmp) > qual:
heappush(new_queue, (q_tmp, c_tmp))
queue = new_queue
optimistic_estimate = task.qf.optimistic_estimate(sg, task.target, task.data, statistics)
# else:
# ps.add_if_required(result, sg, task.qf.evaluate_from_dataset(task.data, sg), task)
# optimistic_estimate = task.qf.optimistic_generalisation_from_dataset(task.data, sg) if qf_is_bounded else float("inf")
# compute refinements and fill the queue
if len(candidate_description) < task.depth and (optimistic_estimate / self.alpha ** (len(candidate_description) + 1)) >= ps.minimum_required_quality(result, task):
# print(qual)
# print(optimistic_estimate)
self.refined[len(candidate_description)] += 1
# print(str(candidate_description))
for new_description in operator.refinements(candidate_description):
heappush(queue, (-optimistic_estimate, new_description))
else:
self.discarded[len(candidate_description)] += 1
result.sort(key=lambda x: x[0], reverse=True)
for qual, sg in result:
print("{} {}".format(qual, sg))
print("discarded " + str(self.discarded))
return ps.SubgroupDiscoveryResult(result, task)
class BeamSearch:
'''
Implements the BeamSearch algorithm. Its a basic implementation
'''
def __init__(self, beam_width=20, beam_width_adaptive=False):
self.beam_width = beam_width
self.beam_width_adaptive = beam_width_adaptive
def execute(self, task):
# adapt beam width to the result set size if desired
if self.beam_width_adaptive:
self.beam_width = task.result_set_size
# check if beam size is to small for result set
if self.beam_width < task.result_set_size:
raise RuntimeError('Beam width in the beam search algorithm is smaller than the result set size!')
if task.mode != 2:
raise RuntimeError('Mode parameter must be 2 for BeamSearch algorithm')
task.qf.calculate_constant_statistics(task.data, task.target)
# init
beam = [(0, 0, ps.Conjunction([],task.mode), task.qf.calculate_statistics(slice(None), task.target, task.data))]
last_beam = None
depth = 0
while beam != last_beam and depth < task.depth:
last_beam = beam.copy()
for (_, _,last_sg, _) in last_beam:
if not getattr(last_sg, 'visited', False):
setattr(last_sg, 'visited', True)
for sel in task.search_space:
# create a clone
new_selectors = list(last_sg.selectors)
if sel not in new_selectors:
new_selectors.append(sel)
sg = ps.Conjunction(new_selectors,task.mode)
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
quality,_,_ = task.qf.evaluate(sg, task.target, task.data, statistics)
#p_value = ps.ChiSquaredQF(stat="p").evaluate(sg, task.target, task.data, statistics) # Calculate pvalue stat.
ps.add_if_required(beam, sg, quality, task, check_for_duplicates=True, statistics=statistics)
depth += 1
# TODO make sure there is no bug here
result = beam[:task.result_set_size]
result.sort(key=lambda x: x[0], reverse=True)
return ps.SubgroupDiscoveryResult(result, task)
class InfoGainedSearch:
'''
Implements the InfoGained algorithm.
'''
def execute(self, task):
if task.mode not in [0,1]:
raise RuntimeError('Mode parameter must be dynamic or maximum for InfoGained algorithm')
task.qf.calculate_constant_statistics(task.data, task.target)
# beam = [wracc list, sg, stats list, info_gained list, odd_value list, idx, p_value list]
# Each list is generated by adding in each step the information of the parent node + child node.
# idx parameters is used to manage if the required variables are presented in the sg.
if len(task.filter_vars) == 0:
beam = [([], ps.Conjunction([],task.mode), [],[],[],-1,[])]
else:
beam = [([], ps.Conjunction([],task.mode), [],[],[],0,[])]
last_beam = None
depth = 0
start = time()
while beam != last_beam and depth < task.depth:
#print(depth)
last_beam = beam.copy()
beam.clear() # List used to save all the candidates of iteration n.
smt = False # Parameter used to control if there are something in beam list
for index_beam, (quality_l, last_sg, stats_l, info_l, odd_l,value_idx,pvalue_l) in enumerate(last_beam): # For each node in list, all possible nodes are generated.
aux_beam = [] # List used for each parent to save candidates.
for sel in task.search_space:
# Generate a sg using the parents labels + possible labels.
new_selectors = list(last_sg.selectors)
if sel not in new_selectors: # A sg can not contain 2 same selectors.
new_selectors.append(sel) # New sg generated
sg = ps.Conjunction(new_selectors,task.mode)
statistics = task.qf.calculate_statistics(sg, task.target, task.data) ## Calculate some stats
quality, info_gain, odd_v = task.qf.evaluate(sg, task.target, task.data, statistics, measures=True) ## Calculate wracc, info gained and odd value
# If wracc stat < 0, then generated sg is not take into account.
if quality >= 0:
sel_idx = value_idx
if len(task.filter_vars) > 0:
if any(filter_var in str(sel) for filter_var in task.filter_vars):
sel_idx = value_idx + 1
# If there are not elements in beam, we will add the element. In case there are something in list, check
# if new generated sg is already in list (it means that new sg is in the list but has different labels order).
if smt is False or sorted(new_selectors) not in [sorted(elem[1]._selectors) for elem in beam]:
p_value = ps.ChiSquaredQF(stat="p").evaluate(sg, task.target, task.data, statistics) # Calculate pvalue stat.
aux_beam.append((quality_l.copy() + [quality], sg, stats_l.copy() + [statistics], info_l.copy() + [info_gain], odd_l.copy() + [odd_v],sel_idx, pvalue_l.copy() + [p_value]))
# After adding all the candidates that satisfy the conditions
if len(aux_beam) > 1:
filter_list = []
# if more than one element in list and (not in first iteration and parent node does not contain all the required variables)
if depth > 0 and value_idx not in [-1,len(task.filter_vars)]:
# Depend on the number of required variables in parent, filter generated sg.
filter_list = list(filter(lambda x:x[5] == value_idx + 1, aux_beam))
# Using info gained threshold, we will obtain the final list with candidates.
aux_beam = ps.calculate_threshold(filter_list,aux_beam,depth,task.mode)
beam.extend(aux_beam)
smt = True
if time()-start>task.timeout:
beam.extend(last_beam[index_beam+1:])
break
if time()-start>task.timeout:
break
depth += 1
beam_cut = [] # Final gropus cutted.
for elem in beam:
tp = ps.best_complex(elem,task.mode,task.filter_vars)
if tp != None:
beam_cut.append(tp)
beam = [(elem[0][-1],elem[1],elem[2][-1],elem[3][-1],elem[4][-1],elem[5],elem[6][-1]) for elem in beam] # Final groups.
return ps.SubgroupDiscoveryResult(beam, task), ps.SubgroupDiscoveryResult(beam_cut, task)
class SimpleSearch:
def __init__(self, show_progress=True):
self.show_progress = show_progress
def execute(self, task):
task.qf.calculate_constant_statistics(task.data, task.target)
result = []
all_selectors = chain.from_iterable(combinations(task.search_space, r) for r in range(1, task.depth + 1))
if self.show_progress:
try:
from tqdm import tqdm # pylint: disable=import-outside-toplevel
def binomial(x, y):
try:
binom = factorial(x) // factorial(y) // factorial(x - y)
except ValueError:
binom = 0
return binom
total = sum(binomial(len(task.search_space), k) for k in range(1, task.depth + 1))
all_selectors = tqdm(all_selectors, total=total)
except ImportError:
pass
for selectors in all_selectors:
sg = ps.Conjunction(selectors)
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
quality = task.qf.evaluate(sg, task.target, task.data, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
result.sort(key=lambda x: x[0], reverse=True)
return ps.SubgroupDiscoveryResult(result, task)
class SimpleDFS:
def execute(self, task, use_optimistic_estimates=True):
task.qf.calculate_constant_statistics(task.data, task.target)
result = self.search_internal(task, [], task.search_space, [], use_optimistic_estimates)
result.sort(key=lambda x: x[0], reverse=True)
return ps.SubgroupDiscoveryResult(result, task)
def search_internal(self, task, prefix, modification_set, result, use_optimistic_estimates):
sg = ps.Conjunction(copy.copy(prefix))
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
if use_optimistic_estimates and len(prefix) < task.depth and isinstance(task.qf, ps.BoundedInterestingnessMeasure):
optimistic_estimate = task.qf.optimistic_estimate(sg, task.target, task.data, statistics)
if not optimistic_estimate > ps.minimum_required_quality(result, task):
return result
quality = task.qf.evaluate(sg, task.target, task.data, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
if not ps.constraints_satisfied(task.constraints_monotone, sg, statistics=statistics, data=task.data):
return
if len(prefix) < task.depth:
new_modification_set = copy.copy(modification_set)
for sel in modification_set:
prefix.append(sel)
new_modification_set.pop(0)
self.search_internal(task, prefix, new_modification_set, result, use_optimistic_estimates)
# remove the sel again
prefix.pop(-1)
return result
class DFS:
"""
Implementation of a depth-first-search with look-ahead using a provided datastructure.
"""
def __init__(self, apply_representation):
self.target_bitset = None
self.apply_representation = apply_representation
self.operator = None
self.params_tpl = namedtuple('StandardQF_parameters', ('size_sg', 'positives_count'))
def execute(self, task):
self.operator = ps.StaticSpecializationOperator(task.search_space)
task.qf.calculate_constant_statistics(task.data, task.target)
result = []
with self.apply_representation(task.data, task.search_space) as representation:
self.search_internal(task, result, representation.Conjunction([]))
result.sort(key=lambda x: x[0], reverse=True)
return ps.SubgroupDiscoveryResult(result, task)
def search_internal(self, task, result, sg):
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
if not constraints_satisfied(task.constraints_monotone, sg, statistics, task.data):
return
optimistic_estimate = task.qf.optimistic_estimate(sg, task.target, task.data, statistics)
if not optimistic_estimate > ps.minimum_required_quality(result, task):
return
quality = task.qf.evaluate(sg, task.target, task.data, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
if sg.depth < task.depth:
for new_sg in self.operator.refinements(sg):
self.search_internal(task, result, new_sg)
class DFSNumeric:
tpl = namedtuple('size_mean_parameters', ('size_sg', 'mean'))
def __init__(self):
self.pop_size = 0
self.f = None
self.target_values = None
self.bitsets = {}
self.num_calls = 0
def execute(self, task):
if not isinstance(task.qf, ps.StandardQFNumeric):
warnings.warn("BSD_numeric so far is only implemented for StandardQFNumeric")
self.pop_size = len(task.data)
sorted_data = task.data.sort_values(task.target.get_attributes(), ascending=False)
# generate target bitset
self.target_values = sorted_data[task.target.get_attributes()[0]].to_numpy()
task.qf.calculate_constant_statistics(task.data, task.target)
# generate selector bitsets
self.bitsets = {}
for sel in task.search_space:
# generate bitset
self.bitsets[sel] = sel.covers(sorted_data)
result = self.search_internal(task, [], task.search_space, [], np.ones(len(sorted_data), dtype=bool))
result.sort(key=lambda x: x[0], reverse=True)
return ps.SubgroupDiscoveryResult(result, task)
def search_internal(self, task, prefix, modification_set, result, bitset):
self.num_calls += 1
sg_size = bitset.sum()
if sg_size == 0:
return result
target_values_sg = self.target_values[bitset]
target_values_cs = np.cumsum(target_values_sg)
sizes = np.arange(1, len(target_values_cs) + 1)
mean_values_cs = target_values_cs / sizes
tpl = DFSNumeric.tpl(sizes, mean_values_cs)
qualities = task.qf.evaluate(None, None, None, tpl)
optimistic_estimate = np.max(qualities)
if optimistic_estimate <= ps.minimum_required_quality(result, task):
return result
sg = ps.Conjunction(copy.copy(prefix))
quality = qualities[-1]
ps.add_if_required(result, sg, quality, task)
if len(prefix) < task.depth:
new_modification_set = copy.copy(modification_set)
for sel in modification_set:
prefix.append(sel)
new_bitset = bitset & self.bitsets[sel]
new_modification_set.pop(0)
self.search_internal(task, prefix, new_modification_set, result, new_bitset)
# remove the sel again
prefix.pop(-1)
return result
'''
Created on 29.09.2017
@author: lemmerfn
'''
from collections import namedtuple
from functools import total_ordering
import numpy as np
import scipy.stats
from pysubgroup_mod import utils
import pysubgroup_mod as ps
from pysubgroup_mod.subgroup_description import EqualitySelector
@total_ordering
class BinaryTarget:
statistic_types = ('size_sg', 'size_dataset', 'positives_sg', 'positives_dataset', 'size_complement',
'relative_size_sg', 'relative_size_complement', 'coverage_sg', 'coverage_complement',
'target_share_sg', 'target_share_complement', 'target_share_dataset', 'lift')
def __init__(self, target_attribute=None, target_value=None, target_selector=None):
"""
Creates a new target for the boolean model class (classic subgroup discovery).
If target_attribute and target_value are given, the target_selector is computed using attribute and value
"""
if target_attribute is not None and target_value is not None:
if target_selector is not None:
raise BaseException("BinaryTarget is to be constructed EITHER by a selector OR by attribute/value pair")
target_selector = EqualitySelector(target_attribute, target_value)
if target_selector is None:
raise BaseException("No target selector given")
self.target_selector = target_selector
def __repr__(self):
return "T: " + str(self.target_selector)
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __lt__(self, other):
return str(self) < str(other)
def covers(self, instance):
return self.target_selector.covers(instance)
def get_attributes(self):
return [self.target_selector.get_attribute_name()]
def get_base_statistics(self, subgroup, data):
cover_arr, size_sg = ps.get_cover_array_and_size(subgroup, len(data), data)
positives = self.covers(data)
instances_subgroup = size_sg
positives_dataset = np.sum(positives)
instances_dataset = len(data)
positives_subgroup = np.sum(positives[cover_arr])
return instances_dataset, positives_dataset, instances_subgroup, positives_subgroup
def calculate_statistics(self, subgroup, data, cached_statistics=None):
if cached_statistics is None or not isinstance(cached_statistics, dict):
statistics = dict()
elif all(k in cached_statistics for k in BinaryTarget.statistic_types):
return cached_statistics
else:
statistics = cached_statistics
(instances_dataset, positives_dataset, instances_subgroup, positives_subgroup) = \
self.get_base_statistics(subgroup, data)
statistics['size_sg'] = instances_subgroup
statistics['size_dataset'] = instances_dataset
statistics['positives_sg'] = positives_subgroup
statistics['positives_dataset'] = positives_dataset
statistics['size_complement'] = instances_dataset - instances_subgroup
statistics['relative_size_sg'] = instances_subgroup / instances_dataset
statistics['relative_size_complement'] = (instances_dataset - instances_subgroup) / instances_dataset
statistics['coverage_sg'] = positives_subgroup / positives_dataset
statistics['coverage_complement'] = (positives_dataset - positives_subgroup) / positives_dataset
statistics['target_share_sg'] = positives_subgroup / instances_subgroup
statistics['target_share_complement'] = (positives_dataset - positives_subgroup) / (instances_dataset - instances_subgroup)
statistics['target_share_dataset'] = positives_dataset / instances_dataset
statistics['lift'] = statistics['target_share_sg'] / statistics['target_share_dataset']
return statistics
class SimplePositivesQF(ps.AbstractInterestingnessMeasure): # pylint: disable=abstract-method
tpl = namedtuple('PositivesQF_parameters', ('size_sg', 'positives_count'))
def __init__(self):
self.dataset_statistics = None
self.positives = None
self.has_constant_statistics = False
self.required_stat_attrs = ('size_sg', 'positives_count')
def calculate_constant_statistics(self, data, target):
assert isinstance(target, BinaryTarget)
self.positives = target.covers(data)
self.dataset_statistics = SimplePositivesQF.tpl(len(data), np.sum(self.positives))
self.has_constant_statistics = True
def calculate_statistics(self, subgroup, target, data, statistics=None):
cover_arr, size_sg = ps.get_cover_array_and_size(subgroup, len(self.positives), data)
return SimplePositivesQF.tpl(size_sg, np.count_nonzero(self.positives[cover_arr]))
# TODO Make ChiSquared useful for real nominal data not just binary
# TODO Introduce Enum for direction
# TODO Maybe it is possible to give a optimistic estimate for ChiSquared
class ChiSquaredQF(SimplePositivesQF):
"""
ChiSquaredQF which test for statistical independence of a subgroup against it's complement
...
"""
@staticmethod
def chi_squared_qf(instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, min_instances=1, bidirect=True, direction_positive=True, index=0):
"""
Performs chi2 test of statistical independence
Test whether a subgroup is statistically independent from it's complement (see scipy.stats.chi2_contingency).
Parameters
----------
instances_dataset, positives_dataset, instances_subgroup, positives_subgroup : int
counts of subgroup and dataset
min_instances : int, optional
number of required instances, if less -inf is returned for that subgroup
bidirect : bool, optional
If true both directions are considered interesting else direction_positive decides which direction is interesting
direction_positive: bool, optional
Only used if bidirect=False; specifies whether you are interested in positive (True) or negative deviations
index : {0, 1}, optional
decides whether the test statistic (0) or the p-value (1) should be used
"""
if (instances_subgroup < min_instances) or ((instances_dataset - instances_subgroup) < min_instances):
#if (instances_subgroup < min_instances):
return float("-inf")
negatives_subgroup = instances_subgroup - positives_subgroup # pylint: disable=bad-whitespace
negatives_dataset = instances_dataset - positives_dataset # pylint: disable=bad-whitespace
negatives_complement = negatives_dataset - negatives_subgroup
positives_complement = positives_dataset - positives_subgroup
val = scipy.stats.chi2_contingency([[positives_subgroup, positives_complement],
[negatives_subgroup, negatives_complement]], correction=False)[index]
if bidirect:
return val
p_subgroup = positives_subgroup / instances_subgroup
p_dataset = positives_dataset / instances_dataset
if direction_positive and p_subgroup > p_dataset:
return val
elif not direction_positive and p_subgroup < p_dataset:
return val
return -val
@staticmethod
def chi_squared_qf_weighted(subgroup, data, weighting_attribute, effective_sample_size=0, min_instances=5, ):
(instancesDataset, positivesDataset, instancesSubgroup, positivesSubgroup) = subgroup.get_base_statistics(data, weighting_attribute)
if (instancesSubgroup < min_instances) or ((instancesDataset - instancesSubgroup) < 5):
return float("inf")
if effective_sample_size == 0:
effective_sample_size = ps.effective_sample_size(data[weighting_attribute])
# p_subgroup = positivesSubgroup / instancesSubgroup
# p_dataset = positivesDataset / instancesDataset
negatives_subgroup = instancesSubgroup - positivesSubgroup
negatives_dataset = instancesDataset - positivesDataset
positives_complement = positivesDataset - positivesSubgroup
negatives_complement = negatives_dataset - negatives_subgroup
val = scipy.stats.chi2_contingency([[positivesSubgroup, positives_complement],
[negatives_subgroup, negatives_complement]], correction=True)[0]
return scipy.stats.chi2.sf(val * effective_sample_size / instancesDataset, 1)
def __init__(self, direction='both', min_instances=1, stat='chi2'):
"""
Parameters
----------
direction : {'both', 'positive', 'negative'}
direction of deviation that is of interest
min_instances : int, optional
number of required instances, if less -inf is returned for that subgroup
stat : {'chi2', 'p'}
whether to report the test statistic or the p-value (see scipy.stats.chi2_contingency)
"""
if direction == 'both':
self.bidirect = True
self.direction_positive = True
if direction == 'positive':
self.bidirect = False
self.direction_positive = True
if direction == 'negative':
self.bidirect = False
self.direction_positive = False
self.min_instances = min_instances
self.index = {'chi2' : 0, 'p': 1}[stat]
super().__init__()
def evaluate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
dataset = self.dataset_statistics
return ChiSquaredQF.chi_squared_qf(dataset.size_sg, dataset.positives_count, statistics.size_sg, statistics.positives_count, self.min_instances, self.bidirect, self.direction_positive, self.index)
class StandardQF(SimplePositivesQF, ps.BoundedInterestingnessMeasure):
"""
StandardQF which weights the relative size against the difference in averages
The StandardQF is a general form of quality function which for different values of a is order equivalen to
many popular quality measures.
Attributes
----------
a : float
used as an exponent to scale the relative size to the difference in averages
"""
@staticmethod
def standard_qf(subg,a, instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, measures):
if not hasattr(instances_subgroup, '__array_interface__') and (instances_subgroup == 0):
return np.nan, np.nan, np.nan
p_subgroup = np.divide(positives_subgroup, instances_subgroup)
#if instances_subgroup == 0:
# return 0
#p_subgroup = positives_subgroup / instances_subgroup
p_dataset = positives_dataset / instances_dataset
if measures is True:
return (instances_subgroup / instances_dataset) ** a * (p_subgroup - p_dataset), utils.calculate_info_gained(instances_dataset,instances_subgroup,positives_dataset,positives_subgroup), utils.calculate_odd_value(instances_dataset,instances_subgroup,positives_dataset,positives_subgroup)
return (instances_subgroup / instances_dataset) ** a * (p_subgroup - p_dataset)
def __init__(self, a):
"""
Parameters
----------
a : float
exponent to trade-off the relative size with the difference in means
"""
self.a = a
super().__init__()
def evaluate(self, subgroup, target, data, statistics=None, measures=False):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
dataset = self.dataset_statistics
return StandardQF.standard_qf(subgroup,self.a, dataset.size_sg, dataset.positives_count, statistics.size_sg, statistics.positives_count,measures)
def optimistic_estimate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
dataset = self.dataset_statistics
return StandardQF.standard_qf(subgroup,self.a, dataset.size_sg, dataset.positives_count, statistics.positives_count, statistics.positives_count)
def optimistic_generalisation(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
dataset = self.dataset_statistics
pos_remaining = dataset.positives_count - statistics.positives_count
return StandardQF.standard_qf(subgroup,self.a, dataset.size_sg, dataset.positives_count, statistics.size_sg + pos_remaining, dataset.positives_count)
class LiftQF(StandardQF):
"""
Lift Quality Function
LiftQF is a StandardQF with a=0.
Thus it treats the difference in ratios as the quality without caring about the relative size of a subgroup.
"""
def __init__(self):
"""
"""
super().__init__(0.0)
# TODO add true binomial quality function as in https://opus.bibliothek.uni-wuerzburg.de/opus4-wuerzburg/frontdoor/index/index/docId/1786
class SimpleBinomialQF(StandardQF):
"""
Simple Binomial Quality Function
SimpleBinomialQF is a StandardQF with a=0.5.
It is an order equivalent approximation of the full binomial test if the subgroup size is much smaller than the size of the entire dataset.
"""
def __init__(self):
"""
"""
super().__init__(0.5)
class WRAccQF(StandardQF):
"""
Weighted Relative Accuracy Quality Function
WRAccQF is a StandardQF with a=1.
It is order equivalent to the difference in the observed and expected number of positive instances.
"""
def __init__(self):
"""
"""
super().__init__(1.0)
#####
# GeneralizationAware Interestingness Measures
#####
class GeneralizationAware_StandardQF(ps.GeneralizationAwareQF_stats):
def __init__(self, a):
super().__init__(StandardQF(0))
self.a = a
def get_max(self, *args):
max_ratio = 0.0
max_stats = None
for stat in args:
if stat.size_sg > 0:
ratio = stat.positives_count / stat.size_sg
if ratio > max_ratio:
max_ratio = ratio
max_stats = stat
return max_stats
def evaluate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
sg_stats = statistics.subgroup_stats
general_stats = statistics.generalisation_stats
if sg_stats.size_sg == 0 or general_stats.size_sg == 0:
return np.nan
sg_ratio = sg_stats.positives_count / sg_stats.size_sg
general_ratio = general_stats.positives_count / general_stats.size_sg
return (sg_stats.size_sg / self.stats0.size_sg) ** self.a * (sg_ratio - general_ratio)
import pysubgroup_mod as ps
class MinSupportConstraint:
def __init__(self, min_support):
self.min_support = min_support
@property
def is_monotone(self):
return True
def is_satisfied(self, subgroup, statistics=None, data=None):
if hasattr(statistics, 'size'):
return statistics.size >= self.min_support
elif hasattr(statistics, 'size_sg'):
return statistics.size_sg >= self.min_support
else:
return ps.get_size(subgroup, len(data), data) >= self.min_support
'''
Created on 29.09.2017
@author: lemmerfn
'''
from collections import namedtuple
from functools import total_ordering
import pysubgroup_mod as ps
@total_ordering
class FITarget:
statistic_types = ('size_sg', 'size_dataset')
def __repr__(self):
return "T: Frequent Itemsets"
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __lt__(self, other):
return str(self) < str(other)
def get_attributes(self):
return []
def get_base_statistics(self, subgroup, data):
_, size = ps.get_cover_array_and_size(subgroup, len(data), data)
return size
def calculate_statistics(self, subgroup_description, data, cached_statistics=None):
if cached_statistics is None or not isinstance(cached_statistics, dict):
statistics = dict()
elif all(k in cached_statistics for k in FITarget.statistic_types):
return cached_statistics
else:
statistics = cached_statistics
_, size = ps.get_cover_array_and_size(subgroup_description, len(data), data)
statistics['size_sg'] = size
statistics['size_dataset'] = len(data)
return statistics
class SimpleCountQF(ps.AbstractInterestingnessMeasure):
tpl = namedtuple('CountQF_parameters', ('subgroup_size'))
def __init__(self):
self.required_stat_attrs = ('subgroup_size',)
self.has_constant_statistics = True
self.size_dataset = None
def calculate_constant_statistics(self, data, target):
self.size_dataset = len(data)
def calculate_statistics(self, subgroup_description, target, data, statistics=None):
_, size = ps.get_cover_array_and_size(subgroup_description, self.size_dataset, data)
return SimpleCountQF.tpl(size)
def gp_get_stats(self, _):
return {"subgroup_size" : 1}
def gp_get_null_vector(self):
return {"subgroup_size":0}
def gp_merge(self, l, r):
l["subgroup_size"] += r["subgroup_size"]
def gp_get_params(self, _cover_arr, v):
return SimpleCountQF.tpl(v['subgroup_size'])
def gp_to_str(self, stats):
return str(stats['subgroup_size'])
@property
def gp_requires_cover_arr(self):
return False
class CountQF(SimpleCountQF, ps.BoundedInterestingnessMeasure):
def evaluate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
return statistics.subgroup_size
def optimistic_estimate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
return statistics.subgroup_size
class AreaQF(SimpleCountQF):
def evaluate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
return statistics.subgroup_size * subgroup.depth
from collections import namedtuple, defaultdict
from itertools import combinations
import numpy as np
import pysubgroup_mod as ps
from tqdm import tqdm
from copy import copy
import itertools
class GpGrowth:
def __init__(self, mode='b_u' ):
self.GP_node = namedtuple('GP_node', ['cls', 'id', 'parent', 'children', 'stats'])
self.minSupp = 10
self.tqdm = tqdm
self.depth = 0
self.mode = mode #specify eihther b_u (bottom up) or t_d (top down)
# Future: There also is the option of a stable mode which never creates the prefix trees
def prepare_selectors(self, search_space):
self.get_stats = task.qf.gp_get_stats
self.get_null_vector = task.qf.gp_get_null_vector
self.merge = task.qf.gp_merge
l = []
for selector in search_space:
cov_arr = selector.covers(data)
l.append((np.count_nonzero(cov_arr), selector, cov_arr))
l = [(size, selector, arr) for size, selector, arr in l if size > self.minSupp]
s = sorted(l, reverse=True)
selectors_sorted = [selector for size, selector, arr in s]
arrs = np.vstack([arr for size, selector, arr in s]).T
return selectors_sorted, arrs
def nodes_to_cls_nodes(self, nodes):
cls_nodes = defaultdict(list)
for node in nodes:
cls_nodes[node.cls].append(node)
return cls_nodes
def execute(self, task):
assert(self.mode in ('b_u', 't_d'))
task.qf.calculate_constant_statistics(task)
self.depth = task.depth
selectors_sorted, arrs = self.prepare_selectors(task.search_space)
self.requires_cover_arr = task.qf.gp_requires_cover_arr
# Create tree
root = self.GP_node(-1, -1, None, {}, self.get_null_vector())
nodes = []
for row_index, row in self.tqdm(enumerate(arrs), 'creating tree', total=len(arrs)):
self.normal_insert(root, nodes, self.get_stats(row_index), np.nonzero(row)[0])
nodes.append(root)
# mine tree
cls_nodes = self.nodes_to_cls_nodes(nodes)
if self.mode == 'b_u':
patterns = self.recurse(cls_nodes, [])
elif self.mode == 't_d':
patterns = self.recurse_top_down(cls_nodes, root)
else:
raise RuntimeError('mode needs to be either b_u or t_d')
# compute quality functions
return self.calculate_quality_function_for_patterns(patterns, selectors_sorted, arrs)
def calculate_quality_function_for_patterns(self, patterns, selectors_sorted, arrs):
out = []
for indices, gp_params in self.tqdm(patterns, 'computing quality function',):
if len(indices) > 0:
selectors = [selectors_sorted[i] for i in indices]
#print(selectors, stats)
sg = ps.Conjunction(selectors)
if self.requires_cover_arr:
statistics = task.qf.gp_get_params(np.all([arrs[i] for i in indices]), gp_params)
else:
statistics = task.qf.gp_get_params(None, gp_params)
#qual1 = task.qf.evaluate(sg, task.qf.calculate_statistics(sg, task.data))
qual2 = task.qf.evaluate(sg, statistics)
out.append((qual2, sg))
return out
def normal_insert(self, root, nodes, new_stats, classes):
node = root
for cls in classes:
if cls not in node.children:
new_child = self.GP_node(cls, len(nodes), node, {}, self.get_null_vector())
nodes.append(new_child)
node.children[cls] = new_child
self.merge(node.stats, new_stats)
node = node.children[cls]
self.merge(node.stats, new_stats)
return node
def insert_into_tree(self, root, nodes, new_stats, classes, max_depth):
''' Creates a tree of a maximum depth = depth
'''
if len(classes) <= max_depth:
self.normal_insert(root, nodes, new_stats, classes)
return
for prefix in combinations(classes, max_depth -1):
node = self.normal_insert(root, nodes, new_stats, classes)
# do normal insert for prefix
index_for_remaining = classes.index(prefix) + 1
for cls in classes[index_for_remaining:]:
if cls not in node.children:
new_child = self.GP_node(cls, len(nodes), node, {}, self.get_null_vector())
nodes.append(new_child)
node.children[cls] = new_child
self.merge(node.stats, new_stats)
def check_constraints(self, node):
#return node[0] >= self.minSupp
return node['size'] >= self.minSupp
def recurse(self, cls_nodes, prefix, is_single_path=False):
if len(cls_nodes) == 0:
raise RuntimeError
results = []
results.append((prefix, cls_nodes[-1][0].stats))
if len(prefix) >= self.depth:
return results
stats_dict = self.get_stats_for_class(cls_nodes)
if is_single_path:
root_stats = cls_nodes[-1][0].stats
del stats_dict[-1]
all_combinations = ps.powerset(stats_dict.keys(), max_length=self.depth - len(prefix))
for comb in all_combinations:
results.append((prefix+comb, root_stats))
else:
for cls, nodes in cls_nodes.items():
if cls >= 0:
if self.check_constraints(stats_dict[cls]):
if len(prefix) == (self.depth - 1):
results.append(((*prefix, cls), stats_dict[cls]))
else:
is_single_path_now = len(nodes) == 1
new_tree = self.create_new_tree_from_nodes(nodes)
if len(new_tree) > 0:
results.extend(self.recurse(new_tree, (*prefix, cls), is_single_path_now))
return results
def get_prefixes_top_down(self, alpha, max_length):
if len(alpha) == 0:
return [()]
if len(alpha) == 1 or max_length == 1:
return [(alpha[0],)]
prefixes = [(alpha[0],)]
prefixes.extend([(alpha[0], *suffix) for suffix in self.get_prefixes_top_down(alpha[1:], max_length-1)])
return prefixes
def recurse_top_down(self, cls_nodes, root, depth_in=0):
alpha = []
curr_depth = depth_in
while True:
if root.cls == -1:
pass
else:
alpha.append(root.cls)
if len(root.children) == 1 and curr_depth <= self.depth:
curr_depth += 1
root = next(iter(root.children.values()))
else:
break
prefixes = self.get_prefixes_top_down(alpha, max_length=self.depth - depth_in + 1)
# Bug: If we have a longer path that branches. eg. consider the tree from items A - B - C and A - B - D
# and depth - depth_in == 2 then prefixes = [(A), (A, B)] but the sets
# (A, C) and (A, D) are also valid
# basically if we have prefixes of diffrent length this does not work properly
if len(root.children) == 0 or curr_depth >= self.depth:
results = []
stats_dict = self.get_stats_for_class(cls_nodes)
for prefix in prefixes:
cls = max(prefix)
if self.check_constraints(stats_dict[cls]):
results.append((prefix, stats_dict[cls]))
return results
else:
suffixes = [((), root.stats)]
stats_dict = self.get_stats_for_class(cls_nodes)
for cls in cls_nodes:
if cls >= 0 and cls not in alpha:
if self.check_constraints(stats_dict[cls]):
# Future: There is also the possibility to compute the stats_dict of the prefix tree
# without creating the prefix tree first
# This might be useful if curr_depth == self.depth - 2
# as we need not recreate the tree
if curr_depth == (self.depth - 1):
suffixes.append(((cls,), stats_dict[cls]))
else:
new_root, nodes = self.get_top_down_tree_for_class(cls_nodes, cls)
if len(nodes) > 0:
new_cls_nodes = self.nodes_to_cls_nodes(nodes)
print(" " * curr_depth, cls, curr_depth, len(new_cls_nodes))
suffixes.extend(self.recurse_top_down(new_cls_nodes, new_root, curr_depth+1))
return [((*pre, *(suf[0])), suf[1]) for pre, suf in itertools.product(prefixes, suffixes)]
def remove_infrequent_class(self, nodes, cls_nodes, stats_dict):
# returns cleaned tree
infrequent_classes = []
for cls in cls_nodes:
if not self.check_constraints(stats_dict[cls]):
infrequent_classes.append(cls)
infrequent_classes = sorted(infrequent_classes, reverse=True)
for cls in infrequent_classes:
for node_to_remove in cls_nodes[cls]:
self.merge_trees_top_down(nodes, node_to_remove.parent, node_to_remove)
def get_top_down_tree_for_class(self, cls_nodes, cls):
# Future: Can eventually also remove infrequent nodes already during tree creation
base_root = None
nodes = []
if len(cls_nodes[cls]) > 0:
base_root = self.create_copy_of_tree_top_down(cls_nodes[cls][0], nodes)
for other_root in cls_nodes[cls][1:]:
self.merge_trees_top_down(nodes, base_root, other_root)
return base_root, nodes
def create_copy_of_tree_top_down(self, root, nodes=None, parent=None):
if nodes is None:
nodes = []
#if len(nodes) == 0:
# root_cls = -1
children = {}
new_root = self.GP_node(root.cls, len(nodes), parent, children, root.stats.copy())
nodes.append(new_root)
for child_cls, child in root.children.items():
new_child = self.create_copy_of_tree_top_down(child, nodes, new_root)
children[child_cls] = new_child
return new_root
def merge_trees_top_down(self, nodes, mutable_root, other_root):
self.merge(mutable_root.stats, other_root.stats)
for cls in other_root.children:
if cls not in mutable_root.children:
self.create_copy_of_tree_top_down(other_root.children[cls], nodes, mutable_root)
else:
self.merge_trees_top_down(nodes, mutable_root.children[cls], other_root.children[cls])
def get_stats_for_class(self, cls_nodes):
out = {}
for key, nodes in cls_nodes.items():
s = self.get_null_vector()
for node in nodes:
self.merge(s, node.stats)
out[key] = s
return out
def create_new_tree_from_nodes(self, nodes):
new_nodes = {}
for node in nodes:
nodes_upwards = self.get_nodes_upwards(node)
self.create_copy_of_path(nodes_upwards[1:], new_nodes, node.stats)
#self.remove_infrequent_nodes(new_nodes)
cls_nodes = defaultdict(list)
for new_node in new_nodes.values():
cls_nodes[new_node.cls].append(new_node)
return cls_nodes
def remove_infrequent_nodes(self, new_nodes):
keys = list(new_nodes.keys())
for key in keys:
node = new_nodes[key]
if node.stats["size"] < self.minSupp:
del new_nodes[key]
def create_copy_of_path(self, nodes, new_nodes, stats):
parent = None
for node in reversed(nodes):
if node.id not in new_nodes:
new_node = self.GP_node(node.cls, node.id, parent, {}, stats.copy())
new_nodes[node.id] = new_node
else:
new_node = new_nodes[node.id]
self.merge(new_node.stats, stats)
if parent is not None:
parent.children[new_node.cls] = new_node
parent = new_node
def get_nodes_upwards(self, node):
ref = node
path = []
while True:
path.append(ref)
ref = ref.parent
if ref is None:
break
return path
def to_file(self, task, path):
task.qf.calculate_constant_statistics(task)
self.depth = task.depth
selectors_sorted, arrs = self.prepare_selectors(task.search_space)
# Create tree
root = self.GP_node(-1, -1, None, {}, self.get_null_vector())
nodes = []
with open(path, 'w') as f:
for row_index, row in self.tqdm(enumerate(arrs), 'creating tree', total=len(arrs)):
#print(np.nonzero(row)[0])
f.write(" ".join(map(str, np.nonzero(row)[0])) + " "+ task.qf.gp_to_str(self.get_stats(row_index))+"\r\n")
if __name__ == '__main__':
from pysubgroup.tests.DataSets import get_credit_data
from pysubgroup import model_target
data = get_credit_data()
#warnings.filterwarnings("error")
print(data.columns)
searchSpace_Nominal = ps.create_nominal_selectors(data, ignore=['duration', 'credit_amount'])
searchSpace_Numeric = ps.create_numeric_selectors(data, ignore=['duration', 'credit_amount'])
searchSpace = searchSpace_Nominal + searchSpace_Numeric
target = ps.FITarget()
#QF=model_target.EMM_Likelihood(model_target.PolyRegression_ModelClass(x_name='duration', y_name='credit_amount'))
QF=ps.CountQF()
task = ps.SubgroupDiscoveryTask(data, target, searchSpace, result_set_size=200, depth=4, qf=QF)
GpGrowth(mode='b_u').to_file(task,'E:/tmp/gp_credit.txt')
import time
start_time = time.time()
gp = GpGrowth(mode='b_u').execute(task)
print("--- %s seconds ---" % (time.time() - start_time))
#gp = [(qual, sg) for qual, sg in gp if sg.depth <= task.depth]
gp = sorted(gp)
quit()
start_time = time.time()
dfs1 = ps.SimpleDFS().execute(task)
print("--- %s seconds ---" % (time.time() - start_time))
dfs = [(qual, sg.subgroup_description) for qual, sg in dfs1]
dfs = sorted(dfs, reverse=True)
gp = sorted(gp, reverse=True)
def better_sorted(l):
the_dict=defaultdict(list)
prev_key=l[0][0]
for key, val in l:
if abs(prev_key-key)<10**-11:
the_dict[prev_key].append(val)
else:
the_dict[key].append(val)
prev_key = key
print(len(the_dict))
result = []
for key, vals in the_dict.items():
for val in sorted(vals):
result.append((key, val))
return result
dfs = better_sorted(dfs)
gp = better_sorted(gp)
gp = gp[:task.result_set_size]
for i, (l, r) in enumerate(zip(gp, dfs)):
print(i)
print('gp:', l)
print('df:', r)
assert(abs(l[0]-r[0]) < 10 ** -7)
assert(l[1] == r[1])
'''
Created on 28.04.2016
@author: lemmerfn
'''
from abc import ABC, abstractmethod
from collections import namedtuple
from itertools import combinations
import numpy as np
import pysubgroup_mod as ps
class AbstractInterestingnessMeasure(ABC):
# pylint: disable=no-member
def ensure_statistics(self, subgroup, target, data, statistics=None):
if not self.has_constant_statistics:
self.calculate_constant_statistics(data, target)
if any(not hasattr(statistics, attr) for attr in self.required_stat_attrs):
if getattr(subgroup, 'statistics', False):
return subgroup.statistics
else:
return self.calculate_statistics(subgroup, target, data, statistics)
return statistics
# pylint: enable=no-member
#def optimistic_estimate_from_dataset(self, data, subgroup, weighting_attribute=None): #pylint: disable=unused-argument
# return float("inf")
class BoundedInterestingnessMeasure(AbstractInterestingnessMeasure):
pass
#@abstractmethod
#def optimistic_estimate_from_dataset(self, data, subgroup, weighting_attribute=None):
# pass
#####
# FIX ME: This is currently not working anymore
#####
class CombinedInterestingnessMeasure(BoundedInterestingnessMeasure):
def __init__(self, measures, weights=None):
self.measures = measures
if weights is None:
weights = [1] * len(measures)
assert len(weights) == len(measures)
self.weights = weights
def calculate_constant_statistics(self, data, target):
pass
def calculate_statistics(self, subgroup, target, data, cached_statistics=None):
pass
def evaluate(self, subgroup, target, data, statistics=None):
#FIX USE of constant statistics
return np.dot([m.evaluate(subgroup, target, data, None) for m in self.measures], self.weights)
def optimistic_estimate(self, subgroup, target, data, statistics=None):
# FIX USE of constant statistics
return np.dot([m.optimistic_estimate(subgroup, target, data, None) for m in self.measures], self.weights)
def evaluate_from_statistics(self, instances_dataset, positives_dataset, instances_subgroup, positives_subgroup):
return np.dot([m.evaluate_from_statistics(instances_dataset, positives_dataset, instances_subgroup, positives_subgroup) for m in self.measures], self.weights)
#def optimistic_estimate_from_statistics(self, instances_dataset, positives_dataset, instances_subgroup, positives_subgroup):
# return np.dot(
# [m.evaluate_from_statistics(instances_dataset, positives_dataset, instances_subgroup, positives_subgroup) for m in self.measures],
# self.weights)
##########
# Filter
##########
def unique_attributes(result_set, data):
result = []
used_attributes = []
for (q, sg) in result_set:
atts = sg.subgroup_description.get_attributes()
if atts not in used_attributes or all([ps.is_categorical_attribute(data, x) for x in atts]):
result.append((q, sg))
used_attributes.append(atts)
return result
def minimum_statistic_filter(result_set, statistic, minimum, data):
result = []
for (q, sg) in result_set:
if len(sg.statistics) == 0:
sg.calculate_statistics(data)
if sg.statistics[statistic] >= minimum:
result.append((q, sg))
return result
def minimum_quality_filter(result_set, minimum):
result = []
for (q, sg) in result_set:
if q >= minimum:
result.append((q, sg))
return result
def maximum_statistic_filter(result_set, statistic, maximum):
result = []
for (q, sg) in result_set:
if sg.statistics[statistic] <= maximum:
result.append((q, sg))
return result
def overlap_filter(result_set, data, similarity_level=0.9):
result = []
result_sgs = []
for (q, sg) in result_set:
if not overlaps_list(sg, result_sgs, data, similarity_level):
result_sgs.append(sg)
result.append((q, sg))
return result
def overlaps_list(sg, list_of_sgs, data, similarity_level=0.9):
for anotherSG in list_of_sgs:
if ps.overlap(sg, anotherSG, data) > similarity_level:
return True
return False
class CountCallsInterestingMeasure(BoundedInterestingnessMeasure):
def __init__(self, qf):
self.qf = qf
self.calls = 0
def calculate_statistics(self, sg, target, data, statistics=None):
self.calls += 1
return self.qf.calculate_statistics(sg, target, data, statistics)
def __getattr__(self, name):
return getattr(self.qf, name)
def __hasattr__(self, name):
return hasattr(self.qf, name)
#####
# GeneralizationAware Interestingness Measures
#####
class GeneralizationAwareQF(AbstractInterestingnessMeasure):
ga_tuple = namedtuple('ga_tuple', ['subgroup_quality', 'generalisation_quality'])
def __init__(self, qf):
self.qf = qf
# this cache maps the representation of descriptions to tuples
# the first entry is the quality and the second one is
# the largest quality of all its predessors
self.cache = {}
self.has_constant_statistics = False
self.required_stat_attrs = ['subgroup_quality', 'generalisation_quality']
self.q0 = 0
def calculate_constant_statistics(self, data, target):
self.cache = {}
self.qf.calculate_constant_statistics(data, target)
self.q0 = self.qf.evaluate(slice(None), target, data)
self.has_constant_statistics = self.qf.has_constant_statistics
def calculate_statistics(self, subgroup, target, data, statistics=None):
sg_repr = repr(subgroup)
if sg_repr in self.cache:
return GeneralizationAwareQF.ga_tuple(*self.cache[sg_repr])
else:
(q_sg, q_prev) = self.get_qual_and_previous_qual(subgroup, target, data)
self.cache[sg_repr] = (q_sg, q_prev)
return GeneralizationAwareQF.ga_tuple(q_sg, q_prev)
def get_qual_and_previous_qual(self, subgroup, target, data):
q_subgroup = self.qf.evaluate(subgroup, target, data)
max_q = 0
selectors = subgroup.selectors
if len(selectors) > 0:
# compute quality of all generalizations
generalizations = combinations(selectors, len(selectors)-1)
for sels in generalizations:
sgd = ps.Conjunction(list(sels))
(q_sg, q_prev) = self.calculate_statistics(sgd, target, data)
max_q = max(max_q, q_sg, q_prev)
return (q_subgroup, max_q)
def evaluate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
return statistics.subgroup_quality - statistics.generalisation_quality
#####
# GeneralizationAware Interestingness Measures
#####
class GeneralizationAwareQF_stats(AbstractInterestingnessMeasure):
ga_tuple = namedtuple('ga_stats_tuple', ['subgroup_stats', 'generalisation_stats'])
def __init__(self, qf):
self.qf = qf
# this cache maps the representation of descriptions to tuples
# the first entry is the quality and the second one is
# the largest quality of all its predecessors
self.cache = {}
self.has_constant_statistics = False
self.required_stat_attrs = GeneralizationAwareQF_stats.ga_tuple._fields
self.stats0 = None
def calculate_constant_statistics(self, data, target):
self.cache = {}
self.qf.calculate_constant_statistics(data, target)
self.stats0 = self.qf.calculate_statistics(slice(None), target, data)
self.has_constant_statistics = self.qf.has_constant_statistics
def calculate_statistics(self, subgroup, target, data, statistics=None):
sg_repr = repr(subgroup)
if sg_repr in self.cache:
return GeneralizationAwareQF_stats.ga_tuple(*self.cache[sg_repr])
else:
(stats_sg, stats_prev) = self.get_stats_and_previous_stats(subgroup, target, data)
self.cache[sg_repr] = (stats_sg, stats_prev)
return GeneralizationAwareQF_stats.ga_tuple(stats_sg, stats_prev)
def get_stats_and_previous_stats(self, subgroup, target, data):
stats_subgroup = self.qf.calculate_statistics(subgroup, target, data)
max_stats = self.stats0
selectors = subgroup.selectors
if len(selectors) > 0:
# compute quality of all generalizations
generalizations = combinations(selectors, len(selectors)-1)
for sels in generalizations:
sgd = ps.Conjunction(list(sels))
(stats_sg, stats_prev) = self.calculate_statistics(sgd, target, data)
max_stats = self.get_max(max_stats, stats_sg, stats_prev)
return (stats_subgroup, max_stats)
def evaluate(self, subgroup, statistics_or_data=None):
raise NotImplementedError
def get_max(self, *args):
raise NotImplementedError
\ No newline at end of file
from collections import namedtuple
from scipy.stats import norm
import numpy as np
import pysubgroup_mod as ps
beta_tuple = namedtuple('beta_tuple', ['beta', 'size'])
class EMM_Likelihood(ps.AbstractInterestingnessMeasure):
tpl = namedtuple('EMM_Likelihood', ['model_params', 'subgroup_likelihood', 'inverse_likelihood', 'size'])
def __init__(self, model):
self.model = model
self.has_constant_statistics = False
self.required_stat_attrs = EMM_Likelihood.tpl._fields
self.data_size = None
def calculate_constant_statistics(self, task):
self.model.calculate_constant_statistics(task)
self.data_size = len(task.data)
self.has_constant_statistics = True
def calculate_statistics(self, subgroup, data=None):
cover_arr, sg_size = ps.get_cover_array_and_size(subgroup, self.data_size, data)
params = self.model.fit(cover_arr, data)
return self.get_tuple(sg_size, params, cover_arr)
def get_tuple(self, sg_size, params, cover_arr):
#numeric stability?
all_likelihood = self.model.likelihood(params, np.ones(self.data_size, dtype=bool))
sg_likelihood_sum = np.sum(all_likelihood[cover_arr])
total_likelihood_sum = np.sum(all_likelihood)
dataset_average = np.nan
if (self.data_size - sg_size) > 0:
dataset_average = (total_likelihood_sum - sg_likelihood_sum)/(self.data_size - sg_size)
sg_average = np.nan
if sg_size > 0:
sg_average = sg_likelihood_sum/sg_size
return EMM_Likelihood.tpl(params, sg_average, dataset_average, sg_size)
def evaluate(self, subgroup, statistics=None):
statistics = self.ensure_statistics(subgroup, statistics)
#numeric stability?
return statistics.subgroup_likelihood - statistics.inverse_likelihood
def gp_get_params(self, cover_arr, v):
params = self.model.gp_get_params(v)
sg_size = params.size
return self.get_tuple(sg_size, params, cover_arr)
def supports_weights(self):
return False
def is_applicable(self, _):
return True
def __getattr__(self, name):
return getattr(self.model, name)
class PolyRegression_ModelClass:
def __init__(self, x_name='x', y_name='y', degree=1):
self.x_name = x_name
self.y_name = y_name
if degree != 1:
raise ValueError('Currently only degree == 1 is supported')
self.degree = degree
self.x = None
self.y = None
self.has_constant_statistics = True
super().__init__()
def calculate_constant_statistics(self, task):
data = task.data
self.x = data[self.x_name].to_numpy()
self.y = data[self.y_name].to_numpy()
self.has_constant_statistics = True
@staticmethod
def gp_merge(u, v):
v0 = v[0]
u0 = u[0]
if v0 == 0 or u0 == 0:
d = 0
else:
d = v0 * u0/(v0 + u0)*(v[1]/v0 - u[1]/u0)*(v[2]/v0 - u[2]/u0)
u += v
u[3] += d
def gp_get_null_vector(self):
return np.zeros(5)
def gp_get_stats(self, row_index):
x = self.x[row_index]
return np.array([1, x, self.y[row_index], 0, x*x])
def gp_get_params(self, v):
size = v[0]
if size < self.degree:
return beta_tuple(np.full(self.degree + 1, np.nan), size)
v1 = v[1]
slope = v[0] * v[3] / (v[0]*v[4] - v1 * v1)
intersept = v[2]/v[0] - slope * v[1]/v[0]
return beta_tuple(np.array([slope, intersept]), v[0])
def fit(self, subgroup, data=None):
cover_arr, size = ps.get_cover_array_and_size(subgroup, len(self.x), data)
if size <= self.degree + 1:
return beta_tuple(np.full(self.degree + 1, np.nan), size)
return beta_tuple(np.polyfit(self.x[cover_arr], self.y[cover_arr], deg=self.degree), size)
def likelihood(self, stats, sg):
if any(np.isnan(stats.beta)):
return np.full(self.x[sg].shape, np.nan)
return norm.pdf(np.polyval(stats.beta, self.x[sg]) - self.y[sg])
def loglikelihood(self, stats, sg):
return norm.logpdf(np.polyval(stats.beta, self.x[sg]) - self.y[sg])
'''
Created on 29.09.2017
@author: lemmerfn
'''
import numbers
from collections import namedtuple
from functools import total_ordering
import numpy as np
import pysubgroup_mod as ps
@total_ordering
class NumericTarget:
statistic_types = (
'size_sg', 'size_dataset', 'mean_sg', 'mean_dataset', 'std_sg', 'std_dataset', 'median_sg', 'median_dataset',
'max_sg', 'max_dataset', 'min_sg', 'min_dataset', 'mean_lift', 'median_lift')
def __init__(self, target_variable):
self.target_variable = target_variable
def __repr__(self):
return "T: " + str(self.target_variable)
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __lt__(self, other):
return str(self) < str(other)
def get_attributes(self):
return [self.target_variable]
def get_base_statistics(self, subgroup, data):
cover_arr, size_sg = ps.get_cover_array_and_size(subgroup, len(data), data)
all_target_values = data[self.target_variable]
sg_target_values = all_target_values[cover_arr]
instances_dataset = len(data)
instances_subgroup = size_sg
mean_sg = np.mean(sg_target_values)
mean_dataset = np.mean(all_target_values)
return (instances_dataset, mean_dataset, instances_subgroup, mean_sg)
def calculate_statistics(self, subgroup, data, cached_statistics=None):
if cached_statistics is None or not isinstance(cached_statistics, dict):
statistics = dict()
elif all(k in cached_statistics for k in NumericTarget.statistic_types):
return cached_statistics
else:
statistics = cached_statistics
cover_arr, _ = ps.get_cover_array_and_size(subgroup, len(data), data)
all_target_values = data[self.target_variable].to_numpy()
sg_target_values = all_target_values[cover_arr]
statistics['size_sg'] = len(sg_target_values)
statistics['size_dataset'] = len(data)
statistics['mean_sg'] = np.mean(sg_target_values)
statistics['mean_dataset'] = np.mean(all_target_values)
statistics['std_sg'] = np.std(sg_target_values)
statistics['std_dataset'] = np.std(all_target_values)
statistics['median_sg'] = np.median(sg_target_values)
statistics['median_dataset'] = np.median(all_target_values)
statistics['max_sg'] = np.max(sg_target_values)
statistics['max_dataset'] = np.max(all_target_values)
statistics['min_sg'] = np.min(sg_target_values)
statistics['min_dataset'] = np.min(all_target_values)
statistics['mean_lift'] = statistics['mean_sg'] / statistics['mean_dataset']
statistics['median_lift'] = statistics['median_sg'] / statistics['median_dataset']
return statistics
class StandardQFNumeric(ps.BoundedInterestingnessMeasure):
tpl = namedtuple('StandardQFNumeric_parameters', ('size_sg', 'mean', 'estimate'))
@staticmethod
def standard_qf_numeric(a, _, mean_dataset, instances_subgroup, mean_sg):
return instances_subgroup ** a * (mean_sg - mean_dataset)
def __init__(self, a, invert=False, estimator='sum'):
if not isinstance(a, numbers.Number):
raise ValueError(f'a is not a number. Received a={a}')
self.a = a
self.invert = invert
self.required_stat_attrs = ('size_sg', 'mean')
self.dataset_statistics = None
self.all_target_values = None
self.has_constant_statistics = False
if estimator == 'sum':
self.estimator = StandardQFNumeric.Summation_Estimator(self)
elif estimator == 'average':
self.estimator = StandardQFNumeric.Average_Estimator(self)
elif estimator == 'order':
self.estimator = StandardQFNumeric.Ordering_Estimator(self)
else:
raise ValueError('estimator is not one of the following: ' + str(['sum', 'average', 'order']))
def calculate_constant_statistics(self, data, target):
data = self.estimator.get_data(data, target)
self.all_target_values = data[target.target_variable].to_numpy()
target_mean = np.mean(self.all_target_values)
data_size = len(data)
self.dataset_statistics = StandardQFNumeric.tpl(data_size, target_mean, None)
self.estimator.calculate_constant_statistics(data, target)
self.has_constant_statistics = True
def evaluate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
dataset = self.dataset_statistics
return StandardQFNumeric.standard_qf_numeric(self.a, dataset.size_sg, dataset.mean, statistics.size_sg, statistics.mean)
def calculate_statistics(self, subgroup, target, data, statistics=None):
cover_arr, sg_size = ps.get_cover_array_and_size(subgroup, len(self.all_target_values), data)
sg_mean = np.array([0])
sg_target_values = 0
if sg_size > 0:
sg_target_values = self.all_target_values[cover_arr]
sg_mean = np.mean(sg_target_values)
estimate = self.estimator.get_estimate(subgroup, sg_size, sg_mean, cover_arr, sg_target_values)
else:
estimate = float('-inf')
return StandardQFNumeric.tpl(sg_size, sg_mean, estimate)
def optimistic_estimate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
return statistics.estimate
class Summation_Estimator:
def __init__(self, qf):
self.qf = qf
self.indices_greater_mean = None
self.target_values_greater_mean = None
def get_data(self, data, target):
return data
def calculate_constant_statistics(self, data, target): # pylint: disable=unused-argument
self.indices_greater_mean = self.qf.all_target_values > self.qf.dataset_statistics.mean
self.target_values_greater_mean = self.qf.all_target_values#[self.indices_greater_mean]
def get_estimate(self, subgroup, sg_size, sg_mean, cover_arr, _): # pylint: disable=unused-argument
larger_than_mean = self.target_values_greater_mean[cover_arr][self.indices_greater_mean[cover_arr]]
size_greater_mean = len(larger_than_mean)
sum_greater_mean = np.sum(larger_than_mean)
return sum_greater_mean - size_greater_mean * self.qf.dataset_statistics.mean
class Average_Estimator:
def __init__(self, qf):
self.qf = qf
self.indices_greater_mean = None
self.target_values_greater_mean = None
def get_data(self, data, target):
return data
def calculate_constant_statistics(self, data, target): # pylint: disable=unused-argument
self.indices_greater_mean = self.qf.all_target_values > self.qf.dataset_statistics.mean
self.target_values_greater_mean = self.qf.all_target_values
def get_estimate(self, subgroup, sg_size, sg_mean, cover_arr, _): # pylint: disable=unused-argument
larger_than_mean = self.target_values_greater_mean[cover_arr][self.indices_greater_mean[cover_arr]]
size_greater_mean = len(larger_than_mean)
max_greater_mean = np.sum(larger_than_mean)
return size_greater_mean ** self.qf.a * (max_greater_mean - self.qf.dataset_statistics.mean)
class Ordering_Estimator:
def __init__(self, qf):
self.qf = qf
self.indices_greater_mean = None
self._get_estimate = self.get_estimate_numpy
self.use_numba = True
self.numba_in_place = False
def get_data(self, data, target):
data.sort_values(target.get_attributes(), ascending=False, inplace=True)
return data
def calculate_constant_statistics(self, data, target):
if self.use_numba and not self.numba_in_place:
try:
from numba import njit # pylint: disable=unused-import, import-outside-toplevel
#print('StandardQf_Numeric: Using numba for speedup')
except ImportError:
return
@njit
def estimate_numba(values_sg, a, mean_dataset):
n = 1
sum_values = 0
max_value = -10 ** 10
for val in values_sg:
sum_values += val
mean_sg = sum_values / n
quality = n ** a * (mean_sg - mean_dataset)
if quality > max_value:
max_value = quality
n += 1
return max_value
self._get_estimate = estimate_numba
self.numba_in_place = True
def get_estimate(self, subgroup, sg_size, sg_mean, cover_arr, target_values_sg): # pylint: disable=unused-argument
if self.numba_in_place:
return self._get_estimate(target_values_sg, self.qf.a, self.qf.dataset_statistics.mean)
else:
return self._get_estimate(target_values_sg, self.qf.a, self.qf.dataset_statistics.mean)
def get_estimate_numpy(self, values_sg, _, mean_dataset):
target_values_cs = np.cumsum(values_sg)
sizes = np.arange(1, len(target_values_cs) + 1)
mean_values = target_values_cs / sizes
stats = StandardQFNumeric.tpl(sizes, mean_values, mean_dataset)
qualities = self.qf.evaluate(None, None, None, stats)
optimistic_estimate = np.max(qualities)
return optimistic_estimate
# TODO Update to new format
#class GAStandardQFNumeric(ps.AbstractInterestingnessMeasure):
# def __init__(self, a, invert=False):
# self.a = a
# self.invert = invert
#
# def evaluate_from_dataset(self, data, subgroup, weighting_attribute=None):
# (instances_dataset, _, instances_subgroup, mean_sg) = subgroup.get_base_statistics(data, weighting_attribute)
# if instances_subgroup in (0, instances_dataset):
# return 0
# max_mean = get_max_generalization_mean(data, subgroup, weighting_attribute)
# relative_size = (instances_subgroup / instances_dataset)
# return ps.conditional_invert(relative_size ** self.a * (mean_sg - max_mean), self.invert)
# def supports_weights(self):
# return True
# def is_applicable(self, subgroup):
# return isinstance(subgroup.target, NumericTarget)
#def get_max_generalization_mean(data, subgroup, weighting_attribute=None):
# selectors = subgroup.subgroup_description.selectors
# generalizations = ps.powerset(selectors)
# max_mean = 0
# for sels in generalizations:
# sg = ps.Subgroup(subgroup.target, ps.Conjunction(list(sels)))
# mean_sg = sg.get_base_statistics(data, weighting_attribute)[3]
# max_mean = max(max_mean, mean_sg)
# return max_mean
import pysubgroup_mod as ps
from collections import defaultdict
from itertools import chain
class RefinementOperator:
pass
class StaticSpecializationOperator:
def __init__(self, selectors):
search_space_dict = defaultdict(list)
for selector in selectors:
search_space_dict[selector.attribute_name].append(selector)
self.search_space = list(search_space_dict.values())
self.search_space_index = {key: i for i, key in enumerate(search_space_dict.keys())}
def refinements(self, subgroup):
if subgroup.depth > 0:
index_of_last = self.search_space_index[subgroup._selectors[-1].attribute_name]
new_selectors = chain.from_iterable(self.search_space[index_of_last + 1:])
else:
new_selectors = chain.from_iterable(self.search_space)
return (subgroup & sel for sel in new_selectors)
class StaticGeneralizationOperator:
def __init__(self, selectors):
self.search_space = selectors
def refinements(self, sG):
index_of_last_selector = min(self.search_space.index(sG._selectors[-1]), len(self.search_space) - 1)
new_selectors = self.search_space[index_of_last_selector + 1:]
return (sG | sel for sel in new_selectors)
import numpy as np
import pysubgroup_mod as ps
class RepresentationBase():
def __init__(self, new_conjunction, selectors_to_patch):
self._new_conjunction = new_conjunction
self.previous_conjunction = None
self.selectors_to_patch = selectors_to_patch
def patch_all_selectors(self):
for sel in self.selectors_to_patch:
self.patch_selector(sel)
def patch_selector(self, sel):
raise NotImplementedError
def patch_classes(self):
pass
def undo_patch_classes(self):
pass
def __enter__(self):
self.patch_classes()
self.patch_all_selectors()
return self
def __exit__(self, * args):
self.undo_patch_classes()
class BitSet_Conjunction(ps.Conjunction):
n_instances = 0
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.representation = self.compute_representation()
def compute_representation(self):
# empty description ==> return a list of all '1's
if not self._selectors:
return np.full(BitSet_Conjunction.n_instances, True, dtype=bool)
# non-empty description
return np.all([sel.representation for sel in self._selectors], axis=0)
@property
def size_sg(self):
return np.count_nonzero(self.representation)
def append_and(self, to_append):
super().append_and(to_append)
self.representation = np.logical_and(self.representation, to_append.representation)
@property
def __array_interface__(self):
return self.representation.__array_interface__
class BitSet_Disjunction(ps.Disjunction):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.representation = self.compute_representation()
def compute_representation(self):
# empty description ==> return a list of all '1's
if not self._selectors:
return np.full(BitSet_Conjunction.n_instances, False, dtype=bool)
# non-empty description
return np.any([sel.representation for sel in self._selectors], axis=0)
@property
def size_sg(self):
return np.count_nonzero(self.representation)
def append_or(self, to_append):
super().append_or(to_append)
self.representation = np.logical_or(self.representation, to_append.representation)
@property
def __array_interface__(self):
return self.representation.__array_interface__
class BitSetRepresentation(RepresentationBase):
Conjunction = BitSet_Conjunction
Disjunction = BitSet_Disjunction
def __init__(self, df, selectors_to_patch):
self.df = df
super().__init__(BitSet_Conjunction, selectors_to_patch)
def patch_selector(self, sel):
sel.representation = sel.covers(self.df)
sel.size_sg = np.count_nonzero(sel.representation)
def patch_classes(self):
BitSet_Conjunction.n_instances = len(self.df)
super().patch_classes()
class Set_Conjunction(ps.Conjunction):
all_set = set()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.representation = self.compute_representation()
self.arr_for_interface = np.array(list(self.representation), dtype=int)
def compute_representation(self):
# empty description ==> return a list of all '1's
if not self._selectors:
return Set_Conjunction.all_set
# non-empty description
return set.intersection(*[sel.representation for sel in self._selectors])
@property
def size_sg(self):
return len(self.representation)
#def __copy__(self):
# tmp = super().__copy__()
# tmp.representation = self.representation.copy()
# return tmp
def append_and(self, to_append):
super().append_and(to_append)
self.representation = self.representation.intersection(to_append.representation)
self.arr_for_interface = np.array(list(self.representation), dtype=int)
@property
def __array_interface__(self):
return self.arr_for_interface.__array_interface__ # pylint: disable=no-member
class SetRepresentation(RepresentationBase):
Conjunction = Set_Conjunction
def __init__(self, df, selectors_to_patch):
self.df = df
super().__init__(Set_Conjunction, selectors_to_patch)
def patch_selector(self, sel):
sel.representation = set(*np.nonzero(sel.covers(self.df)))
sel.size_sg = len(sel.representation)
def patch_classes(self):
Set_Conjunction.all_set = set(self.df.index)
super().patch_classes()
class NumpySet_Conjunction(ps.Conjunction):
all_set = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.representation = self.compute_representation()
def compute_representation(self):
# empty description ==> return a list of all '1's
if not self._selectors:
return NumpySet_Conjunction.all_set
start = self._selectors[0].representation
for sel in self._selectors[1:]:
start = np.intersect1d(start, sel.representation, assume_unique=True)
return start
@property
def size_sg(self):
return len(self.representation)
#def __copy__(self):
# tmp = super().__copy__()
# tmp.representation = self.representation.copy()
# return tmp
def append_and(self, to_append):
super().append_and(to_append)
#self._selectors.append(to_append)
self.representation = np.intersect1d(self.representation, to_append.representation, True)
@property
def __array_interface__(self):
return self.representation.__array_interface__
class NumpySetRepresentation(RepresentationBase):
Conjunction = NumpySet_Conjunction
def __init__(self, df, selectors_to_patch):
self.df = df
super().__init__(NumpySet_Conjunction, selectors_to_patch)
def patch_selector(self, sel):
sel.representation = np.nonzero(sel.covers(self.df))[0]
sel.size_sg = len(sel.representation)
def patch_classes(self):
NumpySet_Conjunction.all_set = np.arange(len(self.df))
super().patch_classes()
'''
Created on 28.04.2016
@author: lemmerfn
'''
from abc import ABC, abstractmethod
import weakref
from functools import total_ordering
import pandas as pd
import pysubgroup_mod as ps
from itertools import chain
import copy
import numpy as np
@total_ordering
class SelectorBase(ABC):
__refs__ = weakref.WeakSet()
def __new__(cls, *args, **kwargs):
tmp = super().__new__(cls)
tmp.set_descriptions(*args, **kwargs)
if tmp in SelectorBase.__refs__:
for ref in SelectorBase. __refs__:
if ref == tmp:
return ref
return tmp
def __init__(self):
SelectorBase.__refs__.add(self)
def __eq__(self, other):
if other is None:
return False
return repr(self) == repr(other)
def __lt__(self, other):
return repr(self) < repr(other)
def __hash__(self):
return self._hash #pylint: disable=no-member
@abstractmethod
def set_descriptions(self, *args, **kwargs):
pass
def get_cover_array_and_size(subgroup, data_len=None, data=None):
if hasattr(subgroup, "representation"):
cover_arr = subgroup
size = subgroup.size_sg
elif isinstance(subgroup, slice):
cover_arr = subgroup
if data_len is None:
if isinstance(data, pd.DataFrame):
data_len = len(data)
else:
raise ValueError("if you pass a slice, you need to pass either data_len or data")
# https://stackoverflow.com/questions/36188429/retrieve-length-of-slice-from-slice-object-in-python
size = len(range(*subgroup.indices(data_len)))
elif hasattr(subgroup, '__array_interface__'):
cover_arr = subgroup
type_char = subgroup.__array_interface__['typestr'][1]
if type_char == 'b': # boolean indexing is used
size = np.count_nonzero(cover_arr)
elif type_char == 'u' or type_char == 'i': # integer indexing
size = subgroup.__array_interface__['shape'][0]
else:
print(type_char)
raise NotImplementedError(f"Currently a typechar of {type_char} is not supported.")
else:
assert isinstance(data, pd.DataFrame)
cover_arr = subgroup.covers(data)
size = np.count_nonzero(cover_arr)
return cover_arr, size
def get_size(subgroup, data_len=None, data=None):
if hasattr(subgroup, "representation"):
size = subgroup.size_sg
elif isinstance(subgroup, slice):
if data_len is None:
if isinstance(data, pd.DataFrame):
data_len = len(data)
else:
raise ValueError("if you pass a slice, you need to pass either data_len or data")
# https://stackoverflow.com/questions/36188429/retrieve-length-of-slice-from-slice-object-in-python
size = len(range(*subgroup.indices(data_len)))
elif hasattr(subgroup, '__array_interface__'):
type_char = subgroup.__array_interface__['typestr'][1]
if type_char == 'b': # boolean indexing is used
size = np.count_nonzero(subgroup)
elif type_char == 'u' or type_char == 'i': # integer indexing
size = subgroup.__array_interface__['shape'][0]
else:
print(type_char)
raise NotImplementedError(f"Currently a typechar of {type_char} is not supported.")
else:
assert isinstance(data, pd.DataFrame)
size = np.count_nonzero(subgroup.covers(data))
return size
class EqualitySelector(SelectorBase):
def __init__(self, attribute_name, attribute_value, selector_name=None):
if attribute_name is None:
raise TypeError()
if attribute_value is None:
raise TypeError()
self._attribute_name = attribute_name
self._attribute_value = attribute_value
self._selector_name = selector_name
self.set_descriptions(self._attribute_name, self._attribute_value, self._selector_name)
super().__init__()
@property
def attribute_name(self):
return self._attribute_name
@property
def attribute_value(self):
return self._attribute_value
def set_descriptions(self, attribute_name, attribute_value, selector_name=None): # pylint: disable=arguments-differ
self._hash, self._query, self._string = EqualitySelector.compute_descriptions(attribute_name, attribute_value, selector_name=selector_name)
@classmethod
def compute_descriptions(cls, attribute_name, attribute_value, selector_name):
if isinstance(attribute_value, (str, bytes)):
query = str(attribute_name) + "==" + "'" + str(attribute_value) + "'"
elif np.isnan(attribute_value):
query = attribute_name + ".isnull()"
else:
query = str(attribute_name) + "==" + str(attribute_value)
if selector_name is not None:
string_ = selector_name
else:
string_ = query
hash_value = hash(query)
return (hash_value, query, string_)
def __repr__(self):
return self._query
def covers(self, data):
row = data[self.attribute_name].to_numpy()
if pd.isnull(self.attribute_value):
return pd.isnull(row)
return row == self.attribute_value
def __str__(self, open_brackets="", closing_brackets=""):
return open_brackets + self._string + closing_brackets
@property
def selectors(self):
return (self,)
class NegatedSelector(SelectorBase):
def __init__(self, selector):
self._selector = selector
self.set_descriptions(selector)
super().__init__()
def covers(self, data_instance):
return np.logical_not(self._selector.covers(data_instance))
def __repr__(self):
return self._query
def __str__(self, open_brackets="", closing_brackets=""):
return "NOT " + self._selector.__str__(open_brackets, closing_brackets)
def set_descriptions(self, selector): # pylint: disable=arguments-differ
self._query = "(not " + repr(selector) + ")"
self._hash = hash(repr(self))
@property
def attribute_name(self):
return self._selector.attribute_name
@property
def selectors(self):
return self._selector.selectors
# Including the lower bound, excluding the upper_bound
class IntervalSelector(SelectorBase):
def __init__(self, attribute_name, lower_bound, upper_bound, selector_name=None):
self._attribute_name = attribute_name
self._lower_bound = lower_bound
self._upper_bound = upper_bound
self.selector_name = selector_name
self.set_descriptions(attribute_name, lower_bound, upper_bound, selector_name)
super().__init__()
@property
def attribute_name(self):
return self._attribute_name
@property
def lower_bound(self):
return self._lower_bound
@property
def upper_bound(self):
return self._upper_bound
def covers(self, data_instance):
val = data_instance[self.attribute_name].to_numpy()
return np.logical_and((val >= self.lower_bound), (val < self.upper_bound))
def __repr__(self):
return self._query
def __hash__(self):
return self._hash
def __str__(self):
return self._string
@classmethod
def compute_descriptions(cls, attribute_name, lower_bound, upper_bound, selector_name=None):
if selector_name is None:
_string = cls.compute_string(attribute_name, lower_bound, upper_bound, rounding_digits=2)
else:
_string = selector_name
_query = cls.compute_string(attribute_name, lower_bound, upper_bound, rounding_digits=None)
_hash = _query.__hash__()
return (_hash, _query, _string)
def set_descriptions(self, attribute_name, lower_bound, upper_bound, selector_name=None): # pylint: disable=arguments-differ
self._hash, self._query, self._string = IntervalSelector.compute_descriptions(attribute_name, lower_bound, upper_bound, selector_name=selector_name)
@classmethod
def compute_string(cls, attribute_name, lower_bound, upper_bound, rounding_digits):
if rounding_digits is None:
formatter = "{}"
else:
formatter = "{0:." + str(rounding_digits) + "f}"
ub = upper_bound
lb = lower_bound
if ub % 1:
ub = formatter.format(ub)
if lb % 1:
lb = formatter.format(lb)
if lower_bound == float("-inf") and upper_bound == float("inf"):
repre = attribute_name + "= anything"
elif lower_bound == float("-inf"):
repre = attribute_name + "<" + str(ub)
elif upper_bound == float("inf"):
repre = attribute_name + ">=" + str(lb)
else:
repre = attribute_name + ": [" + str(lb) + ":" + str(ub) + "["
return repre
@property
def selectors(self):
return (self,)
def create_selectors(data, nbins=5, intervals_only=True, ignore=None):
if ignore is None:
ignore = []
sels = create_nominal_selectors(data, ignore)
sels.extend(create_numeric_selectors(data, nbins, intervals_only, ignore=ignore))
return sels
def create_nominal_selectors(data, ignore=None):
if ignore is None:
ignore = []
nominal_selectors = []
# for attr_name in [x for x in data.select_dtypes(exclude=['number']).columns.values if x not in ignore]:
# nominal_selectors.extend(create_nominal_selectors_for_attribute(data, attr_name))
nominal_dtypes = data.select_dtypes(exclude=['number'])
dtypes = data.dtypes
# print(dtypes)
for attr_name in [x for x in nominal_dtypes.columns.values if x not in ignore]:
nominal_selectors.extend(create_nominal_selectors_for_attribute(data, attr_name, dtypes))
return nominal_selectors
def create_nominal_selectors_for_attribute(data, attribute_name, dtypes=None):
nominal_selectors = []
for val in pd.unique(data[attribute_name]):
nominal_selectors.append(EqualitySelector(attribute_name, val))
# setting the is_bool flag for selector
if dtypes is None:
dtypes = data.dtypes
if dtypes[attribute_name] == 'bool':
for s in nominal_selectors:
s.is_bool = True
return nominal_selectors
def create_numeric_selectors(data, nbins=5, intervals_only=True, weighting_attribute=None, ignore=None):
if ignore is None:
ignore = []
numeric_selectors = []
for attr_name in [x for x in data.select_dtypes(include=['number']).columns.values if x not in ignore]:
numeric_selectors.extend(create_numeric_selectors_for_attribute(
data, attr_name, nbins, intervals_only, weighting_attribute))
return numeric_selectors
def create_numeric_selectors_for_attribute(data, attr_name, nbins=5, intervals_only=True, weighting_attribute=None):
numeric_selectors = []
data_not_null = data[data[attr_name].notnull()]
uniqueValues = np.unique(data_not_null[attr_name])
if len(data_not_null.index) < len(data.index):
numeric_selectors.append(EqualitySelector(attr_name, np.nan))
if len(uniqueValues) <= nbins:
for val in uniqueValues:
numeric_selectors.append(EqualitySelector(attr_name, val))
else:
cutpoints = ps.equal_frequency_discretization(data, attr_name, nbins, weighting_attribute)
if intervals_only:
old_cutpoint = float("-inf")
for c in cutpoints:
numeric_selectors.append(IntervalSelector(attr_name, old_cutpoint, c))
old_cutpoint = c
numeric_selectors.append(IntervalSelector(attr_name, old_cutpoint, float("inf")))
else:
for c in cutpoints:
numeric_selectors.append(IntervalSelector(attr_name, c, float("inf")))
numeric_selectors.append(IntervalSelector(attr_name, float("-inf"), c))
return numeric_selectors
def remove_target_attributes(selectors, target):
result = []
for sel in selectors:
if not sel.get_attribute_name() in target.get_attributes():
result.append(sel)
return result
##############
# Boolean expressions
##############
class BooleanExpressionBase(ABC):
def __or__(self, other):
tmp = copy.copy(self)
tmp.append_or(other)
return tmp
def __and__(self, other):
tmp = self.__copy__()
tmp.append_and(other)
return tmp
@abstractmethod
def append_and(self, to_append):
pass
@abstractmethod
def append_or(self, to_append):
pass
@abstractmethod
def __copy__(self):
pass
@total_ordering
class Conjunction(BooleanExpressionBase):
def __init__(self, selectors,mode):
try:
it = iter(selectors)
self._selectors = list(it)
self.mode = mode
except TypeError:
self._selectors = [selectors]
def covers(self, instance):
# empty description ==> return a list of all '1's
if not self._selectors:
return np.full(len(instance), True, dtype=bool)
# non-empty description
return np.all([sel.covers(instance) for sel in self._selectors], axis=0)
def __len__(self):
return len(self._selectors)
def __str__(self, open_brackets="", closing_brackets="", and_term=" AND "):
if not self._selectors:
return "Dataset"
if self.mode == 2:
attrs = sorted(str(sel) for sel in self._selectors)
else:
attrs = [str(sel) for sel in self._selectors]
return "".join((open_brackets, and_term.join(attrs), closing_brackets))
def __repr__(self):
if hasattr(self, "_repr"):
return self._repr
else:
self._repr = self._compute_repr()
return self._repr
def __eq__(self, other):
return repr(self) == repr(other)
def __lt__(self, other):
return repr(self) < repr(other)
def __hash__(self):
if hasattr(self, "_hash"):
return self._hash
else:
self._hash = self._compute_hash()
return self._hash
def _compute_representations(self):
self._repr = self._compute_repr()
self._hash = self._compute_hash()
def _compute_repr(self):
if not self._selectors:
return "True"
if self.mode == 2:
reprs = sorted(repr(sel) for sel in self._selectors)
else:
reprs = [repr(sel) for sel in self._selectors]
return "".join(("(", " and ".join(reprs), ")"))
def _compute_hash(self):
return hash(repr(self))
def _invalidate_representations(self):
if hasattr(self, '_repr'):
delattr(self, '_repr')
if hasattr(self, '_hash'):
delattr(self, '_hash')
def append_and(self, to_append):
if isinstance(to_append, SelectorBase):
self._selectors.append(to_append)
elif isinstance(to_append, Conjunction):
self._selectors.extend(to_append.selectors)
else:
try:
self._selectors.extend(to_append)
except TypeError:
self._selectors.append(to_append)
self._invalidate_representations()
def append_or(self, to_append):
raise RuntimeError("Or operations are not supported by a pure Conjunction. Consider using DNF.")
def pop_and(self):
return self._selectors.pop()
def pop_or(self):
raise RuntimeError("Or operations are not supported by a pure Conjunction. Consider using DNF.")
def __copy__(self):
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
result._selectors = list(self._selectors)
return result
@property
def depth(self):
return len(self._selectors)
@property
def selectors(self):
return tuple(chain.from_iterable(sel.selectors for sel in self._selectors))
@total_ordering
class Disjunction(BooleanExpressionBase):
def __init__(self, selectors):
if isinstance(selectors, (list, tuple)):
self._selectors = selectors
else:
self._selectors = [selectors]
def covers(self, instance):
# empty description ==> return a list of all '1's
if not self._selectors:
return np.full(len(instance), False, dtype=bool)
# non-empty description
return np.any([sel.covers(instance) for sel in self._selectors], axis=0)
def __len__(self):
return len(self._selectors)
def __str__(self, open_brackets="", closing_brackets="", or_term=" OR "):
if not self._selectors:
return "Dataset"
attrs = sorted(str(sel) for sel in self._selectors)
return "".join((open_brackets, or_term.join(attrs), closing_brackets))
def __repr__(self):
if not self._selectors:
return "True"
reprs = sorted(repr(sel) for sel in self._selectors)
return "".join(("(", " or ".join(reprs), ")"))
def __eq__(self, other):
return repr(self) == repr(other)
def __lt__(self, other):
return repr(self) < repr(other)
def __hash__(self):
return hash(repr(self))
def append_and(self, to_append):
raise RuntimeError("And operations are not supported by a pure Conjunction. Consider using DNF.")
def append_or(self, to_append):
try:
self._selectors.extend(to_append)
except TypeError:
self._selectors.append(to_append)
def __copy__(self):
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
result._selectors = copy.copy(self._selectors)
return result
@property
def selectors(self):
return tuple(chain.from_iterable(sel.selectors for sel in self._selectors))
class DNF(Disjunction):
def __init__(self, selectors=None):
if selectors is None:
selectors = []
super().__init__([])
self.append_or(selectors)
@staticmethod
def _ensure_pure_conjunction(to_append):
if isinstance(to_append, Conjunction):
return to_append
elif isinstance(to_append, SelectorBase):
return Conjunction(to_append)
else:
it = iter(to_append)
if all(isinstance(sel, SelectorBase) for sel in to_append):
return Conjunction(it)
else:
raise ValueError("DNFs only accept an iterable of pure Selectors")
def append_or(self, to_append):
try:
it = iter(to_append)
conjunctions = [DNF._ensure_pure_conjunction(part) for part in it]
except TypeError:
conjunctions = DNF._ensure_pure_conjunction(to_append)
super().append_or(conjunctions)
def append_and(self, to_append):
conj = DNF._ensure_pure_conjunction(to_append)
if len(self._selectors) > 0:
for conjunction in self._selectors:
conjunction.append_and(conj)
else:
self._selectors.append(conj)
def pop_and(self):
out_list = [s.pop_and() for s in self._selectors]
return_val = out_list[0]
if all(x == return_val for x in out_list):
return return_val
else:
raise RuntimeError("pop_and failed as the result was inconsistent")
\ No newline at end of file
import itertools
from functools import partial
from heapq import heappush, heappop
from collections.abc import Iterable
import math
import numpy as np
import pandas as pd
import pysubgroup_mod as ps
from math import sqrt
# Function that calculate entrophy
def calculate_entriopia(x):
if x in [0.0,1.0]:
return 0
return -x*math.log(x,2) - (1-x)*math.log(1-x,2)
# Function that calculate information gained for a subgroup
def calculate_info_gained(ID,IS,PD,PS):
a = ID - IS
b = PD - PS
x1 = IS / ID
x2 = a / ID
p1 = PD / ID
p2 = PS / IS
p3 = 0
if a != 0:
p3 = b / a
return calculate_entriopia(p1) - (x1)*calculate_entriopia(p2) - (x2)*calculate_entriopia(p3)
# Function that calculate odd value for a subgroup
def calculate_odd_value(ID,IS,PD,PS):
b = IS - PS
c = PD - PS
d = (ID - PD) - b
if b == 0 or c == 0 or d == 0:
odd_value = 100
else:
odd_value = (PS*d) / (b*c)
return odd_value
# Function that calculate and optimal threshold based on standard deviation for a given values list.
def threshold(info_list,depth,mode):
if len(np.unique(info_list)) == 1:
return list(info_list)[0]
n = len(info_list)
c1 = np.sum(np.square(info_list))
c2 = np.sum(info_list)
a = n*c1
b = c2**2
c = n*(n-1)
s = sqrt((a-b)/c)
if depth > 0: # Parameter used to check if algorithm is in the first iteration (Beams: length == 1).
if mode == 1: ## Parameter used to check if user wants dynamic threshold or max value.
s = max(info_list)
else:
s = max(info_list) - s
return s
def calculate_threshold(l1,l2,depth,mode):
if len(l1) == 0:
thr = threshold([elem[3][-1] for elem in l2],depth,mode)
return [elem for elem in l2 if elem[3][-1] >= thr]
thr = threshold([elem[3][-1] for elem in l1],depth,mode)
return [elem for elem in l1 if elem[3][-1] >= thr]
def best_complex(elem,mode,filter_vars):
#OR: <1.68, 1.68 - 3.47, 3.47 - 6.71, >6.71
#l_quali, sg, l_stats, l_info, l_odd, idx
## Nos quedamos con las etiquetas de los items del grupo. Ej: "estadioini==IV" -> "estadioini"
group_labels = [item.attribute_name for item in elem[1]._selectors]
lenghts = [i for i in range(1,len(group_labels)+1)]
selectors = list(elem[1]._selectors)
odd_list = [(4,odd) if odd > 6.71 else (3,odd) if 3.47 < odd <= 6.71 else (2,odd) if 1.68 < odd <= 3.47 else (1,odd) for odd in elem[4]]
l3 = [(i,v,g,l,p) for i,v,g,l,p in zip(elem[3],odd_list,group_labels,lenghts,elem[6])]
## Check si el grupo contiene PrimTratCon & estadioini
if all(var in group_labels for var in filter_vars):
l3 = [cand for idx,cand in enumerate(l3[1:],2) if(all(var in group_labels[:idx] for var in filter_vars))]
## Calculamos umbral threshold
thr = threshold([it[0] for it in l3],len(l3),0)
## Lista con los candidatos a corte por encima del threshold info_gain and pvalue
x_filter = [cand for cand in l3 if cand[0] >= thr and cand[4] <= 0.05]
#x_filter = [cand for cand in l3 if cand[0] >= thr]
if len(x_filter) == 0:
return None
if len(x_filter) == 1:
index = group_labels.index(x_filter[0][2])
sg = ps.Conjunction(selectors[:index+1],mode)
tup = (elem[0][index],sg,elem[2][index],elem[3][index],elem[4][index],elem[5],elem[6][index])
return tup
return_cand = x_filter[0]
for idx, cand in enumerate(x_filter[1:],start=1):
if cand[1][0] > return_cand[1][0]: # If candidate upgrades odd range, it is selected as new return_cand
return_cand = cand
# If candidate has an odd range lower than return_cand odd range, the algorithm stops.
# Also, if candidate is not consecutive and his odd range does not improve the return_cand odd range, the algorithm stops.
elif (cand[1][0] == return_cand[1][0] and cand[3] > x_filter[idx-1][3] + 1) or (cand[1][0] < return_cand[1][0]):
break
#if return_cand[1][1] == 100:
if return_cand[1][0] == 4: # Also, if return_cand has maximum odd range (odd value > 6.71), algorithm stops and returns return_cand.
break
index = group_labels.index(return_cand[2])
sg = ps.Conjunction(selectors[:index+1],mode)
tup = (elem[0][index],sg,elem[2][index],elem[3][index],elem[4][index],elem[5],elem[6][index])
return tup
def add_if_required(result, sg, quality, task, check_for_duplicates=False, statistics=None):
if quality > task.min_quality:
p_value = ps.ChiSquaredQF(stat="p").evaluate(sg, task.target, task.data, statistics) # Calculate pvalue stat.
if not ps.constraints_satisfied(task.constraints, sg, statistics, task.data):
return
if check_for_duplicates and (quality, p_value, sg, statistics) in result:
return
if len(result) < task.result_set_size:
heappush(result, (quality, p_value, sg, statistics))
elif quality > result[0][0]:
heappop(result)
heappush(result, (quality, p_value, sg, statistics))
def minimum_required_quality(result, task):
if len(result) < task.result_set_size:
return task.min_quality
else:
return result[0][0]
# Returns the cutpoints for discretization
def equal_frequency_discretization(data, attribute_name, nbins=5, weighting_attribute=None):
cutpoints = []
if weighting_attribute is None:
cleaned_data = data[attribute_name]
cleaned_data = cleaned_data[~np.isnan(cleaned_data)]
sorted_data = sorted(cleaned_data)
number_instances = len(sorted_data)
for i in range(1, nbins):
position = i * number_instances // nbins
while True:
if position >= number_instances:
break
val = sorted_data[position]
if val not in cutpoints:
break
position += 1
# print (sorted_data [position])
if val not in cutpoints:
cutpoints.append(val)
else:
cleaned_data = data[[attribute_name, weighting_attribute]]
cleaned_data = cleaned_data[~np.isnan(cleaned_data[attribute_name])]
cleaned_data.sort(order=attribute_name)
overall_weights = cleaned_data[weighting_attribute].sum()
remaining_weights = overall_weights
bin_size = overall_weights / nbins
sum_of_weights = 0
for row in cleaned_data:
sum_of_weights += row[weighting_attribute]
if sum_of_weights > bin_size:
if not row[attribute_name] in cutpoints:
cutpoints.append(row[attribute_name])
remaining_weights = remaining_weights - sum_of_weights
if remaining_weights < 1.5 * (bin_size):
break
sum_of_weights = 0
return cutpoints
def conditional_invert(val, invert):
return - 2 * (invert - 0.5) * val
def results_df_autoround(df):
return df.round({
'quality': 3,
'size_sg': 0,
'size_dataset': 0,
'positives_sg': 0,
'positives_dataset': 0,
'size_complement': 0,
'relative_size_sg': 3,
'relative_size_complement': 3,
'coverage_sg': 3,
'coverage_complement': 3,
'target_share_sg': 3,
'target_share_complement': 3,
'target_share_dataset': 3,
'lift': 3,
'size_sg_weighted': 1,
'size_dataset_weighted': 1,
'positives_sg_weighted': 1,
'positives_dataset_weighted': 1,
'size_complement_weighted': 1,
'relative_size_sg_weighted': 3,
'relative_size_complement_weighted': 3,
'coverage_sg_weighted': 3,
'coverage_complement_weighted': 3,
'target_share_sg_weighted': 3,
'target_share_complement_weighted': 3,
'target_share_dataset_weighted': 3,
'lift_weighted': 3})
def perc_formatter(x):
return "{0:.1f}%".format(x * 100)
def float_formatter(x, digits=2):
return ("{0:." + str(digits) + "f}").format(x)
def is_categorical_attribute(data, attribute_name):
return attribute_name in data.select_dtypes(exclude=['number']).columns.values
def is_numerical_attribute(data, attribute_name):
return attribute_name in data.select_dtypes(include=['number']).columns.values
def remove_selectors_with_attributes(selector_list, attribute_list):
return [x for x in selector_list if x.attributeName not in attribute_list]
def effective_sample_size(weights):
return sum(weights) ** 2 / sum(weights ** 2)
# from https://docs.python.org/3/library/itertools.html#recipes
def powerset(iterable, max_length=None):
"powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
s = list(iterable)
if max_length is None:
max_length = len(s)
if max_length < len(s):
max_length = len(s)
return itertools.chain.from_iterable(itertools.combinations(s, r) for r in range(max_length))
def overlap(sg, another_sg, data):
cover_sg = sg.covers(data)
cover_another_sg = another_sg.covers(data)
union = np.logical_or(cover_sg, cover_another_sg)
intercept = np.logical_and(cover_sg, cover_another_sg)
sim = np.sum(intercept) / np.sum(union)
return sim
#####
# bitset operations
#####
def to_bits(list_of_ints):
v = 0
for x in list_of_ints:
v += 1 << x
return v
def count_bits(bitset_as_int):
c = 0
while bitset_as_int > 0:
c += 1
bitset_as_int &= bitset_as_int - 1
return c
def find_set_bits(bitset_as_int):
while bitset_as_int > 0:
x = bitset_as_int.bit_length() - 1
yield x
bitset_as_int = bitset_as_int - (1 << x)
#####
# TID-list operations
#####
def intersect_of_ordered_list(list_1, list_2):
result = []
i = 0
j = 0
while i < len(list_1) and j < len(list_2):
if list_1[i] < list_2[j]:
i += 1
elif list_2[j] < list_1[i]:
j += 1
else:
result.append(list_1[i])
j += 1
i += 1
return result
class SubgroupDiscoveryResult:
def __init__(self, results, task):
self.task = task
self.results = results
assert isinstance(results, Iterable)
def to_descriptions(self):
return [(qual, sgd) for qual, sgd, stats in self.results]
def to_table(self, statistics_to_show=None, print_header=True, include_target=False, mode=2):
if statistics_to_show is None:
statistics_to_show = type(self.task.target).statistic_types
table = []
if print_header:
row = ["quality", "subgroup"]
for stat in statistics_to_show:
row.append(stat)
if mode != 2:
row.append("pvalue")
table.append(row)
if mode !=2:
for (q, sg, stats,_,_,_,p_value) in self.results:
stats = self.task.target.calculate_statistics(sg, self.task.data, stats)
row = [str(q), str(sg)]
if include_target:
row.append(str(self.task.target))
for stat in statistics_to_show:
row.append(str(stats[stat]))
row.append(str(p_value))
table.append(row)
else:
for (q, sg, stats) in self.results:
stats = self.task.target.calculate_statistics(sg, self.task.data, stats)
row = [str(q), str(sg)]
if include_target:
row.append(str(self.task.target))
for stat in statistics_to_show:
row.append(str(stats[stat]))
return table
def to_dataframe(self, statistics_to_show=None, autoround=False, include_target=False, mode=2):
if statistics_to_show is None:
statistics_to_show = type(self.task.target).statistic_types
res = self.to_table(statistics_to_show, True, include_target, mode)
headers = res.pop(0)
df = pd.DataFrame(res, columns=headers, dtype=np.float64)
if autoround:
df = results_df_autoround(df)
return df
def to_latex(self, statistics_to_show=None):
if statistics_to_show is None:
statistics_to_show = type(self.task.target).statistic_types
df = self.to_dataframe(statistics_to_show)
latex = df.to_latex(index=False, col_space=10, formatters={
'quality': partial(float_formatter, digits=3),
'size_sg': partial(float_formatter, digits=0),
'size_dataset': partial(float_formatter, digits=0),
'positives_sg': partial(float_formatter, digits=0),
'positives_dataset': partial(float_formatter, digits=0),
'size_complement': partial(float_formatter, digits=0),
'relative_size_sg': perc_formatter,
'relative_size_complement': perc_formatter,
'coverage_sg': perc_formatter,
'coverage_complement': perc_formatter,
'target_share_sg': perc_formatter,
'target_share_complement': perc_formatter,
'target_share_dataset': perc_formatter,
'lift': partial(float_formatter, digits=1)})
latex = latex.replace(' AND ', r' $\wedge$ ')
return latex
from functools import partial
import pandas as pd
import numpy as np
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.spatial.distance import squareform
from matplotlib import pyplot as plt
import pysubgroup_mod as ps
def plot_sgbars(result_df, _, ylabel="target share", title="Discovered Subgroups", dynamic_widths=False, _suffix=""):
shares_sg = result_df["target_share_sg"]
shares_compl = result_df["target_share_complement"]
sg_relative_sizes = result_df["relative_size_sg"]
x = np.arange(len(result_df))
base_width = 0.8
if dynamic_widths:
width_sg = 0.02 + base_width * sg_relative_sizes
width_compl = base_width - width_sg
else:
width_sg = base_width / 2
width_compl = base_width / 2
fig, ax = plt.subplots()
rects1 = ax.bar(x, shares_sg, width_sg, align='edge')
rects2 = ax.bar(x + width_sg, shares_compl, width_compl, align='edge', color='#61b76f')
ax.set_ylabel(ylabel)
ax.set_title(title)
ax.set_xticks(x + base_width / 2)
ax.set_xticklabels(result_df.index, rotation=90)
ax.legend((rects1[0], rects2[0]), ('subgroup', 'complement'))
fig.set_size_inches(12, len(result_df))
return fig
def plot_roc(result_df, data, qf=ps.StandardQF(0.5), levels=40, annotate=False):
instances_dataset = len(data)
positives_dataset = np.max(result_df['positives_dataset'])
negatives_dataset = instances_dataset - positives_dataset
xlist = np.linspace(0.01, 0.99, 100)
ylist = np.linspace(0.01, 0.99, 100)
X, Y = np.meshgrid(xlist, ylist)
f = np.vectorize(partial(qf.evaluate, instances_dataset, positives_dataset), otypes=[np.float])
Z = f(X * negatives_dataset + Y * positives_dataset, Y * positives_dataset)
max_val = np.max([np.max(Z), -np.min(Z)])
fig, ax = plt.subplots()
cm = plt.cm.get_cmap("bwr")
plt.contourf(X, Y, Z, levels, cmap=cm, vmin=-max_val, vmax=max_val)
for i, sg in result_df.iterrows():
rel_positives_sg = sg['positives_sg'] / positives_dataset
rel_negatives_sg = (sg['size_sg'] - sg['positives_sg']) / negatives_dataset
ax.plot(rel_negatives_sg, rel_positives_sg, 'ro', color='black')
if annotate:
label_margin = 0.01
ax.annotate(str(i), (rel_negatives_sg + label_margin, rel_positives_sg + label_margin))
# plt.colorbar(cp)
plt.title('Discovered subgroups')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
return fig
def plot_npspace(result_df, data, annotate=True, fixed_limits=False):
fig, ax = plt.subplots()
for i, sg in result_df.iterrows():
target_share_sg = sg['target_share_sg']
size_sg = sg['size_sg']
ax.plot(size_sg, target_share_sg, 'ro', color='black')
if annotate:
ax.annotate(str(i), (size_sg + 5, target_share_sg + 0.001))
if fixed_limits:
plt.xlim((0, len(data)))
plt.ylim((0, 1))
plt.title('Discovered subgroups')
plt.xlabel('Size of Subgroup')
plt.ylabel('Target Share Subgroup')
return fig
def plot_distribution_numeric(sg, data, bins):
fig, _ = plt.subplots()
target_values_sg = data[sg.covers(data)][sg.target.get_attributes()].values
target_values_data = data[sg.target.get_attributes()].values
plt.hist(target_values_sg, bins, alpha=0.5, label=str(sg.subgroup_description), density=True)
plt.hist(target_values_data, bins, alpha=0.5, label="Overall Data", density=True)
plt.legend(loc='upper right')
return fig
def compare_distributions_numeric(sgs, data, bins):
fig, _ = plt.subplots()
for sg in sgs:
target_values_sg = data[sg.covers(data)][sg.target.get_attributes()].values
plt.hist(target_values_sg, bins, alpha=0.3, label=str(sg.subgroup_description), density=True)
plt.legend(loc='upper right')
return fig
def similarity_sgs(sgd_results, data, color=True):
sgs = [x[1] for x in sgd_results]
#sgNames = [str(sg.subgroup_description) for sg in sgs]
dists = [[ps.overlap(sg, sg2, data) for sg2 in sgs] for sg in sgs]
dist_df = pd.DataFrame(dists)
if color:
dist_df = dist_df.style.background_gradient()
return dist_df
def similarity_dendrogram(result, data):
fig, _ = plt.subplots()
dist_df = similarity_sgs(result, data, color=False)
mat = 1 - dist_df.values
dists = squareform(mat)
linkage_matrix = linkage(dists, "single")
dendrogram(linkage_matrix, labels=dist_df.index)
return fig
def supportSetVisualization(result, in_order=True, drop_empty=True):
df = result.task.data
n_items = len(result.task.data)
n_SGDs = len(result.results)
covs = np.zeros((n_items, n_SGDs), dtype=bool)
for i, (_, r, _) in enumerate(result.to_subgroups):
covs[:, i] = r.covers(df)
img_arr = covs.copy()
sort_inds_x = np.argsort(np.sum(covs, axis=1))[::-1]
img_arr = img_arr[sort_inds_x, :]
if not in_order:
sort_inds_y = np.argsort(np.sum(covs, axis=0))
img_arr = img_arr[:, sort_inds_y]
if drop_empty:
keep_entities = np.sum(img_arr, axis=1) > 0
print("Discarding {} entities that are not covered".format(n_items - np.count_nonzero(keep_entities)))
img_arr = img_arr[keep_entities, :]
return img_arr.T
IGSD
This repository contains the material refering to the paper: "", it contains:
1. datasets: Directory in which datasets to be used in the algortihm are stored.
2. results: Directory in which the algortihm will store the results produced.
3. pysubgroup_mod: The project code.
1. IGSD Project Scripts
Contains the scripts of IGSD and other algorithms such as BeamSearch, DFS, BestFirstSearch, etc. Moreover, main.py is the
principal script file which will launch the specific algorithm.
The main.py file required several arguments to be used, so the following command line will execute the python file:
py --dataname <FILE> --class_column <CLASS_COLUMN> --class_value <CLASS_VALUE> --mode <MODE> --depth <DEPTH> --list_ignore <LIST_IGNORE> --list_conds <LIST_CONDS>
With:
- <FILE>: The name of the dataset input file.
- <CLASS_COLUMN>: The attribute (column) used as target (studied class).
- <CLASS_VALUE>: The value of <CLASS_COLUMN> that we want to analize.
- <MODE>: The mode that IGSD will employ to perfom the analysis when IG threshold is calculated (dynamic, maximum). If you want to employ another algorithm, the default value is used.
- <DEPTH>: The number of attributes that the algortihms will consider.
- <LIST_IGNORE>: A list with the attributes (columns) of the dataset that the user does not want to be consider in the anaylis.
- <LIST_CONDS>: A list with the attributes (columns) of the dataset that the user wants to be present in the patterns obtained.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment