mnetonix.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. """
  2. mnetonix.py
  3. Usage:
  4. python mnetonix.py <datafile> <montage>
  5. datafile: Either an EDF file or a BrainVision header file (vhdr).
  6. montage: Any format montage file supported by MNE.
  7. (Requires Python 3)
  8. Command line script for reading EDF and BrainVision files using MNE
  9. (mne-python) and storing the data and metadata into a NIX file. Supports
  10. reading montage files for recording channel locations.
  11. NIX Format layout:
  12. Data:
  13. Raw Data are stored in either a single 2-dimensional DataArray or a collection
  14. of DataArrays (one per recording channel). The latter makes tagging easier
  15. since MultiTag positions and extents don't need to specify every channel they
  16. reference. However, creating multiple DataArrays makes file sizes much
  17. bigger.
  18. Stimuli:
  19. MNE provides stimulus information through the Raw.annotations dictionary.
  20. Onsets correspond to the 'positions' array and durations correspond to the
  21. 'extents' array of the "Stimuli" MultiTag.
  22. Metadata:
  23. MNE collects metadata into a (nested) dictionary (Raw.info). All non-empty
  24. keys are converted into Properties in NIX. The nested structure of the
  25. dictionary is replicated in NIX by creating child Sections, starting with one
  26. root section with name "Info".
  27. """
  28. import sys
  29. import os
  30. from collections.abc import Iterable, Mapping
  31. import mne
  32. import matplotlib.pyplot as plt
  33. import numpy as np
  34. import nixio as nix
  35. DATA_BLOCK_NAME = "EEG Data Block"
  36. DATA_BLOCK_TYPE = "Recording"
  37. RAW_DATA_GROUP_NAME = "Raw Data Group"
  38. RAW_DATA_GROUP_TYPE = "EEG Channels"
  39. RAW_DATA_TYPE = "Raw Data"
  40. def plot_channel(data_array, index):
  41. signal = data_array[index]
  42. tdim = data_array.dimensions[1]
  43. datadim = data_array.dimensions[0]
  44. plt.plot(tdim.ticks, signal, label=datadim.labels[index])
  45. xlabel = f"({tdim.unit})"
  46. plt.xlabel(xlabel)
  47. ylabel = f"{datadim.labels[index]} ({data_array.unit})"
  48. plt.ylabel(ylabel)
  49. plt.legend()
  50. plt.show()
  51. def create_md_tree(section, values, block):
  52. if values is None:
  53. return
  54. for k, v in values.items():
  55. if v is None:
  56. continue
  57. if isinstance(v, Iterable):
  58. if not len(v):
  59. continue
  60. ndim = np.ndim(v)
  61. if ndim > 1:
  62. da = block.create_data_array(k, "Multidimensional Metadata",
  63. data=v)
  64. for _ in range(ndim):
  65. da.append_set_dimension()
  66. section.create_property(k, da.id)
  67. da.metadata = section
  68. continue
  69. # check element type
  70. if isinstance(v, Mapping):
  71. # Create a new Section to hold the metadata found in the
  72. # dictionary
  73. subsec = section.create_section(k, str(v.__class__))
  74. create_md_tree(subsec, v, block)
  75. continue
  76. elif isinstance(v[0], Mapping):
  77. # Create multiple new Sections to hold the metadata found in
  78. # each nested dictionary
  79. for idx, subd in enumerate(v):
  80. secname = f"{k}-{idx}"
  81. subsec = section.create_section(secname, str(v.__class__))
  82. create_md_tree(subsec, subd, block)
  83. continue
  84. try:
  85. section.create_property(k, v)
  86. except TypeError:
  87. # inconsistent iterable types: upgrade to floats
  88. section.create_property(k, [float(vi) for vi in v])
  89. def write_single_da(mneraw, block):
  90. # data and times
  91. data = mneraw.get_data()
  92. time = mneraw.times
  93. nchan = mneraw.info["nchan"]
  94. print(f"Found {nchan} channels with {mneraw.n_times} samples per channel")
  95. da = block.create_data_array("EEG Data", RAW_DATA_TYPE, data=data)
  96. block.groups[RAW_DATA_GROUP_NAME].data_arrays.append(da)
  97. da.unit = "V"
  98. for dimlen in data.shape:
  99. if dimlen == nchan:
  100. # channel labels: SetDimension
  101. da.append_set_dimension(labels=mneraw.ch_names)
  102. elif dimlen == mneraw.n_times:
  103. # times: RangeDimension
  104. # NOTE: EDF always uses seconds
  105. da.append_range_dimension(ticks=time, label="time", unit="s")
  106. def write_multi_da(mneraw, block):
  107. data = mneraw.get_data()
  108. time = mneraw.times
  109. nchan = mneraw.info["nchan"]
  110. channames = mneraw.ch_names
  111. print(f"Found {nchan} channels with {mneraw.n_times} samples per channel")
  112. # find the channel dimension to iterate over it
  113. for idx, dimlen in enumerate(data.shape):
  114. if dimlen == nchan:
  115. chanidx = idx
  116. break
  117. else:
  118. raise RuntimeError("Could not find data dimension that matches number "
  119. "of channels")
  120. for idx, chandata in enumerate(np.rollaxis(data, chanidx)):
  121. chname = channames[idx]
  122. da = block.create_data_array(chname, RAW_DATA_TYPE, data=chandata)
  123. block.groups[RAW_DATA_GROUP_NAME].data_arrays.append(da)
  124. da.unit = "V"
  125. # times: RangeDimension
  126. # NOTE: EDF always uses seconds
  127. da.append_range_dimension(ticks=time, label="time", unit="s")
  128. def write_stim_tags(mneraw, block):
  129. # check dimensionality of data
  130. datashape = block.groups[RAW_DATA_GROUP_NAME].data_arrays[0].shape
  131. stimuli = mneraw.annotations
  132. labels = stimuli.description
  133. ndim = len(datashape)
  134. if ndim == 1:
  135. positions = stimuli.onset
  136. extents = stimuli.duration
  137. else:
  138. channelextent = mneraw.info["nchan"] - 1
  139. positions = [(0, p) for p in stimuli.onset]
  140. extents = [(channelextent, e) for e in stimuli.duration]
  141. posda = block.create_data_array("Stimuli onset", "Stimuli Positions",
  142. data=positions)
  143. posda.append_set_dimension(labels=labels.tolist())
  144. extda = block.create_data_array("Stimuli Durations", "Stimuli Extents",
  145. data=extents)
  146. extda.append_set_dimension(labels=labels.tolist())
  147. for _ in range(ndim-1):
  148. # extra set dimensions for any extra data dimensions (beyond the first)
  149. posda.append_set_dimension()
  150. extda.append_set_dimension()
  151. stimmtag = block.create_multi_tag("Stimuli", "EEG Stimuli",
  152. positions=posda)
  153. stimmtag.extents = extda
  154. block.groups[RAW_DATA_GROUP_NAME].multi_tags.append(stimmtag)
  155. for da in block.groups[RAW_DATA_GROUP_NAME].data_arrays:
  156. if da.type == RAW_DATA_TYPE:
  157. stimmtag.references.append(da)
  158. def write_raw_mne(nfname, mneraw, split_data_channels=False):
  159. mneinfo = mneraw.info
  160. extrainfo = mneraw._raw_extras
  161. # Create NIX file
  162. nf = nix.File(nfname, nix.FileMode.Overwrite)
  163. # Write Data to NIX
  164. block = nf.create_block(DATA_BLOCK_NAME, DATA_BLOCK_TYPE,
  165. compression=nix.Compression.DeflateNormal)
  166. block.create_group(RAW_DATA_GROUP_NAME, RAW_DATA_GROUP_TYPE)
  167. if split_data_channels:
  168. write_multi_da(mneraw, block)
  169. else:
  170. write_single_da(mneraw, block)
  171. if mneraw.annotations:
  172. write_stim_tags(mneraw, block)
  173. # Write metadata to NIX
  174. # info dictionary
  175. infomd = nf.create_section("Info", "File metadata")
  176. create_md_tree(infomd, mneinfo, block)
  177. # extras
  178. if len(extrainfo) > 1:
  179. for idx, emd_i in enumerate(extrainfo):
  180. extrasmd = nf.create_section(f"Extras-{idx}",
  181. "Raw Extras metadata")
  182. create_md_tree(extrasmd, emd_i, block)
  183. elif extrainfo:
  184. extrasmd = nf.create_section("Extras", "Raw Extras metadata")
  185. create_md_tree(extrasmd, extrainfo[0], block)
  186. # all done
  187. nf.close()
  188. print(f"Created NIX file at '{nfname}'")
  189. print("Done")
  190. def main():
  191. if len(sys.argv) < 2:
  192. print("Please provide either a BrainVision vhdr or "
  193. "an EDF filename as the first argument")
  194. sys.exit(1)
  195. datafilename = sys.argv[1]
  196. montage = None
  197. if len(sys.argv) > 2:
  198. montage = sys.argv[2]
  199. montage = os.path.abspath(montage)
  200. root, ext = os.path.splitext(datafilename)
  201. nfname = root + os.path.extsep + "nix"
  202. if ext.casefold() == ".edf".casefold():
  203. mneraw = mne.io.read_raw_edf(datafilename, montage=montage,
  204. preload=True, stim_channel=False)
  205. elif ext.casefold() == ".vhdr".casefold():
  206. mneraw = mne.io.read_raw_brainvision(datafilename, montage=montage,
  207. preload=True, stim_channel=False)
  208. else:
  209. raise RuntimeError(f"Unknown extension '{ext}'")
  210. print(f"Converting '{datafilename}' to NIX")
  211. write_raw_mne(nfname, mneraw, True)
  212. nfname = root + "-oneda" + os.path.extsep + "nix"
  213. write_raw_mne(nfname, mneraw, False)
  214. mneraw.close()
  215. if __name__ == "__main__":
  216. main()