import os
import sys
import uuid
import hashlib
from datetime import datetime
import pandas as pd
from articledownloader.articledownloader import ArticleDownloader
from d3tales_api.database_info import db_info
from d3tales_api.Processors.parser_cv import *
from d3tales_api.Processors.parser_dft import *
from d3tales_api.Processors.parser_uvvis import *
from d3tales_api.D3database.d3database import DBconnector
from d3tales_api.D3database.schema2class import Schema2Class
try:
from chemdataextractor2 import Document
from chemdataextractor2.doc.text import *
except ImportError:
print("WARNING. ChemDataExtractor2 not installed! Install ChemDataExtractor if you plan on performing NLP parsing.")
try:
from elsapy.elsdoc import AbsDoc
from elsapy.elsclient import ElsClient
except Exception:
print(
"WARNING. Elsevier's elsapy has not imported correctly! If you plan on performing NLP parsing, please resolve this issue.")
[docs]def float_from_str(text):
normal_chars = "0123456789"
# replace subscripts
subscript_chars = "₀₁₂₃₄₅₆₇₈₉"
mapping = str.maketrans(subscript_chars, normal_chars)
text = text.translate(mapping)
# replace superscript
superscript_chars = "⁰¹²³⁴⁵⁶⁷⁸⁹"
mapping = str.maketrans(superscript_chars, normal_chars)
text = text.translate(mapping)
return float(text.replace(" ", "").replace("x10^", "e").replace("×10", "e").replace("⁻", "-").encode("ascii", "ignore").decode())
[docs]class ProcessDFT:
"""
Class to process molecular DFT output files.
Copyright 2021, University of Kentucky
"""
[docs] def __init__(self, _id: str = None, filepath: str = None, submission_info: dict = None, metadata: dict = None,
parsing_class=ProcessGausLog):
"""
:param _id: str, molecule ID
:param submission_info: dict, submission information
:param metadata: dict, metadata (`mol_file` should be a key to the filepath of the file to be processed)
:param parsing_class: class, class to use for file parsing (EX: ProcessGausLog, ProcessCCLIB, or ProcessPsi4Log)
"""
submission_info = submission_info or {}
self.data_path = filepath or metadata.get("mol_file")
self.id = _id
self.parsing_class = parsing_class
self.submission_info = json.loads(json.dumps(submission_info, ))
self.DFTData = parsing_class(metadata=metadata)
@property
def data_dict(self):
"""
Dictionary of processed data (in accordance with D3TalES backend schema)
"""
if self.DFTData.calculation_type == 'wtuning':
if not hasattr(self.DFTData, 'omega'):
raise TypeError("{} parsing class is not a WTuning parsing class.".format(self.parsing_class))
all_data_dict = {
"_id": self.hash_id,
"mol_id": self.id,
"submission_info": self.submission_info,
"calculation_type": self.DFTData.calculation_type,
"data": {
"conditions": self.conditions,
"omega": self.DFTData.omega
}
}
json_data = json.dumps(all_data_dict)
return json.loads(json_data)
data_dict = {
"conditions": self.conditions,
"charge": self.DFTData.charge,
"spin_multiplicity": self.DFTData.spin_multiplicity,
"number_of_electrons": sum(self.DFTData.electrons)
}
try:
data_dict.update({"is_groundState": self.is_groundState})
except ConnectionError:
print(
"Warning. Could not connect to the database, so no 'is_groundState' property was specified. DB_INFO_FILE may not be defined.")
if 'freq' in self.DFTData.calculation_type:
data_dict.update({
"gibbs_correction": {
"value": self.DFTData.gibbs_correction * 27.2114, # convert to eV
"unit": "eV"
},
"frequency_dict": self.DFTData.frequency_dicts
})
elif 'opt' in self.DFTData.calculation_type or 'energy' in self.DFTData.calculation_type:
data_dict.update({
"scf_total_energy": {
"value": self.DFTData.final_energy,
"unit": "eV"
},
"scf_dipole_moment": {
"value": self.DFTData.dipole_moments[-1],
"unit": "Debye"
},
"homo": {
"value": self.DFTData.homo,
"unit": "eV"
},
"lumo": {
"value": self.DFTData.lumo,
"unit": "eV"
},
"homo_1": {
"value": self.DFTData.homo_1,
"unit": "eV"
},
"lumo_1": {
"value": self.DFTData.lumo_1,
"unit": "eV"
},
})
if 'opt' in self.DFTData.calculation_type:
if (int(self.DFTData.spin_multiplicity) % 2) == 0:
rss_dict = self.DFTData.get_radical_stability_score(spin_type="mulliken")
if rss_dict:
data_dict.update(rss_dict)
data_dict.update({"geometry": self.DFTData.final_structure})
elif 'tddft' in self.DFTData.calculation_type:
data_dict.update({
"excitations": self.DFTData.tddft_excitations,
"singlet_plotting": self.DFTData.tddft_spectra_data,
"scf_dipole_moment": {
"value": self.DFTData.dipole_moments[-1],
"unit": "Debye"
},
})
all_data_dict = {
"_id": self.hash_id,
"mol_id": self.id,
"submission_info": self.submission_info,
"calculation_type": self.DFTData.calculation_type,
"runtime": self.DFTData.runtime,
"data": data_dict
}
json_data = json.dumps(all_data_dict)
return json.loads(json_data)
@property
def hash_id(self):
"""
Hash ID
"""
hash_dict = {
"_id": self.id,
"calculation_type": self.DFTData.calculation_type,
"conditions": self.conditions,
}
dhash = hashlib.md5()
encoded = json.dumps(hash_dict, sort_keys=True).encode()
dhash.update(encoded)
return dhash.hexdigest()
@property
def conditions(self):
"""
Dictionary of conditions (in accordance with D3TaLES backend schema)
"""
data_dict = {
"data_source": 'dft',
"code_name": self.DFTData.code_name,
"code_version": self.DFTData.code_version,
"functional": self.DFTData.functional,
"basis_set": self.DFTData.basis_set,
}
if self.DFTData.tuning_parameter:
data_dict['tuning_parameter'] = self.DFTData.tuning_parameter
if getattr(self.DFTData, "solvent", None):
data_dict['solvent'] = {
'name': self.DFTData.solvent,
'model': 'implicit_solvent',
'dielectric_constant': self.DFTData.dielectric_constant
}
return data_dict
@property
def mol_info(self):
"""
Dictionary containing basic molecule information using the ID from the D3TalES database
"""
base_coll = DBconnector(db_info.get("frontend")).get_collection("base")
document = base_coll.find_one({"_id": self.id})
if document:
return document['mol_info']
else:
raise IOError("No molecule with id {} exists in the frontend database. Create an instance in the frontend "
"database first.".format(self.id))
@property
def is_groundState(self):
"""
True if current species is the ground state, else False
"""
mol_info = self.mol_info
if 'groundState_charge' in mol_info.keys():
gs_charge = mol_info['groundState_charge']
return True if gs_charge == self.DFTData.charge else False
else:
raise IOError("The molecule does not have a specified groundState_charge.")
[docs]class ProcessPotBase:
"""
Base class for processing potentiostat files
Copyright 2021, University of Kentucky
"""
[docs] def __init__(self, filepath, _id: str = None, submission_info: dict = None, metadata: dict = None,
parsing_class=ParseChiCV, **kwargs):
"""
:param filepath: str, filepath to
:param _id: str, molecule ID
:param submission_info: dict, submission info
:param metadata: dict, metadata
:param parsing_class: class, class to use for file parsing (EX: ParseChiCV)
"""
self.id = _id
self.hash_id = str(uuid.uuid4())
self.data_path = filepath or metadata.get("mol_file")
self.submission_info = submission_info or {}
metadata = metadata or {}
self.instrument = metadata.get("instrument", '')
self.experiment_run_id = metadata.get("experiment_run_id", '')
self.working_electrode = metadata.get("electrode_working", '')
self.counter_electrode = metadata.get("electrode_counter", '')
self.reference_electrode = metadata.get("electrode_reference", '')
self.temperature = metadata.get("temperature", '')
self.redox_mol_concentration = metadata.get("redox_mol_concentration", '')
self.working_electrode_surface_area = metadata.get("working_electrode_surface_area", '')
self.solvents = metadata.get("solvent") if isinstance(metadata.get("solvent"), list) else [
metadata.get("solvent")] if metadata.get("solvent") else []
self.electrolytes = metadata.get("electrolyte") if isinstance(metadata.get("electrolyte"), list) else [
metadata.get("electrolyte")] if metadata.get("electrolyte") else []
self.ionic_liquids = metadata.get("ionic_liquid") if isinstance(metadata.get("ionic_liquid"), list) else [
metadata.get("ionic_liquid")] if metadata.get("ionic_liquid") else []
self.ParsedData = parsing_class(filepath, **kwargs)
@property
def data_dict(self):
"""
Dictionary of processed data (in accordance with D3TalES backend schema)
"""
pot_data = self.ParsedData.parsed_data
pot_data.update({"conditions": self.pot_conditions})
all_data_dict = {
"_id": self.hash_id,
"mol_id": self.id,
"submission_info": self.submission_info,
"data": pot_data
}
json_data = json.dumps(all_data_dict)
return json.loads(json_data)
@property
def mol_info(self):
"""
Dictionary containing basic molecule information using the ID from the D3TalES database
"""
base_coll = DBconnector(db_info.get("frontend")).get_collection("base")
document = base_coll.find({"_id": self.id})
if document:
return document['mol_info']
else:
raise IOError(
"No molecule with id {} exists in the frontend database. Create an instance in the frontend database "
"first.".format(id))
@property
def pot_conditions(self):
"""
Dictionary of conditions (in accordance with D3TaLES backend schema)
"""
conditions_data = self.ParsedData.conditions_data
conditions_data.update({
"working_electrode": self.working_electrode,
"counter_electrode": self.counter_electrode,
"reference_electrode": self.reference_electrode,
"solvent": self.reagent_format(self.solvents),
"electrolyte": self.reagent_format(self.electrolytes),
"ionic_liquid": self.reagent_format(self.ionic_liquids),
"instrument": self.instrument,
"working_electrode_surface_area": self.data_format(self.working_electrode_surface_area),
"redox_mol_concentration": self.data_format(self.redox_mol_concentration),
"temperature": self.data_format(self.temperature),
"experiment_run_id": self.experiment_run_id
})
return {k: v for k, v in conditions_data.items() if v}
[docs]class ProcessCV(ProcessPotBase):
"""
Class to process CV data
Copyright 2021, University of Kentucky
"""
[docs] def __init__(self, filepath, _id: str = None, submission_info: dict = None, metadata: dict = None,
parsing_class=ParseChiCV, **kwargs):
"""
:param filepath: str, filepath to
:param _id: str, molecule ID
:param submission_info: dict, submission info
:param metadata: dict, metadata
:param parsing_class: class, class to use for file parsing (EX: ParseChiCV)
"""
super().__init__(filepath, _id, submission_info, metadata, parsing_class, **kwargs)
self.ParsedData = parsing_class(filepath, **kwargs)
@property
def data_dict(self):
"""
Dictionary of processed data (in accordance with D3TalES backend schema)
"""
cv_data = self.ParsedData.parsed_data
cv_data.update(self.ParsedData.calculate_prop("peaks"))
cv_data.update(dict(conditions=self.pot_conditions,
plot_data=self.ParsedData.calculate_plotting("plot_data").get("abs_plot"),
reversibility=self.ParsedData.calculate_prop("reversibility", return_type=list),
e_half=self.ParsedData.calculate_prop("e_half", return_type=list),
peak_splittings=self.ParsedData.calculate_prop("peak_splittings", return_type=list),
middle_sweep=self.ParsedData.calculate_prop("middle_sweep", return_type=list),
))
all_data_dict = {
"_id": self.hash_id,
"mol_id": self.id,
"submission_info": self.submission_info,
"data": cv_data
}
json_data = json.dumps(all_data_dict)
return json.loads(json_data)
[docs]class ProcessCA(ProcessPotBase):
"""
Class to process CA data
Copyright 2021, University of Kentucky
"""
[docs] def __init__(self, filepath, _id: str = None, submission_info: dict = None, metadata: dict = None,
parsing_class=ParseChiCA, **kwargs):
"""
:param filepath: str, filepath to
:param _id: str, molecule ID
:param submission_info: dict, submission info
:param metadata: dict, metadata
:param parsing_class: class, class to use for file parsing (EX: ParseChiCA)
"""
super().__init__(filepath, _id, submission_info, metadata, parsing_class, **kwargs)
self.ParsedData = parsing_class(filepath, **kwargs)
@property
def data_dict(self):
"""
Dictionary of processed data (in accordance with D3TalES backend schema)
"""
cv_data = self.ParsedData.parsed_data
cv_data.update(dict(conditions=self.pot_conditions,
# TODO add conductivity
))
all_data_dict = {
"_id": self.hash_id,
"mol_id": self.id,
"submission_info": self.submission_info,
"data": cv_data
}
json_data = json.dumps(all_data_dict)
return json.loads(json_data)
[docs]class ProcessUvVis:
"""
Class to process UV-Vis data files.
Copyright 2021, University of Kentucky
"""
[docs] def __init__(self, filepath, mol_id, metadata=None, parsing_class=ParseExcel):
"""
:param filepath: str, filepath to data file
:param mol_id: str, molecule ID
:param metadata: dict, dictionary containing any metadata for this molecule, e.g., {"solvent": "acetonitrile"}
:param parsing_class: class, a UV-Vis parsing class with which to parse the data
"""
self.mol_id = mol_id
self.uuid = str(uuid.uuid4())
metadata = metadata or {}
self.instrument = metadata.get("instrument", '')
self.solvent = metadata.get("solvent", '')
self.UvVisData = parsing_class(filepath)
@property
def no_sql_data(self):
"""
UV-Vis information in a dictionary that matches the No-SQL schema
"""
all_data_dict = {
"date_recorded": self.UvVisData.date_recorded,
"solvent": self.solvent,
"instrument": self.instrument,
"integration_time": self.UvVisData.integration_time,
"absorbance_data": self.UvVisData.absorbance_data,
}
json_data = json.dumps(all_data_dict, default=str)
return json.loads(json_data)
@property
def sql_absorbance_data(self):
"""
UV-Vis information in a dictionary that matches the SQL AbsorbanceData Table schema
"""
data = self.UvVisData.absorbance_data
return [{"uvvis_id": self.uuid,
"mol_id": self.mol_id,
"wavelength": wavelength,
"absorbance": absorbance}
for wavelength, absorbance in zip(data["wavelength"], data["absorbance"])]
@property
def sql_data(self):
"""
UV-Vis information in a dictionary that matches the SQL UVVisData Table schema
"""
data_dict = {
"uvvis_id": self.uuid,
"mol_id": self.mol_id,
"date_recorded": self.UvVisData.date_recorded,
"solvent": self.solvent,
"instrument": self.instrument,
"integration_time": self.UvVisData.integration_time,
"absorbance_data": self.UvVisData.absorbance_data,
}
json_data = json.dumps(data_dict, default=str)
return json.loads(json_data)
[docs]class ProcessNlp:
"""
Class to process NLP data for backend database
Copyright 2021, University of Kentucky
"""
[docs] def __init__(self, doi=None, instance=None, els_apikey="3a5b9e26201041e25eee70953c65782c", article_download=False,
download_dir="temp/"):
"""
:param doi: str, DOI or scopus id
:param instance: dict, NLP data. Instance data will override web-scrapped data.
:param els_apikey: str, API access key for api.elsevier.com
:param article_download: bool, download article and insert article main text if True
:param els_apikey: str, directory path into which to download article
"""
instance = instance or {}
self.doi = (doi or instance.get("doi", instance.get("_id")) or "").strip("https://doi.org/")
if not self.doi:
raise ValueError(
"ProcessNlp requires a DOI. Either include doi as an argument or include doi as a key in the instance data.")
self.els_apikey = els_apikey
self.instance = instance or {}
self.publisher, self.main_text = "", ""
self.article_download = article_download
self.download_path = "{}/{}.html".format(download_dir, self.doi.replace("/", "_"))
self.s2c = Schema2Class(database="backend", schema_name="nlp")
# Get basic article info
self.basic_info = self.get_basic_info()
for key in self.basic_info:
setattr(self, key, self.basic_info[key])
# Download full article
if self.article_download:
os.makedirs(download_dir, exist_ok=True)
self.download_article()
self.main_text = self.get_main_text()
@property
def data_dict(self):
data_dict = self.basic_info
data_dict.update(self.instance)
data_dict.update({"_id": self.doi})
if self.article_download and self.main_text:
data_dict.update({"main_text": self.main_text, "pdf_location": os.path.abspath(self.download_path)})
json_data = json.dumps(data_dict)
return json.loads(json_data)
[docs] def get_main_text(self):
try:
doc = Document.from_file(self.download_path)
elements = doc.elements
paragraphs, refs, i = [], False, 0
while i < len(elements) and not refs:
elem = elements[i]
if elem.text in ["Abstract"]:
i += 1
while i < len(elements):
elem = elements[i]
try:
if elem.text in ["References"]:
refs = True
break
except:
pass
if isinstance(elem, Paragraph):
paragraphs.append(elem.text)
i += 1
break
i += 1
data_obj = self.s2c.Data()
data_obj.text = paragraphs
return data_obj.as_dict()
except:
print('Failed to extract main text from {}'.format(self.doi))
[docs] def get_basic_info(self):
"""
Dictionary of processed data (in accordance with D3TalES backend schema)
"""
nlp_obj = self.s2c.Nlp()
author_obj = self.s2c.Author()
client = ElsClient(self.els_apikey)
ab = AbsDoc(uri="https://api.elsevier.com/content/abstract/doi/{}".format(self.doi))
ab.read(client)
if ab.read(client):
print("Gathering data from the web...")
# put the data into nlp raw data schema
nlp_obj.publisher = ab.data["coredata"].get('dc:publisher', "")
nlp_obj.journal = ab.data["coredata"].get('prism:publicationName', "")
nlp_obj.publish_date = ab.data["coredata"].get('prism:coverDate', "")
nlp_obj.authors = [author_obj.from_json(json.dumps(self.make_author_entry(author=author)))
for author in ab.data["authors"]["author"]]
nlp_obj._id = ab.data["coredata"].get('prism:doi', "")
nlp_obj.date_accessed = datetime.now().isoformat()
nlp_obj.abstract = ab.data["coredata"].get('dc:description', "")
nlp_obj.title = ab.title
else:
raise ConnectionError("Not able to retrieve abstract for {}.".format(self.doi))
return nlp_obj.as_dict()
[docs] def make_author_entry(self, author):
return {
"auid": author.get("@auid", ""),
"indexed_name": author.get("ce:indexed-name", ""),
"surname": author.get('ce:surname', ""),
"given_name": author.get("ce:given-name", ""),
"affiliations": self.get_affiliations(author.get("affiliation", ""))
}
[docs] def download_article(self):
ad = ArticleDownloader(els_api_key=self.els_apikey)
if self.publisher == "American Chemical Society":
mode = "acs"
elif self.publisher == "Royal Society of Chemistry":
mode = "rsc"
elif self.publisher == 'Elsevier B.V.':
mode = "elsevier"
elif self.publisher == 'Nature Research':
mode = 'nature'
elif self.publisher == 'IOP Publishing Ltd':
mode = "ecs"
elif self.publisher == 'Wiley-VCH Verlag':
mode = "wiley"
elif self.publisher == 'Korean Electrochemical Society':
mode = "ecs"
elif self.publisher == 'Institute of Physics Publishing':
mode = "ecs"
else:
mode = "elsevier"
with open(self.download_path, "wb") as f:
ad.get_html_from_doi(doi=self.doi, writefile=f, mode=mode)
[docs] @staticmethod
def get_affiliations(affiliation):
if isinstance(affiliation, list):
affiliations = [x.get("@id", "") for x in affiliation]
elif isinstance(affiliation, dict):
affiliations = [affiliation.get("@id", "")]
else:
affiliations = []
return affiliations
[docs] @classmethod
def from_parsed_json(cls, json_path, **kwargs):
"""
Generate data class from JSON file
:param json_path: str, path to JSON file
:return: data class
"""
with open(json_path) as fn:
processing_data = json.load(fn)
return cls(instance=processing_data, **kwargs)
[docs] @classmethod
def from_dataframe(cls, nlp_df, nlp_model, doi=None, base_instance=None, date_generated=datetime.now(), **kwargs):
"""
Translate pandas DataFrame from NLP extraction to JSON data conforming to the D3TaLES backend NLP
schema: https://github.com/D3TaLES/schema/blob/main/schema_backend/nlp.schema.json. Note that the DataFrame
should contain the all the columns listed in the expected_columns variable, and no other columns.
:param nlp_df: pandas DataFrame, extracted NLP data
:param nlp_model: str, name of NLP model used for data extraction
:param doi: str, DOI associated with extracted data
:param base_instance: dict, base instance containing additional info for backend data insertion
:param date_generated: datetime obj, date and time the data was generated
:return: dict, JSON formatted data conforming to the D3TaLES backend NLP schema
"""
# Check for DOI
doi_str = (doi or base_instance.get("doi", base_instance.get("_id"))).strip("https://doi.org/")
if not doi_str:
raise ValueError(
"ProcessNlp.from_dataframe requires a DOI. Either include doi as an argument or include doi as a key in the base_instance data.")
# Check DF columns
expected_columns = ["molecule", "property", "value", "unit", "line_number", "parent_sentence", "notes"]
unexpected_columns = [c for c in nlp_df.columns if c not in expected_columns]
if unexpected_columns:
raise SyntaxError(
"The column titles for the NLP DataFrame contain unexpected columns: " + ", ".join(unexpected_columns))
missing_columns = [c for c in expected_columns if c not in nlp_df.columns]
if missing_columns:
raise SyntaxError(
"The column titles for the NLP DataFrame do not contain columns: " + ", ".join(missing_columns))
# Check values in "properties" column
nlp_df.property = nlp_df.property.str.replace(' ', '_')
nlp_df.property = nlp_df.property.str.lower()
expected_properties = ["oxidation_potential", "reduction_potential", "solubility", "stability", "conductivity",
"diffusion_coefficient", "charge_transfer_rate"]
unexpected_properties = [p for p in set(nlp_df.property.tolist()) if p not in expected_properties]
if unexpected_properties:
raise ValueError("There were unexpected properties in the NLP DataFrame 'properties' column: ",
", ".join(unexpected_properties))
# Generate output JSON
extracted_molecules = []
date_generated = date_generated.isoformat() if isinstance(date_generated, datetime) else date_generated
mol_names = set(nlp_df.molecule.tolist())
for mol in mol_names:
extracted_properties = {}
mol_df = nlp_df[nlp_df.molecule == mol]
for prop in set(mol_df.property.tolist()):
props = []
prop_df = mol_df[mol_df.property == prop]
for i, row in prop_df.iterrows():
row.fillna("", inplace=True)
prop_data = {
"conditions": {
"data_source": "nlp",
"nlp_model": nlp_model,
"date_generated": date_generated,
"doi": doi_str,
},
"value": float_from_str(row.value),
"unit": row.unit
}
prop_data.update({p: row.get(p) for p in ["line_number", "parent_sentence", "notes"] if row.get(p)})
props.append(prop_data)
extracted_properties[prop] = props
extracted_molecules.append({"molecule_name": mol, "extracted_properties": extracted_properties})
instance = base_instance or {}
instance.update({"_id": doi_str, "extracted_molecules": extracted_molecules})
return cls(instance=instance, **kwargs)
[docs] @classmethod
def from_html(cls, html_txt, nlp_model, doi=None, base_instance=None, date_generated=datetime.now(), **kwargs):
nlp_dfs = pd.read_html(html_txt)
if len(nlp_dfs) > 1:
raise ValueError(
"ProcessNlp.from_html found {} tables in the HTML string. There should be only one.".format(
len(nlp_dfs)))
if len(nlp_dfs) < 1:
raise ValueError("ProcessNlp.from_html no tables in the HTML string. There should be one.")
return cls.from_dataframe(nlp_dfs[0], nlp_model, doi=doi, base_instance=base_instance, date_generated=date_generated, **kwargs)
[docs] @classmethod
def from_prop_list(cls, prop_list, nlp_model, doi=None, base_instance=None, date_generated=datetime.now(), **kwargs):
if len(prop_list) < 1:
raise ValueError("ProcessNlp.from_prop_list no propreties in prop_list. There should be at least one.")
nlp_df = pd.DataFrame(prop_list)
return cls.from_dataframe(nlp_df, nlp_model, doi=doi, base_instance=base_instance, date_generated=date_generated, **kwargs)
if __name__ == "__main__":
pass
# data = ProcessDFT(sys.argv[1], parsing_class=ProcessGausLog).data_dict
# data = ProcessCV(sys.argv[1], parsing_class=ParseChiCV).data_dict
# data = ProcessNlp(sys.argv[1], article_download=False).data_dict
# print(data)