Source code for synicix_ml_pipeline.datajoint_tables.BaseTable

import datajoint as datajoint
import numpy as np
import json
import hashlib
import datajoint as dj
import importlib

schema = dj.schema('synicix_dev')

[docs]class BaseTable(): """ Parent table for all datajoint classes which contains some useful function typcially used in tables and the schema pointer All datajoint tables part of the main framework will inherite from this table. """
[docs] @staticmethod def import_class_from_module(module_name, class_name): """ Helper function to handle import of classes from modules Parameters: module_name (str): name of the module containing the target class to import model_class_name (str): name of class to import from module Returns: user_defined_class: class imported from module """ if module_name is not '' and class_name is not '': module = importlib.import_module(module_name) return getattr(module, class_name)
[docs] @staticmethod def compute_md5_hash(tuple_dict): """ Utility helper function to compute the md5 hash given the tuple_dict Parameters: tuple_dict (dict): dictionary to hash Returns: str: 128 byte md5 hash string """ string_to_hash = "" for _, data in tuple_dict.items(): string_to_hash += (str(data)) return hashlib.md5(string_to_hash.encode()).hexdigest()
[docs] @classmethod def check_if_tuple_in_table(cls, tuple_dict): """ Check if a given table has an entry that meets the tuple_dict condition. If exists, return True else False Parameters: table: datajoint table reference tuple_dict: dictionary containing the values of the table attributes Returns: bool: whether the table has the entry that meets tuple_dict or not """ # Check if the table is empty if so return False if len(cls()) == 0: return False # Check if the tuple_dict has the right primary key and attributes assert(set(tuple_dict.keys()).issubset(set(cls.heading.names))),\ "tuple dict has either wrong primary key or attributes (or both)!" restriction_tuple_dict = None # Restrict only non blob columns if cls.heading.blobs: if not restriction_tuple_dict: restriction_tuple_dict = tuple_dict.copy() for attribute in cls.heading.blobs: del restriction_tuple_dict[attribute] # Find float values and compare them float_attributes = [] for key, value in tuple_dict.items(): if type(value) == float: float_attributes.append(key) # Remove float attributes from restriction if float_attributes: if not restriction_tuple_dict: restriction_tuple_dict = tuple_dict.copy() for attribute in float_attributes: del restriction_tuple_dict[attribute] # Check if there is any blobs or floats if not not then set restriction_tuple_dict to tuple_dict if not (cls.heading.blobs or float_attributes): restriction_tuple_dict = tuple_dict # Query the table with the restriction restriction_rel = cls() & restriction_tuple_dict # If float attributes exists compare them if float_attributes: qualifed_keys = [] for tuple_entry in restriction_rel.proj(*float_attributes): is_about_the_same_value = True for attribute_name in float_attributes: if not np.isclose(tuple_dict[attribute_name], tuple_entry[attribute_name]): is_about_the_same_value = False break if is_about_the_same_value: dict_to_insert = dict() for primary_attribute_name in cls.primary_key: dict_to_insert[primary_attribute_name] = tuple_entry[primary_attribute_name] qualifed_keys.append(dict_to_insert) # Redo the restriction restriction_rel = restriction_rel & qualifed_keys if len(restriction_rel) == 1: return True elif len(restriction_rel) == 0: return False else: # there are more than 1 entries that meet the criteria raise ValueError("The given restriction gives more than 1 possible entry in the table!")
[docs] @classmethod def insert_tuples(cls, tuple_dicts): """ Insert a tuple by checking the highest id number based on "primary_key_id" When inserting, increment "primary_key_id" by 1 Parameters: tuple_dict: either a dictionary or a list of dictionaries containing the values of the class attributes Returns: None """ # If tuple_dicts is a single dict then put it in a list if isinstance(tuple_dicts, dict): tuple_dicts = [tuple_dicts] # Check if table has blobs columns blob_attributes = cls.heading.blobs if blob_attributes: for tuple_dict in tuple_dicts: blob_only_tuple_dict = dict() for attribute in blob_attributes: blob_only_tuple_dict[attribute] = tuple_dict[attribute] # Hash the blobs tuple_dict[cls.table_name + '_blobs_md5_hash'] = cls.compute_md5_hash(blob_only_tuple_dict) # Variable to store tuple dicts that should be inserted tuple_dicts_to_insert = [] # Get the primary key column name where id is in the name primary_key_column_id_name = cls.heading.primary_key[np.where('id' in column_name for column_name in cls.heading.primary_key)[0][0]] # Variable to store current highest id current_highest_id = cls.fetch(primary_key_column_id_name, order_by="{} DESC".format(primary_key_column_id_name), limit=1) if current_highest_id.size == 0: # Table has no entry in it yet. Initialize to 0 current_highest_id = -1 else: current_highest_id = current_highest_id[0] for tuple_dict in tuple_dicts: if not cls.check_if_tuple_in_table(tuple_dict): # Tuple is not in table, thus increment current_highest_id by 1 and add it to the current tuple_dict as the new id number current_highest_id += 1 tuple_dict[primary_key_column_id_name] = current_highest_id tuple_dicts_to_insert.append(tuple_dict) else: print('Tuple_dict:', tuple_dict, 'already exists! skipping the insertion') # Insert all tuple_dicts in tuple_dicts_to_insert cls.insert(tuple_dicts_to_insert)
@classmethod def get_remaining_keys_to_populate(cls): return (cls.key_source - (schema.jobs & dict(table_name=cls.table_name)).fetch('key')).fetch('KEY')