model_target.py 4.14 KB
Newer Older
aarongitrepos's avatar
All  
aarongitrepos committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
from collections import namedtuple
from scipy.stats import norm
import numpy as np
import pysubgroup_mod as ps
beta_tuple = namedtuple('beta_tuple', ['beta', 'size'])


class EMM_Likelihood(ps.AbstractInterestingnessMeasure):
    tpl = namedtuple('EMM_Likelihood', ['model_params', 'subgroup_likelihood', 'inverse_likelihood', 'size'])
    def __init__(self, model):
        self.model = model
        self.has_constant_statistics = False
        self.required_stat_attrs = EMM_Likelihood.tpl._fields
        self.data_size = None

    def calculate_constant_statistics(self, task):
        self.model.calculate_constant_statistics(task)
        self.data_size = len(task.data)
        self.has_constant_statistics = True

    def calculate_statistics(self, subgroup, data=None):
        cover_arr, sg_size = ps.get_cover_array_and_size(subgroup, self.data_size, data)

        params = self.model.fit(cover_arr, data)
        return self.get_tuple(sg_size, params, cover_arr)

    def get_tuple(self, sg_size, params, cover_arr):
        #numeric stability?
        all_likelihood = self.model.likelihood(params, np.ones(self.data_size, dtype=bool))
        sg_likelihood_sum = np.sum(all_likelihood[cover_arr])
        total_likelihood_sum = np.sum(all_likelihood)
        dataset_average = np.nan
        if (self.data_size - sg_size) > 0:
            dataset_average = (total_likelihood_sum - sg_likelihood_sum)/(self.data_size - sg_size)
        sg_average = np.nan
        if sg_size > 0:
            sg_average = sg_likelihood_sum/sg_size
        return EMM_Likelihood.tpl(params, sg_average, dataset_average, sg_size)

    def evaluate(self, subgroup, statistics=None):
        statistics = self.ensure_statistics(subgroup, statistics)
        #numeric stability?
        return statistics.subgroup_likelihood - statistics.inverse_likelihood

    def gp_get_params(self, cover_arr, v):
        params = self.model.gp_get_params(v)
        sg_size = params.size
        return self.get_tuple(sg_size, params, cover_arr)


    def supports_weights(self):
        return False

    def is_applicable(self, _):
        return True

    def __getattr__(self, name):
        return getattr(self.model, name)

class PolyRegression_ModelClass:
    def __init__(self, x_name='x', y_name='y', degree=1):
        self.x_name = x_name
        self.y_name = y_name
        if degree != 1:
            raise ValueError('Currently only degree == 1 is supported')
        self.degree = degree
        self.x = None
        self.y = None
        self.has_constant_statistics = True
        super().__init__()

    def calculate_constant_statistics(self, task):
        data = task.data
        self.x = data[self.x_name].to_numpy()
        self.y = data[self.y_name].to_numpy()
        self.has_constant_statistics = True

    @staticmethod
    def gp_merge(u, v):
        v0 = v[0]
        u0 = u[0]
        if v0 == 0 or u0 == 0:
            d = 0
        else:
            d = v0 * u0/(v0 + u0)*(v[1]/v0 - u[1]/u0)*(v[2]/v0 - u[2]/u0)
        u += v
        u[3] += d

    def gp_get_null_vector(self):
        return np.zeros(5)

    def gp_get_stats(self, row_index):
        x = self.x[row_index]
        return np.array([1, x, self.y[row_index], 0, x*x])

    def gp_get_params(self, v):
        size = v[0]
        if size < self.degree:
            return beta_tuple(np.full(self.degree + 1, np.nan), size)
        v1 = v[1]
        slope = v[0] * v[3] / (v[0]*v[4] - v1 * v1)
        intersept = v[2]/v[0] - slope * v[1]/v[0]
        return beta_tuple(np.array([slope, intersept]), v[0])

    def fit(self, subgroup, data=None):
        cover_arr, size = ps.get_cover_array_and_size(subgroup, len(self.x), data)
        if size <= self.degree + 1:
            return beta_tuple(np.full(self.degree + 1, np.nan), size)
        return beta_tuple(np.polyfit(self.x[cover_arr], self.y[cover_arr], deg=self.degree), size)

    def likelihood(self, stats, sg):
        if any(np.isnan(stats.beta)):
            return np.full(self.x[sg].shape, np.nan)
        return norm.pdf(np.polyval(stats.beta, self.x[sg]) - self.y[sg])

    def loglikelihood(self, stats, sg):
        return norm.logpdf(np.polyval(stats.beta, self.x[sg]) - self.y[sg])