''' Separate text stream into hdf5 array format for far more efficient processing Memory-efficient Example: $ python3 text_to_hdf5.py /tmp/wikicomp-2014_arko.xml.bz2 "<articlePair id" --split-token-end "</articlePair>" ''' import os import psutil import sys import h5py import gzip import bz2 import logging import argparse process = psutil.Process(os.getpid()) logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', \ level=logging.INFO) logger = logging.getLogger('TextToHDF5') parser = argparse.ArgumentParser(description='Separate large text stream \ delimited by arbitrary tokens into hdf5 array format') # Required positional argument parser.add_argument('input_file', type=str, help='Input text file (bz2 or gz also possible)') parser.add_argument('--encoding', type=str, default='UTF-8', help='Input text file encoding (default: "UTF-8")') parser.add_argument('--dataset-name', type=str, default='vlen_dataset', help='HDF5 dataset output name (default: "vlen_dataset")') # for example: "<articlePair id" parser.add_argument('split_token_start', type=str, help='Start split token (can be XML tag). Example: " \ <articlePair id"') # for example: "</articlePair>" parser.add_argument('--split-token-end', type=str, default=None, help='Start split token (can be ending XML tag \ or nothing if only splitting by start token). Example: \ "</articlePair>"') parser.add_argument('--disable-progress', action='store_true', default=False, help='Disable progress bar (helps when reading large \ compressed file where file needs to be decompressed \ to determine progress bar denominator)') #parser.add_argument('output_file', type=str, # help='Output filename for hdf5 file') args = parser.parse_args() outputFn = args.input_file + '.hdf5' assert args.split_token_end != None, 'blank end token not implemented' lastData = '' lastPrintedProgress = 0 dataCount = 0 # nothing special: just return the data string as-is for now processData = lambda doc: doc if args.input_file.lower().endswith('.bz2'): infile = bz2.open(args.input_file, 'rt', encoding=args.encoding) elif args.input_file.lower().endswith('.gz'): infile = gzip.open(args.input_file, 'rt', encoding=args.encoding) else: infile = open(args.input_file, 'r', encoding=args.encoding) logger.info('Creating HDF5 file %s' % (outputFn)) f = h5py.File(outputFn, 'w') logger.info('Creating HDF5 dataset %s:%s' % (outputFn, args.dataset_name)) dset = f.create_dataset(args.dataset_name, (0,), \ dtype=h5py.special_dtype(vlen=str), chunks=True, maxshape=(2**32,)) if not args.disable_progress: logger.info('Computing input file length. If this takes too long or causes \ issues (in the case of a compressed input file), use \ --disable-progress') infile.seek(0, os.SEEK_END) file_len = infile.tell() infile.seek(0, os.SEEK_SET) logger.info('Beginning input file iteration...') #for ln in infile: (breaks .tell()) ln = infile.readline() while ln: if dataCount % 100 == 0: rss = process.memory_info().rss / 1048576.0 if not args.disable_progress: sys.stderr.write( \ '\rDump data: IDX=%d, POS=%d/%d (%.2f%%), MEM=%.2fMB' % \ (dataCount+1, infile.tell(), \ file_len, 100.0 * infile.tell()/file_len, rss)) else: sys.stderr.write('\rDump data: IDX=%d, MEM=%.2fMB' % \ (dataCount+1, rss)) if lastData=='': if args.split_token_start in ln: lastData = args.split_token_start + \ ln.split(args.split_token_start)[1].split( \ args.split_token_end)[0] if args.split_token_end in ln: lastData += args.split_token_end results = processData(lastData) if results: if dset.size <= dataCount: dset.resize((dataCount+128, )) dset[dataCount] = results dataCount += 1 lastData = '' else: if lastData!='': lastData += ln else: if args.split_token_end in ln: lastData += ln.split(args.split_token_end)[0].strip() + \ args.split_token_end results = processData(lastData) if results: if dset.size <= dataCount: dset.resize((dataCount+128, )) dset[dataCount] = results dataCount += 1 lastData = '' else: lastData += ln #if dataCount >= 100: # stop early for testing # break ln = infile.readline() sys.stderr.write('\n') sys.stderr.flush() logger.info('File iteration complete.') # eventually resize down to actual data count dset.resize((dataCount, )) f.close() logger.info('Output %d objects to %s:%s' % (dataCount, outputFn, \ args.dataset_name))