test_neuralynxio.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests of neo.io.blackrockio
  4. """
  5. # needed for python 3 compatibility
  6. from __future__ import absolute_import
  7. import os
  8. import sys
  9. import re
  10. import warnings
  11. import unittest
  12. import numpy as np
  13. import quantities as pq
  14. from neo import NeuralynxIO, AnalogSignal, SpikeTrain, Event
  15. from neo.test.iotest.common_io_test import BaseTestIO
  16. from neo.core import Segment
  17. class CommonTests(BaseTestIO):
  18. ioclass = NeuralynxIO
  19. files_to_test = []
  20. files_to_download = [
  21. 'Cheetah_v5.5.1/original_data/CheetahLogFile.txt',
  22. 'Cheetah_v5.5.1/original_data/CheetahLostADRecords.txt',
  23. 'Cheetah_v5.5.1/original_data/Events.nev',
  24. 'Cheetah_v5.5.1/original_data/STet3a.nse',
  25. 'Cheetah_v5.5.1/original_data/STet3b.nse',
  26. 'Cheetah_v5.5.1/original_data/Tet3a.ncs',
  27. 'Cheetah_v5.5.1/original_data/Tet3b.ncs',
  28. 'Cheetah_v5.5.1/plain_data/STet3a.txt',
  29. 'Cheetah_v5.5.1/plain_data/STet3b.txt',
  30. 'Cheetah_v5.5.1/plain_data/Tet3a.txt',
  31. 'Cheetah_v5.5.1/plain_data/Tet3b.txt',
  32. 'Cheetah_v5.5.1/plain_data/Events.txt',
  33. 'Cheetah_v5.5.1/README.txt',
  34. 'Cheetah_v5.7.4/original_data/CSC1.ncs',
  35. 'Cheetah_v5.7.4/original_data/CSC2.ncs',
  36. 'Cheetah_v5.7.4/original_data/CSC3.ncs',
  37. 'Cheetah_v5.7.4/original_data/CSC4.ncs',
  38. 'Cheetah_v5.7.4/original_data/CSC5.ncs',
  39. 'Cheetah_v5.7.4/original_data/Events.nev',
  40. 'Cheetah_v5.7.4/plain_data/CSC1.txt',
  41. 'Cheetah_v5.7.4/plain_data/CSC2.txt',
  42. 'Cheetah_v5.7.4/plain_data/CSC3.txt',
  43. 'Cheetah_v5.7.4/plain_data/CSC4.txt',
  44. 'Cheetah_v5.7.4/plain_data/CSC5.txt',
  45. 'Cheetah_v5.7.4/plain_data/Events.txt',
  46. 'Cheetah_v5.7.4/README.txt']
  47. def setUp(self):
  48. super(CommonTests, self).setUp()
  49. data_dir = os.path.join(self.local_test_dir,
  50. 'Cheetah_v{}'.format(self.cheetah_version))
  51. self.sn = os.path.join(data_dir, 'original_data')
  52. self.pd = os.path.join(data_dir, 'plain_data')
  53. if not os.path.exists(self.sn):
  54. raise unittest.SkipTest('data file does not exist:' + self.sn)
  55. class TestCheetah_v551(CommonTests, unittest.TestCase):
  56. cheetah_version = '5.5.1'
  57. def test_read_block(self):
  58. """Read data in a certain time range into one block"""
  59. t_start, t_stop = 3 * pq.s, 4 * pq.s
  60. nio = NeuralynxIO(self.sn, use_cache='never')
  61. block = nio.read_block(t_starts=[t_start], t_stops=[t_stop])
  62. self.assertEqual(len(nio.parameters_ncs), 2)
  63. self.assertTrue(
  64. {'event_id': 11, 'name': 'Starting Recording', 'nttl': 0} in
  65. nio.parameters_nev['Events.nev']['event_types'])
  66. # Everything put in one segment
  67. self.assertEqual(len(block.segments), 1)
  68. seg = block.segments[0]
  69. self.assertEqual(len(seg.analogsignals), 1)
  70. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  71. self.assertEqual(seg.analogsignals[0].sampling_rate.units,
  72. pq.CompoundUnit('32*kHz'))
  73. self.assertEqual(seg.analogsignals[0].t_start, t_start)
  74. self.assertEqual(seg.analogsignals[0].t_stop, t_stop)
  75. self.assertEqual(len(seg.spiketrains), 2)
  76. # Testing different parameter combinations
  77. block = nio.read_block(lazy=True)
  78. self.assertEqual(len(block.segments[0].analogsignals[0]), 0)
  79. self.assertEqual(len(block.segments[0].spiketrains[0]), 0)
  80. block = nio.read_block(cascade=False)
  81. self.assertEqual(len(block.segments), 0)
  82. block = nio.read_block(electrode_list=[0])
  83. self.assertEqual(len(block.segments[0].analogsignals), 1)
  84. self.assertEqual(len(block.channel_indexes[-1].units), 1)
  85. block = nio.read_block(t_starts=None, t_stops=None, events=True,
  86. waveforms=True)
  87. self.assertEqual(len(block.segments[0].analogsignals), 1)
  88. self.assertEqual(len(block.segments[0].spiketrains), 2)
  89. self.assertEqual(len(block.segments[0].spiketrains[0].waveforms),
  90. len(block.segments[0].spiketrains[0]))
  91. self.assertGreater(len(block.segments[0].events), 0)
  92. self.assertEqual(len(block.channel_indexes[-1].units), 2)
  93. block = nio.read_block(t_starts=[t_start], t_stops=[t_stop],
  94. unit_list=[0], electrode_list=[0])
  95. self.assertEqual(len(block.channel_indexes[-1].units), 1)
  96. block = nio.read_block(t_starts=[t_start], t_stops=[t_stop],
  97. unit_list=False)
  98. self.assertEqual(len(block.channel_indexes[-1].units), 0)
  99. def test_read_segment(self):
  100. """Read data in a certain time range into one block"""
  101. nio = NeuralynxIO(self.sn, use_cache='never')
  102. seg = nio.read_segment(t_start=None, t_stop=None)
  103. self.assertEqual(len(seg.analogsignals), 1)
  104. self.assertEqual(seg.analogsignals[0].shape[-1], 2)
  105. self.assertEqual(seg.analogsignals[0].sampling_rate.units,
  106. pq.CompoundUnit('32*kHz'))
  107. self.assertEqual(len(seg.spiketrains), 2)
  108. # Testing different parameter combinations
  109. seg = nio.read_segment(lazy=True)
  110. self.assertEqual(len(seg.analogsignals[0]), 0)
  111. self.assertEqual(len(seg.spiketrains[0]), 0)
  112. seg = nio.read_segment(cascade=False)
  113. self.assertEqual(len(seg.analogsignals), 0)
  114. self.assertEqual(len(seg.spiketrains), 0)
  115. seg = nio.read_segment(electrode_list=[0])
  116. self.assertEqual(len(seg.analogsignals), 1)
  117. seg = nio.read_segment(t_start=None, t_stop=None, events=True,
  118. waveforms=True)
  119. self.assertEqual(len(seg.analogsignals), 1)
  120. self.assertEqual(len(seg.spiketrains), 2)
  121. self.assertTrue(len(seg.spiketrains[0].waveforms) > 0)
  122. self.assertTrue(len(seg.events) > 0)
  123. def test_read_ncs_data(self):
  124. t_start, t_stop = 0, 500 * 512 # in samples
  125. nio = NeuralynxIO(self.sn, use_cache='never')
  126. seg = Segment('testsegment')
  127. for el_id, el_dict in nio.parameters_ncs.items():
  128. filepath = nio.parameters_ncs[el_id]['recording_file_name']
  129. filename = filepath.split('/')[-1].split('\\')[-1].split('.')[0]
  130. nio.read_ncs(filename, seg, t_start=t_start, t_stop=t_stop)
  131. anasig = seg.filter({'electrode_id': el_id},
  132. objects=AnalogSignal)[0]
  133. target_data = np.zeros((16679, 512))
  134. with open(self.pd + '/%s.txt' % filename) as datafile:
  135. for i, line in enumerate(datafile):
  136. line = line.strip('\xef\xbb\xbf')
  137. entries = line.split()
  138. target_data[i, :] = np.asarray(entries[4:])
  139. target_data = target_data.reshape((-1, 1)) * el_dict['ADBitVolts']
  140. np.testing.assert_array_equal(target_data[:len(anasig)],
  141. anasig.magnitude)
  142. def test_read_nse_data(self):
  143. t_start, t_stop = None, None # in samples
  144. nio = NeuralynxIO(self.sn, use_cache='never')
  145. seg = Segment('testsegment')
  146. for el_id, el_dict in nio.parameters_nse.items():
  147. filepath = nio.parameters_nse[el_id]['recording_file_name']
  148. filename = filepath.split('/')[-1].split('\\')[-1].split('.')[0]
  149. nio.read_nse(filename, seg, t_start=t_start, t_stop=t_stop,
  150. waveforms=True)
  151. spiketrain = seg.filter({'electrode_id': el_id},
  152. objects=SpikeTrain)[0]
  153. # target_data = np.zeros((500, 32))
  154. # timestamps = np.zeros(500)
  155. entries = []
  156. with open(self.pd + '/%s.txt' % filename) as datafile:
  157. for i, line in enumerate(datafile):
  158. line = line.strip('\xef\xbb\xbf')
  159. entries.append(line.split())
  160. entries = np.asarray(entries, dtype=float)
  161. target_data = entries[:-1, 11:]
  162. timestamps = entries[:-1, 0]
  163. timestamps = (timestamps * pq.microsecond -
  164. nio.parameters_global['t_start'])
  165. np.testing.assert_array_equal(timestamps.magnitude,
  166. spiketrain.magnitude)
  167. np.testing.assert_array_equal(target_data,
  168. spiketrain.waveforms)
  169. def test_read_nev_data(self):
  170. t_start, t_stop = 0 * pq.s, 1000 * pq.s
  171. nio = NeuralynxIO(self.sn, use_cache='never')
  172. seg = Segment('testsegment')
  173. filename = 'Events'
  174. nio.read_nev(filename + '.nev', seg, t_start=t_start, t_stop=t_stop)
  175. timestamps = []
  176. nttls = []
  177. names = []
  178. event_ids = []
  179. with open(self.pd + '/%s.txt' % filename) as datafile:
  180. for i, line in enumerate(datafile):
  181. line = line.strip('\xef\xbb\xbf')
  182. entries = line.split('\t')
  183. nttls.append(int(entries[5]))
  184. timestamps.append(int(entries[3]))
  185. names.append(entries[10].rstrip('\r\n'))
  186. event_ids.append(int(entries[4]))
  187. timestamps = (np.array(timestamps) * pq.microsecond -
  188. nio.parameters_global['t_start'])
  189. # masking only requested spikes
  190. mask = np.where(timestamps < t_stop)[0]
  191. # return if no event fits criteria
  192. if len(mask) == 0:
  193. return
  194. timestamps = timestamps[mask]
  195. nttls = np.asarray(nttls)[mask]
  196. names = np.asarray(names)[mask]
  197. event_ids = np.asarray(event_ids)[mask]
  198. for i in range(len(timestamps)):
  199. events = seg.filter({'nttl': nttls[i]}, objects=Event)
  200. events = [e for e in events
  201. if (e.annotations['marker_id'] == event_ids[i] and
  202. e.labels == names[i])]
  203. self.assertTrue(len(events) == 1)
  204. self.assertTrue(timestamps[i] in events[0].times)
  205. def test_read_ntt_data(self):
  206. pass
  207. # TODO: Implement test_read_ntt_data once ntt files are available
  208. class TestCheetah_v574(TestCheetah_v551, CommonTests, unittest.TestCase):
  209. cheetah_version = '5.7.4'
  210. def test_read_block(self):
  211. """Read data in a certain time range into one block"""
  212. t_start, t_stop = 3 * pq.s, 4 * pq.s
  213. nio = NeuralynxIO(self.sn, use_cache='never')
  214. block = nio.read_block(t_starts=[t_start], t_stops=[t_stop])
  215. self.assertEqual(len(nio.parameters_ncs), 5)
  216. self.assertTrue(
  217. {'event_id': 19, 'name': 'Starting Recording', 'nttl': 0} in
  218. nio.parameters_nev['Events.nev']['event_types'])
  219. self.assertTrue(
  220. {'event_id': 19, 'name': 'Stopping Recording', 'nttl': 0} in
  221. nio.parameters_nev['Events.nev']['event_types'])
  222. # Everything put in one segment
  223. self.assertEqual(len(block.segments), 1)
  224. seg = block.segments[0]
  225. self.assertEqual(len(seg.analogsignals), 1)
  226. self.assertEqual(seg.analogsignals[0].shape[-1], 5)
  227. self.assertEqual(seg.analogsignals[0].sampling_rate.units,
  228. pq.CompoundUnit('32*kHz'))
  229. self.assertAlmostEqual(seg.analogsignals[0].t_start, t_start, places=4)
  230. self.assertAlmostEqual(seg.analogsignals[0].t_stop, t_stop, places=4)
  231. self.assertEqual(len(seg.spiketrains), 0) # no nse files available
  232. # Testing different parameter combinations
  233. block = nio.read_block(lazy=True)
  234. self.assertEqual(len(block.segments[0].analogsignals[0]), 0)
  235. block = nio.read_block(cascade=False)
  236. self.assertEqual(len(block.segments), 0)
  237. block = nio.read_block(electrode_list=[0])
  238. self.assertEqual(len(block.segments[0].analogsignals), 1)
  239. block = nio.read_block(t_starts=None, t_stops=None, events=True,
  240. waveforms=True)
  241. self.assertEqual(len(block.segments[0].analogsignals), 1)
  242. self.assertEqual(len(block.segments[0].spiketrains), 0)
  243. self.assertGreater(len(block.segments[0].events), 0)
  244. self.assertEqual(len(block.channel_indexes), 5)
  245. def test_read_segment(self):
  246. """Read data in a certain time range into one block"""
  247. nio = NeuralynxIO(self.sn, use_cache='never')
  248. seg = nio.read_segment(t_start=None, t_stop=None)
  249. self.assertEqual(len(seg.analogsignals), 1)
  250. self.assertEqual(seg.analogsignals[0].shape[-1], 5)
  251. self.assertEqual(seg.analogsignals[0].sampling_rate.units,
  252. pq.CompoundUnit('32*kHz'))
  253. self.assertEqual(len(seg.spiketrains), 0)
  254. # Testing different parameter combinations
  255. seg = nio.read_segment(lazy=True)
  256. self.assertEqual(len(seg.analogsignals[0]), 0)
  257. self.assertEqual(len(seg.spiketrains), 0)
  258. seg = nio.read_segment(cascade=False)
  259. self.assertEqual(len(seg.analogsignals), 0)
  260. self.assertEqual(len(seg.spiketrains), 0)
  261. seg = nio.read_segment(electrode_list=[0])
  262. self.assertEqual(len(seg.analogsignals), 1)
  263. seg = nio.read_segment(t_start=None, t_stop=None, events=True,
  264. waveforms=True)
  265. self.assertEqual(len(seg.analogsignals), 1)
  266. self.assertEqual(len(seg.spiketrains), 0)
  267. self.assertTrue(len(seg.events) > 0)
  268. class TestGaps(CommonTests, unittest.TestCase):
  269. cheetah_version = '5.5.1'
  270. def test_gap_handling(self):
  271. nio = NeuralynxIO(self.sn, use_cache='never')
  272. block = nio.read_block(t_starts=None, t_stops=None)
  273. # known gap values
  274. n_gaps = 1
  275. self.assertEqual(len(block.segments), n_gaps + 1)
  276. # one channel index for analogsignals for each of the 3 segments and
  277. # one for spiketrains
  278. self.assertEqual(len(block.channel_indexes), len(block.segments) + 1)
  279. self.assertEqual(len(block.channel_indexes[-1].units), 2)
  280. for unit in block.channel_indexes[-1].units:
  281. self.assertEqual(len(unit.spiketrains), n_gaps + 1)
  282. anasig_channels = [i for i in block.channel_indexes
  283. if 'analogsignal' in i.name]
  284. self.assertEqual(len(anasig_channels), n_gaps + 1)
  285. def test_gap_warning(self):
  286. nio = NeuralynxIO(self.sn, use_cache='never')
  287. with reset_warning_registry():
  288. with warnings.catch_warnings(record=True) as w:
  289. warnings.simplefilter('always')
  290. nio.read_block(t_starts=None, t_stops=None)
  291. self.assertGreater(len(w), 0)
  292. self.assertTrue(issubclass(w[0].category, UserWarning))
  293. self.assertEqual('Substituted t_starts and t_stops in order to'
  294. ' skip gap in recording session.',
  295. str(w[0].message))
  296. def test_analogsignal_shortening_warning(self):
  297. nio = NeuralynxIO(self.sn, use_cache='never')
  298. with reset_warning_registry():
  299. with warnings.catch_warnings(record=True) as w:
  300. seg = Segment('testsegment')
  301. nio.read_ncs(os.path.join(self.sn, 'Tet3a.ncs'), seg)
  302. self.assertGreater(len(w), 0)
  303. self.assertTrue(issubclass(w[0].category, UserWarning))
  304. self.assertTrue('Analogsignalarray was shortened due to gap in'
  305. ' recorded data of file'
  306. in str(w[0].message))
  307. # This class is copied from
  308. # 'http://bugs.python.org/file40031/reset_warning_registry.py' by Eli Collins
  309. # and is related to http://bugs.python.org/issue21724 Python<3.4
  310. class reset_warning_registry(object):
  311. """
  312. context manager which archives & clears warning registry for duration of
  313. context.
  314. :param pattern:
  315. optional regex pattern, causes manager to only reset modules whose
  316. names match this pattern. defaults to ``".*"``.
  317. """
  318. #: regexp for filtering which modules are reset
  319. _pattern = None
  320. #: dict mapping module name -> old registry contents
  321. _backup = None
  322. def __init__(self, pattern=None):
  323. self._pattern = re.compile(pattern or ".*")
  324. def __enter__(self):
  325. # archive and clear the __warningregistry__ key for all modules
  326. # that match the 'reset' pattern.
  327. pattern = self._pattern
  328. backup = self._backup = {}
  329. for name, mod in list(sys.modules.items()):
  330. if pattern.match(name):
  331. reg = getattr(mod, "__warningregistry__", None)
  332. if reg:
  333. backup[name] = reg.copy()
  334. reg.clear()
  335. return self
  336. def __exit__(self, *exc_info):
  337. # restore warning registry from backup
  338. modules = sys.modules
  339. backup = self._backup
  340. for name, content in backup.items():
  341. mod = modules.get(name)
  342. if mod is None:
  343. continue
  344. reg = getattr(mod, "__warningregistry__", None)
  345. if reg is None:
  346. setattr(mod, "__warningregistry__", content)
  347. else:
  348. reg.clear()
  349. reg.update(content)
  350. # clear all registry entries that we didn't archive
  351. pattern = self._pattern
  352. for name, mod in list(modules.items()):
  353. if pattern.match(name) and name not in backup:
  354. reg = getattr(mod, "__warningregistry__", None)
  355. if reg:
  356. reg.clear()
  357. if __name__ == '__main__':
  358. unittest.main()