Separate large JSON text streams into HDF5 format

'''
Separate json stream(s) into hdf5 array format for far more efficient processing

Example:

$ python3 json_to_hdf5.py /tmp/jsonfile1.json.gz /tmp/jsonfile2.json.gz
    /tmp/jsonfile3.json.gz

'''
import os
import json
import psutil
import sys
import h5py
import gzip
import bz2
import logging
import argparse

process = psutil.Process(os.getpid())

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', \
    level=logging.INFO)

logger = logging.getLogger('JsonToHDF5')

parser = argparse.ArgumentParser(description='Separate large json stream(s) \
    into hdf5 array format')

# Required positional argument
parser.add_argument('input_files', type=str, nargs='+',
                    help='Input json file(s) in order (bz2 or gz also \
                    possible)')
parser.add_argument('--encoding', type=str, default='UTF-8',
                    help='Input text file encoding (default: "UTF-8")')
parser.add_argument('--dataset-name', type=str, default='vlen_dataset',
                    help='HDF5 dataset output name (default: "vlen_dataset")')

#parser.add_argument('output_file', type=str,
#                    help='Output filename for hdf5 file')

args = parser.parse_args()

assert len(args.input_files) > 0

outputFn = args.input_files[0] + '.hdf5'

lastData = ''
lastPrintedProgress = 0
dataCount = 0

# nothing special: just return the data string as-is for now
processData = lambda doc: doc

logger.info('Creating HDF5 file %s' % (outputFn))
f = h5py.File(outputFn, 'w')
logger.info('Creating HDF5 dataset %s:%s' % (outputFn, args.dataset_name))
dset = f.create_dataset(args.dataset_name, (0,), \
    dtype=h5py.special_dtype(vlen=str), chunks=True, maxshape=(2**32,))

file_count = len(args.input_files)

logger.info('Beginning input file iteration...')

for file_idx, input_file in enumerate(args.input_files):
    #logger.info('Open input file: %s' % input_file)

    if input_file.lower().endswith('.bz2'):
        infile = bz2.open(input_file, 'rt', encoding=args.encoding)
    elif input_file.lower().endswith('.gz'):
        infile = gzip.open(input_file, 'rt', encoding=args.encoding)
    else:
        infile = open(input_file, 'r', encoding=args.encoding)

    try:
        infilename = input_file.split('/')[-1]
    except:
        infilename = input_file.split('\\')[-1]

    #logger.info('Decoding JSON...')
    jsondata = json.loads(infile.read())

    rss = process.memory_info().rss / 1048576.0
    sys.stderr.write( \
        '\rDump data: FILE=%s, IDX=%d, COUNT=%d/%d (%.2f%%), ' \
        'MEM=%.2fMB' % \
        (infilename, dataCount+1, file_idx+1, \
        file_count, 100.0 * (file_idx+1)/file_count, rss))

    for record_idx, record in enumerate(jsondata):
        #if dataCount % 100 == 0:
            if dset.size <= dataCount:
                dset.resize((dataCount+128, ))

            dset[dataCount] = json.dumps(record)
            dataCount += 1

        #if dataCount >= 100: # stop early for testing
        #    break

sys.stderr.write('\n')
sys.stderr.flush()

logger.info('File iteration complete.')

# eventually resize down to actual data count
dset.resize((dataCount, ))

f.close()

logger.info('Output %d objects to %s:%s' % (dataCount, outputFn, \
    args.dataset_name))

 

Leave a Reply

Your email address will not be published. Required fields are marked *