import requests
from Bio import Entrez
import GEOparse
import os
import gzip
import shutil
import pandas as pd
from os.path import join
from collections import defaultdict
import numpy as np
# =================================================================================
def extract_tsv(tsv_file):
Extracts specific data from a TSV file that contains the disease cui and disease name and returns a DataFrame
with selected columns (cui and disease_name) and renamed the 'cui' column for 'disease_id' header.
Input Parameters:
- tsv_file (str): Path to the TSV file to be read.
- pd.DataFrame: A DataFrame containing specific data with the renamed columns.
use_cols = ['cui', 'disease_name']
new_column_names = {'cui': 'disease_id'}
disease_df = pd.read_csv(tsv_file, delimiter='\t',usecols=use_cols)
disease_df.rename(columns=new_column_names, inplace=True)
return disease_df
def download_and_save_gds(email_request, disease_df, gds_path):
Queries GEO GDS to find datasets associated with given disease names and returns a DataFrame.
Input Parameters:
- email_request (str): Email address for Entrez API usage.
- disease_df (pd.DataFrame): DataFrame containing 'disease_id' and 'disease_name' columns with disease names to query.
- pd.DataFrame: DataFrame with columns 'disease_id' and 'gds_id' representing the relationship between
disease IDs and GDS identifiers.
""" = email_request
rows = []
# Ensure the directory exists
if not os.path.exists(gds_path):
unique_disease_names = set(disease_df['disease_name'])
for disease_name in unique_disease_names:
query = f'"Homo sapiens"[Organism] AND "disease state" AND "{disease_name}"'
with Entrez.esearch(db="gds", term=query, retmax=1000) as handle:
record =
id_list = record.get("IdList", [])
if not id_list:
print(f"No matching records found for: {disease_name}")
for gds_id in id_list:
with Entrez.esummary(db="gds", id=gds_id) as handle:
gds_summaries =
for summary in gds_summaries:
if summary['Accession'].startswith("GDS"):
geo_accession = summary['Accession']
gds = GEOparse.get_GEO(geo=geo_accession, destdir=gds_path, annotate_gpl=True)
disease_id = disease_df[disease_df['disease_name'] == disease_name]['disease_id'].values[0]
"disease_id": disease_id,
"gds_id": summary['Accession']
dis_gds_df = pd.DataFrame(rows)
return dis_gds_df
# =================================================================================
def decompress_gds_gz_files(gds_path):
Decompresses .gz files in a directory.
Input Parameters:
destdir (str): Path to the directory containing .gz files.
for filename in os.listdir(gds_path):
if filename.startswith("GDS") and filename.endswith(".gz"):
filepath = os.path.join(gds_path, filename)
uncompressed_filepath = os.path.join(gds_path, filename[:-3]) #[:3] to remove the .gz extension
with, 'rb') as compressed_file:
with open(uncompressed_filepath, 'wb') as uncompressed_file:
shutil.copyfileobj(compressed_file, uncompressed_file)
def filter_gds(gds_path, disease_gds_df):
Filters GDS files based on specific criteria (`value_type` of the dataset must be "count", the `channel_count` must be 1 and the disease state column must be present) and updates the provided DataFrame to include only those GDS IDs that meet the criteria.
gds_path (str): Path to the directory containing GDS .soft files.
disease_gds_df (pandas.DataFrame): DataFrame containing GDS IDs and associated disease information.
This DataFrame must have a column named 'gds_id' which contains GDS IDs.
pandas.DataFrame: A filtered DataFrame containing only the rows where 'gds_id' meets the criteria (count value type and single channel).
# Initialize an empty list to store valid GDS IDs
valid_gds_ids = []
# Iterate over all files in the directory specified by gds_path
for filename in os.listdir(gds_path):
# Check if the file is a GDS .soft file
if filename.startswith("GDS") and filename.endswith(".soft"):
filepath = os.path.join(gds_path, filename)
gds = GEOparse.get_GEO(filepath=filepath) # Load the GDS file using GEOparse
gds_id = # Extract the GDS ID from the GDS metadata
# Get the 'value_type' and 'channel_count' from the GDS metadata
value_type = gds.metadata.get('value_type', [None])[0]
channel_count = int(gds.metadata.get('channel_count', [None])[0])
# Check if 'value_type' is 'count' and 'channel_count' is 1
if value_type.lower() == 'count' and channel_count == 1:
gds_annot = gds.columns.reset_index().rename(columns={'index': 'gsm_id'}) #get the GDS annotations
# Check if the 'disease state' column is present
if 'disease state' in gds_annot.columns:
valid_gds_ids.append(gds_id) #add the GDS ID to the list of valid GDS IDs
# Filter the disease_gds_df to include only rows where 'gds_id' is in the list of valid GDS IDs
diseases_gds_filtered = disease_gds_df[disease_gds_df['gds_id'].isin(valid_gds_ids)]
return diseases_gds_filtered
def create_gds_gpl_mapping_and_download(gds_path,gpl_path):
Creates a mapping of GDS IDs to corresponding GPL IDs.
Input Parameters:
gds_path (str): Path to the directory containing GDS files.
pd.DataFrame: DataFrame with columns 'gds_id' and 'gpl_id', mapping GDS IDs to GPL IDs.
rows = []
for filename in os.listdir(gds_path):
if filename.startswith("GDS") and filename.endswith(".soft"):
filepath = os.path.join(gds_path, filename)
gds = GEOparse.get_GEO(filepath=filepath)
geo_platform = gds.metadata.get('platform', [])
for gpl_id in geo_platform:
rows.append({'gds_id':, 'gpl_id': gpl_id})
gpl = GEOparse.get_GEO(geo=gpl_id, destdir=gpl_path, annotate_gpl=True)
print(f"Downloaded GPL {gpl_id} for GDS {gds_id} to {os.path.join(gpl_path, gpl_id)}")
gds_gpl_df = pd.DataFrame(rows)
return gds_gpl_df
# =================================================================================
def download_gpl(gds_gpl_df, gpl_path):
Fetches unique GPL data for the provided GDS-GPL mapping from a DataFrame and stores them in the specified directory.
Input Parameters:
gds_gpl_df (pandas.DataFrame): DataFrame containing GDS IDs and corresponding GPL IDs.
gpl_path (str): Path to the directory to store GPL files.
# Ensure the directory exists
if not os.path.exists(gpl_path):
# Extract unique GPL IDs
diff_gpl_ids = gds_gpl_df['gpl_id'].unique()
# Iterate through the unique GPL IDs and download GPL files
for gpl_id in diff_gpl_ids:
gpl = GEOparse.get_GEO(geo=gpl_id, destdir=gpl_path, annotate_gpl=True)
# =================================================================================
def decompress_gpl_gz_files(gpl_path):
Decompresses .gz files in a directory.
Input Parameters:
destdir (str): Path to the directory containing .gz files.
for filename in os.listdir(gpl_path):
if filename.startswith("GPL") and filename.endswith(".gz"):
filepath = os.path.join(gpl_path, filename)
uncompressed_filepath = os.path.join(gpl_path, filename[:-3]) #[:3] to remove the .gz extension
with, 'rb') as compressed_file:
with open(uncompressed_filepath, 'wb') as uncompressed_file:
shutil.copyfileobj(compressed_file, uncompressed_file)
\ No newline at end of file
