Separate large pure text stream into HDF5 format

'''
Separate text stream into hdf5 array format for far more efficient processing

Memory-efficient

Example:

$ python3 text_to_hdf5.py /tmp/wikicomp-2014_arko.xml.bz2 "<articlePair id"
    --split-token-end "</articlePair>"

'''
import os
import psutil
import sys
import h5py
import gzip
import bz2
import logging
import argparse

process = psutil.Process(os.getpid())

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', \
    level=logging.INFO)

logger = logging.getLogger('TextToHDF5')

parser = argparse.ArgumentParser(description='Separate large text stream \
    delimited by arbitrary tokens into hdf5 array format')

# Required positional argument
parser.add_argument('input_file', type=str,
                    help='Input text file (bz2 or gz also possible)')
parser.add_argument('--encoding', type=str, default='UTF-8',
                    help='Input text file encoding (default: "UTF-8")')
parser.add_argument('--dataset-name', type=str, default='vlen_dataset',
                    help='HDF5 dataset output name (default: "vlen_dataset")')
# for example: "<articlePair id"
parser.add_argument('split_token_start', type=str,
                    help='Start split token (can be XML tag). Example: " \
                    <articlePair id"')

# for example: "</articlePair>"
parser.add_argument('--split-token-end', type=str, default=None,
                    help='Start split token (can be ending XML tag \
                     or nothing if only splitting by start token). Example: \
                     "</articlePair>"')
parser.add_argument('--disable-progress', action='store_true', default=False,
                    help='Disable progress bar (helps when reading large \
                    compressed file where file needs to be decompressed \
                    to determine progress bar denominator)')

#parser.add_argument('output_file', type=str,
#                    help='Output filename for hdf5 file')

args = parser.parse_args()

outputFn = args.input_file + '.hdf5'

assert args.split_token_end != None, 'blank end token not implemented'

lastData = ''
lastPrintedProgress = 0
dataCount = 0

# nothing special: just return the data string as-is for now
processData = lambda doc: doc

if args.input_file.lower().endswith('.bz2'):
    infile = bz2.open(args.input_file, 'rt', encoding=args.encoding)
elif args.input_file.lower().endswith('.gz'):
    infile = gzip.open(args.input_file, 'rt', encoding=args.encoding)
else:
    infile = open(args.input_file, 'r', encoding=args.encoding)

logger.info('Creating HDF5 file %s' % (outputFn))
f = h5py.File(outputFn, 'w')
logger.info('Creating HDF5 dataset %s:%s' % (outputFn, args.dataset_name))
dset = f.create_dataset(args.dataset_name, (0,), \
    dtype=h5py.special_dtype(vlen=str), chunks=True, maxshape=(2**32,))

if not args.disable_progress:
    logger.info('Computing input file length. If this takes too long or causes \
issues (in the case of a compressed input file), use \
--disable-progress')

    infile.seek(0, os.SEEK_END)
    file_len = infile.tell()
    infile.seek(0, os.SEEK_SET)

logger.info('Beginning input file iteration...')

#for ln in infile: (breaks .tell())

ln = infile.readline()
while ln:
    if dataCount % 100 == 0:
        rss = process.memory_info().rss / 1048576.0
        if not args.disable_progress:
            sys.stderr.write( \
                '\rDump data: IDX=%d, POS=%d/%d (%.2f%%), MEM=%.2fMB' % \
                (dataCount+1, infile.tell(), \
                file_len, 100.0 * infile.tell()/file_len, rss))
        else:
            sys.stderr.write('\rDump data: IDX=%d, MEM=%.2fMB' % \
                (dataCount+1, rss))

    if lastData=='':
        if args.split_token_start in ln:
            lastData = args.split_token_start + \
                ln.split(args.split_token_start)[1].split( \
                    args.split_token_end)[0]
            if args.split_token_end in ln:
                lastData += args.split_token_end
                results = processData(lastData)
                if results:
                    if dset.size <= dataCount:
                        dset.resize((dataCount+128, ))

                    dset[dataCount] = results
                    dataCount += 1
                lastData = ''
        else:
            if lastData!='':
                lastData += ln
    else:
        if args.split_token_end in ln:
            lastData += ln.split(args.split_token_end)[0].strip() + \
                args.split_token_end
            results = processData(lastData)
            if results:
                if dset.size <= dataCount:
                    dset.resize((dataCount+128, ))

                dset[dataCount] = results
                dataCount += 1
            lastData = ''
        else:
            lastData += ln

    #if dataCount >= 100: # stop early for testing
    #    break

    ln = infile.readline()

sys.stderr.write('\n')
sys.stderr.flush()

logger.info('File iteration complete.')

# eventually resize down to actual data count
dset.resize((dataCount, ))

f.close()

logger.info('Output %d objects to %s:%s' % (dataCount, outputFn, \
    args.dataset_name))

 

Leave a Reply

Your email address will not be published. Required fields are marked *