import requests from Bio import Entrez import GEOparse import os import gzip import shutil import pandas as pd from os.path import join from collections import defaultdict import numpy as np def fetch_gds_data(gds_path): """ Fetches and processes Gene Expression Omnibus (GEO) GDS data files from the specified directory. This function iterates through GDS files in the given directory, extracts gene expression data, annotations, and metadata for each file, and consolidates this information into three dataframes. Input Parameters: gds_path (str): The path to the directory containing GDS files. Returns: tuple: A tuple containing three pandas DataFrames: - expression_df: DataFrame with gene expression data. - annotation_df: DataFrame with annotation data. - metadata_df: DataFrame with metadata about the GDS files. """ expression_dfs = [] annotation_dfs = [] metadata_rows = [] for filename in os.listdir(gds_path): if not filename.startswith("GDS") or not filename.endswith(".soft"): continue filepath = os.path.join(gds_path, filename) gds = GEOparse.get_GEO(filepath=filepath) value_types = gds.metadata.get('value_type', []) if 'count' not in value_types: continue #Collect metadata metadata_rows.append({ 'gds_id': gds.name, 'gds_title': gds.metadata.get('title', [None])[0], 'gds_type': gds.metadata.get('type', [None])[0], 'gpl_id': gds.metadata.get('platform', [None])[0], 'channel_count': gds.metadata.get('channel_count', [None])[0], 'value_type': value_types[0] }) #Extract gene expression data gsm_columns = [col for col in gds.table.columns if col.startswith("GSM")] non_gsm_columns = [col for col in gds.table.columns if not col.startswith("GSM")] melted_df = pd.melt( gds.table, id_vars=non_gsm_columns, value_vars=gsm_columns, var_name='gsm_id', value_name='value' ) melted_df.dropna(subset=['IDENTIFIER', 'value'], inplace=True) melted_df.rename(columns={'ID_REF': 'id_ref', 'IDENTIFIER': 'gene_symbol'}, inplace=True) melted_df['gds_id'] = gds.name #Extract annotation data gds_annot = gds.columns.reset_index().drop(columns=['description']) gds_annot['gds_id'] = gds.name #Collect expression data for expression_df expression_dfs.append(melted_df) #Collect annotation data for annotation_df annotation_dfs.append(gds_annot) #Combine all expression and annotation dataframes expression_df = pd.concat(expression_dfs, ignore_index=True) annotation_df = pd.concat(annotation_dfs, ignore_index=True) rename_dict = { 'index': 'gsm_id', 'disease state': 'disease_state', 'cell type': 'cell_type', 'development stage': 'development_stage', 'genotype/variation': 'genotype' } annotation_df.rename(columns={k: v for k, v in rename_dict.items() if k in annotation_df.columns}, inplace=True) metadata_df = pd.DataFrame(metadata_rows) return expression_df, annotation_df, metadata_df # ================================================================================= def fetch_gpl_annot(gpl_path): """ Fetch GPL data from files in the specified directory. Input Parameters: gpl_path (str): The directory path where the GPL annotation files are stored. Returns: pd.DataFrame: A dataframe containing GPL annotation data. Columns are 'gpl_id' and 'gpl_title'. """ data_rows = [] for filename in os.listdir(gpl_path): if filename.startswith("GPL") and filename.endswith(".annot"): filepath = os.path.join(gpl_path, filename) gpl = GEOparse.get_GEO(filepath=filepath) #Collect GPL (platform) data data_rows.append({ 'gpl_id': gpl.name, 'gpl_title': gpl.metadata.get('platform_title', [None])[0] }) gpl_data = pd.DataFrame(data_rows) return gpl_data # ================================================================================= def process_disease_state(df, fill_value=''): """ Processes the 'disease_state' column in the given DataFrame. This function fills missing values in the DataFrame, then categorizes the 'disease_state' column into two categories: 'c' for control/normal/healthy states and 'd' for diseased states. Input Parameters: df (pandas.DataFrame): The input DataFrame containing a 'disease_state' column. fill_value (str, optional): The value to use for filling missing values in the DataFrame. Default is an empty string. Returns: pandas.DataFrame: A new DataFrame with the processed 'disease_state' column. """ #Fill missing values and create a copy of the DataFrame new_dataframe = df.fillna(fill_value).copy() #Create a mask for control/normal/healthy states using case-insensitive matching mask_control = new_dataframe['disease_state'].str.lower().str.contains('|'.join(['control', 'normal', 'healthy', 'not diseased', 'wild-type'])) #Assign 'c' to control/normal/healthy states new_dataframe.loc[mask_control, 'disease_state'] = 'c' #Assign 'd' to diseased states new_dataframe.loc[~mask_control, 'disease_state'] = 'd' return new_dataframe # ================================================================================= def extract_cuis(apikey,dataframe, column_name): """ Extracts CUIs (Concept Unique Identifiers) for terms in a specified column of a DataFrame. This function interacts with the UMLS API to fetch CUIs for unique terms found in the specified column of the input DataFrame. Input Parameters: dataframe (pandas.DataFrame): The input DataFrame containing the terms. column_name (str): The name of the column in the DataFrame containing the terms for which CUIs need to be extracted. Returns: dict: A dictionary mapping terms to their respective CUIs. """ #UMLS API key and settings apikey = apikey version = 'current' uri = "https://uts-ws.nlm.nih.gov" content_endpoint = "/rest/search/" + version full_url = uri + content_endpoint search_type = 'exact' #Get the different terms from the specified column, removing NaNs and empty strings list_different = set(dataframe[column_name].dropna()) list_different = {item for item in list_different if item.strip()} different_cuis = {} #Fetch CUIs for each different term for item in list_different: page = 0 while True: page += 1 query = { 'string': item, 'apiKey': apikey, 'pageNumber': page, 'searchType': search_type } r = requests.get(full_url, params=query) r.raise_for_status() r.encoding = 'utf-8' outputs = r.json() items = ((outputs.get('result', {})).get('results', [])) if len(items) == 0: if page == 1: break else: break for result in items: cui = result['ui'] different_cuis[item] = cui return different_cuis