import requests from Bio import Entrez import GEOparse import os import gzip import shutil import pandas as pd from os.path import join from collections import defaultdict import numpy as np # ================================================================================= def extract_tsv(tsv_file): """ Extracts specific data from a TSV file that contains the disease cui and disease name and returns a DataFrame with selected columns (cui and disease_name) and renamed the 'cui' column for 'disease_id' header. Input Parameters: - tsv_file (str): Path to the TSV file to be read. Returns: - pd.DataFrame: A DataFrame containing specific data with the renamed columns. """ use_cols = ['cui', 'disease_name'] new_column_names = {'cui': 'disease_id'} disease_df = pd.read_csv(tsv_file, delimiter='\t',usecols=use_cols) disease_df.rename(columns=new_column_names, inplace=True) return disease_df ########################### def download_and_save_gds(email_request, disease_df, gds_path): """ Queries GEO GDS to find datasets associated with given disease names and returns a DataFrame. Input Parameters: - email_request (str): Email address for Entrez API usage. - disease_df (pd.DataFrame): DataFrame containing 'disease_id' and 'disease_name' columns with disease names to query. Returns: - pd.DataFrame: DataFrame with columns 'disease_id' and 'gds_id' representing the relationship between disease IDs and GDS identifiers. """ Entrez.email = email_request rows = [] # Ensure the directory exists if not os.path.exists(gds_path): os.makedirs(gds_path) unique_disease_names = set(disease_df['disease_name']) for disease_name in unique_disease_names: query = f'"Homo sapiens"[Organism] AND "disease state" AND "{disease_name}"' with Entrez.esearch(db="gds", term=query, retmax=1000) as handle: record = Entrez.read(handle) id_list = record.get("IdList", []) if not id_list: print(f"No matching records found for: {disease_name}") continue for gds_id in id_list: with Entrez.esummary(db="gds", id=gds_id) as handle: gds_summaries = Entrez.read(handle) for summary in gds_summaries: if summary['Accession'].startswith("GDS"): geo_accession = summary['Accession'] gds = GEOparse.get_GEO(geo=geo_accession, destdir=gds_path, annotate_gpl=True) disease_id = disease_df[disease_df['disease_name'] == disease_name]['disease_id'].values[0] rows.append({ "disease_id": disease_id, "gds_id": summary['Accession'] }) break dis_gds_df = pd.DataFrame(rows) return dis_gds_df # ================================================================================= def decompress_gds_gz_files(gds_path): """ Decompresses .gz files in a directory. Input Parameters: destdir (str): Path to the directory containing .gz files. Returns: None """ for filename in os.listdir(gds_path): if filename.startswith("GDS") and filename.endswith(".gz"): filepath = os.path.join(gds_path, filename) uncompressed_filepath = os.path.join(gds_path, filename[:-3]) #[:3] to remove the .gz extension with gzip.open(filepath, 'rb') as compressed_file: with open(uncompressed_filepath, 'wb') as uncompressed_file: shutil.copyfileobj(compressed_file, uncompressed_file) def filter_gds(gds_path, disease_gds_df): """ Filters GDS files based on specific criteria (`value_type` of the dataset must be "count", the `channel_count` must be 1 and the disease state column must be present) and updates the provided DataFrame to include only those GDS IDs that meet the criteria. Args: gds_path (str): Path to the directory containing GDS .soft files. disease_gds_df (pandas.DataFrame): DataFrame containing GDS IDs and associated disease information. This DataFrame must have a column named 'gds_id' which contains GDS IDs. Returns: pandas.DataFrame: A filtered DataFrame containing only the rows where 'gds_id' meets the criteria (count value type and single channel). """ # Initialize an empty list to store valid GDS IDs valid_gds_ids = [] # Iterate over all files in the directory specified by gds_path for filename in os.listdir(gds_path): # Check if the file is a GDS .soft file if filename.startswith("GDS") and filename.endswith(".soft"): filepath = os.path.join(gds_path, filename) gds = GEOparse.get_GEO(filepath=filepath) # Load the GDS file using GEOparse gds_id = gds.name # Extract the GDS ID from the GDS metadata # Get the 'value_type' and 'channel_count' from the GDS metadata value_type = gds.metadata.get('value_type', [None])[0] channel_count = int(gds.metadata.get('channel_count', [None])[0]) # Check if 'value_type' is 'count' and 'channel_count' is 1 if value_type.lower() == 'count' and channel_count == 1: gds_annot = gds.columns.reset_index().rename(columns={'index': 'gsm_id'}) #get the GDS annotations # Check if the 'disease state' column is present if 'disease state' in gds_annot.columns: valid_gds_ids.append(gds_id) #add the GDS ID to the list of valid GDS IDs # Filter the disease_gds_df to include only rows where 'gds_id' is in the list of valid GDS IDs diseases_gds_filtered = disease_gds_df[disease_gds_df['gds_id'].isin(valid_gds_ids)] return diseases_gds_filtered def create_gds_gpl_mapping_and_download(gds_path,gpl_path): """ Creates a mapping of GDS IDs to corresponding GPL IDs. Input Parameters: gds_path (str): Path to the directory containing GDS files. Returns: pd.DataFrame: DataFrame with columns 'gds_id' and 'gpl_id', mapping GDS IDs to GPL IDs. """ rows = [] for filename in os.listdir(gds_path): if filename.startswith("GDS") and filename.endswith(".soft"): filepath = os.path.join(gds_path, filename) gds = GEOparse.get_GEO(filepath=filepath) gds_id=gds.name geo_platform = gds.metadata.get('platform', []) for gpl_id in geo_platform: rows.append({'gds_id': gds.name, 'gpl_id': gpl_id}) gpl = GEOparse.get_GEO(geo=gpl_id, destdir=gpl_path, annotate_gpl=True) print(f"Downloaded GPL {gpl_id} for GDS {gds_id} to {os.path.join(gpl_path, gpl_id)}") gds_gpl_df = pd.DataFrame(rows) return gds_gpl_df # ================================================================================= def download_gpl(gds_gpl_df, gpl_path): """ Fetches unique GPL data for the provided GDS-GPL mapping from a DataFrame and stores them in the specified directory. Input Parameters: gds_gpl_df (pandas.DataFrame): DataFrame containing GDS IDs and corresponding GPL IDs. gpl_path (str): Path to the directory to store GPL files. """ # Ensure the directory exists if not os.path.exists(gpl_path): os.makedirs(gpl_path) # Extract unique GPL IDs diff_gpl_ids = gds_gpl_df['gpl_id'].unique() # Iterate through the unique GPL IDs and download GPL files for gpl_id in diff_gpl_ids: gpl = GEOparse.get_GEO(geo=gpl_id, destdir=gpl_path, annotate_gpl=True) # ================================================================================= def decompress_gpl_gz_files(gpl_path): """ Decompresses .gz files in a directory. Input Parameters: destdir (str): Path to the directory containing .gz files. Returns: None """ for filename in os.listdir(gpl_path): if filename.startswith("GPL") and filename.endswith(".gz"): filepath = os.path.join(gpl_path, filename) uncompressed_filepath = os.path.join(gpl_path, filename[:-3]) #[:3] to remove the .gz extension with gzip.open(filepath, 'rb') as compressed_file: with open(uncompressed_filepath, 'wb') as uncompressed_file: shutil.copyfileobj(compressed_file, uncompressed_file)