''' Separate json stream(s) into hdf5 array format for far more efficient processing Example: $ python3 json_to_hdf5.py /tmp/jsonfile1.json.gz /tmp/jsonfile2.json.gz /tmp/jsonfile3.json.gz ''' import os import json import psutil import sys import h5py import gzip import bz2 import logging import argparse process = psutil.Process(os.getpid()) logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', \ level=logging.INFO) logger = logging.getLogger('JsonToHDF5') parser = argparse.ArgumentParser(description='Separate large json stream(s) \ into hdf5 array format') # Required positional argument parser.add_argument('input_files', type=str, nargs='+', help='Input json file(s) in order (bz2 or gz also \ possible)') parser.add_argument('--encoding', type=str, default='UTF-8', help='Input text file encoding (default: "UTF-8")') parser.add_argument('--dataset-name', type=str, default='vlen_dataset', help='HDF5 dataset output name (default: "vlen_dataset")') #parser.add_argument('output_file', type=str, # help='Output filename for hdf5 file') args = parser.parse_args() assert len(args.input_files) > 0 outputFn = args.input_files[0] + '.hdf5' lastData = '' lastPrintedProgress = 0 dataCount = 0 # nothing special: just return the data string as-is for now processData = lambda doc: doc logger.info('Creating HDF5 file %s' % (outputFn)) f = h5py.File(outputFn, 'w') logger.info('Creating HDF5 dataset %s:%s' % (outputFn, args.dataset_name)) dset = f.create_dataset(args.dataset_name, (0,), \ dtype=h5py.special_dtype(vlen=str), chunks=True, maxshape=(2**32,)) file_count = len(args.input_files) logger.info('Beginning input file iteration...') for file_idx, input_file in enumerate(args.input_files): #logger.info('Open input file: %s' % input_file) if input_file.lower().endswith('.bz2'): infile = bz2.open(input_file, 'rt', encoding=args.encoding) elif input_file.lower().endswith('.gz'): infile = gzip.open(input_file, 'rt', encoding=args.encoding) else: infile = open(input_file, 'r', encoding=args.encoding) try: infilename = input_file.split('/')[-1] except: infilename = input_file.split('\\')[-1] #logger.info('Decoding JSON...') jsondata = json.loads(infile.read()) rss = process.memory_info().rss / 1048576.0 sys.stderr.write( \ '\rDump data: FILE=%s, IDX=%d, COUNT=%d/%d (%.2f%%), ' \ 'MEM=%.2fMB' % \ (infilename, dataCount+1, file_idx+1, \ file_count, 100.0 * (file_idx+1)/file_count, rss)) for record_idx, record in enumerate(jsondata): #if dataCount % 100 == 0: if dset.size <= dataCount: dset.resize((dataCount+128, )) dset[dataCount] = json.dumps(record) dataCount += 1 #if dataCount >= 100: # stop early for testing # break sys.stderr.write('\n') sys.stderr.flush() logger.info('File iteration complete.') # eventually resize down to actual data count dset.resize((dataCount, )) f.close() logger.info('Output %d objects to %s:%s' % (dataCount, outputFn, \ args.dataset_name))