Source code for robotics_api.utils.mongo_dbs

import os
import json
import warnings
import jsonschema
import pandas as pd
from urllib import request
from dotty_dict import dotty
from pymongo import MongoClient
from monty.json import jsanitize
import python_jsonschema_objects as pjs
from robotics_api.settings import DB_INFO_FILE


[docs] def db_info_generator(db_file=DB_INFO_FILE): """ Generates information about database connections. This function requires either a `db_file` argument or a `DB_INFO_FILE` environment variable. The argument or environment variable should be a path to a JSON file containing connection information for the databases. The keys should be the database names such as `frontend`, `backend`, `expflow`, and `fireworks`. Args: db_file (str, optional): Path to the database info JSON file. Defaults to None. Returns: dict: JSON object containing information about database connections. """ db_file = db_file or os.environ.get('DB_INFO_FILE') or os.getenv('DB_INFO_FILE') if not db_file: warnings.warn("Environment variable DB_INFO_FILE not defined. Default database information is in ues. ") return { "frontend": { "host": "mongodb://USERNAME:PASSWORD@DATABASE_IP:DATABASE_PORT/frontend", "database": "ui" }, "backend": { "host": "mongodb://USERNAME:PASSWORD@DATABASE_IP:DATABASE_PORT/backend", "database": "backend" } } with open(db_file, "r") as f: return json.load(f)
DB_INFO = db_info_generator() print(DB_INFO)
[docs] class Schema2Class: """ Get D3TaLES schema from GitHub and load it to a class Copyright 2021, University of Kentucky """
[docs] def __init__(self, database=None, schema_name=None, schema_directory=None, named_only=False, schema_version=None): """ Initializes the Schema2Class object and retrieves the schema. Args: database (str, optional): Database name. Defaults to None. schema_name (str, optional): Schema name. Defaults to None. schema_directory (str, optional): Directory of the schema. Defaults to None. named_only (bool, optional): If True, only properties with a title attribute will be included in the resulting namespace. Defaults to False. schema_version (str, optional): Schema version. Defaults to None. """ # fetch schema self.database = database self.branch = schema_version or "main" self.schema_name = '{}/{}'.format(schema_directory, schema_name) if schema_directory else schema_name schema_url = "https://raw.githubusercontent.com/D3TaLES/schema/{}/schema_{}/{}.schema.json".format( self.branch, self.database, self.schema_name).replace("robotics_", "") # print(schema_url) response = request.urlopen(schema_url) self.schema = json.loads(response.read().decode()) # generating classes builder = pjs.ObjectBuilder(self.schema) ns = builder.build_classes(named_only=named_only) # get all name space for name_space in dir(ns): setattr(self, name_space, getattr(ns, name_space)) # # highest-level name space for validation # self.high_level = getattr(ns, schema_name.title().replace("_", "")) # required values if self.schema.get("required", ): self.required = self.schema.get("required") else: self.required = None
[docs] class DBconnector: """ Class to retrieve a collection from a database and insert new entry. Requires a db_infos.json file with credentials Copyright 2021, University of Kentucky """
[docs] def __init__(self, db_information: dict): """ Initializes the DBconnector object with database information. Args: db_information (dict): Dictionary containing database information. """ self.host = db_information.get("host", ) self.password = db_information.get("admin_password", ) self.user = db_information.get("admin_user", ) self.port = int(db_information.get("port", 0)) self.database = db_information.get("database", ) self.collection = db_information.get("collection", )
[docs] def get_database(self, **kwargs): """ Returns a database object. Returns: pymongo.database.Database: A database object. """ try: if "@" in self.host: conn = MongoClient(self.host) else: conn = MongoClient(host=self.host, port=self.port, username=self.user, password=self.password, **kwargs) db = conn[self.database] except: raise ConnectionError return db
[docs] def get_collection(self, coll_name=None): """ Returns a collection from the database. Args: coll_name (str, optional): Name of the collection. Defaults to None. Returns: pymongo.collection.Collection: The collection object. """ db = self.get_database() if not coll_name: coll_name = self.collection if not coll_name: raise IOError("No collection specified") return db[coll_name]
[docs] class MongoDatabase: """ Base class for connecting to a database Copyright 2021, University of Kentucky """
[docs] def __init__(self, database=None, collection_name=None, instance=None, schema_layer="", schema_directory=None, public=None, schema_db=None, default_id=None, validate_schema=True, verbose=1, schema_version=None): """ Initializes the MongoDatabase object. Args: database (str, optional): Name of the database. Defaults to None. collection_name (str, optional): Name of the collection. Defaults to None. instance (dict, optional): Instance to insert or validate. Defaults to None. schema_layer (str, optional): Schema layer. Defaults to "". schema_directory (str, optional): Directory of the schema. Defaults to None. public (bool, optional): Marks instance as public if True. Defaults to None. schema_db (str, optional): Database containing the schema. Defaults to None. default_id (str, optional): Default instance ID. Defaults to None. validate_schema (bool, optional): Validates schema if True. Defaults to True. verbose (int, optional): Verbosity level. Defaults to 1. schema_version (str, optional): Version of the schema. Defaults to None. """ self.collection_name = collection_name self.instance = {schema_layer: self.dot2dict(instance)} if schema_layer else self.dot2dict(instance) self.database = database self.verbose = verbose self.public = public self.dbc = DBconnector(DB_INFO.get(self.database)) self.coll = self.dbc.get_collection(self.collection_name) schema_db = schema_db or database # validate if instance and validate_schema: self.instance['_id'] = self.instance.get("_id") or default_id self.s2c = Schema2Class(schema_name=collection_name, database=schema_db, schema_directory=schema_directory, schema_version=schema_version) jsonschema.validate(schema=self.s2c.schema, instance=self.instance)
[docs] def insert(self, _id, nested=False, update_public=True, instance=None, override_lists=True): """ Upserts a dictionary into a MongoDB collection. Args: _id (str): ID for insertion. nested (bool, optional): Inserts nested attributes if True. Defaults to False. update_public (bool, optional): Updates the public status if True. Defaults to True. instance (dict, optional): Instance to be inserted. Defaults to None. override_lists (bool, optional): Overrides existing lists in insertion if True. Defaults to True. """ if not instance: instance = jsanitize(self.instance, allow_bson=True) # Update public status if isinstance(self.public, bool) and update_public: self.coll.update_one({"_id": _id}, {"$set": {"public": self.public}}, upsert=True) for path, v in instance.items(): if nested and isinstance(v, dict): for nest_k, nest_v in v.items(): new_path = ".".join(path.split(".") + [nest_k]) self.insert(_id, nested=True, update_public=False, instance={new_path: nest_v}) elif isinstance(v, list) and not override_lists: self.array_checker(path, _id) self.coll.update_one({"_id": _id}, {"$addToSet": {path: {"$each": v}}}, upsert=True) else: self.coll.update_one({"_id": _id}, {"$set": {path: v}}, upsert=True) print("{} {}... inserted into the {} database.".format(_id, str(instance)[:15], self.database)) if self.verbose > 1 else None
[docs] def path_insert(self, _id, path, value): """ Inserts a value at a specific path in the collection. Args: _id (str): Instance ID. path (str): Path for insertion. value: Value to insert. """ if isinstance(value, list): self.array_checker(path, _id) self.coll.update_one({"_id": _id}, {"$addToSet": {path: {"$each": value}}}, upsert=True) else: self.coll.update_one({"_id": _id}, {"$set": {path: value}}, upsert=True)
[docs] def array_checker(self, field_path, _id): """ Checks if the field at the path is an array and prepares for set insertion. Args: field_path (str): Path to check. _id (str): Instance ID. """ if not self.coll.count_documents({"_id": _id, field_path: {"$exists": True}}): self.coll.update_one({"_id": _id}, {"$set": {field_path: []}}, upsert=True)
[docs] def field_checker(self, entry, field): """ Checks if a field exists in the collection and returns the result. Args: entry: Value to check. field (str): Name of the field. Returns: bool: True if the field exists, False otherwise. """ result = self.coll.find_one({field: entry}) return result if result else {}
[docs] def make_query(self, query: dict = None, projection: dict = None, output="pandas", multi=True, limit=200): """ Executes a MongoDB database query. Args: query (dict, optional): Query parameters. Defaults to an empty dictionary. projection (dict, optional): Fields to include or exclude. Defaults to an empty dictionary. output (str, optional): Output type, can be "pandas", "json", or list. Defaults to "pandas". multi (bool, optional): If True, returns multiple query responses. If False, returns a single result. Defaults to True. limit (int, optional): Maximum number of responses to return. Defaults to 200. Returns: 1) A dataframe if output="pandas" 2) A list if multi=False and a pymongo dursor if multi=True; output != "pandas """ query = query or {} projection = projection or {} if multi: if projection: cursor = self.coll.find(query, projection).limit(limit) else: cursor = self.coll.find(query).limit(limit) else: if projection: cursor = [self.coll.find_one(query, projection)] else: cursor = [self.coll.find_one(query)] if output == "pandas": return pd.DataFrame.from_records(cursor) elif output == "json": json_data = json.loads(json.dumps(list(cursor), )) return json_data else: return list(cursor)
[docs] @staticmethod def dot2dict(dot_dict): """ Converts dot notation keys to nested dictionary. Args: dot_not (dict): Dictionary with dot notation keys. Returns: dict: Nested dictionary. """ dot_dict = dot_dict or {} if isinstance(dot_dict, dict): final_dict = {} for k, v in dot_dict.items(): if isinstance(k, str) and len(k.split(".")) > 1: dot = dotty() dot[k] = v dot_dict = dot.to_dict() final_dict.update(dot_dict) else: final_dict[k] = v return final_dict return dot_dict
[docs] class RobotStatusDB(MongoDatabase): """ Provides access to the Robot Status database. This class is used to interact with the status database for a specific type of apparatus. Copyright 2021, University of Kentucky. """
[docs] def __init__(self, apparatus_type: str, _id: str = None, instance: dict = None, override_lists: bool = True, wflow_name: str = None, validate_schema=False): """ Initializes the RobotStatusDB class. Args: apparatus_type (str): Name of the apparatus. _id (str, optional): Unique identifier for the status entry. Defaults to None. instance (dict, optional): Instance to insert or validate in the database. Defaults to None. override_lists (bool, optional): Whether to override existing lists in the database. Defaults to True. wflow_name (str, optional): Name of the active workflow. If set, verifies that the instance has the appropriate workflow name. Defaults to None. validate_schema (bool, optional): If True, validates the instance against the schema. Defaults to False. Raises: IOError: If no ID is provided for the status database insertion. """ super().__init__("robotics", 'status_' + apparatus_type, instance, schema_db='robot', validate_schema=validate_schema) self.id = _id or self.instance.get("_id") self.wflow_name = wflow_name if instance: instance["_id"] = self.id if not self.id: raise IOError("ID is required for {} status database insertion.".format(apparatus_type)) self.insert(self.id, override_lists=override_lists) if wflow_name and self.id: self.check_wflow_name()
def __str__(self): return self.id @property def exists(self): """Checks if an entry with the given ID exists in the database. Returns: bool: True if the entry exists, False otherwise. """ return bool(self.coll.find_one({"_id": self.id}))
[docs] def check_wflow_name(self): """Checks if the workflow name matches the instance's current workflow name. Raises: NameError: If the workflow name does not match the current instance's workflow name. """ print("ID", self.id) current_wflow = self.coll.find_one({"_id": self.id}).get("current_wflow_name") if current_wflow == self.wflow_name: return True raise NameError("Argument wflow_name ({}) does not match instance current_wflow_name {}. " "Make sure that only one workflow is initialized.".format(self.wflow_name, current_wflow))
[docs] def get_prop(self, prop: str): """ Retrieves a property from the database for the instance with the given ID. Args: prop (str): Name of the property to retrieve. Returns: Any: The value of the property, or None if the property does not exist. """ return (self.coll.find_one({"_id": self.id}) or {}).get(prop)
[docs] def update_status(self, new_status: str or float, status_name: str = "location"): """ Updates the status for a vial location or station vial. Args: new_status (str or float): New status, such as the new vial location or new vial in the station. status_name (str, optional): Name of the status property. Defaults to "location". """ current_status = self.get_prop("current_" + status_name) history = self.get_prop(status_name + "_history") or [] history.append(current_status) self.insert(self.id, override_lists=True, instance={ "current_" + status_name: new_status, status_name + "_history": history })