measures.py 8.91 KB
Newer Older
aarongitrepos's avatar
All  
aarongitrepos committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
'''
Created on 28.04.2016

@author: lemmerfn
'''
from abc import ABC, abstractmethod
from collections import namedtuple
from itertools import combinations
import numpy as np
import pysubgroup_mod as ps


class AbstractInterestingnessMeasure(ABC):

    # pylint: disable=no-member
    def ensure_statistics(self, subgroup, target, data, statistics=None):
        if not self.has_constant_statistics:
            self.calculate_constant_statistics(data, target)
        if any(not hasattr(statistics, attr) for attr in self.required_stat_attrs):
            if getattr(subgroup, 'statistics', False):
                return subgroup.statistics
            else:
                return self.calculate_statistics(subgroup, target, data, statistics)
        return statistics
    # pylint: enable=no-member
    #def optimistic_estimate_from_dataset(self, data, subgroup, weighting_attribute=None): #pylint: disable=unused-argument
    #    return float("inf")


class BoundedInterestingnessMeasure(AbstractInterestingnessMeasure):
    pass
    #@abstractmethod
    #def optimistic_estimate_from_dataset(self, data, subgroup, weighting_attribute=None):
    #    pass



#####
# FIX ME: This is currently not working anymore
#####
class CombinedInterestingnessMeasure(BoundedInterestingnessMeasure):
    def __init__(self, measures, weights=None):
        self.measures = measures

        if weights is None:
            weights = [1] * len(measures)
        assert len(weights) == len(measures)
        self.weights = weights

    def calculate_constant_statistics(self, data, target):
        pass

    def calculate_statistics(self, subgroup, target, data, cached_statistics=None):
        pass

    def evaluate(self, subgroup, target, data, statistics=None):
        #FIX USE of constant statistics
        return np.dot([m.evaluate(subgroup, target, data, None) for m in self.measures], self.weights)

    def optimistic_estimate(self, subgroup, target, data, statistics=None):
        # FIX USE of constant statistics
        return np.dot([m.optimistic_estimate(subgroup, target, data, None) for m in self.measures], self.weights)

    def evaluate_from_statistics(self, instances_dataset, positives_dataset, instances_subgroup, positives_subgroup):
        return np.dot([m.evaluate_from_statistics(instances_dataset, positives_dataset, instances_subgroup, positives_subgroup) for m in self.measures], self.weights)

    #def optimistic_estimate_from_statistics(self, instances_dataset, positives_dataset, instances_subgroup, positives_subgroup):
    #    return np.dot(
    #        [m.evaluate_from_statistics(instances_dataset, positives_dataset, instances_subgroup, positives_subgroup) for m in self.measures],
    #        self.weights)


##########
# Filter
##########
def unique_attributes(result_set, data):
    result = []
    used_attributes = []
    for (q, sg) in result_set:
        atts = sg.subgroup_description.get_attributes()
        if atts not in used_attributes or all([ps.is_categorical_attribute(data, x) for x in atts]):
            result.append((q, sg))
            used_attributes.append(atts)
    return result


def minimum_statistic_filter(result_set, statistic, minimum, data):
    result = []
    for (q, sg) in result_set:
        if len(sg.statistics) == 0:
            sg.calculate_statistics(data)
        if sg.statistics[statistic] >= minimum:
            result.append((q, sg))
    return result


def minimum_quality_filter(result_set, minimum):
    result = []
    for (q, sg) in result_set:
        if q >= minimum:
            result.append((q, sg))
    return result


def maximum_statistic_filter(result_set, statistic, maximum):
    result = []
    for (q, sg) in result_set:
        if sg.statistics[statistic] <= maximum:
            result.append((q, sg))
    return result


def overlap_filter(result_set, data, similarity_level=0.9):
    result = []
    result_sgs = []
    for (q, sg) in result_set:
        if not overlaps_list(sg, result_sgs, data, similarity_level):
            result_sgs.append(sg)
            result.append((q, sg))
    return result


def overlaps_list(sg, list_of_sgs, data, similarity_level=0.9):
    for anotherSG in list_of_sgs:
        if ps.overlap(sg, anotherSG, data) > similarity_level:
            return True
    return False


class CountCallsInterestingMeasure(BoundedInterestingnessMeasure):
    def __init__(self, qf):
        self.qf = qf
        self.calls = 0

    def calculate_statistics(self, sg, target, data, statistics=None):
        self.calls += 1
        return self.qf.calculate_statistics(sg, target, data, statistics)

    def __getattr__(self, name):
        return getattr(self.qf, name)

    def __hasattr__(self, name):
        return hasattr(self.qf, name)


