123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- #!/usr/bin/env python3
- # call with xml2csv <basename> <path/to/subject>
- #
- # writes header only for new files
- import sys
- import shutil
- from xml.etree.ElementTree import parse as xmlparse
- import csv
- from pathlib import Path
- from tempfile import (
- TemporaryDirectory,
- TemporaryFile,
- )
- import re
- # define CSV columns (changing here will re-order)
- csv_fieldnames = [
- 'SubjectID',
- 'NCR', 'ICR', 'IQR', 'TIV', 'GM', 'WM', 'CSF', 'WMH', 'TSA'
- ]
- # NCR: noise to contrast ratio
- # ICR: inhomogeneity to contrast ratio
- # IQR: image quality rating
- # TIV: total intracranial volume (GM+WM+CSF)
- # GM: total gray matter volume
- # WM: total white matter volume
- # CSF: total cerebral spinal fluid volume
- # WMH: total white matter hyperintensities volume
- # TSA: total surface area
- def val2out(str_float):
- """Uniform formating of floating point values for output.
- The input does not have to be a float, but can also be a str that
- is convertable to float.
- """
- return '{:.4f}'.format(float(str_float))
- def get_basic_catlog(report_dir, sub):
- # load report XML
- catrep_file = report_dir / 'cat_{}_T1w.xml'.format(sub)
- with TemporaryFile() as tf:
- tf.write(re.sub(b'item\.\.\.', b'item>...', catrep_file.read_bytes()))
- tf.seek(0)
- catrep = xmlparse(tf)
- catreport = catrep.getroot()
- # build CSV record
- catlog = {
- 'SubjectID': sub,
- 'IQR': val2out(catreport.find('qualityratings/IQR').text),
- 'NCR': val2out(catreport.find('qualityratings/NCR').text),
- 'ICR': val2out(catreport.find('qualityratings/ICR').text),
- 'TIV': val2out(catreport.find('subjectmeasures/vol_TIV').text),
- 'TSA': val2out(catreport.find('subjectmeasures/surf_TSA').text),
- }
- # get total and tissue volumes
- absTV = catreport.find('subjectmeasures/vol_abs_CGW').text.strip('[]')
- for t, tv in zip(('CSF', 'GM', 'WM', 'WMH'), absTV.split()):
- if float(tv) > 0:
- catlog[t] = val2out(tv)
- return catlog
- def xml2csv(infile, outfilebase, catlog_templ, data_tag,
- additional_extractor=None):
- # load surface XML
- root_node = xmlparse(infile).getroot()
- # iterate over surface atlas found in XML
- for child in root_node:
- destfile = Path('{}_{}.csv'.format(
- outfilebase,
- child.tag,
- ))
- # get ROI names
- rois = sorted([
- name.text
- for name in root_node.findall(child.tag + '/names/item')
- ])
- # this list will define the output columns
- roi_names = list(rois)
- need_header = not destfile.is_file()
- # use context manager to get automatic cleanup
- with destfile.open('a') as catlog_data:
- # build CSV record
- catlog = catlog_templ.copy()
- # get ROI thickness matching succession
- ROIvol = root_node.find(
- child.tag + '/data/' + data_tag).text.strip('[]')
- for id, vol in zip(roi_names, ROIvol.split(';')):
- catlog[id] = val2out(vol)
- if additional_extractor:
- additional_extractor(
- root_node, child.tag, rois, catlog, roi_names)
- writer = csv.DictWriter(
- catlog_data,
- fieldnames=csv_fieldnames + roi_names
- )
- # if there was no CSV, write the header
- if need_header:
- writer.writeheader()
- # write CSV row
- writer.writerow(catlog)
- def add_WM_CSF(root_node, tag, rois, catlog, roi_names):
- # if atlas has WM volume, add at the end
- if root_node.findtext(tag + '/data/Vwm'):
- roi_namesWM = [name + '_WM' for name in rois]
- ROIwm = root_node.find(tag + '/data/Vwm').text.strip('[]')
- for id, vol in zip(roi_namesWM, ROIwm.split(';')):
- catlog[id] = val2out(vol)
- roi_names.extend(roi_namesWM)
- # if atlas has CSF volume, add at the end
- if root_node.findtext(tag + '/data/Vcsf'):
- roi_namesCSF = [name + '_CSF' for name in rois]
- ROIcsf = root_node.find(tag + '/data/Vcsf').text.strip('[]')
- for id, vol in zip(roi_namesCSF, ROIcsf.split(';')):
- catlog[id] = val2out(vol)
- roi_names.extend(roi_namesCSF)
- # output base name
- base_name = sys.argv[1]
- # path to the report
- path2data= Path(sys.argv[2])
- # extract subject identifier from path
- sub = path2data.parts[0]
- # load report XML
- catlog = get_basic_catlog(path2data / 'report', sub)
- # load atlas ROIs volume
- xml2csv(
- path2data / 'label' / 'catROI_{}_T1w.xml'.format(sub),
- '{}_rois'.format(base_name),
- catlog,
- 'Vgm',
- add_WM_CSF,
- )
- # load surface XML
- xml2csv(
- path2data / 'label' / 'catROIs_{}_T1w.xml'.format(sub),
- '{}_thickness'.format(base_name),
- catlog,
- 'thickness',
- )
- # xml2csv(
- # path2data / 'label' / 'catROIs_{}_T1w.xml'.format(sub),
- # '{}_gyrification'.format(base_name),
- # catlog,
- # 'gyrification',
- # )
- #
- # xml2csv(
- # path2data / 'label' / 'catROIs_{}_T1w.xml'.format(sub),
- # '{}_toroGI20mm'.format(base_name),
- # catlog,
- # 'toroGI20mm',
- # )
- #
- # xml2csv(
- # path2data / 'label' / 'catROIs_{}_T1w.xml'.format(sub),
- # '{}_surfarea'.format(base_name),
- # catlog,
- # 'area',
- # )
- #
- # xml2csv(
- # path2data / 'label' / 'catROIs_{}_T1w.xml'.format(sub),
- # '{}_surfgmvol'.format(base_name),
- # catlog,
- # 'gmv',
- # )
- #
- # xml2csv(
- # path2data / 'label' / 'catROIs_{}_T1w.xml'.format(sub),
- # '{}_sulcusdepth'.format(base_name),
- # catlog,
- # 'depth',
- # )
- #
- # xml2csv(
- # path2data / 'label' / 'catROIs_{}_T1w.xml'.format(sub),
- # '{}_fractaldim'.format(base_name),
- # catlog,
- # 'fractaldimension',
- # )
|