binary_target.py 14.6 KB
Newer Older
aarongitrepos's avatar
All  
aarongitrepos committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
'''
Created on 29.09.2017

@author: lemmerfn
'''
from collections import namedtuple
from functools import total_ordering
import numpy as np
import scipy.stats
from pysubgroup_mod import utils
import pysubgroup_mod as ps

from pysubgroup_mod.subgroup_description import EqualitySelector


@total_ordering
class BinaryTarget:

    statistic_types = ('size_sg', 'size_dataset', 'positives_sg', 'positives_dataset', 'size_complement',
                      'relative_size_sg', 'relative_size_complement', 'coverage_sg', 'coverage_complement',
                      'target_share_sg', 'target_share_complement', 'target_share_dataset', 'lift')

    def __init__(self, target_attribute=None, target_value=None, target_selector=None):
        """
        Creates a new target for the boolean model class (classic subgroup discovery).
        If target_attribute and target_value are given, the target_selector is computed using attribute and value
        """
        if target_attribute is not None and target_value is not None:
            if target_selector is not None:
                raise BaseException("BinaryTarget is to be constructed EITHER by a selector OR by attribute/value pair")
            target_selector = EqualitySelector(target_attribute, target_value)
        if target_selector is None:
            raise BaseException("No target selector given")
        self.target_selector = target_selector

    def __repr__(self):
        return "T: " + str(self.target_selector)

    def __eq__(self, other):
        return self.__dict__ == other.__dict__

    def __lt__(self, other):
        return str(self) < str(other)

    def covers(self, instance):
        return self.target_selector.covers(instance)

    def get_attributes(self):
        return [self.target_selector.get_attribute_name()]

    def get_base_statistics(self, subgroup, data):
        cover_arr, size_sg = ps.get_cover_array_and_size(subgroup, len(data), data)
        positives = self.covers(data)
        instances_subgroup = size_sg
        positives_dataset = np.sum(positives)
        instances_dataset = len(data)
        positives_subgroup = np.sum(positives[cover_arr])
        return instances_dataset, positives_dataset, instances_subgroup, positives_subgroup

    def calculate_statistics(self, subgroup, data, cached_statistics=None):
        if cached_statistics is None or not isinstance(cached_statistics, dict):
            statistics = dict()
        elif all(k in cached_statistics for k in BinaryTarget.statistic_types):
            return cached_statistics
        else:
            statistics = cached_statistics

        (instances_dataset, positives_dataset, instances_subgroup, positives_subgroup) = \
            self.get_base_statistics(subgroup, data)
        statistics['size_sg'] = instances_subgroup
        statistics['size_dataset'] = instances_dataset
        statistics['positives_sg'] = positives_subgroup
        statistics['positives_dataset'] = positives_dataset
        statistics['size_complement'] = instances_dataset - instances_subgroup
        statistics['relative_size_sg'] = instances_subgroup / instances_dataset
        statistics['relative_size_complement'] = (instances_dataset - instances_subgroup) / instances_dataset
        statistics['coverage_sg'] = positives_subgroup / positives_dataset
        statistics['coverage_complement'] = (positives_dataset - positives_subgroup) / positives_dataset
        statistics['target_share_sg'] = positives_subgroup / instances_subgroup
        statistics['target_share_complement'] = (positives_dataset - positives_subgroup) / (instances_dataset - instances_subgroup)
        statistics['target_share_dataset'] = positives_dataset / instances_dataset
        statistics['lift'] = statistics['target_share_sg'] / statistics['target_share_dataset']
        
        return statistics


class SimplePositivesQF(ps.AbstractInterestingnessMeasure):  # pylint: disable=abstract-method
    tpl = namedtuple('PositivesQF_parameters', ('size_sg', 'positives_count'))

    def __init__(self):
        self.dataset_statistics = None
        self.positives = None
        self.has_constant_statistics = False
        self.required_stat_attrs = ('size_sg', 'positives_count')

    def calculate_constant_statistics(self, data, target):
        assert isinstance(target, BinaryTarget)
        self.positives = target.covers(data)
        self.dataset_statistics = SimplePositivesQF.tpl(len(data), np.sum(self.positives))
        self.has_constant_statistics = True

    def calculate_statistics(self, subgroup, target, data, statistics=None):
        cover_arr, size_sg = ps.get_cover_array_and_size(subgroup, len(self.positives), data)
        return SimplePositivesQF.tpl(size_sg, np.count_nonzero(self.positives[cover_arr]))



