""" Evaluating optimized models with test data """ # Libraries # -------------------------------------------------------------------------------------------------------- import pandas as pd import numpy as np from xgboost import XGBClassifier from sklearn.metrics import confusion_matrix from sklearn.metrics import f1_score, make_scorer, precision_score, recall_score, accuracy_score, roc_auc_score, average_precision_score from sklearn.ensemble import RandomForestClassifier, BaggingClassifier, AdaBoostClassifier from sklearn.neural_network import MLPClassifier from sklearn.svm import SVC from sklearn.linear_model import LogisticRegression from sklearn.tree import DecisionTreeClassifier from sklearn.metrics import RocCurveDisplay, roc_curve from sklearn.metrics import PrecisionRecallDisplay, precision_recall_curve import matplotlib.pyplot as plt from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay import ast # String to dictionary import seaborn as sns from mpl_toolkits.axes_grid1 import make_axes_locatable # -------------------------------------------------------------------------------------------------------- # Reading data # -------------------------------------------------------------------------------------------------------- def read_data(): # Load test data X_test_pre = np.load('../gen_train_data/data/output/pre/X_test_pre.npy', allow_pickle=True) y_test_pre = np.load('../gen_train_data/data/output/pre/y_test_pre.npy', allow_pickle=True) X_test_post = np.load('../gen_train_data/data/output/post/X_test_post.npy', allow_pickle=True) y_test_post = np.load('../gen_train_data/data/output/post/y_test_post.npy', allow_pickle=True) # Load ORIGINAL training data X_train_pre = np.load('../gen_train_data/data/output/pre/X_train_pre.npy', allow_pickle=True) y_train_pre = np.load('../gen_train_data/data/output/pre/y_train_pre.npy', allow_pickle=True) X_train_post = np.load('../gen_train_data/data/output/post/X_train_post.npy', allow_pickle=True) y_train_post = np.load('../gen_train_data/data/output/post/y_train_post.npy', allow_pickle=True) # Load oversampled training data X_train_over_pre = np.load('../gen_train_data/data/output/pre/X_train_over_pre.npy', allow_pickle=True) y_train_over_pre = np.load('../gen_train_data/data/output/pre/y_train_over_pre.npy', allow_pickle=True) X_train_over_post = np.load('../gen_train_data/data/output/post/X_train_over_post.npy', allow_pickle=True) y_train_over_post = np.load('../gen_train_data/data/output/post/y_train_over_post.npy', allow_pickle=True) # Load undersampled training data X_train_under_pre = np.load('../gen_train_data/data/output/pre/X_train_under_pre.npy', allow_pickle=True) y_train_under_pre = np.load('../gen_train_data/data/output/pre/y_train_under_pre.npy', allow_pickle=True) X_train_under_post = np.load('../gen_train_data/data/output/post/X_train_under_post.npy', allow_pickle=True) y_train_under_post = np.load('../gen_train_data/data/output/post/y_train_under_post.npy', allow_pickle=True) data_dic = { "X_test_pre": X_test_pre, "y_test_pre": y_test_pre, "X_test_post": X_test_post, "y_test_post": y_test_post, "X_train_pre": X_train_pre, "y_train_pre": y_train_pre, "X_train_post": X_train_post, "y_train_post": y_train_post, "X_train_over_pre": X_train_over_pre, "y_train_over_pre": y_train_over_pre, "X_train_over_post": X_train_over_post, "y_train_over_post": y_train_over_post, "X_train_under_pre": X_train_under_pre, "y_train_under_pre": y_train_under_pre, "X_train_under_post": X_train_under_post, "y_train_under_post": y_train_under_post, } return data_dic # -------------------------------------------------------------------------------------------------------- # Returning tuned models for each situation # -------------------------------------------------------------------------------------------------------- def get_tuned_models(group_str, method_str): # Read sheet corresponding to group and method with tuned models and their hyperparam tuned_models_df = pd.read_excel("./output/hyperparam/hyperparamers.xlsx",sheet_name=f"{group_str}_{method_str}") # Mapping from model abbreviations to sklearn model classes model_mapping = { 'DT': DecisionTreeClassifier, 'RF': RandomForestClassifier, 'Bagging': BaggingClassifier, 'AB': AdaBoostClassifier, 'XGB': XGBClassifier, 'LR': LogisticRegression, 'SVM': SVC, 'MLP': MLPClassifier } tuned_models = {} # Iterate through each row of the DataFrame for _, row in tuned_models_df.iterrows(): model_name = row.iloc[0] # Read dictionary parameters = ast.literal_eval(row['Best Parameters']) # Add extra parameters if model_name == 'AB': parameters['algorithm'] = 'SAMME' elif model_name == 'LR': parameters['max_iter'] = 1000 elif model_name == 'SVM': parameters['max_iter'] = 1000 parameters['probability'] = True elif model_name == "MLP": parameters['max_iter'] = 500 # Add class_weight argument for cost-sensitive learning method if 'CW' in method_str: if model_name == 'Bagging' or model_name == 'AB': parameters['estimator'] = DecisionTreeClassifier(class_weight='balanced') else: parameters['class_weight'] = 'balanced' # Fetch class model_class = model_mapping[model_name] # Initialize model tuned_models[model_name] = model_class(**parameters) return tuned_models # -------------------------------------------------------------------------------------------------------- # Scorers # -------------------------------------------------------------------------------------------------------- def TN_scorer(clf, X, y): """Gives the number of samples predicted as true negatives""" y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) return cm[0, 0] def FN_scorer(clf, X, y): """Gives the number of samples predicted as false negatives""" y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) return cm[1, 0] def FP_scorer(clf, X, y): """Gives the number of samples predicted as false positives""" y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) return cm[0, 1] def TP_scorer(clf, X, y): """Gives the number of samples predicted as true positives""" y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) return cm[1, 1] def negative_recall_scorer(clf, X, y): """Gives the negative recall defined as the (number of true_negative_samples)/(total number of negative samples)""" y_pred = clf.predict(X) cm = confusion_matrix(y, y_pred) TN_prop = cm[0,0]/(cm[0,1]+cm[0,0]) return TN_prop # Custom scorers for AUROC and AUPRC def AUROC_scorer(clf, X, y): if hasattr(clf, "decision_function"): y_score = clf.decision_function(X) else: y_score = clf.predict_proba(X)[:, 1] return roc_auc_score(y, y_score) def AUPRC_scorer(clf, X, y): if hasattr(clf, "decision_function"): y_score = clf.decision_function(X) else: y_score = clf.predict_proba(X)[:, 1] return average_precision_score(y, y_score) # -------------------------------------------------------------------------------------------------------- if __name__ == "__main__": # Reading data data_dic = read_data() # Setup # -------------------------------------------------------------------------------------------------------- # Scorings to use for model evaluation scorings = { 'F1':make_scorer(f1_score), 'NREC': negative_recall_scorer, 'REC':make_scorer(recall_score), 'PREC':make_scorer(precision_score), 'ACC': make_scorer(accuracy_score), 'TN':TN_scorer, 'FN':FN_scorer, 'FP':FP_scorer, 'TP':TP_scorer, 'AUROC': AUROC_scorer, 'AUPRC': AUPRC_scorer } method_names = { 0: "ORIG", 1: "ORIG_CW", 2: "OVER", 3: "UNDER" } # -------------------------------------------------------------------------------------------------------- # Evaluating performance using test dataset # -------------------------------------------------------------------------------------------------------- scores_sheets = {} # To store score dfs as sheets in the same excel file for i, group in enumerate(['pre', 'post']): # Get test dataset based on group X_test = data_dic['X_test_' + group] y_test = data_dic['y_test_' + group] for j, method in enumerate(['', '', 'over_', 'under_']): # Get train dataset based on group and method X_train = data_dic['X_train_' + method + group] y_train = data_dic['y_train_' + method + group] # Get tuned models for this group and method models = get_tuned_models(group, method_names[j]) # Scores df scores_df = pd.DataFrame(index=models.keys(), columns=scorings.keys()) # Create a figure for all models in this group-method fig, axes = plt.subplots(len(models), 3, figsize=(10, 8 * len(models))) # Evaluate each model with test dataset for model_idx, (model_name, model) in enumerate(models.items()): print(f"{group}-{method_names[j]}-{model_name}") # Fit the model on the training data model.fit(X_train, y_train) # --------------------- SCORINGS --------------------------- # Calculate and store the scores for each metric for metric_name, scorer in scorings.items(): score = scorer(model, X_test, y_test) scores_df.at[model_name, metric_name] = round(score, 4) # ----------------------------------------------------------- # --------------------- PLOTS --------------------------- # Check if the model has a decision_function method if hasattr(model, "decision_function"): # Use the decision function to get scores y_score = model.decision_function(X_test) else: # Otherwise, use the probability estimates and take the probability of the positive class y_score = model.predict_proba(X_test)[:, 1] # Calculate ROC curve and ROC area for each class fpr, tpr, _ = roc_curve(y_test, y_score, pos_label=model.classes_[1]) # Plot the ROC curve with thicker line roc_display = RocCurveDisplay(fpr=fpr, tpr=tpr) roc_display.plot(ax=axes[model_idx][0], lw=2) # Plot the diagonal line for the ROC curve axes[model_idx][0].plot([0, 1], [0, 1], 'k--', lw=2, label='Random Classifier') axes[model_idx][0].set_title(f'ROC Curve for {group}-{method_names[j]}-{model_name}') axes[model_idx][0].set_xlabel('False Positive Rate') axes[model_idx][0].set_ylabel('True Positive Rate') axes[model_idx][0].legend(loc='lower right') # Calculate precision-recall curve precision, recall, _ = precision_recall_curve(y_test, y_score, pos_label=model.classes_[1]) # Plot the precision-recall curve with thicker line pr_display = PrecisionRecallDisplay(precision=precision, recall=recall) pr_display.plot(ax=axes[model_idx][1], lw=2) # Plot the baseline for the PR curve no_skill = len(y_test[y_test == 1]) / len(y_test) axes[model_idx][1].plot([0, 1], [no_skill, no_skill], 'k--', lw=2, label='No Skill') axes[model_idx][1].set_title(f'PR Curve for {group}-{method_names[j]}-{model_name}') axes[model_idx][1].set_xlabel('Recall') axes[model_idx][1].set_ylabel('Precision') axes[model_idx][1].legend(loc='lower left') # Predict the test data to get confusion matrix y_pred = model.predict(X_test) # Compute confusion matrix cm = confusion_matrix(y_test, y_pred) # Plot the confusion matrix cmp = ConfusionMatrixDisplay(cm) # Deactivate default colorbar cmp.plot(ax=axes[model_idx][2], colorbar=False, cmap=sns.color_palette("light:b", as_cmap=True)) # Adding custom colorbar using make_axes_locatable divider = make_axes_locatable(axes[model_idx][2]) cax = divider.append_axes("right", size="5%", pad=0.05) plt.colorbar(cmp.im_, cax=cax) axes[model_idx][2].set_title(f'CM for {group}-{method_names[j]}-{model_name}') axes[model_idx][2].set_xlabel('Predicted label') axes[model_idx][2].set_ylabel('True label') # ---------------------------------------------------------- # Adjust layout and save/show figure plt.tight_layout() plt.savefig(f'./output/testing/plots/{group}_{method_names[j]}.svg', format='svg', dpi=500) plt.close(fig) # Store the DataFrame in the dictionary with a unique key for each sheet sheet_name = f"{group}_{method_names[j]}" scores_sheets[sheet_name] = scores_df # Write results to Excel file with pd.ExcelWriter('./output/testing/testing_tuned_models.xlsx') as writer: for sheet_name, data in scores_sheets.items(): data.to_excel(writer, sheet_name=sheet_name) print("Successful evaluation with test dataset") # --------------------------------------------------------------------------------------------------------