Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
""" | |
Author: Philipp Seidl | |
ELLIS Unit Linz, LIT AI Lab, Institute for Machine Learning | |
Johannes Kepler University Linz | |
Contact: [email protected] | |
File contains functions that help prepare and download USPTO-related datasets | |
""" | |
import os | |
import gzip | |
import pickle | |
import requests | |
import subprocess | |
import pandas as pd | |
import numpy as np | |
from scipy import sparse | |
import json | |
def download_temprel_repo(save_path='data/temprel-fortunato', chunk_size=128): | |
"downloads the template-relevance master branch" | |
url = "https://gitlab.com/mefortunato/template-relevance/-/archive/master/template-relevance-master.zip" | |
r = requests.get(url, stream=True) | |
with open(save_path, 'wb') as fd: | |
for chunk in r.iter_content(chunk_size=chunk_size): | |
fd.write(chunk) | |
def unzip(path): | |
"unzips a file given a path" | |
import zipfile | |
with zipfile.ZipFile(path, 'r') as zip_ref: | |
zip_ref.extractall(path.replace('.zip','')) | |
def download_file(url, output_path=None): | |
""" | |
# code from fortunato | |
# could also import from temprel.data.download import get_uspto_50k but slightly altered ;) | |
""" | |
if not output_path: | |
output_path = url.split('/')[-1] | |
with requests.get(url, stream=True) as r: | |
r.raise_for_status() | |
with open(output_path, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
if chunk: | |
f.write(chunk) | |
def get_uspto_480k(): | |
if not os.path.exists('data'): | |
os.mkdir('data') | |
if not os.path.exists('data/raw'): | |
os.mkdir('data/raw') | |
os.chdir('data/raw') | |
download_file( | |
'https://github.com/connorcoley/rexgen_direct/raw/master/rexgen_direct/data/train.txt.tar.gz', | |
'train.txt.tar.gz' | |
) | |
subprocess.run(['tar', 'zxf', 'train.txt.tar.gz']) | |
download_file( | |
'https://github.com/connorcoley/rexgen_direct/raw/master/rexgen_direct/data/valid.txt.tar.gz', | |
'valid.txt.tar.gz' | |
) | |
subprocess.run(['tar', 'zxf', 'valid.txt.tar.gz']) | |
download_file( | |
'https://github.com/connorcoley/rexgen_direct/raw/master/rexgen_direct/data/test.txt.tar.gz', | |
'test.txt.tar.gz' | |
) | |
subprocess.run(['tar', 'zxf', 'test.txt.tar.gz']) | |
with open('train.txt') as f: | |
train = [ | |
{ | |
'reaction_smiles': line.strip(), | |
'split': 'train' | |
} | |
for line in f.readlines() | |
] | |
with open('valid.txt') as f: | |
valid = [ | |
{ | |
'reaction_smiles': line.strip(), | |
'split': 'valid' | |
} | |
for line in f.readlines() | |
] | |
with open('test.txt') as f: | |
test = [ | |
{ | |
'reaction_smiles': line.strip(), | |
'split': 'test' | |
} | |
for line in f.readlines() | |
] | |
df = pd.concat([ | |
pd.DataFrame(train), | |
pd.DataFrame(valid), | |
pd.DataFrame(test) | |
]).reset_index() | |
df.to_json('uspto_lg_reactions.json.gz', compression='gzip') | |
os.chdir('..') | |
os.chdir('..') | |
return df | |
def get_uspto_50k(): | |
''' | |
get SI from: | |
Nadine Schneider; Daniel M. Lowe; Roger A. Sayle; Gregory A. Landrum. J. Chem. Inf. Model.201555139-53 | |
''' | |
if not os.path.exists('data'): | |
os.mkdir('data') | |
if not os.path.exists('data/raw'): | |
os.mkdir('data/raw') | |
os.chdir('data/raw') | |
subprocess.run(['wget', 'https://pubs.acs.org/doi/suppl/10.1021/ci5006614/suppl_file/ci5006614_si_002.zip']) | |
subprocess.run(['unzip', '-o', 'ci5006614_si_002.zip']) | |
data = [] | |
with gzip.open('ChemReactionClassification/data/training_test_set_patent_data.pkl.gz') as f: | |
while True: | |
try: | |
data.append(pickle.load(f)) | |
except EOFError: | |
break | |
reaction_smiles = [d[0] for d in data] | |
reaction_reference = [d[1] for d in data] | |
reaction_class = [d[2] for d in data] | |
df = pd.DataFrame() | |
df['reaction_smiles'] = reaction_smiles | |
df['reaction_reference'] = reaction_reference | |
df['reaction_class'] = reaction_class | |
df.to_json('uspto_sm_reactions.json.gz', compression='gzip') | |
os.chdir('..') | |
os.chdir('..') | |
return df | |
def get_uspto_golden(): | |
""" get uspto golden and convert it to smiles dataframe from | |
Lin, Arkadii; Dyubankova, Natalia; Madzhidov, Timur; Nugmanov, Ramil; | |
Rakhimbekova, Assima; Ibragimova, Zarina; Akhmetshin, Tagir; Gimadiev, | |
Timur; Suleymanov, Rail; Verhoeven, Jonas; Wegner, Jörg Kurt; | |
Ceulemans, Hugo; Varnek, Alexandre (2020): | |
Atom-to-Atom Mapping: A Benchmarking Study of Popular Mapping Algorithms and Consensus Strategies. | |
ChemRxiv. Preprint. https://doi.org/10.26434/chemrxiv.13012679.v1 | |
""" | |
if os.path.exists('data/raw/uspto_golden.json.gz'): | |
print('loading precomputed') | |
return pd.read_json('data/raw/uspto_golden.json.gz', compression='gzip') | |
if not os.path.exists('data'): | |
os.mkdir('data') | |
if not os.path.exists('data/raw'): | |
os.mkdir('data/raw') | |
os.chdir('data/raw') | |
subprocess.run(['wget', 'https://github.com/Laboratoire-de-Chemoinformatique/Reaction_Data_Cleaning/raw/master/data/golden_dataset.zip']) | |
subprocess.run(['unzip', '-o', 'golden_dataset.zip']) #return golden_dataset.rdf | |
from CGRtools.files import RDFRead | |
import CGRtools | |
from rdkit.Chem import AllChem | |
def cgr2rxnsmiles(cgr_rx): | |
smiles_rx = '.'.join([AllChem.MolToSmiles(CGRtools.to_rdkit_molecule(m)) for m in cgr_rx.reactants]) | |
smiles_rx += '>>'+'.'.join([AllChem.MolToSmiles(CGRtools.to_rdkit_molecule(m)) for m in cgr_rx.products]) | |
return smiles_rx | |
data = {} | |
input_file = 'golden_dataset.rdf' | |
do_basic_standardization=True | |
print('reading and converting the rdf-file') | |
with RDFRead(input_file) as f: | |
while True: | |
try: | |
r = next(f) | |
key = r.meta['Reaction_ID'] | |
if do_basic_standardization: | |
r.thiele() | |
r.standardize() | |
data[key] = cgr2rxnsmiles(r) | |
except StopIteration: | |
break | |
print('saving as a dataframe to data/uspto_golden.json.gz') | |
df = pd.DataFrame([data],index=['reaction_smiles']).T | |
df['reaction_reference'] = df.index | |
df.index = range(len(df)) #reindex | |
df.to_json('uspto_golden.json.gz', compression='gzip') | |
os.chdir('..') | |
os.chdir('..') | |
return df | |
def load_USPTO_fortu(path='data/processed', which='uspto_sm_', is_appl_matrix=False): | |
""" | |
loads the fortunato preprocessed data as | |
dict X containing X['train'], X['valid'], and X['test'] | |
as well as the labels containing the corresponding splits | |
returns X, y | |
""" | |
X = {} | |
y = {} | |
for split in ['train','valid', 'test']: | |
tmp = np.load(f'{path}/{which}{split}.input.smiles.npy', allow_pickle=True) | |
X[split] = [] | |
for ii in range(len(tmp)): | |
X[split].append( tmp[ii].split('.')) | |
if is_appl_matrix: | |
y[split] = sparse.load_npz(f'{path}/{which}{split}.appl_matrix.npz') | |
else: | |
y[split] = np.load(f'{path}/{which}{split}.labels.classes.npy', allow_pickle=True) | |
print(split, y[split].shape[0], 'samples (', y[split].max() if not is_appl_matrix else y[split].shape[1],'max label)') | |
return X, y | |
#TODO one should load in this file pd.read_json('uspto_R_retro.templates.uspto_R_.json.gz') | |
# this only holds the templates.. the other holds everything | |
def load_templates_sm(path = 'data/processed/uspto_sm_templates.df.json.gz', get_complete_df=False): | |
"returns a dict mapping from class index to mapped reaction_smarts from the templates_df" | |
df = pd.read_json(path) | |
if get_complete_df: return df | |
template_dict = {} | |
for row in range(len(df)): | |
template_dict[df.iloc[row]['index']] = df.iloc[row].reaction_smarts | |
return template_dict | |
def load_templates_lg(path = 'data/processed/uspto_lg_templates.df.json.gz', get_complete_df=False): | |
return load_templates_sm(path=path, get_complete_df=get_complete_df) | |
def load_USPTO_sm(): | |
"loads the default dataset" | |
return load_USPTO_fortu(which='uspto_sm_') | |
def load_USPTO_lg(): | |
"loads the default dataset" | |
return load_USPTO_fortu(which='uspto_lg_') | |
def load_USPTO_sm_pretraining(): | |
"loads the default application matrix label and dataset" | |
return load_USPTO_fortu(which='uspto_sm_', is_appl_matrix=True) | |
def load_USPTO_lg_pretraining(): | |
"loads the default application matrix label and dataset" | |
return load_USPTO_fortu(which='uspto_lg_', is_appl_matrix=True) | |
def load_USPTO_df_sm(): | |
"loads the USPTO small Sm dataset dataframe" | |
return pd.read_json('data/raw/uspto_sm_reactions.json.gz') | |
def load_USPTO_df_lg(): | |
"loads the USPTO large Lg dataset dataframe" | |
return pd.read_json('data/raw/uspto_sm_reactions.json.gz') | |
def load_USPTO_golden(): | |
"loads the golden USPTO dataset" | |
return load_USPTO_fortu(which=f'uspto_golden_', is_appl_matrix=False) | |
def load_USPTO(which = 'sm', is_appl_matrix=False): | |
return load_USPTO_fortu(which=f'uspto_{which}_', is_appl_matrix=is_appl_matrix) | |
def load_templates(which = 'sm',fdir='data/processed', get_complete_df=False): | |
return load_templates_sm(path=f'{fdir}/uspto_{which}_templates.df.json.gz', get_complete_df=get_complete_df) | |
def load_data(dataset, path): | |
splits = ['train', 'valid', 'test'] | |
split2smiles = {} | |
split2label = {} | |
split2reactants = {} | |
split2appl = {} | |
split2prod_idx_reactants = {} | |
for split in splits: | |
label_fn = os.path.join(path, f'{dataset}_{split}.labels.classes.npy') | |
split2label[split] = np.load(label_fn, allow_pickle=True) | |
smiles_fn = os.path.join(path, f'{dataset}_{split}.input.smiles.npy') | |
split2smiles[split] = np.load(smiles_fn, allow_pickle=True) | |
reactants_fn = os.path.join(path, f'uspto_R_{split}.reactants.canonical.npy') | |
split2reactants[split] = np.load(reactants_fn, allow_pickle=True) | |
split2appl[split] = np.load(os.path.join(path, f'{dataset}_{split}.applicability.npy')) | |
pir_fn = os.path.join(path, f'{dataset}_{split}.prod.idx.reactants.p') | |
if os.path.isfile(pir_fn): | |
with open(pir_fn, 'rb') as f: | |
split2prod_idx_reactants[split] = pickle.load(f) | |
if len(split2prod_idx_reactants) == 0: | |
split2prod_idx_reactants = None | |
with open(os.path.join(path, f'{dataset}_templates.json'), 'r') as f: | |
label2template = json.load(f) | |
label2template = {int(k): v for k,v in label2template.items()} | |
return split2smiles, split2label, split2reactants, split2appl, split2prod_idx_reactants, label2template | |
def load_dataset_from_csv(csv_path='', split_col='split', input_col='prod_smiles', ssretroeval=False, reactants_col='reactants_can', ret_df=False, **kwargs): | |
"""loads the dataset from a CSV file containing a split-column, and input-column which can be defined, | |
as well as a 'reaction_smarts' column containing the extracted template, a 'label' column (the index of the template) | |
:returns | |
""" | |
print('loading X, y from csv') | |
df = pd.read_csv(csv_path) | |
X = {} | |
y = {} | |
for spli in set(df[split_col]): | |
#X[spli] = list(df[df[split_col]==spli]['prod_smiles'].apply(lambda k: [k])) | |
X[spli] = list(df[df[split_col]==spli][input_col].apply(lambda k: [k])) | |
y[spli] = (df[df[split_col]==spli]['label']).values | |
print(spli, len(X[spli]), 'samples') | |
# template to dict | |
tmp = df[['reaction_smarts','label']].drop_duplicates(subset=['reaction_smarts','label']).sort_values('label') | |
tmp.index= tmp.label | |
template_list = tmp['reaction_smarts'].to_dict() | |
print(len(template_list),'templates') | |
if ssretroeval: | |
# setup for ttest | |
test_reactants_can = list(df[df[split_col]=='test'][reactants_col]) | |
only_in_test = set(y['test']) - set(y['train']).union(set(y['valid'])) | |
print('obfuscating', len(only_in_test), 'templates because they are only in test') | |
for ii in only_in_test: | |
template_list[ii] = 'CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC.CCCCCCCCCCCCCCCCCCCCCCCCCCC.CCCCCCCCCCCCCCCCCCCCCC>>CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC.CCCCCCCCCCCCCCCCCCCCC' #obfuscate them | |
if ret_df: | |
return X, y, template_list, test_reactants_can, df | |
return X, y, template_list, test_reactants_can | |
if ret_df: | |
return X, y, template_list, None, df | |
return X, y, template_list, None |