Convert Sejong POS-tagged corpus format to CoNLL-U format

Convert Sejong POS-tagged corpus format to CoNLL-U format (useful for training Google SyntaxNet)

#!/usr/bin/python3
# -*- coding: utf-8 -*-
'''
Convert Sejong POS-tagged corpus format to CoNLL-U format for use with Google
SyntaxNet

http://universaldependencies.org/docs/format.html

Outputs training, testing, and tuning sets (60-20-20 ratio, randomly chosen)

Arguments: ./corpus_to_conll.py <sejong_corpus_dir> <output_dir>

sejong_corpus_dir should contain UTF-16 text files
(BTAA0001.txt, etc...)

output_dir will contain:
- tagged-training-corpus.conllu
- tagged-tuning-corpus.conllu
- tagged-dev-corpus.conllu (evaluation set)
'''

import os, sys

if(len(sys.argv) < 3):
    print('Arguments: ./corpus_to_conll.py <sejong_corpus_dir> <output_dir>')
    sys.exit(1)

SEJONG_PATH = sys.argv[1]
output_dir = sys.argv[2]

from os import listdir
from os.path import isfile, join
SEJONG_FILES = [join(SEJONG_PATH, f) for f in listdir(SEJONG_PATH) if isfile(join(SEJONG_PATH, f))]
# Process in sensible order
SEJONG_FILES.sort()

if output_dir.endswith('/'):
    output_dir = output_dir[:-1]

try:
    os.mkdir(output_dir)
except FileExistsError:
    pass

print('Processing corpus and outputting to %s...\n' % output_dir)

class CoNLLSentence(object):
    def __init__(self):
                # 장식품/NNG + 으로/JKB + …/SE

        # [장식품, 으로, …]
        self.FORM = []
        # ['NNG', 'JKB', 'SE']
        self.XPOSTAG = []

    def toString(self):
        assert(len(self.FORM) == len(self.XPOSTAG))

        self.ID = [(i+1) for i in range(0, len(self.FORM))]
        self.LEMMA = self.FORM
        self.UPOSTAG = self.XPOSTAG
        self.FEATS = ['_' for i in range(0, len(self.FORM))]
        self.HEAD = ['_' for i in range(0, len(self.FORM))]
        self.DEPREL = ['_' for i in range(0, len(self.FORM))]

        s = []

        for i in range(0, len(self.FORM)):
            s.append('%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s' % (self.ID[i],
                self.FORM[i],
                self.LEMMA[i],
                self.UPOSTAG[i],
                self.XPOSTAG[i],
                self.FEATS[i],
                self.HEAD[i],
                self.DEPREL[i]))

        return '\n'.join(s)
        

'''
Process a file and return a list of sentences
'''
def processFile(path):
    # BTAE0201.txt has encoding error. be lenient.
    f = open(path, 'r', encoding='utf16', errors='ignore')
    contents = f.read()
    f.close()
    
    # Some of the files don't have <group>!
    # strings -f -el *.txt |grep "<group>"
    # For example, BTAZ0223.txt
    
    contents = contents.replace('<head>', '<p>')
    contents = contents.replace('</head>', '</p>')
    
    # Detected Sejong tagset
    tagset = set()

    # CoNLL sentences
    sentences = []
    
    # There's even <p> before </teiHeader>
    paras = contents.split('</teiHeader>')[1].split('<p>')[1:]
    for p in paras:
        p = p.split('</p>')[0].strip()
        
        sentence = CoNLLSentence()

        try:
            lines = p.split('\n')
            #skipPara = False
            for ln in lines:
                if not ln:
                    continue
                
                #BTAA0001-00000013	세계적인	세계/NNG + 적/XSN + 이/VCP + ᆫ/ETM
                cols = ln.split('\t')
                
                if ('<' in cols[0]):
                    # Do we want to process dates? Not sure yet...
                    print('Ignoring tag: ' + cols[0])
                    #skipPara = True
                    break
                
                if len(cols) != 3:
                    #print('Parsing error in %s! Column may contain tab or unexpected tag' % (path))
                    raise ValueError('Parsing error in %s! Column may contain tab or unexpected tag' % (path))
                
                #print(cols)
                lineid = cols[0].strip()
                # Don't recommend stripping the other columns
                # Sometimes spaces are tagged, and apparently they could be anywhere
                # BTHO0432-00032162	주장자( 杖子)를	주장자/NNG + (/SS +  /SW + 杖子/SH + )/SS + 를/JKO
                word = cols[1]
                # BTBA0233-00016517	2(남북한)+4(미	2/SN + (/SS + 남/NNP + 북한/NNP + )/SS + +/SW + 4/SN + (/SS + 미/NNP
                # Some data even contains +. What we really want is to split by ' + '
                tags = cols[2].split(' + ')
                
                for tagelem in tags:
                    slashes = tagelem.split('/')
                    # sometimes component contains slash
                    # only consider tag to be after last /
                    # >>> slashes = '////////SP'.split('/')
                    wordcomponent = '/'.join(slashes[0:len(slashes)-1])
                    tag = slashes[len(slashes)-1]
                    
                    if (not wordcomponent) or (not tag):
                        #print('Parsing error in %s! Word or tag empty: %s' % (path, str(tags)))
                        raise ValueError('Parsing error in %s! Word or tag empty: %s' % (path, str(tags)))
                    
                    #print(wordcomponent, tag)
                    assert(not('\t' in wordcomponent))
                    assert(not('\n' in wordcomponent))
                    assert(not('\t' in tag))
                    assert(not('\n' in tag))
                    sentence.FORM.append(wordcomponent)
                    sentence.XPOSTAG.append(tag)
                    tagset.add(tag)

            #print(sentence.toString() + '\n')
            sentences.append(sentence)
        except Exception as e:
            print(e)
            print('Processing paragraph failed.')

    return (tagset, sentences)

