Spaces:
Runtime error
Runtime error
import json, os, logging, csv, gzip, numpy, pdb | |
from compound import Compound | |
base_path = os.path.split(os.path.realpath(__file__))[0] | |
### Input Files: | |
# original version of the KEGG compound file | |
OLD_COMPOUND_JSON_FNAME = os.path.join(base_path, './data_cc/equilibrator_compounds.json.gz') | |
# a CSV file with additional names and InChIs (mostly compounds missing from KEGG | |
# and added manually) | |
KEGG_ADDITIONS_TSV_FNAME = os.path.join(base_path, './data_cc/kegg_additions.tsv') | |
### Files created by this module: | |
# names and InChIs only | |
KEGG_COMPOUND_JSON_FNAME = os.path.join(base_path, './data_cc/kegg_compounds.json.gz') | |
# names, InChIs and pKa data | |
DEFAULT_CACHE_FNAME = os.path.join(base_path, './data_cc/compounds.json.gz') | |
class CompoundEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if (isinstance(obj, Compound)): | |
return obj.to_json_dict() | |
return json.JSONEncoder.default(self, obj) | |
class Singleton(type): | |
def __init__(cls,name,bases,dic): | |
super(Singleton,cls).__init__(name,bases,dic) | |
cls.instance=None | |
def __call__(cls,*args,**kw): | |
if cls.instance is None: | |
cls.instance=super(Singleton,cls).__call__(*args,**kw) | |
return cls.instance | |
class CompoundCacher(object, metaclass=Singleton): | |
""" | |
CompoundCacher is a singleton that handles caching of Compound objects | |
for the component-contribution package. The Compounds are retrieved by | |
their ID (which is the KEGG ID in most cases). | |
The first time a Compound is requested, it is obtained from the relevant | |
database and a Compound object is created (this takes a while because | |
it usually involves internet communication and then invoking the ChemAxon | |
plugin for calculating the pKa values for that structure). | |
Any further request for the same Compound ID will draw the object from | |
the cache. When the method dump() is called, all cached data is written | |
to a file that will be loaded in future python sessions. | |
""" | |
def __init__(self, cache_fname=None): | |
self.cache_fname = cache_fname | |
if self.cache_fname is None: | |
self.cache_fname = DEFAULT_CACHE_FNAME | |
compounds = json.load(gzip.open(KEGG_COMPOUND_JSON_FNAME, 'r')) | |
self.compound_id2inchi = { d['compound_id']: d['inchi'] | |
for d in compounds } | |
self.need_to_update_cache_file = False | |
self.load() | |
def get_all_compound_ids(self): | |
return sorted(self.compound_id2inchi.keys()) | |
def load(self): | |
# parse the JSON cache file and store in a dictionary 'compound_dict' | |
self.compound_dict = {} | |
self.compound_ids = [] | |
if os.path.exists(self.cache_fname): | |
for d in json.load(gzip.open(self.cache_fname, 'r')): | |
self.compound_ids.append(d['compound_id']) | |
self.compound_dict[d['compound_id']] = Compound.from_json_dict(d) | |
def dump(self): | |
if self.need_to_update_cache_file: | |
fp = gzip.open(self.cache_fname, 'w') | |
data = sorted(list(self.compound_dict.values()), | |
key=lambda d:d.compound_id) | |
dict_data = [x.to_json_dict() for x in data] | |
json.dump(dict_data, fp, cls=CompoundEncoder, | |
sort_keys=True, indent=4, separators=(',', ': ')) | |
fp.close() | |
self.need_to_update_cache_file = False | |
def get_compound(self, compound_id, kegg_additions_cids=None): | |
if compound_id not in self.compound_dict: | |
logging.debug('Cache miss: %s' % str(compound_id)) | |
inchi = self.compound_id2inchi[compound_id] | |
comp = Compound.from_inchi('KEGG', compound_id, inchi) | |
self.add(comp) | |
#if a compound id is in the kegg_additions.tsv | |
#remove the one in cache, and replace it with new one | |
else: | |
if kegg_additions_cids is not None: | |
if compound_id in kegg_additions_cids: | |
self.remove(compound_id) | |
logging.debug('Cache update: %s' % str(compound_id)) | |
inchi = self.compound_id2inchi[compound_id] | |
comp = Compound.from_inchi('KEGG', compound_id, inchi) | |
self.add(comp) | |
logging.debug('Cache hit: %s' % str(compound_id)) | |
return self.compound_dict[compound_id] | |
def remove(self, compound_id): | |
if compound_id in self.compound_dict: | |
del self.compound_dict[compound_id] | |
else: | |
logging.debug('%s is not cached, cannot remove it' % str(compound_id)) | |
def add(self, comp): | |
self.compound_dict[comp.compound_id] = comp | |
self.need_to_update_cache_file = True | |
def get_element_matrix(self, compound_ids): | |
if type(compound_ids) == str: | |
compound_ids = [compound_ids] | |
# gather the "atom bags" of all compounds in a list 'atom_bag_list' | |
elements = set() | |
atom_bag_list = [] | |
for compound_id in compound_ids: | |
comp = self.get_compound(compound_id) | |
atom_bag = comp.atom_bag | |
if atom_bag is not None: | |
elements = elements.union(list(atom_bag.keys())) | |
atom_bag_list.append(atom_bag) | |
elements.discard('H') # don't balance H (it's enough to balance e-) | |
elements = sorted(elements) | |
# create the elemental matrix, where each row is a compound and each | |
# column is an element (or e-) | |
Ematrix = numpy.matrix(numpy.zeros((len(atom_bag_list), len(elements)))) | |
for i, atom_bag in enumerate(atom_bag_list): | |
if atom_bag is None: | |
Ematrix[i, :] = numpy.nan | |
else: | |
for j, elem in enumerate(elements): | |
Ematrix[i, j] = atom_bag.get(elem, 0) | |
return elements, Ematrix | |
############################################################################### | |
def RebuildCompoundJSON(): | |
kegg_dict = {} | |
for d in json.load(gzip.open(OLD_COMPOUND_JSON_FNAME, 'r')): | |
cid = d['CID'] | |
kegg_dict[cid] = {'compound_id': cid, | |
'name': d['name'], | |
'names': d['names'], | |
'inchi': d['InChI']} | |
# override some of the compounds or add new ones with 'fake' IDs, | |
# i.e. C80000 or higher. | |
kegg_additions_cids = [] | |
for d in csv.DictReader(open(KEGG_ADDITIONS_TSV_FNAME, 'r'), | |
delimiter='\t'): | |
cid = 'C%05d' % int(d['cid']) | |
kegg_additions_cids.append(cid) | |
kegg_dict[cid] = {'compound_id': cid, | |
'name': d['name'], | |
'names': [d['name']], | |
'inchi': d['inchi']} | |
compound_json = [kegg_dict[compound_id] for compound_id in sorted(kegg_dict.keys())] | |
new_json = gzip.open(KEGG_COMPOUND_JSON_FNAME, 'w') | |
json.dump(compound_json, new_json, sort_keys=True, indent=4) | |
new_json.close() | |
return kegg_additions_cids | |
############################################################################### | |
def BuildCache(start_from_scratch=False, kegg_additions_cids=None): | |
if start_from_scratch and os.path.exists(DEFAULT_CACHE_FNAME): | |
os.remove(DEFAULT_CACHE_FNAME) | |
ccache = CompoundCacher(cache_fname=DEFAULT_CACHE_FNAME) | |
i = 0 | |
for compound_id in ccache.get_all_compound_ids(): | |
logging.debug('Caching %s' % compound_id) | |
comp = ccache.get_compound(compound_id, kegg_additions_cids=kegg_additions_cids) | |
logging.debug(str(comp)) | |
i += 1 | |
if i % 100 == 0: | |
logging.debug('Dumping Cache ...') | |
ccache.dump() | |
ccache.dump() | |
############################################################################### | |
if __name__ == '__main__': | |
logger = logging.getLogger('') | |
#logger.setLevel(logging.WARNING) | |
logger.setLevel(logging.DEBUG) | |
kegg_additions_cids = CompoundCacher.RebuildCompoundJSON() | |
CompoundCacher.BuildCache(start_from_scratch=False, kegg_additions_cids=kegg_additions_cids) | |