mnetonix.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. """
  2. mnetonix.py
  3. Usage:
  4. python mnetonix.py <datafile> <montage>
  5. datafile: Either an EDF file or a BrainVision header file (vhdr).
  6. montage: Any format montage file supported by MNE.
  7. (Requires Python 3)
  8. Command line script for reading EDF and BrainVision files using MNE
  9. (mne-python) and storing the data and metadata into a NIX file. Supports
  10. reading montage files for recording channel locations.
  11. NIX Format layout:
  12. Data:
  13. Raw Data are stored in either a single 2-dimensional DataArray or a collection
  14. of DataArrays (one per recording channel). The latter makes tagging easier
  15. since MultiTag positions and extents don't need to specify every channel they
  16. reference. However, creating multiple DataArrays makes file sizes much
  17. bigger.
  18. Stimuli:
  19. MNE provides stimulus information through the Raw.annotations dictionary.
  20. Onsets correspond to the 'positions' array and durations correspond to the
  21. 'extents' array of the "Stimuli" MultiTag.
  22. Metadata:
  23. MNE collects metadata into a (nested) dictionary (Raw.info). All non-empty
  24. keys are converted into Properties in NIX. The nested structure of the
  25. dictionary is replicated in NIX by creating child Sections, starting with one
  26. root section with name "Info".
  27. """
  28. import sys
  29. import os
  30. from collections.abc import Iterable, Mapping
  31. import mne
  32. import matplotlib.pyplot as plt
  33. import numpy as np
  34. import nixio as nix
  35. DATA_BLOCK_NAME = "EEG Data Block"
  36. DATA_BLOCK_TYPE = "Recording"
  37. RAW_DATA_GROUP_NAME = "Raw Data Group"
  38. RAW_DATA_GROUP_TYPE = "EEG Channels"
  39. RAW_DATA_TYPE = "Raw Data"
  40. def plot_channel(data_array, index):
  41. signal = data_array[index]
  42. tdim = data_array.dimensions[1]
  43. datadim = data_array.dimensions[0]
  44. plt.plot(tdim.ticks, signal, label=datadim.labels[index])
  45. xlabel = f"({tdim.unit})"
  46. plt.xlabel(xlabel)
  47. ylabel = f"{datadim.labels[index]} ({data_array.unit})"
  48. plt.ylabel(ylabel)
  49. plt.legend()
  50. plt.show()
  51. def create_md_tree(section, values, block):
  52. if values is None:
  53. return
  54. for k, v in values.items():
  55. if v is None:
  56. continue
  57. if isinstance(v, Iterable):
  58. if not len(v):
  59. continue
  60. ndim = np.ndim(v)
  61. if ndim > 1:
  62. da = block.create_data_array(k, "Multidimensional Metadata",
  63. data=v)
  64. for _ in range(ndim):
  65. da.append_set_dimension()
  66. prop = section.create_property(k, da.id)
  67. prop.type = str(v.__class__)
  68. da.metadata = section
  69. continue
  70. # check element type
  71. if isinstance(v, Mapping):
  72. # Create a new Section to hold the metadata found in the
  73. # dictionary
  74. subsec = section.create_section(k, str(v.__class__))
  75. create_md_tree(subsec, v, block)
  76. continue
  77. if isinstance(v[0], Mapping):
  78. # Create a new subsection to hold each nested dictionary as
  79. # sub-subsections
  80. subsec = section.create_section(k, str(v.__class__))
  81. for idx, subd in enumerate(v):
  82. subsubsec = subsec.create_section(f"{k}-{idx}",
  83. str(subd.__class__))
  84. create_md_tree(subsubsec, subd, block)
  85. continue
  86. try:
  87. prop = section.create_property(k, v)
  88. except TypeError:
  89. # inconsistent iterable types: upgrade to floats
  90. prop = section.create_property(k, [float(vi) for vi in v])
  91. prop.type = str(v.__class__)
  92. def write_single_da(mneraw, block):
  93. # data and times
  94. data = mneraw.get_data()
  95. time = mneraw.times
  96. nchan = mneraw.info["nchan"]
  97. print(f"Found {nchan} channels with {mneraw.n_times} samples per channel")
  98. da = block.create_data_array("EEG Data", RAW_DATA_TYPE, data=data)
  99. block.groups[RAW_DATA_GROUP_NAME].data_arrays.append(da)
  100. da.unit = "V"
  101. for dimlen in data.shape:
  102. if dimlen == nchan:
  103. # channel labels: SetDimension
  104. da.append_set_dimension(labels=mneraw.ch_names)
  105. elif dimlen == mneraw.n_times:
  106. # times: RangeDimension
  107. # NOTE: EDF always uses seconds
  108. da.append_range_dimension(ticks=time, label="time", unit="s")
  109. def write_multi_da(mneraw, block):
  110. data = mneraw.get_data()
  111. time = mneraw.times
  112. nchan = mneraw.info["nchan"]
  113. channames = mneraw.ch_names
  114. print(f"Found {nchan} channels with {mneraw.n_times} samples per channel")
  115. # find the channel dimension to iterate over it
  116. for idx, dimlen in enumerate(data.shape):
  117. if dimlen == nchan:
  118. chanidx = idx
  119. break
  120. else:
  121. raise RuntimeError("Could not find data dimension that matches number "
  122. "of channels")
  123. for idx, chandata in enumerate(np.rollaxis(data, chanidx)):
  124. chname = channames[idx]
  125. da = block.create_data_array(chname, RAW_DATA_TYPE, data=chandata)
  126. block.groups[RAW_DATA_GROUP_NAME].data_arrays.append(da)
  127. da.unit = "V"
  128. # times: RangeDimension
  129. # NOTE: EDF always uses seconds
  130. da.append_range_dimension(ticks=time, label="time", unit="s")
  131. def separate_stimulus_types(stimuli):
  132. # separate stimuli based on label
  133. stimdict = dict()
  134. for label, onset, duration in zip(stimuli.description,
  135. stimuli.onset,
  136. stimuli.duration):
  137. if label not in stimdict:
  138. stimdict[label] = [(label, onset, duration)]
  139. else:
  140. stimdict[label].append((label, onset, duration))
  141. return stimdict
  142. def write_stim_tags(mneraw, block, split):
  143. stimuli = mneraw.annotations
  144. if split:
  145. stimtuples = separate_stimulus_types(stimuli)
  146. for label, st in stimtuples.items():
  147. create_stimulus_multi_tag(st, block, mneraw, split,
  148. mtagname=label)
  149. else:
  150. stimtuples = [(l, o, d) for l, o, d in zip(stimuli.description,
  151. stimuli.onset,
  152. stimuli.duration)]
  153. create_stimulus_multi_tag(stimtuples, block, mneraw)
  154. def create_stimulus_multi_tag(stimtuples, block, mneraw, mtagname="Stimuli"):
  155. # check dimensionality of data
  156. datashape = block.groups[RAW_DATA_GROUP_NAME].data_arrays[0].shape
  157. labels = [st[0] for st in stimtuples]
  158. onsets = [st[1] for st in stimtuples]
  159. durations = [st[2] for st in stimtuples]
  160. ndim = len(datashape)
  161. if ndim == 1:
  162. positions = onsets
  163. extents = durations
  164. else:
  165. channelextent = mneraw.info["nchan"] - 1
  166. positions = [(0, p) for p in onsets]
  167. extents = [(channelextent, e) for e in durations]
  168. posda = block.create_data_array("Stimuli onset", "Stimuli Positions",
  169. data=positions)
  170. posda.append_set_dimension(labels=labels.tolist())
  171. extda = block.create_data_array("Stimuli Durations", "Stimuli Extents",
  172. data=extents)
  173. extda.append_set_dimension(labels=labels.tolist())
  174. for _ in range(ndim-1):
  175. # extra set dimensions for any extra data dimensions (beyond the first)
  176. posda.append_set_dimension()
  177. extda.append_set_dimension()
  178. stimmtag = block.create_multi_tag(mtagname, "EEG Stimuli",
  179. positions=posda)
  180. stimmtag.extents = extda
  181. block.groups[RAW_DATA_GROUP_NAME].multi_tags.append(stimmtag)
  182. for da in block.groups[RAW_DATA_GROUP_NAME].data_arrays:
  183. if da.type == RAW_DATA_TYPE:
  184. stimmtag.references.append(da)
  185. def write_raw_mne(nfname, mneraw,
  186. split_data_channels=False, split_stimuli=False):
  187. mneinfo = mneraw.info
  188. extrainfo = mneraw._raw_extras
  189. # Create NIX file
  190. nf = nix.File(nfname, nix.FileMode.Overwrite)
  191. # Write Data to NIX
  192. block = nf.create_block(DATA_BLOCK_NAME, DATA_BLOCK_TYPE,
  193. compression=nix.Compression.DeflateNormal)
  194. block.create_group(RAW_DATA_GROUP_NAME, RAW_DATA_GROUP_TYPE)
  195. if split_data_channels:
  196. write_multi_da(mneraw, block)
  197. else:
  198. write_single_da(mneraw, block)
  199. if mneraw.annotations:
  200. write_stim_tags(mneraw, block, split_stimuli)
  201. # Write metadata to NIX
  202. # info dictionary
  203. infomd = nf.create_section("Info", "File metadata")
  204. create_md_tree(infomd, mneinfo, block)
  205. # extras
  206. if len(extrainfo) > 1:
  207. for idx, emd_i in enumerate(extrainfo):
  208. extrasmd = nf.create_section(f"Extras-{idx}",
  209. "Raw Extras metadata")
  210. create_md_tree(extrasmd, emd_i, block)
  211. elif extrainfo:
  212. extrasmd = nf.create_section("Extras", "Raw Extras metadata")
  213. create_md_tree(extrasmd, extrainfo[0], block)
  214. # all done
  215. nf.close()
  216. print(f"Created NIX file at '{nfname}'")
  217. print("Done")
  218. def main():
  219. args = sys.argv
  220. if len(args) < 2:
  221. print("Please provide either a BrainVision vhdr or "
  222. "an EDF filename as the first argument")
  223. sys.exit(1)
  224. splitdata = False
  225. if "--split-data" in args:
  226. splitdata = True
  227. args.remove("--split-data")
  228. splitstim = False
  229. if "--split-stimuli" in args:
  230. splitstim = True
  231. args.remove("--split-stimuli")
  232. datafilename = args[1]
  233. montage = None
  234. if len(args) > 2:
  235. montage = args[2]
  236. montage = os.path.abspath(montage)
  237. root, ext = os.path.splitext(datafilename)
  238. nfname = root + os.path.extsep + "nix"
  239. if ext.casefold() == ".edf".casefold():
  240. mneraw = mne.io.read_raw_edf(datafilename, montage=montage,
  241. preload=True, stim_channel=False)
  242. elif ext.casefold() == ".vhdr".casefold():
  243. mneraw = mne.io.read_raw_brainvision(datafilename, montage=montage,
  244. preload=True, stim_channel=False)
  245. else:
  246. raise RuntimeError(f"Unknown extension '{ext}'")
  247. print(f"Converting '{datafilename}' to NIX")
  248. if splitdata:
  249. print(" Creating one DataArray per channel")
  250. if splitstim:
  251. print(" Creating one MultiTag for each stimulus type")
  252. write_raw_mne(nfname, mneraw, splitdata, splitstim)
  253. mneraw.close()
  254. if __name__ == "__main__":
  255. main()