Source code for nexusLIMS.utils

# This file has been co-edited by both Euclid Techlabs and NIST.
# For LICENSING information, please refer to the LICENSE file in the root directory of NexusLIMS

from lxml import etree as _etree
import sqlite3 as sql3
import certifi as _certifi
import tempfile as _tempfile
import os as _os
import subprocess as _sp
from datetime import timedelta as _timedelta
from os.path import getmtime as _getmtime
import contextlib
import pytz
import logging as _logging
import sys as _sys
import ldap3

import nexusLIMS

_logger = _logging.getLogger(__name__)
_logger.setLevel(_logging.INFO)

# hours to add to datetime objects (hack for poole testing -- should be -2 if
# running tests from Mountain Time on files in Eastern Time)
tz_offset = _timedelta(hours=0)

CONFIG = nexusLIMS.get_config()

[docs]def local_datetime(dt, tz): """ Convert an UTC datetime to a local datetime. Parameters ---------- dt : :py:class:~datetime.datetime Datetime in the UTC timezone. tz : str Local timezone information. e.g. "America/Chicago". Returns ------- local_dt : :py:class:~datetime.datetime New datetime in the local timezone. """ local_tz = pytz.timezone(tz) iso_dt = dt.replace(tzinfo=pytz.utc) local_dt = iso_dt.astimezone(local_tz) return local_dt
[docs]def setup_loggers(log_level): """ Set logging level of all NexusLIMS loggers Parameters ---------- log_level : int The level of logging, such as ``logging.DEBUG`` """ _logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s: ' '%(message)s', level=log_level) loggers = [_logging.getLogger(name) for name in _logging.root.manager.loggerDict if 'nexusLIMS' in name] for logger in loggers: logger.setLevel(log_level)
[docs]def parse_xml(xml, xslt_file, **kwargs): """ Parse and translate an XML string from the API into a nicer format Parameters ---------- xml : str or bytes A string containing XML, such as that returned by :py:func:`~.fetch_xml` xslt_file : str or io.BytesIO Path to the XSLT file to use for transformation **kwargs : dict, optional Other keyword arguments are passed as parameters to the XSLT transformer. ``None`` values are converted to an empty string. Returns ------- simplified_dom : :py:class:`lxml.etree._XSLTResultTree` """ for key, value in kwargs.items(): kwargs[key] = "''" if value is None else f"'{value}'" parser = _etree.XMLParser(remove_blank_text=True, encoding='utf-8') # load XML structure from string root = _etree.fromstring(xml, parser) # use LXML to load XSLT stylesheet into xsl_transform # (note, etree.XSLT needs to be called on a root _Element # not an _ElementTree) xsl_dom = _etree.parse(xslt_file, parser).getroot() xsl_transform = _etree.XSLT(xsl_dom) # do XSLT transformation try: simplified_dom = xsl_transform(root, **kwargs) except _etree.XSLTApplyError: for error in xsl_transform.error_log: print(error.message, error.line) raise _etree.XSLTApplyError("Error in parse_xml") return simplified_dom
[docs]def nexus_req(url, fn, basic_auth=False, **kwargs): """ A helper method that wraps a function from :py:mod:`requests`, but adds a local certificate authority chain to validate the SharePoint server's certificates and authenticates using NTLM. Parameters ---------- url : str The URL to fetch fn : function The function from the ``requests`` library to use (e.g. :py:func:`~requests.get`, :py:func:`~requests.put`, :py:func:`~requests.post`, etc.) basic_auth : bool If True, use only username and password for authentication rather than NTLM (like what is used for CDCS access rather than for NIST network resources) **kwargs : dict, optional Other keyword arguments are passed along to the ``fn`` Returns ------- r : :py:class:`requests.Response` A requests response object """ from .harvester.sharepoint_calendar import CA_BUNDLE_PATH, get_auth with _tempfile.NamedTemporaryFile() as tmp: with open(_certifi.where(), 'rb') as sys_cert: lines = sys_cert.readlines() tmp.writelines(lines) with open(CA_BUNDLE_PATH, 'rb') as our_cert: lines = our_cert.readlines() tmp.writelines(lines) tmp.seek(0) r = fn(url, auth=get_auth(basic=basic_auth), verify=tmp.name, **kwargs) return r
[docs]def is_subpath(path, of_paths): """ Helper function to determine if a given path is a "subpath" of a set of paths. Useful to help determine which instrument a given file comes from, given the instruments ``filestore_path`` and the path of the file to test. Parameters ---------- path : str The path of the file (or directory) to test. This will usually be the absolute path to a file on the local filesystem (to be compared using the host-specific ``mmf_nexus_root_path``. of_paths : str or list The "higher-level" path to test against (or list thereof). In typical use, this will be a path joined of an instruments ``filestore_path`` with the root-level ``mmf_nexus_root_path`` Returns ------- result : bool Whether or not path is a subpath of one of the directories in of_paths Examples -------- >>> is_subpath('/mnt/**REMOVED**_mmfnexus/Titan/**REMOVED**/190628 - **REMOVED** ' + ... 'Training/6_28_2019 Box6 4S/4_330mm.dm3', ... os.path.join(CONFIG['mmfnexus_path'], ... titan.filestore_path)) True """ if isinstance(of_paths, str): of_paths = [of_paths] abs_of_paths = [_os.path.abspath(of_path) for of_path in of_paths] result = any(_os.path.abspath(path).startswith(subpath) for subpath in abs_of_paths) return result
[docs]def get_from_db(query): """ Get contents and column names from a table in the NexusLIMS database file. Parameters ---------- query: str Query for the database. e.g. "SELECT * from instruments Returns ------- results: list Fetched all (remaining) rows of a query result defined in the sqlite3's cursor. The list is empty if fetching failed. col_names: list A list of column names defined in the table. Note that the list is different from the 7-tuple defined in the original cursor.description. Only the first item in the 7-tuple is saved in the list. """ # use contextlib to auto-close the connection and database cursors with contextlib.closing(sql3.connect( CONFIG['nexusLIMS_db_path'])) as conn: with conn: # auto-commits with contextlib.closing(conn.cursor()) as cursor: # auto-closes results = cursor.execute(query).fetchall() col_names = list(map(lambda x: x[0], cursor.description)) return results, col_names
[docs]def get_nested_dict_value(nested_dict, value, prepath=()): """ Use a recursive method to find a value in a dictionary of dictionaries (such as the metadata dictionaries we receive from the file parsers). Cribbed from: https://stackoverflow.com/a/22171182/1435788 Parameters ---------- nested_dict : dict Dictionary to search value : object Value to search for prepath : tuple "path" to prepend to the search to limit the search to only part of the dictionary Returns ------- path : tuple or None The "path" through the dictionary (expressed as a tuple of keys) where value was found. If None, the value was not found in the dictionary. """ for k, v in nested_dict.items(): path = prepath + (k,) if v == value: # found value return path elif hasattr(v, 'items'): # v is a dict p = get_nested_dict_value(v, value, path) # recursive call if p is not None: return p
[docs]def get_nested_dict_key(nested_dict, key_to_find, prepath=()): """ Use a recursive method to find a key in a dictionary of dictionaries (such as the metadata dictionaries we receive from the file parsers). Cribbed from: https://stackoverflow.com/a/22171182/1435788 Parameters ---------- nested_dict : dict Dictionary to search key_to_find : object Value to search for prepath : tuple "path" to prepend to the search to limit the search to only part of the dictionary Returns ------- path : tuple or None The "path" through the dictionary (expressed as a tuple of keys) where value was found. If None, the value was not found in the dictionary. """ for k, v in nested_dict.items(): path = prepath + (k,) if k == key_to_find: # found key return path elif hasattr(v, 'items'): # v is a dict p = get_nested_dict_key(v, key_to_find, path) # recursive call if p is not None: return p
[docs]def get_nested_dict_value_by_path(nest_dict, path): """ Get the value from within a nested dictionary structure by traversing into the dictionary as deep as that path found and returning that value Parameters ---------- nest_dict : dict A dictionary of dictionaries that is to be queried path : tuple A tuple (or other iterable type) that specifies the subsequent keys needed to get to a a value within `nest_dict` Returns ------- value : object or str The value at the path within the nested dictionary; if there's no value there, return the string `"not found"` """ sub_dict = nest_dict for key in path: if key in sub_dict: sub_dict = sub_dict[key] else: sub_dict = 'not found' return sub_dict
[docs]def set_nested_dict_value(nest_dict, path, value): """ Set a value within a nested dictionary structure by traversing into the dictionary as deep as that path found and changing it to `value`. Cribbed from https://stackoverflow.com/a/13688108/1435788 Parameters ---------- nest_dict : dict A dictionary of dictionaries that is to be queried path : tuple A tuple (or other iterable type) that specifies the subsequent keys needed to get to a a value within `nest_dict` value : object The value which will be given to the path in the nested dictionary Returns ------- value : object The value at the path within the nested dictionary """ for key in path[:-1]: nest_dict = nest_dict.setdefault(key, {}) nest_dict[path[-1]] = value
[docs]def try_getting_dict_value(d, key): """ This method will try to get a value from a dictionary (potentially nested) and fail silently if the value is not found, returning None. Parameters ---------- d : dict The dictionary from which to get a value key : str or tuple The key to query, or if an iterable container type (tuple, list, etc.) is given, the path into a nested dictionary to follow Returns ------- val : object or str The value of the dictionary specified by `key`. If the dictionary does not have a key, returns the string `"not found"` without raising an error """ try: if isinstance(key, str): return d[key] elif hasattr(key, '__iter__'): return get_nested_dict_value_by_path(d, key) except (KeyError, TypeError) as e: return 'not found'
[docs]def find_dirs_by_mtime(path, dt_from, dt_to): """ Given two timestamps, find the directories under a path that were last modified between the two .. deprecated:: 0.0.9 `find_dirs_by_mtime` is not recommended for use to find files for record inclusion, because subsequent modifications to a directory (e.g. the user wrote a text file or did some analysis afterwards) means no files will be returned from that directory (because it is not searched) Parameters ---------- path : str The root path from which to start the search dt_from : datetime.datetime The "starting" point of the search timeframe dt_to : datetime.datetime The "ending" point of the search timeframe Returns ------- dirs : :obj:`list` of :obj:`str` A list of the directories that have modification times within the time range provided """ dirs = [] # adjust the datetime objects with the tz_offset (usually should be 0) dt_from += tz_offset dt_to += tz_offset # use os.walk and only inspect the directories for mtime (much fewer # comparisons than looking at every file): _logger.info(f'Finding directories modified between {dt_from.isoformat()} ' f'and {dt_to.isoformat()}') for dirpath, _, _ in _os.walk(path): if dt_from.timestamp() < _getmtime(dirpath) < dt_to.timestamp(): dirs.append(dirpath) return dirs
[docs]def find_files_by_mtime(path, dt_from, dt_to): """ Given two timestamps, find files under a path that were last modified between the two. Parameters ---------- path : str The root path from which to start the search dt_from : datetime.datetime The "starting" point of the search timeframe dt_to : datetime.datetime The "ending" point of the search timeframe Returns ------- files : list A list of the files that have modification times within the time range provided (sorted by modification time) """ # find only the directories that have been modified between these two # timestamps (should be much faster than inspecting all files) # Note: this doesn't work reliably, so just look in entire path... # dirs = find_dirs_by_mtime(path, dt_from, dt_to) dirs = [path] # adjust the datetime objects with the tz_offset (usually should be 0) dt_from += tz_offset dt_to += tz_offset files = set() # use a set here (faster and we won't have duplicates) # for each of those directories, walk the file tree and inspect the # actual files: for d in dirs: for dirpath, _, filenames in _os.walk(d): for f in filenames: fname = _os.path.abspath(_os.path.join(dirpath, f)) if dt_from.timestamp() < _getmtime(fname) < dt_to.timestamp(): files.add(fname) # convert the set to a list and sort my mtime files = list(files) files.sort(key=_getmtime) return files
[docs]def gnu_find_files_by_mtime(path, dt_from, dt_to, extensions): """ Given two timestamps, find files under a path that were last modified between the two. Uses the system-provided GNU ``find`` command. In basic testing, this method was found to be approximately 3 times faster than using :py:meth:`find_files_by_mtime` (which is implemented in pure Python). Parameters ---------- path : str The root path from which to start the search dt_from : datetime.datetime The "starting" point of the search timeframe dt_to : datetime.datetime The "ending" point of the search timeframe extensions : :obj:`list` of :obj:`str` A list of strings representing the extensions to find Returns ------- files : :obj:`list` of :obj:`str` A list of the files that have modification times within the time range provided (sorted by modification time) Raises ------ NotImplementedError If the system running this code is not Linux-based RuntimeError If the find command cannot be found, or running it results in output to `stderr` """ _logger.info(f'Using GNU `find` to search for files') # Verify we're running on Linux if not _sys.platform.startswith('linux'): raise NotImplementedError('gnu_find_files_by_mtime only implemented ' 'for Linux') def _which(fname): def _is_exec(f): return _os.path.isfile(f) and _os.access(f, _os.X_OK) # Check to see if find command is on PATH: exec_file = fname for p in _os.environ["PATH"].split(_os.pathsep): exe_file = _os.path.join(p, exec_file) if _is_exec(exe_file): return exe_file return False if not _which('find'): raise RuntimeError('find command was not found on the system PATH') # adjust the datetime objects with the tz_offset (usually should be 0) dt_from += tz_offset dt_to += tz_offset # Actually run find command (ignoring mib files if specified by # environment variable): filetype_regex = '|'.join(extensions) cmd = f'find {_os.path.join(CONFIG["mmfnexus_path"], path)} ' + \ f'-type f ' + \ f'-regextype posix-egrep ' + \ f'-regex ".*\\.({filetype_regex})$" ' + \ f'-newermt "{dt_from.isoformat()}" ' + \ f'\\! -newermt "{dt_to.isoformat()}" ' + \ (f'\\! -name "*.mib" ' if CONFIG.get("ignore_mib") else '') + \ f'-print0' _logger.info(f'Running via subprocess: "{cmd}"') out = _sp.Popen(cmd, shell=True, stdin=_sp.PIPE, stdout=_sp.PIPE, stderr=_sp.PIPE) (stdout, stderr) = out.communicate() if len(stderr) > 0: # find command returned an error raise RuntimeError(stderr) files = stdout.split(b'\x00') files = [f.decode() for f in files if len(f) > 0] # convert to set and back to remove duplicates and sort my mtime files = list(set(files)) files.sort(key=_getmtime) return files
def _sort_dict(item): return {k: _sort_dict(v) if isinstance(v, dict) else v for k, v in sorted(item.items(), key=lambda i: i[0].lower())} def _remove_dtb_element(tree, path): """ Helper method that uses exec to delete a specific leaf of a DictionaryTreeBrowser using a string Parameters ---------- tree : :py:class:`~hyperspy.misc.utils.DictionaryTreeBrowser` the ``DictionaryTreeBrowser`` object to remove the object from path : str period-delimited path to a DTB element Returns ------- tree : :py:class:`~hyperspy.misc.utils.DictionaryTreeBrowser` """ to_del = 'tree.{}'.format(path) try: exec('del {}'.format(to_del)) except AttributeError as _: # Log the failure and continue _logger.debug('_remove_dtb_element: Could not find {}'.format(to_del)) return tree def _zero_bytes(fname, bytes_from, bytes_to): """ A helper method to set certain byte locations within a file to zero, which can help for creating highly-compressible test files Parameters ---------- fname : str bytes_from : int or :obj:`list` of str The position of the file (in decimal) at which to start zeroing bytes_to : int or :obj:`list` of str The position of the file (in decimal) at which to stop zeroing. If list, must be the same length as list given in ``bytes_from`` Returns ------- new_fname : str The modified file that has it's bytes zeroed """ from shutil import copyfile filename, ext = _os.path.splitext(fname) if fname.endswith('.ser'): index = int(filename.split('_')[-1]) basename = '_'.join(filename.split('_')[:-1]) new_fname = f'{basename}_dataZeroed_{index}{ext}' else: new_fname = f'{filename}_dataZeroed{ext}' copyfile(fname, new_fname) if isinstance(bytes_from, int): bytes_from = [bytes_from] bytes_to = [bytes_to] with open(new_fname, 'r+b') as f: for bf, bt in zip(bytes_from, bytes_to): f.seek(bf) f.write(b'\0' * (bt - bf)) return new_fname def _get_timespan_overlap(range_1, range_2): """ Find the amount of overlap between two time spans. Adapted from https://stackoverflow.com/a/9044111 Parameters ---------- range_1 : :obj:`tuple` of :py:class:`~datetime.datetime` Tuple of length 2 of datetime objects: first is the start of the time range and the second is the end of the time range range_2 Tuple of length 2 of datetime objects: first is the start of the time range and the second is the end of the time range Returns ------- overlap : :py:class:`~datetime.timedelta` The amount of overlap between the time ranges """ latest_start = max(range_1[0], range_2[0]) earliest_end = min(range_1[1], range_2[1]) delta = earliest_end - latest_start overlap = max(_timedelta(0), delta) return overlap # # # NIST-specific area # #
[docs]def get_nist_div_and_group(username): """ Query the NIST active directory to get division and group information for a user. Parameters ---------- username : str a valid NIST username (the short format: e.g. "ear1" instead of ernst.august.ruska@nist.gov). Returns ------- div, group : str The division and group numbers for the user (as strings) """ server = ldap3.Server(nexusLIMS.ldap_url) with ldap3.Connection(server, auto_bind=True) as conn: conn.search('ou=people,dc=ndir,dc=nist,dc=gov', f'(otherMailbox={username}@email.nist.gov)', attributes=['*']) res = conn.entries[0] div = res.nistdivisionnumber.value group = res.nistgroupnumber.value return div, group