SuooL's Blog

蛰伏于盛夏 藏华于当春

EDA Easy Data Augmentation 数据增强使用记录

介绍

EDA (Easy Data Augmentation) 是一种应用于文本分类的简单的数据增强技术,由4种方法组成,分别是:同义词替换、随机插入、随机替换与随机删除 4个数据增强操作:

  • 同义词替换(Synonym Replacement, SR):从句子中随机选取n个不属于停用词集的单词,并随机选择其同义词替换它们;
  • 随机插入(Random Insertion, RI):随机的找出句中某个不属于停用词集的词,并求出其随机的同义词,将该同义词插入句子的一个随机位置。重复n次;
  • 随机交换(Random Swap, RS):随机的选择句中两个单词并交换它们的位置。重复n次;
  • 随机删除(Random Deletion, RD):以 p的概率,随机的移除句中的每个单词;

使用EDA需要注意:控制样本数量,少量学习,不能扩充太多,因为EDA操作太过频繁可能会改变语义,从而降低模型性能。

作者给出了在实际使用EDA方法的建议,表格的左边是数据的规模 $N_{train}$ , 右边 $\alpha , n_{aug}$ 是概率、比率

比如同义词替换中,替换的单词数 $n=\alpha*l$ , $l$是句子长度。随机插入、随机替换类似。

随机删除的话 $p=\alpha * n_{aug}$ 代表使用EDA方法从每一个句子拓展出的句子数量。

英文增强实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import pickle
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
import numpy as np
import random
import nltk
from nltk.corpus import wordnet
from nltk.corpus import stopwords
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')


def plot_graphs(history, metric):
plt.plot(history.history[metric])
plt.plot(history.history['val_'+metric], '')
plt.xlabel("Epochs")
plt.ylabel(metric)
plt.legend([metric, 'val_'+metric])


def eda_SR(originalSentence, n):
"""
Paper Methodology -> Randomly choose n words from the sentence that are not stop words.
Replace each of these words with one of its synonyms chosen at random.
originalSentence -> The sentence on which EDA is to be applied
n -> The number of words to be chosen for random synonym replacement
"""
stops = set(stopwords.words('english'))
splitSentence = list(originalSentence.split(" "))
splitSentenceCopy = splitSentence.copy()
# Since We Make Changes to The Original Sentence List The Indexes Change and Hence an initial copy proves useful to get values
ls_nonStopWordIndexes = []
for i in range(len(splitSentence)):
if splitSentence[i].lower() not in stops:
ls_nonStopWordIndexes.append(i)
if (n > len(ls_nonStopWordIndexes)):
raise Exception(
"The number of replacements exceeds the number of non stop word words")
for i in range(n):
indexChosen = random.choice(ls_nonStopWordIndexes)
ls_nonStopWordIndexes.remove(indexChosen)
synonyms = []
originalWord = splitSentenceCopy[indexChosen]
for synset in wordnet.synsets(originalWord):
for lemma in synset.lemmas():
if lemma.name() != originalWord:
synonyms.append(lemma.name())
if (synonyms == []):
continue
splitSentence[indexChosen] = random.choice(synonyms).replace('_', ' ')
return " ".join(splitSentence)


def eda_RI(originalSentence, n):
"""
Paper Methodology -> Find a random synonym of a random word in the sentence that is not a stop word.
Insert that synonym into a random position in the sentence. Do this n times
originalSentence -> The sentence on which EDA is to be applied
n -> The number of times the process has to be repeated
"""
stops = set(stopwords.words('english'))
splitSentence = list(originalSentence.split(" "))
splitSentenceCopy = splitSentence.copy()
# Since We Make Changes to The Original Sentence List The Indexes Change and Hence an initial copy proves useful to get values
ls_nonStopWordIndexes = []
for i in range(len(splitSentence)):
if splitSentence[i].lower() not in stops:
ls_nonStopWordIndexes.append(i)
if (n > len(ls_nonStopWordIndexes)):
raise Exception("The number of replacements exceeds the number of non stop word words")
WordCount = len(splitSentence)
for i in range(n):
indexChosen = random.choice(ls_nonStopWordIndexes)
ls_nonStopWordIndexes.remove(indexChosen)
synonyms = []
originalWord = splitSentenceCopy[indexChosen]
for synset in wordnet.synsets(originalWord):
for lemma in synset.lemmas():
if lemma.name() != originalWord:
synonyms.append(lemma.name())
if (synonyms == []):
continue
splitSentence.insert(random.randint(0,WordCount-1), random.choice(synonyms).replace('_', ' '))
return " ".join(splitSentence)

