""" Metric generation for each tuned model. Done in a different script for perfomance and clarity purposes. """ # Libraries # -------------------------------------------------------------------------------------------------------- import pandas as pd import numpy as np from xgboost import XGBClassifier from sklearn.metrics import confusion_matrix from sklearn.metrics import f1_score, make_scorer, precision_score, recall_score, accuracy_score, roc_auc_score, average_precision_score from sklearn.ensemble import RandomForestClassifier, BaggingClassifier, AdaBoostClassifier from sklearn.neural_network import MLPClassifier from sklearn.svm import SVC from sklearn.linear_model import LogisticRegression from sklearn.tree import DecisionTreeClassifier from sklearn.model_selection import StratifiedKFold, cross_validate from sklearn.metrics import RocCurveDisplay, auc from sklearn.metrics import PrecisionRecallDisplay, precision_recall_curve import matplotlib.pyplot as plt import ast # String to dictionary # -------------------------------------------------------------------------------------------------------- # Function to read training datasets # -------------------------------------------------------------------------------------------------------- def read_data(): # Load ORIGINAL training data X_train_pre = np.load('../gen_train_data/data/output/pre/X_train_pre.npy', allow_pickle=True) y_train_pre = np.load('../gen_train_data/data/output/pre/y_train_pre.npy', allow_pickle=True) X_train_post = np.load('../gen_train_data/data/output/post/X_train_post.npy', allow_pickle=True) y_train_post = np.load('../gen_train_data/data/output/post/y_train_post.npy', allow_pickle=True) # Load oversampled training data X_train_over_pre = np.load('../gen_train_data/data/output/pre/X_train_over_pre.npy', allow_pickle=True) y_train_over_pre = np.load('../gen_train_data/data/output/pre/y_train_over_pre.npy', allow_pickle=True) X_train_over_post = np.load('../gen_train_data/data/output/post/X_train_over_post.npy', allow_pickle=True) y_train_over_post = np.load('../gen_train_data/data/output/post/y_train_over_post.npy', allow_pickle=True) # Load undersampled training data X_train_under_pre = np.load('../gen_train_data/data/output/pre/X_train_under_pre.npy', allow_pickle=True) y_train_under_pre = np.load('../gen_train_data/data/output/pre/y_train_under_pre.npy', allow_pickle=True) X_train_under_post = np.load('../gen_train_data/data/output/post/X_train_under_post.npy', allow_pickle=True) y_train_under_post = np.load('../gen_train_data/data/output/post/y_train_under_post.npy', allow_pickle=True) data_dic = { "X_train_pre": X_train_pre, "y_train_pre": y_train_pre, "X_train_post": X_train_post, "y_train_post": y_train_post, "X_train_over_pre": X_train_over_pre, "y_train_over_pre": y_train_over_pre, "X_train_over_post": X_train_over_post, "y_train_over_post": y_train_over_post, "X_train_under_pre": X_train_under_pre, "y_train_under_pre": y_train_under_pre, "X_train_under_post": X_train_under_post, "y_train_under_post": y_train_under_post, } return data_dic # -------------------------------------------------------------------------------------------------------- # Returning tuned models for each situation # -------------------------------------------------------------------------------------------------------- def get_tuned_models(group_str, method_str): # Read sheet corresponding to group and method with tuned models and their hyperparam tuned_models_df = pd.read_excel("./output_hyperparam/hyperparamers.xlsx",sheet_name=f"{group_str}_{method_str}") # Mapping from model abbreviations to sklearn model classes model_mapping = { 'DT': DecisionTreeClassifier, 'RF': RandomForestClassifier, 'Bagging': BaggingClassifier, 'AB': AdaBoostClassifier, 'XGB': XGBClassifier, 'LR': LogisticRegression, 'SVM': SVC, 'MLP': MLPClassifier } tuned_models = {} # Iterate through each row of the DataFrame for _, row in tuned_models_df.iterrows(): model_name = row.iloc[0] # Read dictionary parameters = ast.literal_eval(row['Best Parameters']) # Add extra parameters if model_name == 'AB': parameters['algorithm'] = 'SAMME' elif model_name == 'LR': parameters['max_iter'] = 1000 elif model_name == 'SVM': parameters['max_iter'] = 1000 parameters['probability'] = True elif model_name == "MLP": parameters['max_iter'] = 500 # Add class_weight argument for cost-sensitive learning method if 'CW' in method_str: if model_name == 'Bagging' or model_name == 'AB': parameters['estimator'] = DecisionTreeClassifier(class_weight='balanced') else: parameters['class_weight'] = 'balanced' # Fetch class model_class = model_mapping[model_name] # Initialize model tuned_models[model_name] = model_class(**parameters) return tuned_models # -------------------------------------------------------------------------------------------------------- # Scorers # -------------------------------------------------------------------------------------------------------- def TN_scorer(clf, X, y): """Gives the number of samples predicted as true negatives""" y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) TN = cm[0,0] return TN def FN_scorer(clf, X, y): """Gives the number of samples predicted as false negatives""" y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) FN = cm[0,1] return FN def FP_scorer(clf, X, y): """Gives the number of samples predicted as false positive""" y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) FP = cm[1,0] return FP def TP_scorer(clf, X, y): """Gives the number of samples predicted as true positive""" y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) TP = cm[1,1] return TP def negative_recall_scorer(clf, X, y): """Gives the negative recall defined as the (number of true_negative_samples)/(total number of negative samples)""" y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) TN_prop = cm[0,0]/(cm[0,1]+cm[0,0]) return TN_prop # -------------------------------------------------------------------------------------------------------- if __name__ == "__main__": # Setup # -------------------------------------------------------------------------------------------------------- # Reading training data data_dic = read_data() # Scorings to use for cv metric generation scorings = { 'F1':make_scorer(f1_score), 'PREC':make_scorer(precision_score), 'REC':make_scorer(recall_score), 'ACC': make_scorer(accuracy_score), 'NREC': negative_recall_scorer, 'TN':TN_scorer, 'FN':FN_scorer, 'FP':FP_scorer, 'TP':TP_scorer, 'AUROC': make_scorer(roc_auc_score), 'AUPRC': make_scorer(average_precision_score) } method_names = { 0: "ORIG", 1: "ORIG_CW", 2: "OVER", 3: "UNDER" } # Defining cross-validation protocol cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=1) # -------------------------------------------------------------------------------------------------------- # Metric generation through cv for tuned models3 # -------------------------------------------------------------------------------------------------------- scores_sheets = {} # To store score dfs as sheets in the same excel file for i, group in enumerate(['pre', 'post']): # 'post' for j, method in enumerate(['', 'over_', 'under_']): # Get train dataset based on group and method X_train = data_dic['X_train_' + method + group] y_train = data_dic['y_train_' + method + group] # Get tuned models for this group and method models = get_tuned_models(group, method_names[j]) # Scores df -> one column per cv split, one row for each model-metric scores_df = pd.DataFrame(columns=range(1,11), index=[f"{model_name}_{metric_name}" for model_name in models.keys() for metric_name in scorings.keys()]) # Create a figure for all models in this group-method fig, axes = plt.subplots(len(models), 2, figsize=(10, 8 * len(models))) if len(models) == 1: # Adjustment if there's only one model (axes indexing issue) axes = [axes] # Metric generation for each model for model_idx, (model_name, model) in enumerate(models.items()): print(f"{group}-{method_names[j]}-{model_name}") # # Retrieve cv scores for our metrics of interest # scores = cross_validate(model, X_train, y_train, scoring=scorings, cv=cv, return_train_score=True, n_jobs=10) # # Save results of each fold # for metric_name in scorings.keys(): # scores_df.loc[model_name + f'_{metric_name}']=list(np.around(np.array(scores[f"test_{metric_name}"]),4)) mean_fpr = np.linspace(0, 1, 100) tprs, aucs = [], [] mean_recall = np.linspace(0, 1, 100) precisions, pr_aucs = [], [] cmap = plt.get_cmap('tab10') # Colormap # Initialize storage for scores for each fold fold_scores = {metric_name: [] for metric_name in scorings.keys()} # Loop through each fold in the cross-validation for fold_idx, (train_idx, test_idx) in enumerate(cv.split(X_train, y_train)): X_train_fold, X_test_fold = X_train[train_idx], X_train[test_idx] y_train_fold, y_test_fold = y_train[train_idx], y_train[test_idx] # Fit the model on the training data model.fit(X_train_fold, y_train_fold) # Predict on the test data if hasattr(model, "decision_function"): y_score = model.decision_function(X_test_fold) else: y_score = model.predict_proba(X_test_fold)[:, 1] # Use probability of positive class y_pred = model.predict(X_test_fold) # Calculate and store the scores for each metric for metric_name, scorer in scorings.items(): if metric_name in ['AUROC', 'AUPRC']: score = scorer._score_func(y_test_fold, y_score) else: score = scorer._score_func(y_test_fold, y_pred) fold_scores[metric_name].append(score) # --------------------- CURVES --------------------------- # Generate ROC curve for the fold roc_display = RocCurveDisplay.from_estimator(model, X_test_fold, y_test_fold, name=f"ROC fold {fold_idx}", alpha=0.6, lw=2, ax=axes[model_idx][0], color=cmap(fold_idx % 10)) interp_tpr = np.interp(mean_fpr, roc_display.fpr, roc_display.tpr) interp_tpr[0] = 0.0 tprs.append(interp_tpr) aucs.append(roc_display.roc_auc) # Generate Precision-Recall curve for the fold pr_display = PrecisionRecallDisplay.from_estimator(model, X_test_fold, y_test_fold, name=f"PR fold {fold_idx}", alpha=0.6, lw=2, ax=axes[model_idx][1], color=cmap(fold_idx % 10)) interp_precision = np.interp(mean_recall, pr_display.recall[::-1], pr_display.precision[::-1]) precisions.append(interp_precision) pr_aucs.append(pr_display.average_precision) # Store the fold scores in the dataframe for metric_name, scores in fold_scores.items(): scores_df.loc[f"{model_name}_{metric_name}"] = np.around(scores, 4) # Plot diagonal line for random guessing in ROC curve axes[model_idx][0].plot([0, 1], [0, 1], linestyle='--', lw=2, color='r', alpha=.8, label='Random guessing') # Compute mean ROC curve mean_tpr = np.mean(tprs, axis=0) mean_tpr[-1] = 1.0 mean_auc = auc(mean_fpr, mean_tpr) axes[model_idx][0].plot(mean_fpr, mean_tpr, color='b', lw=4, label=r'Mean ROC (AUC = %0.2f)' % mean_auc, alpha=.8) # Set ROC plot limits and title axes[model_idx][0].set(xlim=[-0.05, 1.05], ylim=[-0.05, 1.05], title=f"ROC Curve - {model_name} ({group}-{method_names[j]})") axes[model_idx][0].legend(loc="lower right") # Compute mean Precision-Recall curve mean_precision = np.mean(precisions, axis=0) mean_pr_auc = np.mean(pr_aucs) axes[model_idx][1].plot(mean_recall, mean_precision, color='b', lw=4, label=r'Mean PR (AUC = %0.2f)' % mean_pr_auc, alpha=.8) # Plot baseline precision (proportion of positive samples) baseline = np.sum(y_train) / len(y_train) axes[model_idx][1].plot([0, 1], [baseline, baseline], linestyle='--', lw=2, color='r', alpha=.8, label='Baseline') # Set Precision-Recall plot limits and title axes[model_idx][1].set(xlim=[-0.05, 1.05], ylim=[-0.05, 1.05], title=f"Precision-Recall Curve - {model_name} ({group}-{method_names[j]})") axes[model_idx][1].legend(loc="lower right") # Store the DataFrame in the dictionary with a unique key for each sheet sheet_name = f"{group}_{method_names[j]}" scores_sheets[sheet_name] = scores_df # Adjust layout and save figure plt.tight_layout() plt.savefig(f'./output_cv_metrics/curves/{group}_{method_names[j]}.svg', format='svg', dpi=500) plt.close(fig) # Write results to Excel file with pd.ExcelWriter('./output_cv_metrics/metrics.xlsx') as writer: for sheet_name, data in scores_sheets.items(): data.to_excel(writer, sheet_name=sheet_name) print("Successful cv metric generation for tuned models")