import sys, os, json import ConceptExtractor import mysql.connector import textdistance import configparser import inflect import re from ConceptExtractor import extractionOfConcepts from umlsExtractor import umls_concept_extractor, get_words configuration = configparser.ConfigParser() configuration.read('config.ini') config = {'user':configuration['ARES']['DB_USER'], 'password':configuration['ARES']['DB_PASSWORD'], 'port':configuration['ARES']['DB_PORT'], 'host':configuration['ARES']['DB_HOST'], 'db':configuration['ARES']['DB_NAME'], 'auth_plugin':configuration['ARES']['DB_AUTH_PLUGIN'] } #Funcion que busca en la tabla de anotaciones de JKES (umls_old_dx), los conceptos encontrados por BERT #Input: el nombre del anotador, la tupla de conceptos a ser procesados def select_query_umls_jkes(tuplesConcepts): cnx = mysql.connector.connect(**config) #Creamos el cursor cursor = cnx.cursor() #Nuestra query query = "SELECT cui FROM umls_old_dx where concept='"+tuplesConcepts[0]+"';" cursor.execute(query) lUmls = [] for row in cursor: if((row[0] is not None) and (row[0].strip()!="")): lUmls.append(str(row[0])) cnx.close() return lUmls #Funcion que busca en la tabla de anotaciones de JKES (umls_old_dx), los conceptos encontrados por BERT #Input: el nombre del anotador, la tupla de conceptos a ser procesados def select_query_jkes(concepts): cnx = mysql.connector.connect(**config) #Creamos el cursor cursor = cnx.cursor() #Nuestra query for concept in concepts: query = "SELECT cui FROM umls_old_dx where sentence like '%"+concept+"%';" cursor.execute(query) lUmls = [] for row in cursor: if((row[0] is not None) and (row[0].strip()!="")): lUmls.append(str(row[0])) cnx.close() return lUmls #Funcion que busca en la tabla de anotaciones de clarifyv2 (umls_old_dx), los conceptos encontrados en BERT #Si estos son encontrados en JKES, se aƱaden a un listado para su guardado posterior, en caso contrario, #se buscan los conceptos mas similares en UMLS #Input: path de archivo de anotaciones def jkes_concept_extractor(pathAnnotations): with open(pathAnnotations) as json_file: annotations = json.load(json_file) data = extractionOfConcepts(annotations) dictAnnotationsEntities = {} dictUmls = {} conceptsSearchUmls = [] cuiSearchUMLS = [] dictTraduccion = {} dictConceptDoc = {} jkes = 0 umls = 0 notCui = 0 p = inflect.engine() with open('entity_cuis.json') as file: dictAnnotationsEntities = json.load(file) #tuplesConcepts = concepto anotado por BIO for tuplesConcepts in data: #Annotator = anotador de la lista de anotadores de JKES if(not(tuplesConcepts[0] in dictUmls.keys())): lUmls = select_query_umls_jkes(tuplesConcepts) if(len(lUmls)>0): dictUmls[tuplesConcepts[4]] = {tuplesConcepts[0]:(lUmls,tuplesConcepts[2],tuplesConcepts[3])} jkes+=1 else: if(not(tuplesConcepts[0] in conceptsSearchUmls)): for annotator in dictAnnotationsEntities.keys(): #Compruebo si la entidad correspondiente a mi anotador es igual a la entidad del concepto conceptKey if (dictAnnotationsEntities[annotator] == tuplesConcepts[1]): regex = r"(?i)(\bca\b)" regexAdenoca = r"(?i)(\badenoca\b)" regexCar = r"(?i)(\bcar\b)" aux = "" changed=False if(re.search(regex,tuplesConcepts[0])): tuplesConcepts = list(tuplesConcepts) aux = tuplesConcepts[0] if("infiltrante" in aux): aux = re.sub(regex,"carcinoma",aux) else: aux = re.sub(regex,"cancer",aux) changed = True elif(re.search(regexAdenoca,tuplesConcepts[0])): tuplesConcepts = list(tuplesConcepts) aux = tuplesConcepts[0] aux = re.sub(regexAdenoca,"adenocarcinoma",aux) changed = True elif(re.search(regexCar,tuplesConcepts[0])): tuplesConcepts = list(tuplesConcepts) aux = tuplesConcepts[0] aux = re.sub(regexCar,"carcinoma",aux) changed = True else: tuplesConcepts = list(tuplesConcepts) aux = p.singular_noun(tuplesConcepts[0]) if(not aux): changed=False else: changed = True if(changed): dictTraduccion[tuplesConcepts[0]] = aux conceptsSearchUmls.append(aux) else: conceptsSearchUmls.append(tuplesConcepts[0]) dictConceptDoc[tuplesConcepts[0]] = [(tuplesConcepts[4],tuplesConcepts[2],tuplesConcepts[3])] umls+=1 else: lAux = dictConceptDoc[tuplesConcepts[0]] lAux.append((tuplesConcepts[4],tuplesConcepts[2],tuplesConcepts[3])) dictConceptDoc[tuplesConcepts[0]] = lAux listConceptsJkes = [] for key in dictUmls.keys(): for key2 in dictUmls[key]: lAux = dictUmls[key][key2][0] for i in range(0,len(lAux)): if(not((key2,lAux[i],"JKES",key,dictUmls[key][key2][1],dictUmls[key][key2][2]) in listConceptsJkes)): listConceptsJkes.append((key2,lAux[i],"JKES",key,dictUmls[key][key2][1],dictUmls[key][key2][2])) print((key2,lAux[i],"JKES",key,dictUmls[key][key2][1],dictUmls[key][key2][2])) print("Checking UMLS CUIS") listCuisUmls = [] cuisUmls = select_query_jkes(conceptsSearchUmls) for cui in cuisUmls: if(cui not in listCuisUmls): listCuisUmls.append(cui) listConceptsUmls = umls_concept_extractor(conceptsSearchUmls,listCuisUmls) for i in range(0,len(listConceptsUmls)): lAux = list(listConceptsUmls[i]) if(lAux[0] in dictTraduccion.values()): position = list(dictTraduccion.values()).index(lAux[0]) lAux[0] = list(dictTraduccion.keys())[position] listConceptsUmls[i] = tuple(lAux) for i in range(0,len(listConceptsUmls)): lAux = list(dictConceptDoc[listConceptsUmls[i][0]]) lAux2 = list(listConceptsUmls[i]) lAux2 = lAux2 + lAux listConceptsUmls[i] = tuple(lAux2) print(jkes,umls) return listConceptsJkes,listConceptsUmls