from packaging.version import Version import numpy as np import neo import quantities as pq n_segments = 2 n_spiketrains = 8 n_waveform_samples = 10 n_analogsignals = 4 n_irregularlysampledsignals = 1 n_events = 2 n_epochs = 3 random_generator = np.random.default_rng(seed=42) def get_rand(shape=None, min=0, max=1, sorted=False): data = random_generator.random(shape) #rescaling random numbers to min-max range data = data*(max-min) + min if sorted: data = np.sort(data) return data def generate_basic_block(): block = neo.Block(name=r'my_block') for seg_idx in range(n_segments): seg = neo.Segment(name=f'my_segment_{seg_idx}') block.segments.append(seg) for spiketrain_idx in range(n_spiketrains): waveforms = get_rand((10, 14)) * pq.V st = neo.SpikeTrain(times=get_rand((10), max=10, sorted=True)*pq.s, t_stop=10*pq.s, name=f'my_spiketrain_{spiketrain_idx}', waveforms=waveforms, left_sweep=4) st.segment = seg seg.spiketrains.append(st) for anasig_idx in range(n_analogsignals): anasig = neo.AnalogSignal(signal=get_rand((100, anasig_idx+2),max=1000)*pq.V, t_start=0*pq.s, sampling_rate=(anasig_idx+1)*pq.Hz, name=f'my_analogsignal_{anasig_idx}') anasig.segment = seg seg.analogsignals.append(anasig) for irr_idx in range(n_irregularlysampledsignals): irrsig = neo.IrregularlySampledSignal(times=get_rand((100), sorted=True, max=100)*pq.s, signal=get_rand((100,anasig_idx+2),max=1000)*pq.V, t_start=0*pq.s, t_stop=100*pq.s, name=f'my_irregularlysampledsignal_{irr_idx}') irrsig.segment = seg seg.irregularlysampledsignals.append(irrsig) for ev_idx in range(n_events): event = neo.Event(times=get_rand((10),max=10,sorted=True)*pq.s, labels=np.array([f'my_event_timestamp_{i}' for i in range(10)]), name=f'my_event_{ev_idx}') event.segment = seg seg.events.append(event) for ep_idx in range(n_epochs): epoch = neo.Epoch(times=get_rand((3),max=6,sorted=True)*pq.s, durations=[1.1, 2.2, 3.3]*pq.s, labels=np.array([f'my_epoch_timestamp_{i}' for i in range(3)]), name=f'my_epoch_{ep_idx}') epoch.segment = seg seg.epochs.append(epoch) return block def add_spiketrain_groups(block): spiketrain_groups = [] for group_idx in range(n_spiketrains): group = neo.Group(name=f'my_spiketrain_group_{group_idx}', allowed_types=[neo.SpikeTrain]) spiketrain_groups.append(group) # assign spiketrains to groups for seg_idx, seg in enumerate(block.segments): for idx, st in enumerate(seg.spiketrains): st.group = spiketrain_groups[idx] spiketrain_groups[idx].spiketrains.append(st) # attach groups to block block.groups.extend(spiketrain_groups) def add_spiketrain_units_channel_indexes(block): # Linking pattern: # 1 Channel_unit - last channel of analog signal # \- 2 spiketrains spiketrain_units = [] for unit_idx in range(n_spiketrains): unit = neo.Unit(f'my_unit_{unit_idx}') spiketrain_units.append(unit) channel_indexes = [] for asig_idx in range(n_analogsignals): signals = [seg.analogsignals[asig_idx] for seg in block.segments] channel_index = neo.ChannelIndex([asig_idx], name=f'my_channel_index_{asig_idx}') channel_index.analogsignals.extend(signals) channel_indexes.append(channel_index) channel_index.block = block # assign spiketrain to units for seg_idx, seg in enumerate(block.segments): for idx, st in enumerate(seg.spiketrains): st.unit = spiketrain_units[idx] spiketrain_units[idx].spiketrains.append(st) # link channel_index to block block.channel_indexes.extend(channel_indexes) # attach units to channel_indexes for idx in range(n_spiketrains): # link two spiketrains to a single channel_index channel_indexes[idx//2].units.append(spiketrain_units[idx]) spiketrain_units[idx].channel_index = channel_indexes[idx//2] if __name__ == '__main__': neo_version = Version(neo.__version__) print(f'Generating test file for neo version {neo_version} ...', end='') block = generate_basic_block() # adding spiketrain units to neo block depending on container objects available if hasattr(neo, 'Group'): add_spiketrain_groups(block) elif hasattr(neo, 'ChannelIndex'): add_spiketrain_units_channel_indexes(block) else: raise ValueError(f'No mechanism to represent spike units found for neo version: {neo_version}') io = neo.NixIO(f'generated_file_neo{neo_version}.nix') io.write_block(block) io.close() print('done')