Spaces:
Runtime error
Runtime error
Create tsne.py
Browse files
tsne.py
ADDED
@@ -0,0 +1,259 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import numpy as np
|
2 |
+
import pandas as pd
|
3 |
+
from openTSNE import TSNE
|
4 |
+
import plotly.graph_objs as go
|
5 |
+
import matplotlib.pyplot as plt
|
6 |
+
import matplotlib.colors as mcolors
|
7 |
+
from sklearn.decomposition import PCA
|
8 |
+
from scipy.optimize import linear_sum_assignment
|
9 |
+
|
10 |
+
class TSNE_Plot():
|
11 |
+
def __init__(self, sentence, embed, label = None, n_clusters :int = 3, n_annotation_positions:int = 20):
|
12 |
+
assert n_clusters > 0, "N must be greater than 0"
|
13 |
+
self.N = n_clusters
|
14 |
+
self.test_X = pd.DataFrame({"text": sentence, "embed": [np.array(i) for i in embed]})
|
15 |
+
self.test_y = pd.DataFrame({'label':label}) if label is not None else pd.DataFrame({"label": self.cluster()})
|
16 |
+
self.embed = self.calculate_tsne()
|
17 |
+
self.init_df()
|
18 |
+
|
19 |
+
self.n_annotation_positions = n_annotation_positions
|
20 |
+
self.show_sentence = []
|
21 |
+
self.random_sentence()
|
22 |
+
|
23 |
+
|
24 |
+
self.annotation_positions = []
|
25 |
+
self.get_annotation_positions()
|
26 |
+
self.mapping = {}
|
27 |
+
|
28 |
+
def cluster(self):
|
29 |
+
from sklearn.cluster import KMeans
|
30 |
+
n_components = min(50, len(self.test_X))
|
31 |
+
pca = PCA(n_components=n_components)
|
32 |
+
compact_embedding = pca.fit_transform(np.array(self.test_X["embed"].tolist()))
|
33 |
+
kmeans = KMeans(n_clusters=self.N)
|
34 |
+
kmeans.fit(compact_embedding)
|
35 |
+
labels = kmeans.labels_
|
36 |
+
return labels
|
37 |
+
|
38 |
+
def generate_colormap(self, n_labels):
|
39 |
+
#创建一个均匀分布的颜色映射
|
40 |
+
color_norm = mcolors.Normalize(vmin=0, vmax=len(n_labels) - 1)
|
41 |
+
# 使用 plt.cm 中预先定义的colormap,你可以自由选择其他colormap如"hsv", "hot", "cool", "viridis"等
|
42 |
+
scalar_map = plt.cm.ScalarMappable(norm=color_norm, cmap='jet')
|
43 |
+
|
44 |
+
colormap = {}
|
45 |
+
for label in range(len(n_labels)):
|
46 |
+
# 将颜色值转换为十六进制
|
47 |
+
color_hex = mcolors.to_hex(scalar_map.to_rgba(label))
|
48 |
+
colormap[n_labels[label]] = color_hex
|
49 |
+
return colormap
|
50 |
+
|
51 |
+
def divide_hex_color_by_half(self, hex_color):
|
52 |
+
if len(hex_color) > 0 and hex_color[0] == "#":
|
53 |
+
hex_color = hex_color[1:]
|
54 |
+
|
55 |
+
red_hex, green_hex, blue_hex = hex_color[0:2], hex_color[2:4], hex_color[4:6]
|
56 |
+
|
57 |
+
red_half = int(red_hex, 16) // 10 + (255-25)
|
58 |
+
green_half = int(green_hex, 16) // 10 + (255-25)
|
59 |
+
blue_half = int(blue_hex, 16) // 10 + (255-25)
|
60 |
+
|
61 |
+
half_hex_color = "#{:02x}{:02x}{:02x}".format(red_half, green_half, blue_half)
|
62 |
+
return half_hex_color
|
63 |
+
|
64 |
+
|
65 |
+
def get_annotation_positions(self):
|
66 |
+
min_x, max_x = self.df['x'].min()-1, self.df['x'].max()+2
|
67 |
+
n = self.n_annotation_positions
|
68 |
+
|
69 |
+
y_min, y_max = self.df['y'].min() * 3, self.df['y'].max() * 3
|
70 |
+
|
71 |
+
add = 0 if n % 2 == 0 else 1
|
72 |
+
y_values = np.linspace(y_min, y_max, n//2+add)
|
73 |
+
|
74 |
+
left_positions = [(min_x, y) for y in y_values]
|
75 |
+
right_positions = [(max_x, y) for y in y_values]
|
76 |
+
|
77 |
+
|
78 |
+
self.annotation_positions = left_positions + right_positions
|
79 |
+
|
80 |
+
|
81 |
+
def euclidean_distance(self, p1, p2):
|
82 |
+
return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)
|
83 |
+
|
84 |
+
def map_points(self):
|
85 |
+
# Get points from the dataframe using the show_sentence indices
|
86 |
+
points1 = [(self.embed[i][0], self.embed[i][1]) for i in self.show_sentence]
|
87 |
+
|
88 |
+
# Create a distance matrix between the points
|
89 |
+
distance_matrix = np.zeros((len(points1), len(self.annotation_positions)))
|
90 |
+
|
91 |
+
for i, point1 in enumerate(points1):
|
92 |
+
for j, point2 in enumerate(self.annotation_positions):
|
93 |
+
distance_matrix[i, j] = self.euclidean_distance(point1, point2)
|
94 |
+
|
95 |
+
# Apply linear_sum_assignment to find the optimal mapping
|
96 |
+
row_ind, col_ind = linear_sum_assignment(distance_matrix)
|
97 |
+
|
98 |
+
for i, j in zip(row_ind, col_ind):
|
99 |
+
self.mapping[self.show_sentence[i]] = self.annotation_positions[j]
|
100 |
+
|
101 |
+
|
102 |
+
def show_text(self, show_sentence, text):
|
103 |
+
sentence = []
|
104 |
+
for i in range(len(text)):
|
105 |
+
if i in show_sentence:
|
106 |
+
s = text[i][:10] + "..." + text[i][-10:]
|
107 |
+
sentence.append(s)
|
108 |
+
else:
|
109 |
+
sentence.append("")
|
110 |
+
return sentence
|
111 |
+
|
112 |
+
def init_df(self):
|
113 |
+
X, Y = np.split(self.embed, 2, axis=1)
|
114 |
+
data = {
|
115 |
+
"x": X.flatten(),
|
116 |
+
"y": Y.flatten(),
|
117 |
+
}
|
118 |
+
|
119 |
+
self.df = pd.DataFrame(data)
|
120 |
+
|
121 |
+
|
122 |
+
def format_data(self):
|
123 |
+
sentence = self.show_text(self.show_sentence, self.test_X["text"])
|
124 |
+
X, Y = np.split(self.embed, 2, axis=1)
|
125 |
+
n = len(self.test_X)
|
126 |
+
data = {
|
127 |
+
"x": X.flatten(),
|
128 |
+
"y": Y.flatten(),
|
129 |
+
"label": self.test_y["label"],
|
130 |
+
"sentence" : sentence,
|
131 |
+
"size" : [20 if i in self.show_sentence else 10 for i in range(n)],
|
132 |
+
"pos" : [{"x_offset": self.mapping.get(i, (0, 0))[0], "y_offset": self.mapping.get(i, (0, 0))[1]} for i in range(n)],
|
133 |
+
"annotate" : [True if i in self.show_sentence else False for i in range(n)],
|
134 |
+
}
|
135 |
+
self.df = pd.DataFrame(data)
|
136 |
+
|
137 |
+
def calculate_tsne(self):
|
138 |
+
embed = np.array(self.test_X["embed"].tolist())
|
139 |
+
n_components = min(50, len(self.test_X))
|
140 |
+
pca = PCA(n_components=n_components)
|
141 |
+
compact_embedding = pca.fit_transform(embed)
|
142 |
+
tsne = TSNE(
|
143 |
+
perplexity=30,
|
144 |
+
metric="cosine",
|
145 |
+
n_jobs=8,
|
146 |
+
random_state=42,
|
147 |
+
verbose=False,
|
148 |
+
)
|
149 |
+
embedding_train = tsne.fit(compact_embedding)
|
150 |
+
embedding_train = embedding_train.optimize(n_iter=1000, momentum=0.8)
|
151 |
+
return embedding_train
|
152 |
+
|
153 |
+
def random_sentence(self):
|
154 |
+
#多次随机可能会影响可视化结果
|
155 |
+
n_samples = len(self.test_y)
|
156 |
+
|
157 |
+
show_sentence = []
|
158 |
+
while len(show_sentence) < self.n_annotation_positions:
|
159 |
+
show_sentence.append(np.random.randint(0, n_samples))
|
160 |
+
show_sentence = list(set(show_sentence))
|
161 |
+
|
162 |
+
# 确保每个标签至少有一个句子,用在show_sentence中最多的标签的句子来补充
|
163 |
+
label_count = self.test_y["label"].value_counts()
|
164 |
+
max_label = label_count.index[0]
|
165 |
+
max_count = label_count[0]
|
166 |
+
for i in range(max_count):
|
167 |
+
for j in range(len(label_count)):
|
168 |
+
if label_count[j] == i:
|
169 |
+
show_sentence.append(self.test_y[self.test_y["label"] == label_count.index[j]].index[0])
|
170 |
+
self.show_sentence = list(set(show_sentence))
|
171 |
+
|
172 |
+
def plot(self, return_fig=False):
|
173 |
+
min_x, max_x = self.df['x'].min()-1, self.df['x'].max()+2
|
174 |
+
fig = go.Figure()
|
175 |
+
fig = go.Figure(layout=go.Layout(
|
176 |
+
autosize=False, # 禁止图像自动调整大小
|
177 |
+
height=800, # 您可以根据需要调整这个值
|
178 |
+
width=1500, # 您可以根据需要调整这个值
|
179 |
+
# plot_bgcolor="#262626",
|
180 |
+
))
|
181 |
+
|
182 |
+
label_colors = self.generate_colormap(self.df['label'].unique())
|
183 |
+
|
184 |
+
line_legend_group = "lines"
|
185 |
+
|
186 |
+
# 为每个类别的点创建散点图
|
187 |
+
for label, color in label_colors.items():
|
188 |
+
mask = self.df["label"] == label
|
189 |
+
fig.add_trace(go.Scatter(x=self.df[mask]['x'], y=self.df[mask]['y'], mode='markers',
|
190 |
+
marker=dict(color=color, size=self.df[mask]['size']), # 添加 size 参数
|
191 |
+
showlegend=True, legendgroup=line_legend_group,
|
192 |
+
name = str(label))
|
193 |
+
)
|
194 |
+
|
195 |
+
|
196 |
+
# 为每个句子创建注释
|
197 |
+
for x, y, label, sentence, pos, annotate in zip(self.df.x, self.df.y, self.df.label, self.df.sentence, self.df.pos, self.df.annotate):
|
198 |
+
if not sentence:
|
199 |
+
continue
|
200 |
+
if not annotate:
|
201 |
+
continue
|
202 |
+
# pos在左边
|
203 |
+
criteria = (pos["x_offset"] - min_x) < 1e-2
|
204 |
+
|
205 |
+
sentence_annotation = dict(
|
206 |
+
x=pos["x_offset"],
|
207 |
+
y=pos["y_offset"],
|
208 |
+
xref="x",
|
209 |
+
yref="y",
|
210 |
+
text=sentence,
|
211 |
+
showarrow=False,
|
212 |
+
xanchor="right" if criteria else 'left',
|
213 |
+
yanchor='middle',
|
214 |
+
font=dict(color="black"),
|
215 |
+
bordercolor=label_colors.get(label, "black"),
|
216 |
+
borderpad=2,
|
217 |
+
bgcolor=self.divide_hex_color_by_half(label_colors.get(label, "black"))
|
218 |
+
)
|
219 |
+
fig.add_annotation(sentence_annotation)
|
220 |
+
|
221 |
+
x_start = x - 1 if criteria else x + 1
|
222 |
+
x_turn = x - 0.5 if criteria else x + 0.5
|
223 |
+
y_turn = y
|
224 |
+
|
225 |
+
fig.add_trace(go.Scatter(x=[pos["x_offset"], x_start, x_turn, x], y=[pos["y_offset"], pos["y_offset"], y_turn, y], mode='lines',
|
226 |
+
line=dict(color=label_colors.get(label, "black")), showlegend=False, legendgroup=line_legend_group))
|
227 |
+
|
228 |
+
# 取消坐标轴的数字
|
229 |
+
fig.update_xaxes(tickvals=[])
|
230 |
+
fig.update_yaxes(tickvals=[])
|
231 |
+
|
232 |
+
if not return_fig:
|
233 |
+
fig.show()
|
234 |
+
else:
|
235 |
+
return fig
|
236 |
+
|
237 |
+
def tsne_plot(self, n_sentence = 20, return_fig=False):
|
238 |
+
# 计算t-SNE,返回降维后的数据,每个元素为一个二维向量
|
239 |
+
embedding_train = self.calculate_tsne()
|
240 |
+
|
241 |
+
# 随机抽取显示文本, n为抽取的数量,show_sentence为一个列表,每个元素为显示文本的索引
|
242 |
+
if self.n_annotation_positions != min(n_sentence, len(self.test_y)):
|
243 |
+
self.n_annotation_positions = min(n_sentence, len(self.test_y))
|
244 |
+
self.random_sentence()
|
245 |
+
self.get_annotation_positions()
|
246 |
+
|
247 |
+
# find the optimal sentence positions
|
248 |
+
self.map_points()
|
249 |
+
|
250 |
+
# 格式化数据,输出为一个pandas的DataFrame,包含x, y, label, sentence, sentence_pos, size
|
251 |
+
# x, y为降维后的坐标,label为类别,sentence为显示的文本,sentence_pos为文本的位置("left", "right"),size为被选中文本的大小
|
252 |
+
self.format_data()
|
253 |
+
# self.df = self.df.sort_values('y').reset_index(drop=True)
|
254 |
+
|
255 |
+
if not return_fig:
|
256 |
+
# 绘制图像
|
257 |
+
self.plot()
|
258 |
+
else:
|
259 |
+
return self.plot(return_fig=return_fig)
|