micromedio.py 7.2 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Class for reading/writing data from micromed (.trc).
  4. Inspired by the Matlab code for EEGLAB from Rami K. Niazy.
  5. Completed with matlab Guillaume BECQ code.
  6. Supported : Read
  7. Author: sgarcia
  8. """
  9. import datetime
  10. import os
  11. import struct
  12. from io import open, BufferedReader
  13. import numpy as np
  14. import quantities as pq
  15. from neo.io.baseio import BaseIO
  16. from neo.core import Segment, AnalogSignal, Epoch, Event
  17. class StructFile(BufferedReader):
  18. def read_f(self, fmt):
  19. return struct.unpack(fmt, self.read(struct.calcsize(fmt)))
  20. class MicromedIO(BaseIO):
  21. """
  22. Class for reading data from micromed (.trc).
  23. Usage:
  24. >>> from neo import io
  25. >>> r = io.MicromedIO(filename='File_micromed_1.TRC')
  26. >>> seg = r.read_segment(lazy=False, cascade=True)
  27. >>> print seg.analogsignals # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
  28. [<AnalogSignal(array([ -1.77246094e+02, -2.24707031e+02,
  29. -2.66015625e+02, ...
  30. """
  31. is_readable = True
  32. is_writable = False
  33. supported_objects = [Segment, AnalogSignal, Event, Epoch]
  34. readable_objects = [Segment]
  35. writeable_objects = []
  36. has_header = False
  37. is_streameable = False
  38. read_params = {Segment: []}
  39. write_params = None
  40. name = None
  41. extensions = ['TRC']
  42. mode = 'file'
  43. def __init__(self, filename=None):
  44. """
  45. This class read a micromed TRC file.
  46. Arguments:
  47. filename : the filename to read
  48. """
  49. BaseIO.__init__(self)
  50. self.filename = filename
  51. def read_segment(self, cascade=True, lazy=False, ):
  52. """
  53. Arguments:
  54. """
  55. f = StructFile(open(self.filename, 'rb'))
  56. # Name
  57. f.seek(64, 0)
  58. surname = f.read(22).decode('ascii')
  59. while surname[-1] == ' ':
  60. if len(surname) == 0:
  61. break
  62. surname = surname[:-1]
  63. firstname = f.read(20).decode('ascii')
  64. while firstname[-1] == ' ':
  65. if len(firstname) == 0:
  66. break
  67. firstname = firstname[:-1]
  68. #Date
  69. f.seek(128, 0)
  70. day, month, year, hour, minute, sec = f.read_f('bbbbbb')
  71. rec_datetime = datetime.datetime(year + 1900, month, day, hour, minute,
  72. sec)
  73. f.seek(138, 0)
  74. Data_Start_Offset, Num_Chan, Multiplexer, Rate_Min, Bytes = f.read_f(
  75. 'IHHHH')
  76. #~ print Num_Chan, Bytes
  77. #header version
  78. f.seek(175, 0)
  79. header_version, = f.read_f('b')
  80. assert header_version == 4
  81. seg = Segment(name=str(firstname + ' ' + surname),
  82. file_origin=os.path.basename(self.filename))
  83. seg.annotate(surname=surname)
  84. seg.annotate(firstname=firstname)
  85. seg.annotate(rec_datetime=rec_datetime)
  86. if not cascade:
  87. f.close()
  88. return seg
  89. # area
  90. f.seek(176, 0)
  91. zone_names = ['ORDER', 'LABCOD', 'NOTE', 'FLAGS', 'TRONCA', 'IMPED_B',
  92. 'IMPED_E', 'MONTAGE',
  93. 'COMPRESS', 'AVERAGE', 'HISTORY', 'DVIDEO', 'EVENT A',
  94. 'EVENT B', 'TRIGGER']
  95. zones = {}
  96. for zname in zone_names:
  97. zname2, pos, length = f.read_f('8sII')
  98. zones[zname] = zname2, pos, length
  99. #~ print zname2, pos, length
  100. # reading raw data
  101. if not lazy:
  102. f.seek(Data_Start_Offset, 0)
  103. rawdata = np.fromstring(f.read(), dtype='u' + str(Bytes))
  104. rawdata = rawdata.reshape((-1, Num_Chan))
  105. # Reading Code Info
  106. zname2, pos, length = zones['ORDER']
  107. f.seek(pos, 0)
  108. code = np.fromstring(f.read(Num_Chan*2), dtype='u2', count=Num_Chan)
  109. units = {-1: pq.nano * pq.V, 0: pq.uV, 1: pq.mV, 2: 1, 100: pq.percent,
  110. 101: pq.dimensionless, 102: pq.dimensionless}
  111. for c in range(Num_Chan):
  112. zname2, pos, length = zones['LABCOD']
  113. f.seek(pos + code[c] * 128 + 2, 0)
  114. label = f.read(6).strip(b"\x00").decode('ascii')
  115. ground = f.read(6).strip(b"\x00").decode('ascii')
  116. (logical_min, logical_max, logical_ground, physical_min,
  117. physical_max) = f.read_f('iiiii')
  118. k, = f.read_f('h')
  119. if k in units.keys():
  120. unit = units[k]
  121. else:
  122. unit = pq.uV
  123. f.seek(8, 1)
  124. sampling_rate, = f.read_f('H') * pq.Hz
  125. sampling_rate *= Rate_Min
  126. if lazy:
  127. signal = [] * unit
  128. else:
  129. factor = float(physical_max - physical_min) / float(
  130. logical_max - logical_min + 1)
  131. signal = (rawdata[:, c].astype(
  132. 'f') - logical_ground) * factor * unit
  133. ana_sig = AnalogSignal(signal, sampling_rate=sampling_rate,
  134. name=str(label), channel_index=c)
  135. if lazy:
  136. ana_sig.lazy_shape = None
  137. ana_sig.annotate(ground=ground)
  138. seg.analogsignals.append(ana_sig)
  139. sampling_rate = np.mean(
  140. [ana_sig.sampling_rate for ana_sig in seg.analogsignals]) * pq.Hz
  141. # Read trigger and notes
  142. for zname, label_dtype in [('TRIGGER', 'u2'), ('NOTE', 'S40')]:
  143. zname2, pos, length = zones[zname]
  144. f.seek(pos, 0)
  145. triggers = np.fromstring(f.read(length), dtype=[('pos', 'u4'), (
  146. 'label', label_dtype)])
  147. if not lazy:
  148. keep = (triggers['pos'] >= triggers['pos'][0]) & (
  149. triggers['pos'] < rawdata.shape[0]) & (
  150. triggers['pos'] != 0)
  151. triggers = triggers[keep]
  152. ea = Event(name=zname[0] + zname[1:].lower(),
  153. labels=triggers['label'].astype('S'),
  154. times=(triggers['pos'] / sampling_rate).rescale('s'))
  155. else:
  156. ea = Event(name=zname[0] + zname[1:].lower())
  157. ea.lazy_shape = triggers.size
  158. seg.events.append(ea)
  159. # Read Event A and B
  160. # Not so well tested
  161. for zname in ['EVENT A', 'EVENT B']:
  162. zname2, pos, length = zones[zname]
  163. f.seek(pos, 0)
  164. epochs = np.fromstring(f.read(length),
  165. dtype=[('label', 'u4'), ('start', 'u4'),
  166. ('stop', 'u4'), ])
  167. ep = Epoch(name=zname[0] + zname[1:].lower())
  168. if not lazy:
  169. keep = (epochs['start'] > 0) & (
  170. epochs['start'] < rawdata.shape[0]) & (
  171. epochs['stop'] < rawdata.shape[0])
  172. epochs = epochs[keep]
  173. ep = Epoch(name=zname[0] + zname[1:].lower(),
  174. labels=epochs['label'].astype('S'),
  175. times=(epochs['start'] / sampling_rate).rescale('s'),
  176. durations=((epochs['stop'] - epochs['start']) / sampling_rate).rescale('s'))
  177. else:
  178. ep = Epoch(name=zname[0] + zname[1:].lower())
  179. ep.lazy_shape = triggers.size
  180. seg.epochs.append(ep)
  181. seg.create_many_to_one_relationship()
  182. f.close()
  183. return seg