Source code for msl.io.readers.detector_responsivity_system

"""
Reader for the Detector Responsivity System from Light Standards at MSL.
"""
import re
from datetime import datetime

import numpy as np

from ..base import Reader
from ..utils import register


[docs]@register class DRSReader(Reader): """Reader for the Detector Responsivity System from Light Standards at MSL."""
[docs] @staticmethod def can_read(file, **kwargs): """Checks if the first line starts with ``DRS`` and ends with ``Shindo``.""" if Reader.get_extension(file).lower() != '.dat': return False line = Reader.get_lines(file, 1)[0] if line.startswith('DRS') and line.endswith('Shindo'): return True if line.startswith('"DRS') and line.endswith('Shindo"'): return True return False
[docs] def read(self, **kwargs): """Reads the *.DAT* and corresponding *.LOG* file. Parameters ---------- **kwargs All key-value pairs are ignored. """ self._default_alias = {'l(nm)': 'wavelength'} self._lines_dat = self.get_lines(self.file, remove_empty_lines=True) self._num_lines_dat = len(self._lines_dat) self._lines_log = self.get_lines(self.file[:-3]+'LOG', remove_empty_lines=True) self._num_lines_log = len(self._lines_log) num_runs = 0 self._index_dat = 0 self._index_log = 0 while self._index_dat < self._num_lines_dat: if 'DRS' in self._lines_dat[self._index_dat]: num_runs += 1 group = self.create_group('run%d' % num_runs) self._read_run(group) else: self._index_dat += 1 self._index_log += 1
def _read_run(self, group): group.add_metadata(**self._get_run_metadata()) # include all Scans scan_number = 0 while self._index_dat < self._num_lines_dat and self._lines_dat[self._index_dat].startswith('Scan'): scan_number += 1 sub = group.create_group('scan%d' % scan_number) self._get_scan_dat(sub) self._get_scan_log(sub) self._index_dat += 1 self._index_log += 1 def _get_run_metadata(self): # add the metadata from the LOG file assert 'DRS' in self._lines_log[self._index_log], self._lines_log[self._index_log] meta = dict() meta['version'] = re.search(r'version (\w+\s+\d{4})', self._lines_log[self._index_log]).group(1) meta['author'] = re.search(r'by\s(.*)', self._lines_log[self._index_log]).group(1) # skip lines until we reach the info about the wavelength scan values while not self._lines_log[self._index_log].startswith('Wavelength start'): self._index_log += 1 # the info about the wavelength range assert self._lines_log[self._index_log].startswith('Wavelength start(nm)'), self._lines_log[self._index_log] meta['wavelength_units'] = 'nm' meta['wavelength_start'] = float(self._lines_log[self._index_log].split(':')[1]) self._index_log += 1 meta['wavelength_end'] = float(self._lines_log[self._index_log].split(':')[1]) self._index_log += 1 meta['wavelength_increment'] = float(self._lines_log[self._index_log].split(':')[1]) self._index_log += 1 if self._lines_log[self._index_log].startswith('Extra'): line_split = self._lines_log[self._index_log][len('Extra wavelength (nm):'):].split(';') extras = [val for val in line_split if val.strip()] meta['extra_wavelengths_nm'] = list(map(float, extras)) self._index_log += 1 # the info about the grating used and the bandwidth grating = re.search(r'at: (\d+) nm', self._lines_log[self._index_log]) bw = re.search(r'correction: ([\d.]+) nm', self._lines_log[self._index_log]) if grating and bw: meta['grating_nm'] = int(grating.group(1)) meta['bandwidth_nm'] = float(bw.group(1)) else: self._index_log -= 1 # go back a line # get the number of devices self._index_log += 1 assert self._lines_log[self._index_log].startswith('Numbers of'), self._lines_log[self._index_log] # the info about the devices under test self._index_log += 1 meta['devices'] = dict() self._names = [dev.strip().upper() for dev in self._lines_log[self._index_log].split('\t') if dev] if self._names[0].endswith('I'): skip = 1 self._names = self._names[1:] else: skip = 0 for name in self._names: meta['devices'][name] = dict() for label in ['Name', 'Tprobe', 'Channel']: self._index_log += 1 line_split = [item.strip() for item in self._lines_log[self._index_log].split('\t') if item] device_key = label if skip == 0 else line_split[0] for index, name in enumerate(self._names): meta['devices'][name][device_key] = line_split[index + skip] # some LOG files have info about the lab temperature self._index_log += 1 if self._lines_log[self._index_log].startswith('Laboratory Temperature'): line_split = [item.strip() for item in self._lines_log[self._index_log].split(';')] meta['Laboratory Temperature {}'.format(line_split[1])] = float(line_split[2]) self._index_log += 1 # some have info about the while 'R0=' in self._lines_log[self._index_log]: prt = self._lines_log[self._index_log].split('\t') prt_dict = { 'R0': float(prt[1][3:]), 'A': float(prt[3][2:]), 'B': float(prt[5][2:]), 'value': float(prt[7]), } meta[prt[0][:-1].strip()] = prt_dict self._index_log += 1 if self._lines_log[self._index_log].startswith('Integration time'): meta['Integration time (s)'] = float(self._lines_log[self._index_log].split(':')[1]) self._index_log += 1 assert self._lines_log[self._index_log].startswith('Source Monitoring'), self._lines_log[self._index_log] meta['Source Monitoring'] = self._lines_log[self._index_log][len('Source Monitoring'):].strip() self._index_log += 1 while 'DVM RANGE' in self._lines_log[self._index_log]: dvm = re.search(r'(\w+)::DVM RANGE\(V\)=\s+(.*\d+)\s+;\s+DVM NPLC=\s+(\d+)', self._lines_log[self._index_log]) meta['{}_dvm_range'.format(dvm.group(1))] = float(dvm.group(2)) meta['{}_dvm_nplc'.format(dvm.group(1))] = float(dvm.group(3)) self._index_log += 1 assert self._lines_log[self._index_log].startswith('Stray light filters'), self._lines_log[self._index_log] meta['Stray light filters'] = self._lines_log[self._index_log][len('Stray light filters'):].strip() # get the comment self._index_log += 1 assert self._lines_log[self._index_log] == 'Comment:', self._lines_log[self._index_log] self._index_log += 1 comment = [] while not self._lines_log[self._index_log].startswith('Scan'): comment.append(self._lines_log[self._index_log]) self._index_log += 1 meta['comment'] = '\n'.join(comment) # move the DAT index to the start of the Scan while not self._lines_dat[self._index_dat].startswith('Scan'): self._index_dat += 1 return meta def _get_scan_dat(self, group): meta = dict() assert self._lines_dat[self._index_dat].startswith('Scan'), self._lines_dat[self._index_dat] meta['start_time'] = self._to_datetime(self._lines_dat[self._index_dat]) # get the field names to use for the numpy array aliases = self._default_alias.copy() for name in self._names: aliases[name] = group.parent.metadata.devices[name]['Name'] self._index_dat += 1 header = [item.strip() for item in self._lines_dat[self._index_dat].split('\t') if item] fieldnames = [] for h in header: name = h for key, value in aliases.items(): if key in h: name = h.replace(key, value) if name in fieldnames: name += '-({})'.format(key[-1]) break fieldnames.append(name) # get the DAT data self._index_dat += 1 data = [] while self._index_dat < self._num_lines_dat and not self._lines_dat[self._index_dat].startswith('End'): data.append(tuple([float(item) for item in self._lines_dat[self._index_dat].split('\t') if item])) self._index_dat += 1 if self._index_dat < self._num_lines_dat: # otherwise the scan was aborted early assert self._lines_dat[self._index_dat].startswith('End of scan'), self._lines_dat[self._index_dat] meta['end_time'] = self._to_datetime(self._lines_dat[self._index_dat]) # sometimes there was more data columns than header columns if data: i = 1 while len(data[0]) > len(fieldnames): fieldnames.append('UNKNOWN-%d' % i) i += 1 group.create_dataset( 'dat', data=data, dtype=[(name, float) for name in fieldnames], **meta ) def _get_scan_log(self, group): meta = dict() assert self._lines_log[self._index_log].startswith('Scan'), self._lines_log[self._index_log] meta['start_time'] = self._to_datetime(self._lines_log[self._index_log]) # get the field names to use for the numpy array aliases = self._default_alias.copy() for name in self._names: aliases[name] = group.parent.metadata.devices[name]['Name'] self._index_log += 1 header = [item.strip() for item in self._lines_log[self._index_log].split('\t') if item.strip()] assert header[0] == 'l(nm)', self._lines_log[self._index_log] fieldnames = [] for h in header: if h == 'Name': continue name = h for key, value in aliases.items(): if key in h: name = h.replace(key, value) break if name not in fieldnames: fieldnames.append(name) # get the LOG data self._index_log += 1 n = len(header) data = [[] for _ in self._names] while self._index_log < self._num_lines_log and not self._lines_log[self._index_log].startswith('End'): for i, name in enumerate(self._names): values = [] items = [item for item in self._lines_log[self._index_log].split('\t') if item] for j, item in enumerate(items): if j == 0: values.append(float(item[:-2])) elif j == 1: assert item == aliases[name], 'expect Name={}, got {}'.format(aliases[name], item) elif j >= n: break else: values.append(float(item)) # fill in the blank columns with NaN while len(values) < len(fieldnames): values.append(np.NaN) data[i].append(tuple(values)) self._index_log += 1 if self._index_log < self._num_lines_log: # otherwise the scan was aborted early assert self._lines_log[self._index_log].startswith('End of scan'), self._lines_log[self._index_log] meta['end_time'] = self._to_datetime(self._lines_log[self._index_log]) for i, name in enumerate(self._names): vertex_name = 'log-' + aliases[name] vertex_name = vertex_name.replace('.', '') # cannot contain a "." if vertex_name in group: vertex_name += '-({})'.format(name[-1]) group.create_dataset( vertex_name, data=data[i], dtype=[(name, float) for name in fieldnames], **meta ) def _to_datetime(self, line): string = line.split(',')[1].replace('p.m.', 'PM').replace('a.m.', 'AM').strip() try: return datetime.strptime(string, '%I:%M %p %d/%m/%Y') except ValueError: return datetime.strptime(string, '%d/%m/%Y %I:%M %p')