dGPredictor / CC /compound_cacher.py
vuu10's picture
Upload 6 files
6d990bb
raw
history blame
8.34 kB
import json, os, logging, csv, gzip, numpy, pdb
from compound import Compound
base_path = os.path.split(os.path.realpath(__file__))[0]
### Input Files:
# original version of the KEGG compound file
OLD_COMPOUND_JSON_FNAME = os.path.join(base_path, './data_cc/equilibrator_compounds.json.gz')
# a CSV file with additional names and InChIs (mostly compounds missing from KEGG
# and added manually)
KEGG_ADDITIONS_TSV_FNAME = os.path.join(base_path, './data_cc/kegg_additions.tsv')
### Files created by this module:
# names and InChIs only
KEGG_COMPOUND_JSON_FNAME = os.path.join(base_path, './data_cc/kegg_compounds.json.gz')
# names, InChIs and pKa data
DEFAULT_CACHE_FNAME = os.path.join(base_path, './data_cc/compounds.json.gz')
class CompoundEncoder(json.JSONEncoder):
def default(self, obj):
if (isinstance(obj, Compound)):
return obj.to_json_dict()
return json.JSONEncoder.default(self, obj)
class Singleton(type):
def __init__(cls,name,bases,dic):
super(Singleton,cls).__init__(name,bases,dic)
cls.instance=None
def __call__(cls,*args,**kw):
if cls.instance is None:
cls.instance=super(Singleton,cls).__call__(*args,**kw)
return cls.instance
class CompoundCacher(object, metaclass=Singleton):
"""
CompoundCacher is a singleton that handles caching of Compound objects
for the component-contribution package. The Compounds are retrieved by
their ID (which is the KEGG ID in most cases).
The first time a Compound is requested, it is obtained from the relevant
database and a Compound object is created (this takes a while because
it usually involves internet communication and then invoking the ChemAxon
plugin for calculating the pKa values for that structure).
Any further request for the same Compound ID will draw the object from
the cache. When the method dump() is called, all cached data is written
to a file that will be loaded in future python sessions.
"""
def __init__(self, cache_fname=None):
self.cache_fname = cache_fname
if self.cache_fname is None:
self.cache_fname = DEFAULT_CACHE_FNAME
compounds = json.load(gzip.open(KEGG_COMPOUND_JSON_FNAME, 'r'))
self.compound_id2inchi = { d['compound_id']: d['inchi']
for d in compounds }
self.need_to_update_cache_file = False
self.load()
def get_all_compound_ids(self):
return sorted(self.compound_id2inchi.keys())
def load(self):
# parse the JSON cache file and store in a dictionary 'compound_dict'
self.compound_dict = {}
self.compound_ids = []
if os.path.exists(self.cache_fname):
for d in json.load(gzip.open(self.cache_fname, 'r')):
self.compound_ids.append(d['compound_id'])
self.compound_dict[d['compound_id']] = Compound.from_json_dict(d)
def dump(self):
if self.need_to_update_cache_file:
fp = gzip.open(self.cache_fname, 'w')
data = sorted(list(self.compound_dict.values()),
key=lambda d:d.compound_id)
dict_data = [x.to_json_dict() for x in data]
json.dump(dict_data, fp, cls=CompoundEncoder,
sort_keys=True, indent=4, separators=(',', ': '))
fp.close()
self.need_to_update_cache_file = False
def get_compound(self, compound_id, kegg_additions_cids=None):
if compound_id not in self.compound_dict:
logging.debug('Cache miss: %s' % str(compound_id))
inchi = self.compound_id2inchi[compound_id]
comp = Compound.from_inchi('KEGG', compound_id, inchi)
self.add(comp)
#if a compound id is in the kegg_additions.tsv
#remove the one in cache, and replace it with new one
else:
if kegg_additions_cids is not None:
if compound_id in kegg_additions_cids:
self.remove(compound_id)
logging.debug('Cache update: %s' % str(compound_id))
inchi = self.compound_id2inchi[compound_id]
comp = Compound.from_inchi('KEGG', compound_id, inchi)
self.add(comp)
logging.debug('Cache hit: %s' % str(compound_id))
return self.compound_dict[compound_id]
def remove(self, compound_id):
if compound_id in self.compound_dict:
del self.compound_dict[compound_id]
else:
logging.debug('%s is not cached, cannot remove it' % str(compound_id))
def add(self, comp):
self.compound_dict[comp.compound_id] = comp
self.need_to_update_cache_file = True
def get_element_matrix(self, compound_ids):
if type(compound_ids) == str:
compound_ids = [compound_ids]
# gather the "atom bags" of all compounds in a list 'atom_bag_list'
elements = set()
atom_bag_list = []
for compound_id in compound_ids:
comp = self.get_compound(compound_id)
atom_bag = comp.atom_bag
if atom_bag is not None:
elements = elements.union(list(atom_bag.keys()))
atom_bag_list.append(atom_bag)
elements.discard('H') # don't balance H (it's enough to balance e-)
elements = sorted(elements)
# create the elemental matrix, where each row is a compound and each
# column is an element (or e-)
Ematrix = numpy.matrix(numpy.zeros((len(atom_bag_list), len(elements))))
for i, atom_bag in enumerate(atom_bag_list):
if atom_bag is None:
Ematrix[i, :] = numpy.nan
else:
for j, elem in enumerate(elements):
Ematrix[i, j] = atom_bag.get(elem, 0)
return elements, Ematrix
###############################################################################
@staticmethod
def RebuildCompoundJSON():
kegg_dict = {}
for d in json.load(gzip.open(OLD_COMPOUND_JSON_FNAME, 'r')):
cid = d['CID']
kegg_dict[cid] = {'compound_id': cid,
'name': d['name'],
'names': d['names'],
'inchi': d['InChI']}
# override some of the compounds or add new ones with 'fake' IDs,
# i.e. C80000 or higher.
kegg_additions_cids = []
for d in csv.DictReader(open(KEGG_ADDITIONS_TSV_FNAME, 'r'),
delimiter='\t'):
cid = 'C%05d' % int(d['cid'])
kegg_additions_cids.append(cid)
kegg_dict[cid] = {'compound_id': cid,
'name': d['name'],
'names': [d['name']],
'inchi': d['inchi']}
compound_json = [kegg_dict[compound_id] for compound_id in sorted(kegg_dict.keys())]
new_json = gzip.open(KEGG_COMPOUND_JSON_FNAME, 'w')
json.dump(compound_json, new_json, sort_keys=True, indent=4)
new_json.close()
return kegg_additions_cids
###############################################################################
@staticmethod
def BuildCache(start_from_scratch=False, kegg_additions_cids=None):
if start_from_scratch and os.path.exists(DEFAULT_CACHE_FNAME):
os.remove(DEFAULT_CACHE_FNAME)
ccache = CompoundCacher(cache_fname=DEFAULT_CACHE_FNAME)
i = 0
for compound_id in ccache.get_all_compound_ids():
logging.debug('Caching %s' % compound_id)
comp = ccache.get_compound(compound_id, kegg_additions_cids=kegg_additions_cids)
logging.debug(str(comp))
i += 1
if i % 100 == 0:
logging.debug('Dumping Cache ...')
ccache.dump()
ccache.dump()
###############################################################################
if __name__ == '__main__':
logger = logging.getLogger('')
#logger.setLevel(logging.WARNING)
logger.setLevel(logging.DEBUG)
kegg_additions_cids = CompoundCacher.RebuildCompoundJSON()
CompoundCacher.BuildCache(start_from_scratch=False, kegg_additions_cids=kegg_additions_cids)