#####
# GeneralizationAware Interestingness Measures
#####
class GeneralizationAwareQF(AbstractInterestingnessMeasure):
    ga_tuple = namedtuple('ga_tuple', ['subgroup_quality', 'generalisation_quality'])
    def __init__(self, qf):
        self.qf = qf

        # this cache maps the representation of descriptions to tuples
        # the first entry is the quality and the second one is
        # the largest quality of all its predessors
        self.cache = {}
        self.has_constant_statistics = False
        self.required_stat_attrs = ['subgroup_quality', 'generalisation_quality']
        self.q0 = 0

    def calculate_constant_statistics(self, data, target):
        self.cache = {}
        self.qf.calculate_constant_statistics(data, target)
        self.q0 = self.qf.evaluate(slice(None), target, data)
        self.has_constant_statistics = self.qf.has_constant_statistics

    def calculate_statistics(self, subgroup, target, data, statistics=None):
        sg_repr = repr(subgroup)
        if sg_repr in self.cache:
            return GeneralizationAwareQF.ga_tuple(*self.cache[sg_repr])
        else:
            (q_sg, q_prev) = self.get_qual_and_previous_qual(subgroup, target, data)
            self.cache[sg_repr] = (q_sg, q_prev)
            return GeneralizationAwareQF.ga_tuple(q_sg, q_prev)

    def get_qual_and_previous_qual(self, subgroup, target, data):
        q_subgroup = self.qf.evaluate(subgroup, target, data)
        max_q = 0
        selectors = subgroup.selectors
        if len(selectors) > 0:
            # compute quality of all generalizations
            generalizations = combinations(selectors, len(selectors)-1)

            for sels in generalizations:
                sgd = ps.Conjunction(list(sels))
                (q_sg, q_prev) = self.calculate_statistics(sgd, target, data)
                max_q = max(max_q, q_sg, q_prev)
        return (q_subgroup, max_q)

    def evaluate(self, subgroup, target, data, statistics=None):
        statistics = self.ensure_statistics(subgroup, target, data, statistics)
        return statistics.subgroup_quality - statistics.generalisation_quality


#####
# GeneralizationAware Interestingness Measures
#####
class GeneralizationAwareQF_stats(AbstractInterestingnessMeasure):
    ga_tuple = namedtuple('ga_stats_tuple', ['subgroup_stats', 'generalisation_stats'])
    def __init__(self, qf):
        self.qf = qf

        # this cache maps the representation of descriptions to tuples
        # the first entry is the quality and the second one is
        # the largest quality of all its predecessors
        self.cache = {}
        self.has_constant_statistics = False
        self.required_stat_attrs = GeneralizationAwareQF_stats.ga_tuple._fields
        self.stats0 = None

    def calculate_constant_statistics(self, data, target):
        self.cache = {}
        self.qf.calculate_constant_statistics(data, target)
        self.stats0 = self.qf.calculate_statistics(slice(None), target, data)
        self.has_constant_statistics = self.qf.has_constant_statistics

    def calculate_statistics(self, subgroup, target, data, statistics=None):
        sg_repr = repr(subgroup)
        if sg_repr in self.cache:
            return GeneralizationAwareQF_stats.ga_tuple(*self.cache[sg_repr])
        else:
            (stats_sg, stats_prev) = self.get_stats_and_previous_stats(subgroup, target, data)
            self.cache[sg_repr] = (stats_sg, stats_prev)
            return GeneralizationAwareQF_stats.ga_tuple(stats_sg, stats_prev)

    def get_stats_and_previous_stats(self, subgroup, target, data):
        stats_subgroup = self.qf.calculate_statistics(subgroup, target, data)
        max_stats = self.stats0
        selectors = subgroup.selectors
        if len(selectors) > 0:
            # compute quality of all generalizations
            generalizations = combinations(selectors, len(selectors)-1)

            for sels in generalizations:
                sgd = ps.Conjunction(list(sels))
                (stats_sg, stats_prev) = self.calculate_statistics(sgd, target, data)
                max_stats = self.get_max(max_stats, stats_sg, stats_prev)
        return (stats_subgroup, max_stats)

    def evaluate(self, subgroup, statistics_or_data=None):
        raise NotImplementedError

    def get_max(self, *args):
        raise NotImplementedError