This is a script for aligning English and Korean corpuses obtainable below. It also outputs vocab files for the NMT training TensorFlow example.
https://wit3.fbk.eu/mono.php?release=XML_releases&tinfo=cleanedhtml_ted
#!/usr/bin/python3
import random
from normalizer import Normalizer
from tokenizer import Tokenizer
'''
Align TED English and Korean xml corpuses:
https://wit3.fbk.eu/mono.php?release=XML_releases&tinfo=cleanedhtml_ted
'''
# character threshold
# if number of characters is less than this, don't output it
ko_len_threshold = 10
en_len_threshold = 20
class TEDVideo(object):
def __init__(self):
self.url = None
self.transcriptions = []
'''
Parse an array of TEDVideo objects from an xml file
'''
def parse_ted_file(fn):
all_videos = []
contents = None
with open(fn, 'r', encoding='utf-8') as fd:
contents = fd.read()
for tmp in contents.split('<file')[1:]:
ted_video = TEDVideo()
ted_video.url = tmp.split('<url>')[1].split('</url>')[0].lower().strip()
transcription_part = tmp.split('<transcription>')[1].split('</transcription')[0]
invalid_file = False
for ln in transcription_part.split('\n'):
ln = ln.strip()
if not ln:
continue
#print(ln)
if '" />' in ln:
invalid_file = True
continue # erroneous line <seekvideo id="551600" />
seekvideo_time = int(ln.split(' id="')[1].split('"')[0])
seekvideo_text = ln.split('">')[1].split('</seekvideo>')[0].strip()
seekvideo_text_normalized = Normalizer.normalize(seekvideo_text)
tk = Tokenizer.tokenize(seekvideo_text_normalized,
keep_punctuation=True,
keep_symbols=True)
#print((seekvideo_time, seekvideo_text))
ted_video.transcriptions.append((seekvideo_time, ' '.join(tk)))
if not invalid_file:
all_videos.append(ted_video)
return all_videos
parsed_en_videos = parse_ted_file('ted_en-20160408.xml')
parsed_ko_videos = parse_ted_file('ted_ko-20160408.xml')
url_set_en = set([v.url for v in parsed_en_videos])
url_set_ko = set([v.url for v in parsed_ko_videos])
monoling_set = url_set_en | url_set_ko
biling_set = url_set_en & url_set_ko
url_to_tedvideo_en = {}
for v in parsed_en_videos:
url_to_tedvideo_en[v.url] = v
url_to_tedvideo_ko = {}
for v in parsed_ko_videos:
url_to_tedvideo_ko[v.url] = v
print('Detected %d/%d bilingual videos total' % (len(biling_set), len(monoling_set)))
#with open('ted_ko-20160408.xml', 'r', encoding='utf-8') as fd:
# contents = fd.read()
sent_ko = []
sent_en = []
vocab_ko = set()
vocab_en = set()
# re-order videos in random order each time
biling_set = list(biling_set)
random.shuffle(biling_set)
for url in biling_set:
en_video = url_to_tedvideo_en[url]
ko_video = url_to_tedvideo_ko[url]
transcript_en = en_video.transcriptions
transcript_ko = ko_video.transcriptions
biling_timestamps = set(list(zip(*transcript_en))[0]) & set(list(zip(*transcript_ko))[0])
transcript_en_dict = dict(transcript_en)
transcript_ko_dict = dict(transcript_ko)
# unzip and check timestamp consistency
#if list(zip(*transcript_en))[0] != list(zip(*transcript_ko))[0]:
#print('en', list(zip(*transcript_en))[0])
#print('ko', list(zip(*transcript_ko))[0])
#print('Timestamps inconsistent for: %s. Skipping.' % url)
# continue
# or should we just print only the matching timestamps?
for ts in biling_timestamps:
if len(transcript_ko_dict[ts]) >= ko_len_threshold and len(transcript_en_dict[ts]) >= en_len_threshold:
sent_ko.append(transcript_ko_dict[ts])
sent_en.append(transcript_en_dict[ts])
for wd in transcript_ko_dict[ts].split():
vocab_ko.add(wd)
for wd in transcript_en_dict[ts].split():
vocab_en.add(wd)
#fd_ko.write(transcript_ko_dict[ts] + '\n')
#fd_en.write(transcript_en_dict[ts] + '\n')
assert(len(sent_ko) == len(sent_en))
# train/test/dev sets
# 80/10/10
num_train = int(len(sent_ko) * 0.80)
num_test = int(len(sent_ko) * 0.10)
num_dev = len(sent_ko)-num_train-num_test
train_ko = sent_ko[:num_train]
test_ko = sent_ko[num_train:num_train+num_test]
dev_ko = sent_ko[num_train+num_test:]
train_en = sent_en[:num_train]
test_en = sent_en[num_train:num_train+num_test]
dev_en = sent_en[num_train+num_test:]
assert len(train_ko)==num_train
assert len(test_ko)==num_test
assert len(dev_ko)==num_dev
assert len(train_en)==num_train
assert len(test_en)==num_test
assert len(dev_en)==num_dev
for datasets in [('train', train_ko), ('test', test_ko), ('dev', dev_ko)]:
with open('ted_biling_%s.ko' % datasets[0], 'w', encoding='utf-8') as fd:
for d in datasets[1]:
fd.write(d + '\n')
for datasets in [('train', train_en), ('test', test_en), ('dev', dev_en)]:
with open('ted_biling_%s.en' % datasets[0], 'w', encoding='utf-8') as fd:
for d in datasets[1]:
fd.write(d + '\n')
# write vocab files for NMT
std_vocab = list(['<unk>', '<s>', '</s>'])
with open('ted_biling_vocab.ko', 'w', encoding='utf-8') as fd:
for v in std_vocab + list(vocab_ko):
fd.write('%s\n' % v)
with open('ted_biling_vocab.en', 'w', encoding='utf-8') as fd:
for v in std_vocab + list(vocab_en):
fd.write('%s\n' % v)
- #!/usr/bin/python3
- import random
- from normalizer import Normalizer
- from tokenizer import Tokenizer
- '''
- Align TED English and Korean xml corpuses:
- https://wit3.fbk.eu/mono.php?release=XML_releases&tinfo=cleanedhtml_ted
- '''
- # character threshold
- # if number of characters is less than this, don't output it
- ko_len_threshold = 10
- en_len_threshold = 20
- class TEDVideo(object):
- def __init__(self):
- self.url = None
- self.transcriptions = []
- '''
- Parse an array of TEDVideo objects from an xml file
- '''
- def parse_ted_file(fn):
- all_videos = []
- contents = None
- with open(fn, 'r', encoding='utf-8') as fd:
- contents = fd.read()
- for tmp in contents.split('<file')[1:]:
- ted_video = TEDVideo()
- ted_video.url = tmp.split('<url>')[1].split('</url>')[0].lower().strip()
- transcription_part = tmp.split('<transcription>')[1].split('</transcription')[0]
- invalid_file = False
- for ln in transcription_part.split('\n'):
- ln = ln.strip()
- if not ln:
- continue
- #print(ln)
- if '" />' in ln:
- invalid_file = True
- continue # erroneous line <seekvideo id="551600" />
- seekvideo_time = int(ln.split(' id="')[1].split('"')[0])
- seekvideo_text = ln.split('">')[1].split('</seekvideo>')[0].strip()
- seekvideo_text_normalized = Normalizer.normalize(seekvideo_text)
- tk = Tokenizer.tokenize(seekvideo_text_normalized,
- keep_punctuation=True,
- keep_symbols=True)
- #print((seekvideo_time, seekvideo_text))
- ted_video.transcriptions.append((seekvideo_time, ' '.join(tk)))
- if not invalid_file:
- all_videos.append(ted_video)
- return all_videos
- parsed_en_videos = parse_ted_file('ted_en-20160408.xml')
- parsed_ko_videos = parse_ted_file('ted_ko-20160408.xml')
- url_set_en = set([v.url for v in parsed_en_videos])
- url_set_ko = set([v.url for v in parsed_ko_videos])
- monoling_set = url_set_en | url_set_ko
- biling_set = url_set_en & url_set_ko
- url_to_tedvideo_en = {}
- for v in parsed_en_videos:
- url_to_tedvideo_en[v.url] = v
- url_to_tedvideo_ko = {}
- for v in parsed_ko_videos:
- url_to_tedvideo_ko[v.url] = v
- print('Detected %d/%d bilingual videos total' % (len(biling_set), len(monoling_set)))
- #with open('ted_ko-20160408.xml', 'r', encoding='utf-8') as fd:
- # contents = fd.read()
- sent_ko = []
- sent_en = []
- vocab_ko = set()
- vocab_en = set()
- # re-order videos in random order each time
- biling_set = list(biling_set)
- random.shuffle(biling_set)
- for url in biling_set:
- en_video = url_to_tedvideo_en[url]
- ko_video = url_to_tedvideo_ko[url]
- transcript_en = en_video.transcriptions
- transcript_ko = ko_video.transcriptions
- biling_timestamps = set(list(zip(*transcript_en))[0]) & set(list(zip(*transcript_ko))[0])
- transcript_en_dict = dict(transcript_en)
- transcript_ko_dict = dict(transcript_ko)
- # unzip and check timestamp consistency
- #if list(zip(*transcript_en))[0] != list(zip(*transcript_ko))[0]:
- #print('en', list(zip(*transcript_en))[0])
- #print('ko', list(zip(*transcript_ko))[0])
- #print('Timestamps inconsistent for: %s. Skipping.' % url)
- # continue
- # or should we just print only the matching timestamps?
- for ts in biling_timestamps:
- if len(transcript_ko_dict[ts]) >= ko_len_threshold and len(transcript_en_dict[ts]) >= en_len_threshold:
- sent_ko.append(transcript_ko_dict[ts])
- sent_en.append(transcript_en_dict[ts])
- for wd in transcript_ko_dict[ts].split():
- vocab_ko.add(wd)
- for wd in transcript_en_dict[ts].split():
- vocab_en.add(wd)
- #fd_ko.write(transcript_ko_dict[ts] + '\n')
- #fd_en.write(transcript_en_dict[ts] + '\n')
- assert(len(sent_ko) == len(sent_en))
- # train/test/dev sets
- # 80/10/10
- num_train = int(len(sent_ko) * 0.80)
- num_test = int(len(sent_ko) * 0.10)
- num_dev = len(sent_ko)-num_train-num_test
- train_ko = sent_ko[:num_train]
- test_ko = sent_ko[num_train:num_train+num_test]
- dev_ko = sent_ko[num_train+num_test:]
- train_en = sent_en[:num_train]
- test_en = sent_en[num_train:num_train+num_test]
- dev_en = sent_en[num_train+num_test:]
- assert len(train_ko)==num_train
- assert len(test_ko)==num_test
- assert len(dev_ko)==num_dev
- assert len(train_en)==num_train
- assert len(test_en)==num_test
- assert len(dev_en)==num_dev
- for datasets in [('train', train_ko), ('test', test_ko), ('dev', dev_ko)]:
- with open('ted_biling_%s.ko' % datasets[0], 'w', encoding='utf-8') as fd:
- for d in datasets[1]:
- fd.write(d + '\n')
- for datasets in [('train', train_en), ('test', test_en), ('dev', dev_en)]:
- with open('ted_biling_%s.en' % datasets[0], 'w', encoding='utf-8') as fd:
- for d in datasets[1]:
- fd.write(d + '\n')
- # write vocab files for NMT
- std_vocab = list(['<unk>', '<s>', '</s>'])
- with open('ted_biling_vocab.ko', 'w', encoding='utf-8') as fd:
- for v in std_vocab + list(vocab_ko):
- fd.write('%s\n' % v)
- with open('ted_biling_vocab.en', 'w', encoding='utf-8') as fd:
- for v in std_vocab + list(vocab_en):
- fd.write('%s\n' % v)
#!/usr/bin/python3
import random
from normalizer import Normalizer
from tokenizer import Tokenizer
'''
Align TED English and Korean xml corpuses:
https://wit3.fbk.eu/mono.php?release=XML_releases&tinfo=cleanedhtml_ted
'''
# character threshold
# if number of characters is less than this, don't output it
ko_len_threshold = 10
en_len_threshold = 20
class TEDVideo(object):
def __init__(self):
self.url = None
self.transcriptions = []
'''
Parse an array of TEDVideo objects from an xml file
'''
def parse_ted_file(fn):
all_videos = []
contents = None
with open(fn, 'r', encoding='utf-8') as fd:
contents = fd.read()
for tmp in contents.split('<file')[1:]:
ted_video = TEDVideo()
ted_video.url = tmp.split('<url>')[1].split('</url>')[0].lower().strip()
transcription_part = tmp.split('<transcription>')[1].split('</transcription')[0]
invalid_file = False
for ln in transcription_part.split('\n'):
ln = ln.strip()
if not ln:
continue
#print(ln)
if '" />' in ln:
invalid_file = True
continue # erroneous line <seekvideo id="551600" />
seekvideo_time = int(ln.split(' id="')[1].split('"')[0])
seekvideo_text = ln.split('">')[1].split('</seekvideo>')[0].strip()
seekvideo_text_normalized = Normalizer.normalize(seekvideo_text)
tk = Tokenizer.tokenize(seekvideo_text_normalized,
keep_punctuation=True,
keep_symbols=True)
#print((seekvideo_time, seekvideo_text))
ted_video.transcriptions.append((seekvideo_time, ' '.join(tk)))
if not invalid_file:
all_videos.append(ted_video)
return all_videos
parsed_en_videos = parse_ted_file('ted_en-20160408.xml')
parsed_ko_videos = parse_ted_file('ted_ko-20160408.xml')
url_set_en = set([v.url for v in parsed_en_videos])
url_set_ko = set([v.url for v in parsed_ko_videos])
monoling_set = url_set_en | url_set_ko
biling_set = url_set_en & url_set_ko
url_to_tedvideo_en = {}
for v in parsed_en_videos:
url_to_tedvideo_en[v.url] = v
url_to_tedvideo_ko = {}
for v in parsed_ko_videos:
url_to_tedvideo_ko[v.url] = v
print('Detected %d/%d bilingual videos total' % (len(biling_set), len(monoling_set)))
#with open('ted_ko-20160408.xml', 'r', encoding='utf-8') as fd:
# contents = fd.read()
sent_ko = []
sent_en = []
vocab_ko = set()
vocab_en = set()
# re-order videos in random order each time
biling_set = list(biling_set)
random.shuffle(biling_set)
for url in biling_set:
en_video = url_to_tedvideo_en[url]
ko_video = url_to_tedvideo_ko[url]
transcript_en = en_video.transcriptions
transcript_ko = ko_video.transcriptions
biling_timestamps = set(list(zip(*transcript_en))[0]) & set(list(zip(*transcript_ko))[0])
transcript_en_dict = dict(transcript_en)
transcript_ko_dict = dict(transcript_ko)
# unzip and check timestamp consistency
#if list(zip(*transcript_en))[0] != list(zip(*transcript_ko))[0]:
#print('en', list(zip(*transcript_en))[0])
#print('ko', list(zip(*transcript_ko))[0])
#print('Timestamps inconsistent for: %s. Skipping.' % url)
# continue
# or should we just print only the matching timestamps?
for ts in biling_timestamps:
if len(transcript_ko_dict[ts]) >= ko_len_threshold and len(transcript_en_dict[ts]) >= en_len_threshold:
sent_ko.append(transcript_ko_dict[ts])
sent_en.append(transcript_en_dict[ts])
for wd in transcript_ko_dict[ts].split():
vocab_ko.add(wd)
for wd in transcript_en_dict[ts].split():
vocab_en.add(wd)
#fd_ko.write(transcript_ko_dict[ts] + '\n')
#fd_en.write(transcript_en_dict[ts] + '\n')
assert(len(sent_ko) == len(sent_en))
# train/test/dev sets
# 80/10/10
num_train = int(len(sent_ko) * 0.80)
num_test = int(len(sent_ko) * 0.10)
num_dev = len(sent_ko)-num_train-num_test
train_ko = sent_ko[:num_train]
test_ko = sent_ko[num_train:num_train+num_test]
dev_ko = sent_ko[num_train+num_test:]
train_en = sent_en[:num_train]
test_en = sent_en[num_train:num_train+num_test]
dev_en = sent_en[num_train+num_test:]
assert len(train_ko)==num_train
assert len(test_ko)==num_test
assert len(dev_ko)==num_dev
assert len(train_en)==num_train
assert len(test_en)==num_test
assert len(dev_en)==num_dev
for datasets in [('train', train_ko), ('test', test_ko), ('dev', dev_ko)]:
with open('ted_biling_%s.ko' % datasets[0], 'w', encoding='utf-8') as fd:
for d in datasets[1]:
fd.write(d + '\n')
for datasets in [('train', train_en), ('test', test_en), ('dev', dev_en)]:
with open('ted_biling_%s.en' % datasets[0], 'w', encoding='utf-8') as fd:
for d in datasets[1]:
fd.write(d + '\n')
# write vocab files for NMT
std_vocab = list(['<unk>', '<s>', '</s>'])
with open('ted_biling_vocab.ko', 'w', encoding='utf-8') as fd:
for v in std_vocab + list(vocab_ko):
fd.write('%s\n' % v)
with open('ted_biling_vocab.en', 'w', encoding='utf-8') as fd:
for v in std_vocab + list(vocab_en):
fd.write('%s\n' % v)