import datajoint as datajoint
import numpy as np
import json
import hashlib
import datajoint as dj
import importlib
schema = dj.schema('synicix_dev')
[docs]class BaseTable():
"""
Parent table for all datajoint classes which contains some useful function typcially used in tables and the schema pointer
All datajoint tables part of the main framework will inherite from this table.
"""
[docs] @staticmethod
def import_class_from_module(module_name, class_name):
"""
Helper function to handle import of classes from modules
Parameters:
module_name (str): name of the module containing the target class to import
model_class_name (str): name of class to import from module
Returns:
user_defined_class: class imported from module
"""
if module_name is not '' and class_name is not '':
module = importlib.import_module(module_name)
return getattr(module, class_name)
[docs] @staticmethod
def compute_md5_hash(tuple_dict):
"""
Utility helper function to compute the md5 hash given the tuple_dict
Parameters:
tuple_dict (dict): dictionary to hash
Returns:
str: 128 byte md5 hash string
"""
string_to_hash = ""
for _, data in tuple_dict.items():
string_to_hash += (str(data))
return hashlib.md5(string_to_hash.encode()).hexdigest()
[docs] @classmethod
def check_if_tuple_in_table(cls, tuple_dict):
"""
Check if a given table has an entry that meets the tuple_dict condition.
If exists, return True else False
Parameters:
table: datajoint table reference
tuple_dict: dictionary containing the values of the table attributes
Returns:
bool: whether the table has the entry that meets tuple_dict or not
"""
# Check if the table is empty if so return False
if len(cls()) == 0:
return False
# Check if the tuple_dict has the right primary key and attributes
assert(set(tuple_dict.keys()).issubset(set(cls.heading.names))),\
"tuple dict has either wrong primary key or attributes (or both)!"
restriction_tuple_dict = None
# Restrict only non blob columns
if cls.heading.blobs:
if not restriction_tuple_dict:
restriction_tuple_dict = tuple_dict.copy()
for attribute in cls.heading.blobs:
del restriction_tuple_dict[attribute]
# Find float values and compare them
float_attributes = []
for key, value in tuple_dict.items():
if type(value) == float:
float_attributes.append(key)
# Remove float attributes from restriction
if float_attributes:
if not restriction_tuple_dict:
restriction_tuple_dict = tuple_dict.copy()
for attribute in float_attributes:
del restriction_tuple_dict[attribute]
# Check if there is any blobs or floats if not not then set restriction_tuple_dict to tuple_dict
if not (cls.heading.blobs or float_attributes):
restriction_tuple_dict = tuple_dict
# Query the table with the restriction
restriction_rel = cls() & restriction_tuple_dict
# If float attributes exists compare them
if float_attributes:
qualifed_keys = []
for tuple_entry in restriction_rel.proj(*float_attributes):
is_about_the_same_value = True
for attribute_name in float_attributes:
if not np.isclose(tuple_dict[attribute_name], tuple_entry[attribute_name]):
is_about_the_same_value = False
break
if is_about_the_same_value:
dict_to_insert = dict()
for primary_attribute_name in cls.primary_key:
dict_to_insert[primary_attribute_name] = tuple_entry[primary_attribute_name]
qualifed_keys.append(dict_to_insert)
# Redo the restriction
restriction_rel = restriction_rel & qualifed_keys
if len(restriction_rel) == 1:
return True
elif len(restriction_rel) == 0:
return False
else: # there are more than 1 entries that meet the criteria
raise ValueError("The given restriction gives more than 1 possible entry in the table!")
[docs] @classmethod
def insert_tuples(cls, tuple_dicts):
"""
Insert a tuple by checking the highest id number based on "primary_key_id"
When inserting, increment "primary_key_id" by 1
Parameters:
tuple_dict: either a dictionary or a list of dictionaries containing
the values of the class attributes
Returns:
None
"""
# If tuple_dicts is a single dict then put it in a list
if isinstance(tuple_dicts, dict):
tuple_dicts = [tuple_dicts]
# Check if table has blobs columns
blob_attributes = cls.heading.blobs
if blob_attributes:
for tuple_dict in tuple_dicts:
blob_only_tuple_dict = dict()
for attribute in blob_attributes:
blob_only_tuple_dict[attribute] = tuple_dict[attribute]
# Hash the blobs
tuple_dict[cls.table_name + '_blobs_md5_hash'] = cls.compute_md5_hash(blob_only_tuple_dict)
# Variable to store tuple dicts that should be inserted
tuple_dicts_to_insert = []
# Get the primary key column name where id is in the name
primary_key_column_id_name = cls.heading.primary_key[np.where('id' in column_name for column_name in cls.heading.primary_key)[0][0]]
# Variable to store current highest id
current_highest_id = cls.fetch(primary_key_column_id_name, order_by="{} DESC".format(primary_key_column_id_name), limit=1)
if current_highest_id.size == 0:
# Table has no entry in it yet. Initialize to 0
current_highest_id = -1
else:
current_highest_id = current_highest_id[0]
for tuple_dict in tuple_dicts:
if not cls.check_if_tuple_in_table(tuple_dict):
# Tuple is not in table, thus increment current_highest_id by 1 and add it to the current tuple_dict as the new id number
current_highest_id += 1
tuple_dict[primary_key_column_id_name] = current_highest_id
tuple_dicts_to_insert.append(tuple_dict)
else:
print('Tuple_dict:', tuple_dict, 'already exists! skipping the insertion')
# Insert all tuple_dicts in tuple_dicts_to_insert
cls.insert(tuple_dicts_to_insert)
@classmethod
def get_remaining_keys_to_populate(cls):
return (cls.key_source - (schema.jobs & dict(table_name=cls.table_name)).fetch('key')).fetch('KEY')