Source code for nexusLIMS.extractors
"""
This module contains the code used to harvest metadata from various file types
generated from instruments in the Electron Microscopy Nexus facility.
Extractors should return a dictionary containing the values to be displayed
in NexusLIMS as a sub-dictionary under the key ``nx_meta``. The remaining keys
will be for the metadata as extracted. Under ``nx_meta``, a few keys are
expected (although not enforced):
* ``'Creation Time'`` - ISO format date and time as a string
* ``'Data Type'`` - a human-readable description of the data type separated by
underscores - e.g "STEM_Imaging", "TEM_EDS", etc.
* ``'DatasetType'`` - determines the value of the Type attribute for the dataset
(defined in the schema)
* ``'Data Dimensions'`` - dimensions of the dataset, surrounded by parentheses,
separated by commas as a string- e.g. '(12, 1024, 1024)'
* ``'Instrument ID'`` - instrument PID pulled from the instrument database
"""
import collections as _collections
import json as _json
import logging as _logging
import os as _os
import pathlib as _pathlib
import shutil as _sh
import hyperspy.api_nogui as _hs
import numpy as _np
import nexusLIMS
from nexusLIMS.instruments import get_instr_from_filepath as _get_instr
from .digital_micrograph import get_dm3_metadata
from .fei_emi import get_ser_metadata
from .quanta_tif import get_quanta_metadata
from .thumbnail_generator import down_sample_image as _down_sample
from .thumbnail_generator import sig_to_thumbnail as _s2thumb
_logger = _logging.getLogger(__name__)
PLACEHOLDER_PREVIEW = _os.path.join(_os.path.dirname(__file__),
'extractor_error.png')
CONFIG = nexusLIMS.get_config()
extension_reader_map = {
'dm3': get_dm3_metadata,
'dm4': get_dm3_metadata,
'tif': get_quanta_metadata,
'ser': get_ser_metadata
}
[docs]def parse_metadata(fname, write_output=True, generate_preview=True,
overwrite=True):
"""
Given an input filename, read the file, determine what "type" of file (i.e.
what instrument it came from) it is, filter the metadata (if necessary) to
what we are interested in, and return it as a dictionary (writing to the
NexusLIMS directory as JSON by default). Also calls the preview
generation method, if desired.
Parameters
----------
fname : str
The filename from which to read data
write_output : bool
Whether to write the metadata dictionary as a json file in the NexusLIMS
folder structure
generate_preview : bool
Whether to generate the thumbnail preview of this dataset (that
operation is not done in this method, it is just called from here so
it can be done at the same time)
overwrite : bool
Whether or not to overwrite the .json metadata file and thumbnail
image if either exists
Returns
-------
nx_meta : dict or None
The "relevant" metadata that is of use for NexusLIMS. If None,
the file could not be opened
preview_fname : str or None
The file path of the generated preview image, or `None` if it was not
requested
"""
extension = _os.path.splitext(fname)[1][1:]
nx_meta = extension_reader_map[extension](fname)
preview_fname = None
# nx_meta should never be None, because the extractors are defensive and
# will always return _something_
if nx_meta is not None:
# Set the dataset type to Misc if it was not set by the file reader
if 'DatasetType' not in nx_meta['nx_meta']:
nx_meta['nx_meta']['DatasetType'] = 'Misc'
nx_meta['nx_meta']['Data Type'] = 'Miscellaneous'
if write_output:
out_fname = fname.replace(CONFIG["mmfnexus_path"],
CONFIG["nexusLIMS_path"]) + '.json'
if not _os.path.isfile(out_fname) or overwrite:
# Create the directory for the metadata file, if needed
_pathlib.Path(_os.path.dirname(out_fname)).mkdir(parents=True,
exist_ok=True)
# Make sure that the nx_meta dict comes first in the json output
out_dict = {'nx_meta': nx_meta['nx_meta']}
for k, v in nx_meta.items():
if k == 'nx_meta':
pass
else:
out_dict[k] = v
with open(out_fname, 'w') as f:
_logger.debug(f'Dumping metadata to {out_fname}')
_json.dump(out_dict, f, sort_keys=False,
indent=2, cls=_CustomEncoder)
if generate_preview:
preview_fname = fname.replace(CONFIG["mmfnexus_path"],
CONFIG["nexusLIMS_path"]) + \
'.thumb.png'
if extension == 'tif':
instr = _get_instr(fname)
instr_name = instr.name if instr is not None else None
if instr_name == '**REMOVED**':
# we know the output size we want for the Quanta
output_size = (512, 471)
_down_sample(fname,
out_path=preview_fname,
output_size=output_size)
else:
factor = 2
_down_sample(fname,
out_path=preview_fname,
factor=factor)
else:
load_options = {'lazy': True}
if extension == 'ser':
load_options['only_valid_data'] = True
try:
s = _hs.load(fname, **load_options)
except Exception as _:
_logger.warning('Signal could not be loaded by HyperSpy. '
'Using placeholder image for preview.')
preview_fname = fname.replace(
CONFIG["mmfnexus_path"],
CONFIG["nexusLIMS_path"]) + '.thumb.png'
_sh.copyfile(PLACEHOLDER_PREVIEW, preview_fname)
return nx_meta, preview_fname
# If s is a list of signals, use just the first one for
# our purposes
if isinstance(s, list):
num_sigs = len(s)
fname = s[0].metadata.General.original_filename
s = s[0]
s.metadata.General.title = \
s.metadata.General.title + \
f' (1 of {num_sigs} total signals in file "{fname}")'
elif s.metadata.General.title == '':
s.metadata.General.title = \
s.metadata.General.original_filename.replace(
extension, '').strip('.')
# only generate the preview if it doesn't exist, or overwrite
# parameter is explicitly provided
if not _os.path.isfile(preview_fname) or overwrite:
_logger.info(f'Generating preview: {preview_fname}')
# Create the directory for the thumbnail, if needed
_pathlib.Path(_os.path.dirname(preview_fname)).mkdir(
parents=True, exist_ok=True)
# Generate the thumbnail
s.compute(show_progressbar=False)
_s2thumb(s, out_path=preview_fname)
else:
_logger.info(f'Preview already exists: {preview_fname}')
return nx_meta, preview_fname
[docs]def flatten_dict(d, parent_key='', separator=' '):
"""
Utility method to take a nested dictionary structure and flatten it into a
single level, separating the levels by a string as specified by
``separator``
Cribbed from: https://stackoverflow.com/a/6027615/1435788
Parameters
----------
d : dict
The dictionary to flatten
parent_key : str
The "root" key to add to add to the existing keys
separator : str
The string to use to separate values in the flattened keys (i.e.
{'a': {'b': 'c'}} would become {'a' + sep + 'b': 'c'})
Returns
-------
flattened_dict : str
The dictionary with depth one, with nested dictionaries flattened
into root-level keys
"""
items = []
for k, v in d.items():
new_key = parent_key + separator + k if parent_key else k
if isinstance(v, _collections.MutableMapping):
items.extend(flatten_dict(v, new_key, separator=separator).items())
else:
items.append((new_key, v))
flattened_dict = dict(items)
return flattened_dict
class _CustomEncoder(_json.JSONEncoder):
"""
A custom JSON Encoder class that will allow certain types to be
serialized that are not able to be by default (taken from
https://stackoverflow.com/a/27050186)
"""
def default(self, obj):
if isinstance(obj, _np.integer):
return int(obj)
elif isinstance(obj, _np.floating):
return float(obj)
elif isinstance(obj, _np.ndarray):
return obj.tolist()
elif isinstance(obj, _np.bytes_):
return obj.decode()
else:
return super(_CustomEncoder, self).default(obj)