# TODO Make ChiSquared useful for real nominal data not just binary
# TODO Introduce Enum for direction
# TODO Maybe it is possible to give a optimistic estimate for ChiSquared
class ChiSquaredQF(SimplePositivesQF):
    """
    ChiSquaredQF which test for statistical independence of a subgroup against it's complement

    ...

    """

    @staticmethod
    def chi_squared_qf(instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, min_instances=1, bidirect=True, direction_positive=True, index=0):
        """
        Performs chi2 test of statistical independence

        Test whether a subgroup is statistically independent from it's complement (see scipy.stats.chi2_contingency).


        Parameters
        ----------
        instances_dataset, positives_dataset, instances_subgroup, positives_subgroup : int
            counts of subgroup and dataset
        min_instances : int, optional
            number of required instances, if less -inf is returned for that subgroup
        bidirect : bool, optional
            If true both directions are considered interesting else direction_positive decides which direction is interesting
        direction_positive: bool, optional
            Only used if bidirect=False; specifies whether you are interested in positive (True) or negative deviations
        index : {0, 1}, optional
            decides whether the test statistic (0) or the p-value (1) should be used
        """
        
        if (instances_subgroup < min_instances) or ((instances_dataset - instances_subgroup) < min_instances):
        #if (instances_subgroup < min_instances):
            return float("-inf")
        negatives_subgroup = instances_subgroup - positives_subgroup # pylint: disable=bad-whitespace
        negatives_dataset = instances_dataset - positives_dataset # pylint: disable=bad-whitespace
        negatives_complement = negatives_dataset - negatives_subgroup
        positives_complement = positives_dataset - positives_subgroup
        val = scipy.stats.chi2_contingency([[positives_subgroup, positives_complement],
                                            [negatives_subgroup, negatives_complement]], correction=False)[index]
        if bidirect:
            return val
        p_subgroup = positives_subgroup / instances_subgroup
        p_dataset = positives_dataset / instances_dataset
        if direction_positive and p_subgroup > p_dataset:
            return val
        elif not direction_positive and p_subgroup < p_dataset:
            return val
        return -val

    @staticmethod
    def chi_squared_qf_weighted(subgroup, data, weighting_attribute, effective_sample_size=0, min_instances=5, ):
        (instancesDataset, positivesDataset, instancesSubgroup, positivesSubgroup) = subgroup.get_base_statistics(data, weighting_attribute)
        if (instancesSubgroup < min_instances) or ((instancesDataset - instancesSubgroup) < 5):
            return float("inf")
        if effective_sample_size == 0:
            effective_sample_size = ps.effective_sample_size(data[weighting_attribute])
        # p_subgroup = positivesSubgroup / instancesSubgroup
        # p_dataset = positivesDataset / instancesDataset

        negatives_subgroup = instancesSubgroup - positivesSubgroup
        negatives_dataset = instancesDataset - positivesDataset
        positives_complement = positivesDataset - positivesSubgroup
        negatives_complement = negatives_dataset - negatives_subgroup
        val = scipy.stats.chi2_contingency([[positivesSubgroup, positives_complement],
                                            [negatives_subgroup, negatives_complement]], correction=True)[0]
        return scipy.stats.chi2.sf(val * effective_sample_size / instancesDataset, 1)

    def __init__(self, direction='both', min_instances=1, stat='chi2'):
        """
        Parameters
        ----------
        direction : {'both', 'positive', 'negative'}
            direction of deviation that is of interest
        min_instances : int, optional
            number of required instances, if less -inf is returned for that subgroup
        stat : {'chi2', 'p'}
            whether to report the test statistic or the p-value (see scipy.stats.chi2_contingency)
        """
        if direction == 'both':
            self.bidirect = True
            self.direction_positive = True
        if direction == 'positive':
            self.bidirect = False
            self.direction_positive = True
        if direction == 'negative':
            self.bidirect = False
            self.direction_positive = False
        self.min_instances = min_instances
        self.index = {'chi2' : 0, 'p': 1}[stat]
        super().__init__()

    def evaluate(self, subgroup, target, data, statistics=None):
        statistics = self.ensure_statistics(subgroup, target, data, statistics)
        dataset = self.dataset_statistics
        return ChiSquaredQF.chi_squared_qf(dataset.size_sg, dataset.positives_count, statistics.size_sg, statistics.positives_count, self.min_instances, self.bidirect, self.direction_positive, self.index)


