# NIST Public License - 2019
#
# This software was developed by employees of the National Institute of
# Standards and Technology (NIST), an agency of the Federal Government
# and is being made available as a public service. Pursuant to title 17
# United States Code Section 105, works of NIST employees are not subject
# to copyright protection in the United States. This software may be
# subject to foreign copyright. Permission in the United States and in
# foreign countries, to the extent that NIST may hold copyright, to use,
# copy, modify, create derivative works, and distribute this software and
# its documentation without fee is hereby granted on a non-exclusive basis,
# provided that this notice and disclaimer of warranty appears in all copies.
#
# THE SOFTWARE IS PROVIDED 'AS IS' WITHOUT ANY WARRANTY OF ANY KIND,
# EITHER EXPRESSED, IMPLIED, OR STATUTORY, INCLUDING, BUT NOT LIMITED
# TO, ANY WARRANTY THAT THE SOFTWARE WILL CONFORM TO SPECIFICATIONS, ANY
# IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
# AND FREEDOM FROM INFRINGEMENT, AND ANY WARRANTY THAT THE DOCUMENTATION
# WILL CONFORM TO THE SOFTWARE, OR ANY WARRANTY THAT THE SOFTWARE WILL BE
# ERROR FREE. IN NO EVENT SHALL NIST BE LIABLE FOR ANY DAMAGES, INCLUDING,
# BUT NOT LIMITED TO, DIRECT, INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES,
# ARISING OUT OF, RESULTING FROM, OR IN ANY WAY CONNECTED WITH THIS SOFTWARE,
# WHETHER OR NOT BASED UPON WARRANTY, CONTRACT, TORT, OR OTHERWISE, WHETHER
# OR NOT INJURY WAS SUSTAINED BY PERSONS OR PROPERTY OR OTHERWISE, AND
# WHETHER OR NOT LOSS WAS SUSTAINED FROM, OR AROSE OUT OF THE RESULTS OF,
# OR USE OF, THE SOFTWARE OR SERVICES PROVIDED HEREUNDER.
#
"""
**Attributes**
Attributes
----------
XSLT_PATH : str
The path to ``cal_events_to_nx_record.xsl``, which is used to translate
the calender event response XML to a format compatible with the Nexus Schema
"""
import os as _os
import logging as _logging
import pathlib as _pathlib
import shutil as _shutil
import sys as _sys
import argparse as _ap
from uuid import uuid4 as _uuid4
from lxml import etree as _etree
from datetime import datetime as _datetime
from io import BytesIO as _bytesIO
import nexusLIMS
import nexusLIMS.schemas.activity as _activity
from nexusLIMS.schemas.activity import AcquisitionActivity as _AcqAc
from nexusLIMS.schemas.activity import cluster_filelist_mtimes
from nexusLIMS.harvester import sharepoint_calendar as _sp_cal
from nexusLIMS.utils import parse_xml as _parse_xml
from nexusLIMS.utils import find_files_by_mtime as _find_files
from nexusLIMS.utils import gnu_find_files_by_mtime as _gnu_find_files
from nexusLIMS.extractors import extension_reader_map as _ext
from nexusLIMS.db.session_handler import get_sessions_to_build as _get_sessions
from nexusLIMS.cdcs import upload_record_files as _upload_record_files
from timeit import default_timer as _timer
_logger = _logging.getLogger(__name__)
XSLT_PATH = _os.path.join(_os.path.dirname(__file__),
"cal_events_to_nx_record.xsl")
XSD_PATH = _os.path.join(_os.path.dirname(_activity.__file__),
"nexus-experiment.xsd")
CONFIG = nexusLIMS.get_config()
[docs]def build_record(instrument, dt_from, dt_to,
user=None,
sample_id=None,
generate_previews=True):
"""
Construct an XML document conforming to the NexusLIMS schema from a
directory containing microscopy data files. For calendar parsing,
currently no logic is implemented for a query that returns multiple records
Parameters
----------
instrument : :py:class:`~nexusLIMS.instruments.Instrument`
One of the NexusLIMS instruments contained in the database.
Controls what instrument calendar is used to get events.
dt_from : datetime.datetime
dt_to : datetime.datetime
The
dt_from : :py:class:`~datetime.datetime` or None
A :py:class:`~datetime.datetime` object representing the starting
timestamp that will be used to determine which files go in this
record, as in :py:func:`~.sharepoint_calendar.fetch_xml`.
dt_to : :py:class:`~datetime.datetime` or None
A :py:class:`~datetime.datetime` object representing the ending
timestamp used to determine the last point in time for which
files should be associated with this record, as in
:py:func:`~.sharepoint_calendar.fetch_xml`.
user : str or None
A valid NIST username (the short format: e.g. "ear1"
instead of ernst.august.ruska@nist.gov). Controls the results
returned from the calendar - value is as specified in
:py:func:`~.sharepoint_calendar.get_events`
sample_id : str or None
A unique identifier pointing to a sample identifier for data
collected in this record. If None, a UUIDv4 will be generated
generate_previews : bool
Whether or not to create the preview thumbnail images
Returns
-------
xml_record : str
A formatted string containing a well-formed and valid XML document
for the data contained in the provided path
"""
xml_record = ''
if sample_id is None:
sample_id = str(_uuid4())
# Insert XML prolog, XSLT reference, and namespaces.
xml_record += "<?xml version=\"1.0\" encoding=\"UTF-8\"?> \n"
# TODO: Header elements may be changed once integration into CDCS determined
xml_record += "<?xml-stylesheet type=\"text/xsl\" href=\"\"?>\n"
xml_record += "<nx:Experiment xmlns=\"\"\n"
xml_record += "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
xml_record += "xmlns:nx=\"" \
"https://data.nist.gov/od/dm/nexus/experiment/v1.0\">\n"
_logger.info(f"Getting calendar events with instrument: {instrument.name}, "
f"from {dt_from.isoformat()} to {dt_to.isoformat()}, "
f"user: {user}")
# events_str = _sp_cal.get_events(instrument=instrument, dt_from=dt_from,
# dt_to=dt_to, user=user, wrap=True)
# Apply XSLT to transform calendar events to single record format:
# output = _parse_xml(events_str, XSLT_PATH,
# instrument_PID=instrument.name,
# instrument_name=instrument.schema_name,
# experiment_id=str(_uuid4()),
# collaborator=None,
# sample_id=sample_id)
output = '' # FIXME bypass calendar query for now.
# No calendar events were found
if str(output) == '':
output = f'<title>Experiment on the {instrument.schema_name}' \
f' on {dt_from.strftime("%A %b. %d, %Y")}</title>\n' + \
'<id/>\n' + \
'<summary>\n' + \
f' <instrument pid="{instrument.name}">' \
f'{instrument.schema_name}</instrument>\n' + \
'</summary>\n'
xml_record += str(output)
_logger.info(f"Building acquisition activities for timespan from "
f"{dt_from.isoformat()} to {dt_to.isoformat()}")
aa_str, activities = build_acq_activities(instrument,
dt_from, dt_to, sample_id,
generate_previews)
xml_record += aa_str
xml_record += "</nx:Experiment>" # Add closing tag for root element.
return xml_record
[docs]def build_acq_activities(instrument, dt_from, dt_to,
sample_id, generate_previews):
"""
Build an XML string representation of each AcquisitionActivity for a
single microscopy session. This includes setup parameters and metadata
associated with each dataset obtained during a microscopy session. Unique
AcquisitionActivities are delimited via clustering of file collection
time to detect "long" breaks during a session.
Parameters
----------
instrument : :py:class:`~nexusLIMS.instruments.Instrument`
One of the NexusLIMS instruments contained in the database.
Controls what instrument calendar is used to get events.
dt_from : datetime.datetime
The starting timestamp that will be used to determine which files go
in this record
dt_to : datetime.datetime
The ending timestamp used to determine the last point in time for
which files should be associated with this record
sample_id : str
An identifier for the sample from which data was collected
generate_previews : bool
Whether or not to create the preview thumbnail images
Returns
-------
acq_activities : str
A string representing the XML output for each AcquisitionActivity
associated with a given reservation/experiment on a microscope.
activities : :obj:`list` of :obj:`~nexusLIMS.schemas.activity.AcquisitionActivity`:
The list of :py:class:`~nexusLIMS.schemas.activity.AcquisitionActivity`
objects generated for the record
"""
_logging.getLogger('hyperspy.io_plugins.digital_micrograph').setLevel(
_logging.WARNING)
start_timer = _timer()
path = _os.path.abspath(_os.path.join(CONFIG['mmfnexus_path'],
instrument.filestore_path))
# find the files to be included
files = get_files(path, dt_from, dt_to)
# remove all files but those supported by nexusLIMS.extractors
files = [f for f in files if _os.path.splitext(f)[1].strip('.') in
_ext.keys()]
end_timer = _timer()
_logger.info(f'Found {len(files)} files in'
f' {end_timer - start_timer:.2f} seconds')
# return a string indicating no files found if none were found
if len(files) == 0:
raise FileNotFoundError('No files found in this time range')
# get the timestamp boundaries of acquisition activities
aa_bounds = cluster_filelist_mtimes(files)
# add the last file's modification time to the boundaries list to make
# the loop below easier to process
aa_bounds.append(_os.path.getmtime(files[-1]))
activities = [None] * len(aa_bounds)
i = 0
aa_idx = 0
while i < len(files):
f = files[i]
mtime = _os.path.getmtime(f)
# check this file's mtime, if it is less than this iteration's value
# in the AA bounds, then it belongs to this iteration's AA
# if not, then we should move to the next activity
if mtime <= aa_bounds[aa_idx]:
# if current activity index is None, we need to start a new AA:
if activities[aa_idx] is None:
start_time = _datetime.fromtimestamp(mtime)
activities[aa_idx] = _AcqAc(start=start_time)
# add this file to the AA
_logger.info(
f'Adding file {i}/{len(files)} '
f'{f.replace(CONFIG["mmfnexus_path"], "").strip("/")} '
f'to activity {aa_idx}')
activities[aa_idx].add_file(f, generate_previews)
# assume this file is the last one in the activity (this will be
# true on the last iteration where mtime is <= to the
# aa_bounds value)
activities[aa_idx].end = _datetime.fromtimestamp(mtime)
i += 1
else:
# this file's mtime is after the boundary and is thus part of the
# next activity, so increment AA counter and reprocess file (do
# not increment i)
aa_idx += 1
acq_activities_str = ''
_logger.info('Finished detecting activities')
for i, a in enumerate(activities):
# aa_logger = _logging.getLogger('nexusLIMS.schemas.activity')
# aa_logger.setLevel(_logging.ERROR)
_logger.info(f'Activity {i}: storing setup parameters')
a.store_setup_params()
_logger.info(f'Activity {i}: storing unique metadata values')
a.store_unique_metadata()
acq_activities_str += a.as_xml(i, sample_id,
indent_level=1, print_xml=False)
return acq_activities_str, activities
[docs]def get_files(path, dt_from, dt_to):
"""
Get list of files under a path that were last modified between the two
given timestamps.
Parameters
----------
path : str
The file path in which to search for files
dt_from : datetime.datetime
The starting timestamp that will be used to determine which files go
in this record
dt_to : datetime.datetime
The ending timestamp used to determine the last point in time for
which files should be associated with this record
Returns
-------
files : :obj:`list` of :obj:`str`
A list of the files that have modification times within the
time range provided (sorted by modification time)
"""
_logger.info(f'Starting new file-finding in {path}')
try:
files = _gnu_find_files(path, dt_from, dt_to, _ext.keys())
except (NotImplementedError, RuntimeError) as e:
_logger.warning(f'GNU find returned error: {e}\nFalling back to pure '
f'Python implementation')
files = _find_files(path, dt_from, dt_to)
return files
[docs]def dump_record(instrument,
dt_from,
dt_to,
filename=None,
user=None,
generate_previews=True):
"""
Writes an XML record composed of information pulled from the Sharepoint
calendar as well as metadata extracted from the microscope data (e.g. dm3
files).
Parameters
----------
instrument : :py:class:`~nexusLIMS.instruments.Instrument`
One of the NexusLIMS instruments contained in the database.
Controls what instrument calendar is used to get events.
dt_from : datetime.datetime
The starting timestamp that will be used to determine which files go
in this record
dt_to : datetime.datetime
The ending timestamp used to determine the last point in time for
which files should be associated with this record
filename : None or str
The filename of the dumped xml file to write. If None, a default name
will be generated from the other parameters
user : str
A string which corresponds to the NIST user who performed the
microscopy experiment
generate_previews : bool
Whether or not to create the preview thumbnail images
Returns
-------
filename : str
The name of the created record that was returned
"""
if filename is None:
filename = 'compiled_record' + \
(f'_{instrument.name}' if instrument else '') + \
dt_from.strftime('_%Y-%m-%d') + \
(f'_{user}' if user else '') + '.xml'
_pathlib.Path(_os.path.dirname(filename)).mkdir(parents=True, exist_ok=True)
with open(filename, 'w') as f:
text = build_record(instrument, dt_from, dt_to,
user=user,
generate_previews=generate_previews)
f.write(text)
return filename
[docs]def validate_record(xml_filename):
"""
Validate an .xml record against the Nexus schema
Parameters
----------
xml_filename : str or io.StringIO or io.BytesIO
The path to the xml file to be validated (can also be a file-like
object like StringIO or BytesIO)
Returns
-------
validates : bool
Whether or not the record validates against the Nexus schema
"""
xsd_doc = _etree.parse(XSD_PATH)
xml_schema = _etree.XMLSchema(xsd_doc)
xml_doc = _etree.parse(xml_filename)
validates = xml_schema.validate(xml_doc)
return validates
[docs]def build_new_session_records():
"""
Fetches new records that need to be built from the database (using
:py:func:`~nexusLIMS.db.session_handler.get_sessions_to_build`), builds
those records using
:py:func:`build_record` (saving to the NexusLIMS folder), and returns a
list of resulting .xml files to be uploaded to CDCS.
Returns
-------
xml_files : list of str
A list of record files that were successfully built and saved to
centralized storage
"""
# get the list of sessions with 'WAITING_TO_BE_BUILT' status
sessions = _get_sessions()
if not sessions:
_sys.exit("No 'TO_BE_BUILT' sessions were found. Exiting.")
xml_files = []
# loop through the sessions
for s in sessions:
try:
s.insert_record_generation_event()
record_text = build_record(instrument=s.instrument,
dt_from=s.dt_from, dt_to=s.dt_to)
except (FileNotFoundError, Exception) as e:
if isinstance(e, FileNotFoundError):
# if no files were found for this session log, mark it as so in
# the database
path = _os.path.join(CONFIG['mmfnexus_path'],
s.instrument.filestore_path)
_logger.warning(f'No files found in '
f'{_os.path.abspath(path)} between '
f'{s.dt_from.isoformat()} and '
f'{s.dt_to.isoformat()}')
_logger.warning(f'Marking {s.session_identifier} as '
f'"NO_FILES_FOUND"')
s.update_session_status('NO_FILES_FOUND')
else:
_logger.error(f'Could not generate record text: {e}')
_logger.error(f'Marking {s.session_identifier} as "ERROR"')
s.update_session_status('ERROR')
else:
if validate_record(_bytesIO(bytes(record_text, 'UTF-8'))):
_logger.info(f'Validated newly generated record')
# generate filename for saved record and make sure path exists
basename = f'{s.dt_from.strftime("%Y-%m-%d")}_' \
f'{s.instrument.name}_' \
f'{s.session_identifier.split("-")[0]}.xml'
filename = _os.path.join(CONFIG['nexusLIMS_path'],
'records', basename)
filename = _os.path.abspath(filename)
_pathlib.Path(_os.path.dirname(filename)).mkdir(parents=True,
exist_ok=True)
# write the record to disk and append to list of files generated
with open(filename, 'w') as f:
f.write(record_text)
_logger.info(f'Wrote record to {filename}')
xml_files.append(filename)
# Mark this session as completed in the database
_logger.info(f'Marking {s.session_identifier} as "COMPLETED"')
s.update_session_status('COMPLETED')
else:
_logger.error(f'Marking {s.session_identifier} as "ERROR"')
_logger.error(f'Could not validate record, did not write to '
f'disk')
s.update_session_status('ERROR')
return xml_files
[docs]def process_new_records(dry_run=False):
"""
Using :py:meth:`build_new_session_records()`, process new records,
save them to disk, and upload them to the NexusLIMS CDCS instance.
"""
if dry_run:
_logger.info("!!DRY RUN!! Only finding files, not building records")
sessions = _get_sessions()
if not sessions:
_logger.warning("No 'TO_BE_BUILT' sessions were found. Exiting.")
return None
for s in sessions:
_logger.info('')
_logger.info('')
dry_run_get_calendar_event(s)
dry_run_file_find(s)
else:
xml_files = build_new_session_records()
# noinspection PyTypeChecker
if len(xml_files) == 0:
_logger.warning("No XML files built, so no files uploaded")
else:
files_uploaded, record_ids = _upload_record_files(xml_files)
for f in files_uploaded:
uploaded_dir = _os.path.abspath(_os.path.join(
_os.path.dirname(f), 'uploaded'))
_pathlib.Path(uploaded_dir).mkdir(parents=True, exist_ok=True)
_shutil.copy2(f, uploaded_dir)
_os.remove(f)
files_not_uploaded = [f for f in xml_files
if f not in files_uploaded]
if len(files_not_uploaded) > 0:
_logger.error(f'Some record files were not uploaded: '
f'{files_not_uploaded}')
[docs]def dry_run_get_calendar_event(s):
"""
Get the calendar event that would be used to create a record based off
the supplied session
Parameters
----------
s : ~nexusLIMS.db.session_handler.Session
A session read from the database
Returns
-------
cal_event : ~nexusLIMS.harvester.sharepoint_calendar.CalendarEvent
A list of strings containing the files that would be included for the
record of this session (if it were not a dry run)
"""
xml = _sp_cal.fetch_xml(s.instrument, s.dt_from, s.dt_to)
cal_event = _sp_cal.CalendarEvent.from_xml(xml)
_logger.info(cal_event)
return cal_event
[docs]def dry_run_file_find(s):
"""
Get the files that would be included for any records to be created based
off the supplied session
Parameters
----------
s : ~nexusLIMS.db.session_handler.Session
A session read from the database
Returns
-------
files : list of str
A list of strings containing the files that would be included for the
record of this session (if it were not a dry run)
"""
path = _os.path.abspath(_os.path.join(CONFIG['mmfnexus_path'],
s.instrument.filestore_path))
_logger.info(f'Searching for files in '
f'{_os.path.abspath(path)} between '
f'{s.dt_from.isoformat()} and '
f'{s.dt_to.isoformat()}')
files = get_files(path, s.dt_from, s.dt_to)
_logger.info(f'Results for {s.session_identifier} on {s.instrument}:')
if len(files) == 0:
_logger.warning('No files found for this session')
else:
_logger.info(f'Found {len(files)} files for this session')
for f in files:
mtime = _datetime.fromtimestamp(
_os.path.getmtime(f)).isoformat()
_logger.info(f'*mtime* {mtime} - {f}')
return files
if __name__ == '__main__': # pragma: no cover
"""
If running as a module, process new records (with some control flags)
"""
from nexusLIMS.utils import setup_loggers
parser = _ap.ArgumentParser()
# Optional argument flag which defaults to False
parser.add_argument("-n", "--dry-run",
action="store_true",
dest='dry_run',
default=False)
# Optional verbosity counter (eg. -v, -vv, -vvv, etc.)
parser.add_argument(
"-v",
"--verbose",
action="count",
default=0,
help="Verbosity (-v, -vv); corresponds to python logging level. "
"0 is WARN, 1 (-v) is INFO, 2 (-vv) is DEBUG. ERROR and "
"CRITICAL are always shown.")
# Specify output of "--version"
parser.add_argument(
"--version",
action="version",
version=f"%(prog)s (version {nexusLIMS.__version__})")
args = parser.parse_args()
# set up logging
logging_levels = {0: _logging.WARNING,
1: _logging.INFO,
2: _logging.DEBUG}
if args.dry_run:
if args.verbose <= 0:
_logger.warning('Increasing verbosity so output of "dry-run" '
'will be shown')
args.verbose = 1
setup_loggers(logging_levels[args.verbose])
# when running as script, __name__ is "__main__", so we need to set level
# explicitly since the setup_loggers function won't find it
_logger.setLevel(logging_levels[args.verbose])
process_new_records(args.dry_run)