# NIST Public License - 2019
#
# This software was developed by employees of the National Institute of
# Standards and Technology (NIST), an agency of the Federal Government
# and is being made available as a public service. Pursuant to title 17
# United States Code Section 105, works of NIST employees are not subject
# to copyright protection in the United States. This software may be
# subject to foreign copyright. Permission in the United States and in
# foreign countries, to the extent that NIST may hold copyright, to use,
# copy, modify, create derivative works, and distribute this software and
# its documentation without fee is hereby granted on a non-exclusive basis,
# provided that this notice and disclaimer of warranty appears in all copies.
#
# THE SOFTWARE IS PROVIDED 'AS IS' WITHOUT ANY WARRANTY OF ANY KIND,
# EITHER EXPRESSED, IMPLIED, OR STATUTORY, INCLUDING, BUT NOT LIMITED
# TO, ANY WARRANTY THAT THE SOFTWARE WILL CONFORM TO SPECIFICATIONS, ANY
# IMPLIED WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
# AND FREEDOM FROM INFRINGEMENT, AND ANY WARRANTY THAT THE DOCUMENTATION
# WILL CONFORM TO THE SOFTWARE, OR ANY WARRANTY THAT THE SOFTWARE WILL BE
# ERROR FREE. IN NO EVENT SHALL NIST BE LIABLE FOR ANY DAMAGES, INCLUDING,
# BUT NOT LIMITED TO, DIRECT, INDIRECT, SPECIAL OR CONSEQUENTIAL DAMAGES,
# ARISING OUT OF, RESULTING FROM, OR IN ANY WAY CONNECTED WITH THIS SOFTWARE,
# WHETHER OR NOT BASED UPON WARRANTY, CONTRACT, TORT, OR OTHERWISE, WHETHER
# OR NOT INJURY WAS SUSTAINED BY PERSONS OR PROPERTY OR OTHERWISE, AND
# WHETHER OR NOT LOSS WAS SUSTAINED FROM, OR AROSE OUT OF THE RESULTS OF,
# OR USE OF, THE SOFTWARE OR SERVICES PROVIDED HEREUNDER.
#
# Code must be able to work under Python 3.4 (32-bit) due to limitations of
# the Windows XP-based microscope PCs. Using this version of Python with
# pyinstaller 3.5 seems to work on the 642 Titan
import contextlib
import os
import pathlib
import platform
import queue
# import random
# import shutil
import socket
import sqlite3
import string
import subprocess
import sys
from datetime import datetime
from uuid import uuid4
[docs]def get_drives():
"""
Get the drive letters (uppercase) in current use by Windows
Adapted from https://stackoverflow.com/a/827398/1435788
Returns
-------
drives : :obj:`list` of str
A list of drive letters currently in use
"""
drives = []
from ctypes import windll
bitmask = windll.kernel32.GetLogicalDrives()
for letter in string.ascii_uppercase:
if bitmask & 1:
drives.append(letter)
bitmask >>= 1
return drives
[docs]def get_free_drives():
"""
Get currently unused drive letters, leaving out A through G and M for
safety (since those are often Windows drives and the M drive is used for
``mmfnexus``
Returns
-------
not_in_use : :obj:`list` of str
A list of "safe" drive letters not currently in use
"""
in_use = get_drives()
not_in_use = [lett for lett in string.ascii_uppercase if lett not in in_use]
not_in_use = [lett for lett in not_in_use if lett not in 'ABCDEFGM']
return not_in_use
[docs]def get_first_free_drive():
"""
Get the first available drive letter that is not being used on this computer
Returns
-------
first_free : str
The first free drive letter that should be safe to use with colon
appended
"""
if sys.platform == "win32":
first_free = get_free_drives()[0]
return first_free + ':'
else:
res = os.path.expanduser("~/nexuslims/mnt")
if not os.path.isdir(res):
os.makedirs(res)
return res
[docs]class DBSessionLogger:
def __init__(self, config, verbosity=0, user=None):
"""
Parameters
----------
config : dict
verbosity : int
-1: 'ERROR', 0: ' WARN', 1: ' INFO', 2: 'DEBUG'
user : str
The user to attach to this record
"""
self.log_text = ""
self.config = config
self.verbosity = verbosity
self.db_name = config["database_name"]
self.drive_letter = get_first_free_drive()
self.user = user
self.hostname = config["networkdrive_hostname"]
self.filestore_path = None
self.session_started = False
self.session_start_time = None
self.last_entry_type = None
self.last_session_id = None
self.last_session_row_number = None
self.last_session_ts = None
self.progress_num = 0
self.session_note = ""
self.db_path = str(pathlib.Path(config["database_relpath"]))
self.password = config["networkdrive_password"] if config["networkdrive_password"] else None
self.full_path = os.path.join(self.drive_letter, self.db_name)
self.cpu_name = platform.node().split('.')[0]
self.session_id = str(uuid4())
self.instr_pid = None
self.instr_schema_name = None
if sys.platform == 'win32':
self.log('Used drives are: {}'.format(get_drives()), 2)
self.log('Unused drives are: {}'.format(get_free_drives()), 2)
self.log('First available drive letter is {}'.format(
self.drive_letter), 2)
[docs] def log(self, to_print, this_verbosity):
"""
Log a message to the console, only printing if the given verbosity is
equal to or lower than the global threshold. Also save it in this
instance's ``log_text`` attribute (regardless of verbosity)
Parameters
----------
to_print : str
The message to log
this_verbosity : int
The verbosity level (higher is more verbose)
"""
level_dict = {-1: 'ERROR', 0: ' WARN', 1: ' INFO', 2: 'DEBUG'}
str_to_log = '{}'.format(datetime.now().isoformat()) + \
':{}'.format(level_dict[this_verbosity]) + \
': {}'.format(to_print)
if this_verbosity <= self.verbosity:
print(str_to_log)
self.log_text += str_to_log + '\n'
[docs] def log_exception(self, e):
"""
Log an exception to the console and the ``log_text``
Parameters
----------
e : Exception
"""
indent = " " * 34
template = indent + "Exception of type {0} occurred. Arguments:\n" + \
indent + "{1!r}"
message = template.format(type(e).__name__, e.args)
print(message)
self.log_text += message + '\n'
[docs] def check_exit_queue(self, thread_queue, exit_queue):
"""
Check to see if a queue (``exit_queue``) has anything in it. If so,
immediately exit.
Parameters
----------
thread_queue : queue.Queue
exit_queue : queue.Queue
"""
if exit_queue is not None:
try:
res = exit_queue.get(0)
if res:
self.log("Received termination signal from GUI thread", 0)
thread_queue.put(ChildProcessError("Terminated from GUI "
"thread"))
sys.exit("Saw termination queue entry")
except queue.Empty:
pass
[docs] def run_cmd(self, cmd):
"""
Run a command using the subprocess module and return the output. Note
that because we want to run the eventual logger without a console
visible, we do not have access to the standard stdin, stdout,
and stderr, and these need to be redirected ``subprocess`` pipes,
accordingly.
Parameters
----------
cmd : str
The command to run (will be run in a new Windows `cmd` shell).
``stderr`` will be redirected for ``stdout`` and included in the
returned output
Returns
-------
output : str
The output of ``cmd``
"""
try:
# Redirect stderr to stdout, and then stdout and stdin to
# subprocess.PIP
p = subprocess.Popen(cmd,
shell=True,
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE)
p.stdin.close()
p.wait()
output = p.stdout.read().decode()
except subprocess.CalledProcessError as e:
p = e.output.decode()
self.log('command {} returned with error (code {}): {}'.format(
e.cmd.replace(self.password, '**************'),
e.returncode,
e.output), 0)
return output
[docs] def mount_network_share(self, mount_point=None):
"""
Mount the path containing the database to the first free drive letter
found using Windows `cmd`. Due to some Windows limitations,
this requires looking up the server's IP address
and mounting using the IP rather than the actual domain name
Parameters
----------
mount_point : str
The mount point on the network drive. The default points to the
`self.db_path`.
"""
if mount_point is None:
mount_point = self.db_path
mount_point = str(pathlib.Path(mount_point))
# we should not have to disconnect anything because we're using free
# letter:
# self.log('unmounting existing N:', 2)
# _ = self.run_cmd(r'net use N: /delete /y')
ip = socket.gethostbyname(self.hostname)
self.log('found network drive at {}'.format(ip), 2)
do_mount = True
if sys.platform == "win32":
current_mounts = str(self.run_cmd('net use')).split('\r\n')
self.log('Currently mounted: ', 2)
self.log('Looking for '
r'{}\{}'.format(ip,
self.db_path).replace(r'\\', '\\'), 2)
for m in current_mounts:
self.log(m, 2)
if r'{}\{}'.format(ip, self.db_path).replace(r'\\', '\\') in m:
old_drive_letter = self.drive_letter
for item in m.split():
if len(item) == 2 and item[1] == ':' \
and item[0] in string.ascii_uppercase:
self.drive_letter = item
break
self.full_path = '{}\\{}'.format(self.drive_letter, self.db_name)
self.log('{} is already mounted'.format(self.drive_letter), 0)
do_mount = False
else:
if os.listdir(self.drive_letter):
# reuse the same mount point, for safety, unmount first
self.umount_network_share()
if do_mount:
workgroup = self.config.get("networkdrive_workgroup")
username = self.config.get("networkdrive_username")
password = self.config.get("networkdrive_password")
if sys.platform == "win32":
mount_command = 'net use {} \\\\{}\\{} '.format(self.drive_letter,
ip,
mount_point)
credential_part = ""
if username:
if workgroup:
credential_part = "/user:%s\\%s" % (workgroup, username)
else:
credential_part = "/user:%s" % username
if password:
credential_part += ' ' + password
if credential_part:
mount_command += credential_part
elif sys.platform == "darwin":
credential_part = ""
if workgroup:
credential_part += workgroup + ';'
if username:
credential_part += username
if password:
credential_part += ':' + password
if credential_part:
credential_part += '@'
# Here assuming network drive is SMB drive
mount_command = "mount -t smbfs //%s%s/%s %s" % (
credential_part, ip, mount_point, self.drive_letter)
else:
raise NotImplementedError("Current OS -- %s not supported." % sys.platform)
self.log('mounting {}'.format(self.drive_letter), 2)
# mounting requires a security policy:
# https://support.microsoft.com/en-us/help/968264/error-message-when-
# you-try-to-map-to-a-network-drive-of-a-dfs-share-by
command_shown = mount_command
if self.password is not None:
command_shown = command_shown.replace(self.password, '********')
self.log('using {}'.format(command_shown), 2)
p = self.run_cmd(mount_command)
if 'error' in str(p):
if '1312' in str(p):
self.log('Visit https://bit.ly/38DvqVh\n'
'to see how to allow mounting network drives as '
'another user.\n'
'(You\'ll need to change HKLM\\System\\'
'CurrentControlSet\\Control\\Lsa\\'
'DisableDomainCreds '
'to 0 in the registry)', 0)
raise ConnectionError('Could not mount network share to access '
'database' + " (\"DisableDomanCreds\" "
"error)" if '1312' in str(p)
else "")
else:
self.log('Using existing mount point {}'.format(
self.drive_letter), 1)
[docs] def umount_network_share(self):
"""
Unmount the network share using the Windows `cmd`
"""
self.log('unmounting {}'.format(self.drive_letter), 2)
if sys.platform == 'win32':
p = self.run_cmd(r'net use {} /del /y'.format(self.drive_letter))
elif sys.platform == "darwin":
p = self.run_cmd("umount %s" % self.drive_letter)
else:
raise NotImplementedError("Current OS -- %s not supported." % sys.platform)
if str(p):
self.log(str(p).strip(), 0)
[docs] def get_instr_pid(self):
"""
Using the name of this computer, get the matching instrument PID from
the database
Returns
-------
instrument_pid : str
The PID for the instrument corresponding to this computer
instrument_schema_name : str
The schema name for the instrument corresponding to this computer
filestore_path : str
The filestore path for the instrument corresponding to this computer
"""
# Get the instrument pid from the computer name of this computer
with contextlib.closing(sqlite3.connect(self.full_path)) as con:
self.log('Looking in database for computer name matching '
'{}'.format(self.cpu_name), 1)
with con as cur:
res = cur.execute('SELECT instrument_pid, schema_name, filestore_path '
'from instruments '
'WHERE '
'computer_name is '
'\'{}\''.format(self.cpu_name))
one_result = res.fetchone()
self.log('Database result is {}'.format(one_result), 2)
if one_result is not None:
instrument_pid, instrument_schema_name, filestore_path = one_result
else:
instrument_pid, instrument_schema_name, filestore_path = (None, None, None)
self.log('instrument_pid: {}, instrument_schema_name: {}, filestore_path: {}'.format(
*one_result), 2)
if instrument_pid is None:
raise sqlite3.DataError('Could not find an instrument matching '
'this computer\'s name '
'({}) '.format(self.cpu_name) +
'in the database!\n\n'
'This should not happen. Please '
'contact miclims@nist.gov as soon as '
'possible.')
else:
self.log('Found instrument ID: '
'{} using '.format(instrument_pid) +
'{}'.format(self.cpu_name), 1)
return instrument_pid, instrument_schema_name, filestore_path
[docs] def last_session_ended(self, thread_queue=None, exit_queue=None):
"""
Check the database for this instrument to make sure that the last
entry in the db was an "END" (properly ended). If it's not, return
False so the GUI can query the user for additional input on how to
proceed.
Parameters
----------
thread_queue : queue.Queue
Main queue for communication with the GUI
exit_queue : queue.Queue
Queue containing any errors so the GUI knows to exit as needed
Returns
-------
state_is_consistent : bool
If the database is consistent (i.e. the last log for this
instrument is an "END" log), return True. If not (it's a "START"
log), return False
"""
try:
self.check_exit_queue(thread_queue, exit_queue)
if self.instr_pid is None:
raise AttributeError(
"Instrument PID must be set before checking "
"the database for any related sessions")
except Exception as e:
if thread_queue:
thread_queue.put(e)
self.log("Error encountered while checking that last record for "
"this instrument was an \"END\" log", -1)
return False
# Get last inserted line for this instrument that is not a record
# generation (should be either a START or END)
query_statement = 'SELECT event_type, session_identifier, ' \
'id_session_log, timestamp FROM session_log WHERE ' \
'instrument = "{}" '.format(self.instr_pid) + \
'AND NOT event_type = "RECORD_GENERATION" ' + \
'ORDER BY timestamp DESC LIMIT 1'
self.log('last_session_ended query: {}'.format(query_statement), 2)
self.check_exit_queue(thread_queue, exit_queue)
with contextlib.closing(sqlite3.connect(self.full_path)) as con:
with con as cur:
try:
self.check_exit_queue(thread_queue, exit_queue)
res = cur.execute(query_statement)
row = res.fetchone()
if row is None:
# If there is no result, this must be the first time
# we're connecting to the database with this
# instrument, so pretend the last session was "END"
self.last_entry_type = "END"
else:
self.last_entry_type, self.last_session_id, \
self.last_session_row_number, self.last_session_ts = row
if self.last_entry_type == "END":
self.log('Verified database consistency for the '
'{}'.format(self.instr_schema_name), 1)
if thread_queue:
thread_queue.put(('Verified database consistency '
'for the {}'.format(
self.instr_schema_name),
self.progress_num))
self.progress_num += 1
return True
elif self.last_entry_type == "START":
self.log('Database is inconsistent for the '
'{} '.format(self.instr_schema_name) +
'(last entry [id_session_log = '
'{}]'.format(self.last_session_row_number) +
' was a "START")', 0)
if thread_queue:
thread_queue.put(('Database is inconsistent!',
self.progress_num))
self.progress_num += 1
return False
else:
raise sqlite3.IntegrityError(
"Last entry for the "
"{} ".format(self.instr_schema_name) +
"was neither \"START\" or \"END\" (value was "
"\"{}\")".format(self.last_entry_type))
except Exception as e:
if thread_queue:
thread_queue.put(e)
self.log("Error encountered while verifying "
"database consistency for the "
"{}".format(self.instr_schema_name), -1)
self.log_exception(e)
return False
pass
[docs] def process_start(self, thread_queue=None, exit_queue=None):
"""
Insert a session `'START'` log for this computer's instrument
Returns True if successful, False if not
"""
insert_statement = "INSERT INTO session_log (instrument, " \
" event_type, session_identifier, session_note" + \
(", user) " if self.user else ") ") + \
"VALUES ('{}', 'START', ".format(self.instr_pid) + \
"'{}'".format(self.session_id) + \
", '{}'".format(self.session_note) + \
(", '{}');".format(self.user) if self.user else ");")
self.log('insert_statement: {}'.format(insert_statement), 2)
self.check_exit_queue(thread_queue, exit_queue)
# Get last entered row with this session_id (to make sure it's correct)
with contextlib.closing(sqlite3.connect(self.full_path)) as con:
with con as cur:
try:
self.check_exit_queue(thread_queue, exit_queue)
_ = cur.execute(insert_statement)
self.session_started = True
if thread_queue:
thread_queue.put(('"START" session inserted into db',
self.progress_num))
self.progress_num += 1
except Exception as e:
if thread_queue:
thread_queue.put(e)
self.log("Error encountered while inserting \"START\" "
"entry into database", -1)
return False
with con as cur:
try:
self.check_exit_queue(thread_queue, exit_queue)
r = cur.execute("SELECT * FROM session_log WHERE "
"session_identifier="
"'{}' ".format(self.session_id) +
"AND event_type = 'START'"
"ORDER BY timestamp DESC " +
"LIMIT 1;")
except Exception as e:
if thread_queue:
thread_queue.put(e)
self.log("Error encountered while verifying that session"
"was started", -1)
return False
id_session_log = r.fetchone()
self.check_exit_queue(thread_queue, exit_queue)
self.log('Verified insertion of row {}'.format(id_session_log), 1)
self.session_start_time = datetime.strptime(
id_session_log[3], "%Y-%m-%dT%H:%M:%S.%f")
if thread_queue:
thread_queue.put(('Verified "START" session inserted into db',
self.progress_num))
self.progress_num += 1
return True
[docs] def process_end(self, thread_queue=None, exit_queue=None):
"""
Insert a session `'END'` log for this computer's instrument,
and change the status of the corresponding `'START'` entry from
`'WAITING_FOR_END'` to `'TO_BE_BUILT'`
"""
user_string = "AND user='{}'".format(self.user) if self.user else ''
insert_statement = "INSERT INTO session_log " \
"(instrument, event_type, " \
"record_status, session_identifier, session_note" + \
(", user) " if self.user else ") ") + \
"VALUES ('{}',".format(self.instr_pid) + \
"'END', 'TO_BE_BUILT', " + \
"'{}'".format(self.session_id) + \
", '{}'".format(self.session_note) + \
(", '{}');".format(self.user) if self.user else ");")
# Get the most 'START' entry for this instrument and session id
get_last_start_id_query = "SELECT id_session_log FROM session_log " + \
"WHERE instrument = " + \
"'{}' ".format(self.instr_pid) + \
"AND event_type = 'START' " + \
"{} ".format(user_string) + \
"AND session_identifier = " + \
"'{}'".format(self.session_id) + \
"AND record_status = 'WAITING_FOR_END';"
self.log('query: {}'.format(get_last_start_id_query), 2)
with sqlite3.connect(self.full_path) as con:
self.log('Inserting END; insert_statement: {}'.format(
insert_statement), 2)
try:
self.check_exit_queue(thread_queue, exit_queue)
_ = con.execute(insert_statement)
if thread_queue:
thread_queue.put(('"END" session log inserted into db',
self.progress_num))
self.progress_num += 1
except Exception as e:
if thread_queue:
thread_queue.put(e)
self.log("Error encountered while insert \"END\" log for "
"session", -1)
return False
try:
self.check_exit_queue(thread_queue, exit_queue)
res = con.execute("SELECT * FROM session_log WHERE "
"session_identifier="
"'{}' ".format(self.session_id) +
"AND event_type = 'END'"
"ORDER BY timestamp DESC " +
"LIMIT 1;")
except Exception as e:
if thread_queue:
thread_queue.put(e)
self.log("Error encountered while verifying that session"
"was ended", -1)
return False
id_session_log = res.fetchone()
self.log('Inserted row {}'.format(id_session_log), 1)
if thread_queue:
thread_queue.put(('Verified "END" session inserted into db',
self.progress_num))
self.progress_num += 1
try:
self.check_exit_queue(thread_queue, exit_queue)
res = con.execute(get_last_start_id_query)
results = res.fetchall()
if len(results) == 0:
raise LookupError("No matching 'START' event found")
elif len(results) > 1:
raise LookupError("More than one 'START' event found with "
"session_identifier = "
"'{}'".format(self.session_id))
last_start_id = results[-1][0]
self.log('SELECT instrument results: {}'.format(last_start_id),
2)
if thread_queue:
thread_queue.put(('Matching "START" session log found',
self.progress_num))
self.progress_num += 1
except Exception as e:
if thread_queue:
thread_queue.put(e)
self.log("Error encountered while getting matching \"START\" "
"log", -1)
return False
try:
# Update previous START event record status
self.check_exit_queue(thread_queue, exit_queue)
res = con.execute("SELECT * FROM session_log WHERE " +
"id_session_log = {}".format(last_start_id))
self.log('Row to be updated: {}'.format(res.fetchone()), 1)
if thread_queue:
thread_queue.put(('Matching "START" session log found',
self.progress_num))
self.progress_num += 1
update_statement = "UPDATE session_log SET " + \
"record_status = 'TO_BE_BUILT' WHERE " + \
"id_session_log = {}".format(last_start_id)
self.check_exit_queue(thread_queue, exit_queue)
_ = con.execute(update_statement)
if thread_queue:
thread_queue.put(('Matching "START" session log\'s status '
'updated',
self.progress_num))
self.progress_num += 1
self.check_exit_queue(thread_queue, exit_queue)
res = con.execute("SELECT * FROM session_log WHERE " +
"id_session_log = {}".format(last_start_id))
if thread_queue:
thread_queue.put(('Verified updated row',
self.progress_num))
self.progress_num += 1
except Exception as e:
if thread_queue:
thread_queue.put(e)
self.log("Error encountered while updating matching \"START\" "
"log's status", -1)
return False
self.log('Row after updating: {}'.format(res.fetchone()), 1)
self.log('Finished ending session {}'.format(self.session_id), 1)
return True
[docs] def db_logger_setup(self, thread_queue=None, exit_queue=None):
"""
setup routine:
1) mount network share.
2) check db exists.
3) get instrument info (pid, schema name).
"""
self.log('username is {}'.format(self.user), 1)
self.log('computer name is {}'.format(self.cpu_name), 1)
try:
self.check_exit_queue(thread_queue, exit_queue)
self.log('running `mount_network_share()`', 2)
self.mount_network_share()
if not os.path.isfile(self.full_path):
raise FileNotFoundError('Could not find NexusLIMS database at '
'{}'.format(self.full_path))
else:
self.log('Path to database is {}'.format(self.full_path), 1)
except Exception as e:
thread_queue.put(e)
self.log("Could not mount the network share holding the "
"database. Details:", -1)
self.log_exception(e)
return False
if thread_queue:
self.progress_num = 1
thread_queue.put(('Mounted network share', self.progress_num))
self.progress_num += 1
self.log('running `get_instr_pid()`', 2)
try:
self.check_exit_queue(thread_queue, exit_queue)
self.instr_pid, self.instr_schema_name, self.filestore_path = self.get_instr_pid()
except Exception as e:
thread_queue.put(e)
self.log("Could not fetch instrument PID and name from database. "
"Details:", -1)
self.log_exception(e)
return False
self.log('Found PID: {} and name: {}'.format(self.instr_pid,
self.instr_schema_name), 2)
if thread_queue:
thread_queue.put(('Instrument PID found', self.progress_num))
self.progress_num += 1
return True
# def _copydata_setup(self, thread_queue=None, exit_queue=None):
# """
# copydata routine:
# 1) mount network share.
# """
# try:
# self.check_exit_queue(thread_queue, exit_queue)
# self.log('running `mount_network_share()`', 2)
# self.mount_network_share(mount_point=self.config["daq_relpath"])
# except Exception as e:
# if thread_queue:
# thread_queue.put(e)
# self.log("Could not mount the network share holding the "
# "database. Details:", -1)
# self.log_exception(e)
# return False
# if thread_queue:
# self.progress_num = 1
# thread_queue.put(('Mounted network share', self.progress_num))
# self.progress_num += 1
# return True
[docs] def db_logger_teardown(self, thread_queue=None, exit_queue=None):
"""
teardown routine
1) unmount network share.
"""
try:
if thread_queue:
thread_queue.put(('Unmounting the database network share',
self.progress_num))
self.progress_num += 1
self.check_exit_queue(thread_queue, exit_queue)
self.log('running `umount_network_share()`', 2)
self.umount_network_share()
except Exception as e:
if thread_queue:
thread_queue.put(e)
self.log("Could not unmount the network share holding the "
"database. Details:", -1)
self.log_exception(e)
return False
if thread_queue:
thread_queue.put(('Unmounted network share', self.progress_num))
self.progress_num += 1
self.log('Finished unmounting network share', 2)
return True
# def _copydata(self, srcdir='mock'):
# """ Take a data file randomly from **mock** data folder,
# copy it to ``filestore_path`` of this instument, to mock the
# behavior of generating experiment data.
# """
# src_dir = os.path.join(self.drive_letter, srcdir)
# dst_dir = os.path.join(self.drive_letter, self.filestore_path)
# if not os.path.isdir(dst_dir):
# os.makedirs(dst_dir)
# datafiles = [f for f in os.listdir(src_dir) if not f.startswith('.')]
# src_file = random.choice(datafiles)
# suffix = src_file.split('.')[-1]
# timestamp = datetime.strftime(datetime.now(), "%y%m%d_%H%M%S")
# dst_file = '%s.%s' % (timestamp, suffix)
# logstr = 'COPY {} --> {}'.format(
# os.path.join(src_dir, src_file),
# os.path.join(dst_dir, dst_file)
# )
# try:
# shutil.copy(os.path.join(src_dir, src_file),
# os.path.join(dst_dir, dst_file))
# self.log(logstr, 2)
# except Exception as e:
# self.log('Failed to ' + logstr, -1)
# self.log_exception(e)
# def copydata(self, thread_queue=None, exit_queue=None):
# """copy a data file from mock folder to instument ``filestore_path``.
# Returns True if successful, False if not
# """
# self.check_exit_queue(thread_queue, exit_queue)
# if self._copydata_setup(thread_queue, exit_queue):
# self._copydata()
# self.db_logger_teardown(thread_queue, exit_queue)
# return True
[docs]def cmdline_args():
# Make parser object
import argparse
p = argparse.ArgumentParser(
description="""This program will mount the nexuslims directory
on CFS2E, connect to the nexuslims_db.sqlite
database, and insert an entry into the session log.""",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
p.add_argument("event_type", type=str,
help="the type of event")
p.add_argument("user", type=str, nargs='?',
help="NIST username associated with this session (current "
"windows logon name will be used if not provided)",
default=None)
p.add_argument("-v", "--verbosity", type=int, choices=[0, 1, 2], default=0,
help="increase output verbosity")
return p.parse_args()
[docs]def gui_start_callback(config, verbosity=2):
"""
Process the start of a session when the GUI is opened
Returns
-------
db_logger : DBSessionLogger
The session logger instance for this session (contains all the
information about instrument, computer, session_id, etc.)
"""
db_logger = DBSessionLogger(config, verbosity=verbosity)
db_logger.db_logger_setup()
db_logger.process_start()
db_logger.db_logger_teardown()
return db_logger
[docs]def gui_end_callback(db_logger):
"""
Process the end of a session when the button is clicked or the GUI window
is closed.
Parameters
----------
db_logger : DBSessionLogger
The session logger instance for this session (contains all the
information about instrument, computer, session_id, etc.)
"""
db_logger.db_logger_setup()
db_logger.process_end()
db_logger.db_logger_teardown()