Compress large JSON text streams into one indexed zip file

This may help with processing of multiple large JSON files. It creates an indexed zip file containing all JSON files that can then be read with Python.

'''
Compress json stream(s) into indexed compressed format (zip file)
for far more efficient processing

Example:

$ python3 json_to_zippack.py /tmp/jsonfile1.json /tmp/jsonfile2.json
    /tmp/jsonfile3.json

'''
import os
import math
import json
import psutil
import sys
import gzip
import bz2
import logging
import argparse
import zipfile

process = psutil.Process(os.getpid())

logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', \
    level=logging.INFO)

logger = logging.getLogger('JsonToZip')

parser = argparse.ArgumentParser(description='Add large json arrays(s) \
    as separate files in one large zip file for efficient indexing')

# Required positional argument
parser.add_argument('input_files', type=str, nargs='+',
                    help='Input json file(s) in order (bz2 or gz also \
                    possible)')
parser.add_argument('--encoding', type=str, default='UTF-8',
                    help='Input text file encoding (default: "UTF-8")')
#parser.add_argument('--compression', type=str, default='none',
#                    help='Zip compression: "none", "gzip", "lzf" \
#                    (default: "none")')

args = parser.parse_args()
assert len(args.input_files) > 0

#assert args.compression in ['none', 'gzip', 'lzf']

#if args.compression == 'none':
#    args.compression = None

outputFn = args.input_files[0] + '.zippack'

lastData = ''
lastPrintedProgress = 0
dataCount = 0

# nothing special: just return the data string as-is for now
processData = lambda doc: doc

logger.info('Creating zip file %s' % outputFn)

zf = zipfile.ZipFile(outputFn, 
                     mode='w',
                     compression=zipfile.ZIP_DEFLATED, 
                     )

file_count = len(args.input_files)

logger.info('Beginning input file iteration...')

for file_idx, input_file in enumerate(args.input_files):
    #logger.info('Open input file: %s' % input_file)

    if input_file.lower().endswith('.bz2'):
        infile = bz2.open(input_file, 'rt', encoding=args.encoding)
    elif input_file.lower().endswith('.gz'):
        infile = gzip.open(input_file, 'rt', encoding=args.encoding)
    else:
        infile = open(input_file, 'r', encoding=args.encoding)

    try:
        infilename = input_file.split('/')[-1]
    except:
        infilename = input_file.split('\\')[-1]

    #logger.info('Decoding JSON...')
    jsondata = json.loads(infile.read())

    rss = process.memory_info().rss / 1048576.0
    sys.stderr.write( \
        '\rDump data: FILE=%s, IDX=%d, COUNT=%d/%d (%.2f%%), ' \
        'MEM=%.2fMB' % \
        (infilename, dataCount+1, file_idx+1, \
        file_count, 100.0 * (file_idx+1)/file_count, rss))

    for record_idx, record in enumerate(jsondata):
        data_to_save = json.dumps(record)
        zf.writestr('%d.json' % dataCount, data_to_save)
        dataCount += 1

zf.writestr('datacount', str(dataCount))

sys.stderr.write('\n')
sys.stderr.flush()

logger.info('File iteration complete.')

zf.close()

logger.info('Output %d objects to %s' % (dataCount, outputFn))

 

Test the zip file:

import json
import zipfile

zf = zipfile.ZipFile('zipfile_writestr.zip', 'r')
print(zf.read('data_000000'))

 

Leave a Reply

Your email address will not be published. Required fields are marked *