tagset = set()
sentences = list()

print('Found %d files to process.' % len(SEJONG_FILES))

m = 0
#SEJONG_FILES = ['../data/sejong_entire/BTHO0136.txt']
for f in SEJONG_FILES:
    m += 1
    print('Processing %s (%0.2f%%)' % (f, 100.0 * float(m) / float(len(SEJONG_FILES))))
    thisFileTagset, thisFileSentences = processFile(f)
    tagset = tagset.union(thisFileTagset)
    sentences += thisFileSentences

# index of sentences within sentences list for each set
training_set_indices = []
tuning_set_indices = []
testing_set_indices = []

# split to 60-20-20 (training, tuning, testing sets)
for i in range(len(sentences)):
    if i % 10 == 0 or i % 10 == 1:     # ~20%
        tuning_set_indices.append(i)
    elif i % 10 == 2 or i % 10 == 3:   # ~20%
        testing_set_indices.append(i)
    else:                               # ~60%
        training_set_indices.append(i)

print()
print('Found %d sentences (%d-%d-%d).' % (len(sentences),
    len(training_set_indices), len(tuning_set_indices), len(testing_set_indices)))
print('Complete tagset found: ' + str(tagset))
print()

print('Writing %s/tagged-training-corpus.conllu (%d sentences)...' % (output_dir, len(training_set_indices)))

fd = open('%s/tagged-training-corpus.conllu' % output_dir, 'w', encoding='utf-8')
for i in range(len(training_set_indices)):
    idx = training_set_indices[i]
    fd.write(sentences[idx].toString() + '\n')
    # no extra newline after last element
    if(i < len(training_set_indices)-1):
        fd.write('\n')
fd.close()

print('Writing %s/tagged-tuning-corpus.conllu (%d sentences)...' % (output_dir, len(tuning_set_indices)))

fd = open('%s/tagged-tuning-corpus.conllu' % output_dir, 'w', encoding='utf-8')
for i in range(len(tuning_set_indices)):
    idx = tuning_set_indices[i]
    fd.write(sentences[idx].toString() + '\n')
    if(i < len(tuning_set_indices)-1):
        fd.write('\n')
fd.close()

print('Writing %s/tagged-dev-corpus.conllu (%d sentences)...' % (output_dir, len(testing_set_indices)))

fd = open('%s/tagged-dev-corpus.conllu' % output_dir, 'w', encoding='utf-8')
for i in range(len(testing_set_indices)):
    idx = testing_set_indices[i]
    fd.write(sentences[idx].toString() + '\n')
    if(i < len(testing_set_indices)-1):
        fd.write('\n')
fd.close()

 

Leave a Reply

Your email address will not be published. Required fields are marked *