This is a script for aligning English and Korean corpuses obtainable below. It also outputs vocab files for the NMT training TensorFlow example.
https://wit3.fbk.eu/mono.php?release=XML_releases&tinfo=cleanedhtml_ted
#!/usr/bin/python3
import random
from normalizer import Normalizer
from tokenizer import Tokenizer
'''
Align TED English and Korean xml corpuses:
https://wit3.fbk.eu/mono.php?release=XML_releases&tinfo=cleanedhtml_ted
'''
# character threshold
# if number of characters is less than this, don't output it
ko_len_threshold = 10
en_len_threshold = 20
class TEDVideo(object):
def __init__(self):
self.url = None
self.transcriptions = []
'''
Parse an array of TEDVideo objects from an xml file
'''
def parse_ted_file(fn):
all_videos = []
contents = None
with open(fn, 'r', encoding='utf-8') as fd:
contents = fd.read()
for tmp in contents.split('<file')[1:]:
ted_video = TEDVideo()
ted_video.url = tmp.split('<url>')[1].split('</url>')[0].lower().strip()
transcription_part = tmp.split('<transcription>')[1].split('</transcription')[0]
invalid_file = False
for ln in transcription_part.split('\n'):
ln = ln.strip()
if not ln:
continue
#print(ln)
if '" />' in ln:
invalid_file = True
continue # erroneous line <seekvideo id="551600" />
seekvideo_time = int(ln.split(' id="')[1].split('"')[0])
seekvideo_text = ln.split('">')[1].split('</seekvideo>')[0].strip()
seekvideo_text_normalized = Normalizer.normalize(seekvideo_text)
tk = Tokenizer.tokenize(seekvideo_text_normalized,
keep_punctuation=True,
keep_symbols=True)
#print((seekvideo_time, seekvideo_text))
ted_video.transcriptions.append((seekvideo_time, ' '.join(tk)))
if not invalid_file:
all_videos.append(ted_video)
return all_videos
parsed_en_videos = parse_ted_file('ted_en-20160408.xml')
parsed_ko_videos = parse_ted_file('ted_ko-20160408.xml')
url_set_en = set([v.url for v in parsed_en_videos])
url_set_ko = set([v.url for v in parsed_ko_videos])
monoling_set = url_set_en | url_set_ko
biling_set = url_set_en & url_set_ko
url_to_tedvideo_en = {}
for v in parsed_en_videos:
url_to_tedvideo_en[v.url] = v
url_to_tedvideo_ko = {}
for v in parsed_ko_videos:
url_to_tedvideo_ko[v.url] = v
print('Detected %d/%d bilingual videos total' % (len(biling_set), len(monoling_set)))
#with open('ted_ko-20160408.xml', 'r', encoding='utf-8') as fd:
# contents = fd.read()
sent_ko = []
sent_en = []
vocab_ko = set()
vocab_en = set()
# re-order videos in random order each time
biling_set = list(biling_set)
random.shuffle(biling_set)
for url in biling_set:
en_video = url_to_tedvideo_en[url]
ko_video = url_to_tedvideo_ko[url]
transcript_en = en_video.transcriptions
transcript_ko = ko_video.transcriptions
biling_timestamps = set(list(zip(*transcript_en))[0]) & set(list(zip(*transcript_ko))[0])
transcript_en_dict = dict(transcript_en)
transcript_ko_dict = dict(transcript_ko)
# unzip and check timestamp consistency
#if list(zip(*transcript_en))[0] != list(zip(*transcript_ko))[0]:
#print('en', list(zip(*transcript_en))[0])
#print('ko', list(zip(*transcript_ko))[0])
#print('Timestamps inconsistent for: %s. Skipping.' % url)
# continue
# or should we just print only the matching timestamps?
for ts in biling_timestamps:
if len(transcript_ko_dict[ts]) >= ko_len_threshold and len(transcript_en_dict[ts]) >= en_len_threshold:
sent_ko.append(transcript_ko_dict[ts])
sent_en.append(transcript_en_dict[ts])
for wd in transcript_ko_dict[ts].split():
vocab_ko.add(wd)
for wd in transcript_en_dict[ts].split():
vocab_en.add(wd)
#fd_ko.write(transcript_ko_dict[ts] + '\n')
#fd_en.write(transcript_en_dict[ts] + '\n')
assert(len(sent_ko) == len(sent_en))
# train/test/dev sets
# 80/10/10
num_train = int(len(sent_ko) * 0.80)
num_test = int(len(sent_ko) * 0.10)
num_dev = len(sent_ko)-num_train-num_test
train_ko = sent_ko[:num_train]
test_ko = sent_ko[num_train:num_train+num_test]
dev_ko = sent_ko[num_train+num_test:]
train_en = sent_en[:num_train]
test_en = sent_en[num_train:num_train+num_test]
dev_en = sent_en[num_train+num_test:]
assert len(train_ko)==num_train
assert len(test_ko)==num_test
assert len(dev_ko)==num_dev
assert len(train_en)==num_train
assert len(test_en)==num_test
assert len(dev_en)==num_dev
for datasets in [('train', train_ko), ('test', test_ko), ('dev', dev_ko)]:
with open('ted_biling_%s.ko' % datasets[0], 'w', encoding='utf-8') as fd:
for d in datasets[1]:
fd.write(d + '\n')
for datasets in [('train', train_en), ('test', test_en), ('dev', dev_en)]:
with open('ted_biling_%s.en' % datasets[0], 'w', encoding='utf-8') as fd:
for d in datasets[1]:
fd.write(d + '\n')
# write vocab files for NMT
std_vocab = list(['<unk>', '<s>', '</s>'])
with open('ted_biling_vocab.ko', 'w', encoding='utf-8') as fd:
for v in std_vocab + list(vocab_ko):
fd.write('%s\n' % v)
with open('ted_biling_vocab.en', 'w', encoding='utf-8') as fd:
for v in std_vocab + list(vocab_en):
fd.write('%s\n' % v)