test_blackrockio.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests of neo.io.blackrockio
  4. """
  5. # needed for python 3 compatibility
  6. from __future__ import absolute_import
  7. import unittest
  8. from numpy.testing import assert_equal
  9. import numpy as np
  10. import quantities as pq
  11. from neo.io.blackrockio import BlackrockIO
  12. from neo.test.iotest.common_io_test import BaseTestIO
  13. from neo.test.iotest.tools import get_test_file_full_path
  14. # check scipy
  15. try:
  16. from distutils import version
  17. import scipy.io
  18. import scipy.version
  19. except ImportError as err:
  20. HAVE_SCIPY = False
  21. SCIPY_ERR = err
  22. else:
  23. if version.LooseVersion(scipy.version.version) < '0.8':
  24. HAVE_SCIPY = False
  25. SCIPY_ERR = ImportError("your scipy version is too old to support " +
  26. "MatlabIO, you need at least 0.8. " +
  27. "You have %s" % scipy.version.version)
  28. else:
  29. HAVE_SCIPY = True
  30. SCIPY_ERR = None
  31. class CommonTests(BaseTestIO, unittest.TestCase):
  32. ioclass = BlackrockIO
  33. files_to_test = ['FileSpec2.3001']
  34. files_to_download = [
  35. 'FileSpec2.3001.nev',
  36. 'FileSpec2.3001.ns5',
  37. 'FileSpec2.3001.ccf',
  38. 'FileSpec2.3001.mat']
  39. ioclass = BlackrockIO
  40. def test_inputs_V23(self):
  41. """
  42. Test various inputs to BlackrockIO.read_block with version 2.3 file
  43. to check for parsing errors.
  44. """
  45. try:
  46. b = BlackrockIO(
  47. get_test_file_full_path(
  48. ioclass=BlackrockIO,
  49. filename='FileSpec2.3001',
  50. directory=self.local_test_dir, clean=False),
  51. verbose=False)
  52. except:
  53. self.fail()
  54. # Load data to maximum extent, one None is not given as list
  55. block = b.read_block(
  56. n_starts=[None], n_stops=None, channels=range(1, 9),
  57. nsx_to_load=5, units='all', load_events=True,
  58. load_waveforms=False)
  59. lena = len(block.segments[0].analogsignals[0])
  60. numspa = len(block.segments[0].spiketrains[0])
  61. # Load data using a negative time and a time exceeding the end of the
  62. # recording
  63. too_large_tstop = block.segments[0].analogsignals[0].t_stop + 1 * pq.s
  64. block = b.read_block(
  65. n_starts=[-100 * pq.ms], n_stops=[too_large_tstop],
  66. channels=range(1, 9), nsx_to_load=[5], units='all',
  67. load_events=False, load_waveforms=False)
  68. lenb = len(block.segments[0].analogsignals[0])
  69. numspb = len(block.segments[0].spiketrains[0])
  70. # Same length of analog signal?
  71. # Both should have read the complete data set!
  72. self.assertEqual(lena, lenb)
  73. # Same length of spike train?
  74. # Both should have read the complete data set!
  75. self.assertEqual(numspa, numspb)
  76. # n_starts and n_stops not given as list
  77. # verifies identical length of returned signals given equal durations
  78. # as input
  79. ns5_unit = block.segments[0].analogsignals[0].sampling_period
  80. block = b.read_block(
  81. n_starts=100 * ns5_unit, n_stops=200 * ns5_unit,
  82. channels=range(1, 9), nsx_to_load=5, units='all',
  83. load_events=False, load_waveforms=False)
  84. lena = len(block.segments[0].analogsignals[0])
  85. block = b.read_block(
  86. n_starts=301 * ns5_unit, n_stops=401 * ns5_unit,
  87. channels=range(1, 9), nsx_to_load=5, units='all',
  88. load_events=False, load_waveforms=False)
  89. lenb = len(block.segments[0].analogsignals[0])
  90. # Same length?
  91. self.assertEqual(lena, lenb)
  92. # Length should be 100 samples exactly
  93. self.assertEqual(lena, 100)
  94. # Load partial data types and check if this is selection is made
  95. block = b.read_block(
  96. n_starts=None, n_stops=None, channels=range(1, 9),
  97. nsx_to_load=5, units='none', load_events=False,
  98. load_waveforms=True)
  99. self.assertEqual(len(block.segments), 1)
  100. self.assertEqual(len(block.segments[0].analogsignals), 8)
  101. self.assertEqual(len(block.channel_indexes), 8)
  102. self.assertEqual(len(block.channel_indexes[0].units), 0)
  103. self.assertEqual(len(block.segments[0].events), 0)
  104. self.assertEqual(len(block.segments[0].spiketrains), 0)
  105. # NOTE: channel 6 does not contain any unit
  106. block = b.read_block(
  107. n_starts=[None, 3000 * pq.ms], n_stops=[1000 * pq.ms, None],
  108. channels=range(1, 9), nsx_to_load='none',
  109. units={1: 0, 5: 0, 6: 0}, load_events=True,
  110. load_waveforms=True)
  111. self.assertEqual(len(block.segments), 2)
  112. self.assertEqual(len(block.segments[0].analogsignals), 0)
  113. self.assertEqual(len(block.channel_indexes), 8)
  114. self.assertEqual(len(block.channel_indexes[0].units), 1)
  115. self.assertEqual(len(block.segments[0].events), 0)
  116. self.assertEqual(len(block.segments[0].spiketrains), 2)
  117. @unittest.skipUnless(HAVE_SCIPY, "requires scipy")
  118. def test_compare_blackrockio_with_matlabloader(self):
  119. """
  120. This test compares the output of BlackRockIO.read_block() with the
  121. output generated by a Matlab implementation of a Blackrock file reader
  122. provided by the company. The output for comparison is provided in a
  123. .mat file created by the script create_data_matlab_blackrock.m.
  124. The function tests LFPs, spike times, and digital events on channels
  125. 80-83 and spike waveforms on channel 82, unit 1.
  126. For details on the file contents, refer to FileSpec2.3.txt
  127. """
  128. # Load data from Matlab generated files
  129. ml = scipy.io.loadmat(
  130. get_test_file_full_path(
  131. ioclass=BlackrockIO,
  132. filename='FileSpec2.3001.mat',
  133. directory=self.local_test_dir, clean=False))
  134. lfp_ml = ml['lfp'] # (channel x time) LFP matrix
  135. ts_ml = ml['ts'] # spike time stamps
  136. elec_ml = ml['el'] # spike electrodes
  137. unit_ml = ml['un'] # spike unit IDs
  138. wf_ml = ml['wf'] # waveform unit 1 channel 1
  139. mts_ml = ml['mts'] # marker time stamps
  140. mid_ml = ml['mid'] # marker IDs
  141. # Load data in channels 1-3 from original data files using the Neo
  142. # BlackrockIO
  143. session = BlackrockIO(
  144. get_test_file_full_path(
  145. ioclass=BlackrockIO,
  146. filename='FileSpec2.3001',
  147. directory=self.local_test_dir, clean=False),
  148. verbose=False)
  149. block = session.read_block(
  150. channels=range(1, 9), units='all', nsx_to_load='all',
  151. scaling='raw', load_waveforms=True, load_events=True)
  152. # Check if analog data on channels 1-8 are equal
  153. self.assertGreater(len(block.channel_indexes), 0)
  154. for chidx in block.channel_indexes:
  155. # Should only have one AnalogSignal per ChannelIndex
  156. self.assertEqual(len(chidx.analogsignals), 1)
  157. idx = chidx.analogsignals[0].annotations['channel_id']
  158. if idx in range(1, 9):
  159. # We ignore the last sample of the Analogsignal returned by the
  160. # Python implementation, since due to an error in the
  161. # corresponding matlab loader the last sample was ignored and
  162. # not saved to the test file
  163. assert_equal(np.squeeze(
  164. chidx.analogsignals[0].base[:-1]), lfp_ml[idx - 1, :])
  165. # Check if spikes in channels 1,3,5,7 are equal
  166. self.assertEqual(len(block.segments), 1)
  167. for st_i in block.segments[0].spiketrains:
  168. channelid = st_i.annotations['channel_id']
  169. if channelid in range(1, 7, 2):
  170. unitid = st_i.annotations['unit_id']
  171. matlab_spikes = ts_ml[np.nonzero(
  172. np.logical_and(elec_ml == channelid, unit_ml == unitid))]
  173. assert_equal(st_i.base, matlab_spikes)
  174. # Check waveforms of channel 1, unit 0
  175. if channelid == 1 and unitid == 0:
  176. assert_equal(np.squeeze(st_i.waveforms), wf_ml)
  177. # Check if digital input port events are equal
  178. self.assertGreater(len(block.segments[0].events), 0)
  179. for ea_i in block.segments[0].events:
  180. if ea_i.name == 'digital_input_port':
  181. # Get all digital event IDs in this recording
  182. marker_ids = set(ea_i.labels)
  183. for marker_id in marker_ids:
  184. python_digievents = ea_i.times.base[
  185. ea_i.labels == marker_id]
  186. matlab_digievents = mts_ml[
  187. np.nonzero(mid_ml == int(marker_id))]
  188. assert_equal(python_digievents, matlab_digievents)
  189. # Note: analog input events are not yet supported
  190. if __name__ == '__main__':
  191. unittest.main()