class StandardQF(SimplePositivesQF, ps.BoundedInterestingnessMeasure):
    """
    StandardQF which weights the relative size against the difference in averages

    The StandardQF is a general form of quality function which for different values of a is order equivalen to
    many popular quality measures.

    Attributes
    ----------
    a : float
        used as an exponent to scale the relative size to the difference in averages

    """

    @staticmethod
    def standard_qf(subg,a, instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, measures):
        
        if not hasattr(instances_subgroup, '__array_interface__') and (instances_subgroup == 0):
            return np.nan, np.nan, np.nan
        p_subgroup = np.divide(positives_subgroup, instances_subgroup)
        #if instances_subgroup == 0:
        #    return 0
        #p_subgroup = positives_subgroup / instances_subgroup
        p_dataset = positives_dataset / instances_dataset
        if measures is True:
            return (instances_subgroup / instances_dataset) ** a * (p_subgroup - p_dataset), utils.calculate_info_gained(instances_dataset,instances_subgroup,positives_dataset,positives_subgroup), utils.calculate_odd_value(instances_dataset,instances_subgroup,positives_dataset,positives_subgroup)
        return (instances_subgroup / instances_dataset) ** a * (p_subgroup - p_dataset)
    def __init__(self, a):
        """
        Parameters
        ----------
        a : float
            exponent to trade-off the relative size with the difference in means
        """
        self.a = a
        super().__init__()

    def evaluate(self, subgroup, target, data, statistics=None, measures=False):
        statistics = self.ensure_statistics(subgroup, target, data, statistics)
        dataset = self.dataset_statistics
        return StandardQF.standard_qf(subgroup,self.a, dataset.size_sg, dataset.positives_count, statistics.size_sg, statistics.positives_count,measures)

    def optimistic_estimate(self, subgroup, target, data, statistics=None):
        statistics = self.ensure_statistics(subgroup, target, data, statistics)
        dataset = self.dataset_statistics
        return StandardQF.standard_qf(subgroup,self.a, dataset.size_sg, dataset.positives_count, statistics.positives_count, statistics.positives_count)

    def optimistic_generalisation(self, subgroup, target, data, statistics=None):
        statistics = self.ensure_statistics(subgroup, target, data, statistics)
        dataset = self.dataset_statistics
        pos_remaining = dataset.positives_count - statistics.positives_count
        return StandardQF.standard_qf(subgroup,self.a, dataset.size_sg, dataset.positives_count, statistics.size_sg + pos_remaining, dataset.positives_count)


class LiftQF(StandardQF):
    """
    Lift Quality Function

    LiftQF is a StandardQF with a=0.
    Thus it treats the difference in ratios as the quality without caring about the relative size of a subgroup.
    """

    def __init__(self):
        """
        """

        super().__init__(0.0)



# TODO add true binomial quality function as in https://opus.bibliothek.uni-wuerzburg.de/opus4-wuerzburg/frontdoor/index/index/docId/1786
class SimpleBinomialQF(StandardQF):
    """
    Simple Binomial Quality Function

    SimpleBinomialQF is a StandardQF with a=0.5.
    It is an order equivalent approximation of the full binomial test if the subgroup size is much smaller than the size of the entire dataset.
    """

    def __init__(self):
        """
        """

        super().__init__(0.5)


class WRAccQF(StandardQF):
    """
    Weighted Relative Accuracy Quality Function

    WRAccQF is a StandardQF with a=1.
    It is order equivalent to the difference in the observed and expected number of positive instances.
    """

    def __init__(self):
        """
        """

        super().__init__(1.0)


#####
# GeneralizationAware Interestingness Measures
#####
class GeneralizationAware_StandardQF(ps.GeneralizationAwareQF_stats):
    def __init__(self, a):
        super().__init__(StandardQF(0))
        self.a = a

    def get_max(self, *args):
        max_ratio = 0.0
        max_stats = None
        for stat in args:
            if stat.size_sg > 0:
                ratio = stat.positives_count / stat.size_sg
                if ratio > max_ratio:
                    max_ratio = ratio
                    max_stats = stat
        return max_stats

    def evaluate(self, subgroup, target, data, statistics=None):
        statistics = self.ensure_statistics(subgroup, target, data, statistics)
        sg_stats = statistics.subgroup_stats
        general_stats = statistics.generalisation_stats
        if sg_stats.size_sg == 0 or general_stats.size_sg == 0:
            return np.nan

        sg_ratio = sg_stats.positives_count / sg_stats.size_sg
        general_ratio = general_stats.positives_count / general_stats.size_sg
        return (sg_stats.size_sg / self.stats0.size_sg) ** self.a * (sg_ratio - general_ratio)