def eda_RS(originalSentence, n):
"""
Paper Methodology -> Find a random synonym of a random word in the sentence that is not a stop word.
Insert that synonym into a random position in the sentence. Do this n times
originalSentence -> The sentence on which EDA is to be applied
n -> The number of times the process has to be repeated
"""
splitSentence = list(originalSentence.split(" "))
WordCount = len(splitSentence)
for i in range(n):
firstIndex = random.randint(0,WordCount-1)
secondIndex = random.randint(0,WordCount-1)
while (secondIndex == firstIndex and WordCount != 1):
secondIndex = random.randint(0,WordCount-1)
splitSentence[firstIndex], splitSentence[secondIndex] = splitSentence[secondIndex], splitSentence[firstIndex]
return " ".join(splitSentence)

def eda_RD(originalSentence, p):
"""
Paper Methodology -> Randomly remove each word in the sentence with probability p.
originalSentence -> The sentence on which EDA is to be applied
p -> Probability of a Word Being Removed
"""
og = originalSentence
if (p == 1):
raise Exception("Always an Empty String Will Be Returned")
if (p > 1 or p < 0):
raise Exception("Improper Probability Value")
splitSentence = list(originalSentence.split(" "))
lsIndexesRemoved = []
WordCount = len(splitSentence)
for i in range(WordCount):
randomDraw = random.random()
if randomDraw <= p:
lsIndexesRemoved.append(i)
lsRetainingWords = []
for i in range(len(splitSentence)):
if i not in lsIndexesRemoved:
lsRetainingWords.append(splitSentence[i])
if (lsRetainingWords == []):
return og
return " ".join(lsRetainingWords)

# EDA函数
def eda(sentence, alpha_sr=0.1, alpha_ri=0.1, alpha_rs=0.1, p_rd=0.1, num_aug=9):

words = list(sentence.split())
num_words = len(words)

augmented_sentences = []
num_new_per_technique = int(num_aug / 4) + 1
n_sr = max(1, int(alpha_sr * num_words))
n_ri = max(1, int(alpha_ri * num_words))
n_rs = max(1, int(alpha_rs * num_words))

# print(words, "\n")

# 同义词替换sr
for _ in range(num_new_per_technique):
a_sentence = eda_SR(sentence, n_sr)
augmented_sentences.append(a_sentence)

# 随机插入ri
for _ in range(num_new_per_technique):
a_sentence = eda_RI(sentence, n_ri)
augmented_sentences.append(a_sentence)

# 随机交换rs
for _ in range(num_new_per_technique):
a_sentence = eda_RS(sentence, n_rs)
augmented_sentences.append(a_sentence)

# 随机删除rd
for _ in range(num_new_per_technique):
a_sentence = eda_RD(sentence, p_rd)
augmented_sentences.append(a_sentence)

# print('*'*20, len(augmented_sentences))
shuffle(augmented_sentences)

if num_aug >= 1:
augmented_sentences = augmented_sentences[:num_aug]
else:
keep_prob = num_aug / len(augmented_sentences)
augmented_sentences = [s for s in augmented_sentences if random.uniform(0, 1) < keep_prob]

augmented_sentences.append(sentence) # 加上original句子

return augmented_sentences

def get_eda_df(sentences, alpha=0.1, num_avg=9):
results = []
for i, sents in enumerate(sentences):
augmented_sentences = eda(sents, alpha_sr=alpha, alpha_ri=alpha, alpha_rs=alpha, p_rd=alpha,
num_aug=num_avg)
results.append(augmented_sentences)
return sum(results, [])

if __name__ == '__main__':
# 测试用例

seg_list = "microcontroller coprocessor bmc boot up device enabled asserted microcontroller bmc boot up device enabled asserted processor cpu1 status presence detected asserted processor cpu0 status presence detected asserted system acpi power state acpi pwr status ss one state soft off asserted button button pressed power button pressed asserted system acpi power state acpi pwr status sg one state working asserted power supply ps1 status presence detected asserted power supply ps2 status presence detected asserted"
augmented_sentences = eda(seg_list, alpha_sr=0.05, alpha_ri=0.05, alpha_rs=0.05, p_rd=0.05, num_aug=9)
print(len(augmented_sentences ))
print(augmented_sentences)

中文增强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# 此部分代码源于:https://github.com/zhanlaoban/eda_nlp_for_Chinese
import synonyms
import jieba
import random
from random import shuffle
import pandas as pd


random.seed(2019)

DATA_PATH = '../data/'
STOPWORDS_FILENAME = 'zh_data/stopwords.txt'#停用词文件名


def load_stopwords_file(filename):
print('加载停用词...')
stopwords=pd.read_csv(filename, index_col=False, quoting=3, sep="\t", names=['stopword'], encoding='utf-8')
return set(stopwords['stopword'].values)


# 加载停用词表
stop_words = load_stopwords_file(DATA_PATH+STOPWORDS_FILENAME)#加载停用词表
print('停用词表大小:', len(stop_words))




########################################################################
# 同义词替换
# 替换一个语句中的n个单词为其同义词
########################################################################
def synonym_replacement(words, n):
new_words = words.copy()
random_word_list = list(set([word for word in words if word not in stop_words]))
random.shuffle(random_word_list)
num_replaced = 0
for random_word in random_word_list:
synonyms = get_synonyms(random_word)
if len(synonyms) >= 1:
synonym = random.choice(synonyms)
new_words = [synonym if word == random_word else word for word in new_words]
num_replaced += 1
if num_replaced >= n:
break

sentence = ' '.join(new_words)
new_words = sentence.split(' ')

return new_words


def get_synonyms(word):
return synonyms.nearby(word)[0]


########################################################################
# 随机插入
# 随机在语句中插入n个词
########################################################################
def random_insertion(words, n):
new_words = words.copy()
for _ in range(n):
add_word(new_words)
return new_words


def add_word(new_words):
synonyms = []
counter = 0
while len(synonyms) < 1:
random_word = new_words[random.randint(0, len(new_words) - 1)]
synonyms = get_synonyms(random_word)
counter += 1
if counter >= 10:
return
random_synonym = random.choice(synonyms)
random_idx = random.randint(0, len(new_words) - 1)
new_words.insert(random_idx, random_synonym)


########################################################################
# Random swap
# Randomly swap two words in the sentence n times
########################################################################

def random_swap(words, n):
new_words = words.copy()
for _ in range(n):
new_words = swap_word(new_words)
return new_words


def swap_word(new_words):
random_idx_1 = random.randint(0, len(new_words) - 1)
random_idx_2 = random_idx_1
counter = 0
while random_idx_2 == random_idx_1:
random_idx_2 = random.randint(0, len(new_words) - 1)
counter += 1
if counter > 3:
return new_words
new_words[random_idx_1], new_words[random_idx_2] = new_words[random_idx_2], new_words[random_idx_1]
return new_words


########################################################################
# 随机删除
# 以概率p删除语句中的词
########################################################################
def random_deletion(words, p):
if len(words) == 1:
return words

new_words = []
for word in words:
r = random.uniform(0, 1)
if r > p:
new_words.append(word)

if len(new_words) == 0:
rand_int = random.randint(0, len(words) - 1)
return [words[rand_int]]

return new_words


########################################################################
# EDA函数
def eda(sentence, segged=False, alpha_sr=0.1, alpha_ri=0.1, alpha_rs=0.1, p_rd=0.1, num_aug=9):
if segged is False: # 还没有分词
seg_list = jieba.cut(sentence) # hide by Jane
seg_list = " ".join(seg_list) # hide by Jane
else:
seg_list = sentence # add by Jane
words = list(seg_list.split())
num_words = len(words)

augmented_sentences = []
num_new_per_technique = int(num_aug / 4) + 1
n_sr = max(1, int(alpha_sr * num_words))
n_ri = max(1, int(alpha_ri * num_words))
n_rs = max(1, int(alpha_rs * num_words))

# print(words, "\n")

# 同义词替换sr
for _ in range(num_new_per_technique):
a_words = synonym_replacement(words, n_sr)
augmented_sentences.append(' '.join(a_words))

# 随机插入ri
for _ in range(num_new_per_technique):
a_words = random_insertion(words, n_ri)
augmented_sentences.append(' '.join(a_words))

# 随机交换rs
for _ in range(num_new_per_technique):
a_words = random_swap(words, n_rs)
augmented_sentences.append(' '.join(a_words))

# 随机删除rd
for _ in range(num_new_per_technique):
a_words = random_deletion(words, p_rd)
augmented_sentences.append(' '.join(a_words))

# print('*'*20, len(augmented_sentences))
shuffle(augmented_sentences)

if num_aug >= 1:
augmented_sentences = augmented_sentences[:num_aug]
else:
keep_prob = num_aug / len(augmented_sentences)
augmented_sentences = [s for s in augmented_sentences if random.uniform(0, 1) < keep_prob]

augmented_sentences.append(seg_list) # 加上original句子

return augmented_sentences


if __name__ == '__main__':
# 测试用例
"""
sentence = "我们就像蒲公英,我也祈祷着能和你飞去同一片土地"
augmented_sentences = eda(sentence)
print(augmented_sentences)
"""

seg_list = "我们 就 像 蒲公英 , 我 也 祈祷 着 能 和 你 飞去 同 一片 土地"
augmented_sentences = eda(seg_list, segged=True, alpha_sr=0.05, alpha_ri=0.05, alpha_rs=0.05, p_rd=0.05, num_aug=9)
print(augmented_sentences)
泡面一杯