Skip to content

Commit 62f0f79

Browse files
refactor(models): accelerate evaluating using mp
1 parent 32e0651 commit 62f0f79

File tree

3 files changed

+204
-129
lines changed

3 files changed

+204
-129
lines changed

evaluate.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
import os
44
import json
55
import argparse
6-
import torch
76
import pandas as pd
87
from dotenv import load_dotenv
8+
import torch
9+
import torch.multiprocessing as mp
910
from models import LengthEvaluator, MTLDEvaluator, RewardEvaluator, TextPair, UniEvaluator
1011
from utils import logger, set_logger
1112

@@ -51,15 +52,12 @@ def evaluate_uni(corpus, uni_model_name):
5152
model_name=uni_model_name
5253
)
5354
logger.info("Uni evaluator loaded with model %s", uni_model_name)
54-
naturalness_scores = uni_evaluator.get_average_score(corpus, 'naturalness')
55-
logger.info("Uni naturalness scores: %s", naturalness_scores)
56-
coherence_scores = uni_evaluator.get_average_score(corpus, 'coherence')
57-
logger.info("Uni coherence scores: %s", coherence_scores)
58-
understandability_scores = uni_evaluator.get_average_score(corpus, 'understandability')
59-
logger.info("Uni understandability scores: %s", understandability_scores)
55+
uni_scores = uni_evaluator.get_average_score(corpus)
56+
for key, value in uni_scores.items():
57+
logger.info("Uni %s scores: %s", key, value)
6058
del uni_evaluator
6159
clean_gpu_cache()
62-
return naturalness_scores, coherence_scores, understandability_scores
60+
return uni_scores['naturalness'], uni_scores['coherence'], uni_scores['understandability']
6361

6462

6563
def clean_gpu_cache():
@@ -92,6 +90,8 @@ def clean_gpu_cache():
9290
results = []
9391

9492
logger.info("Data loaded from %s", args.folder)
93+
mp.set_start_method('spawn')
94+
9595
for file in os.listdir(args.folder):
9696
if file.endswith('.json'):
9797
logger.info("Processing %s", file)
Lines changed: 71 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,90 @@
11
from dataclasses import dataclass
2+
from tqdm import tqdm
23
from transformers import AutoModelForSequenceClassification, AutoTokenizer
4+
import torch
5+
import torch.multiprocessing as mp
36

4-
from models.evaluate.base_evaluator import BaseEvaluator
57
from models.text.text_pair import TextPair
6-
from utils import create_event_loop
78

89

910
@dataclass
10-
class RewardEvaluator(BaseEvaluator):
11+
class RewardEvaluator:
1112
"""
1213
Reward Model Evaluator.
1314
OpenAssistant/reward-model-deberta-v3-large-v2: 分数范围为[-inf, inf],越高越好
1415
"""
1516
reward_name: str = "OpenAssistant/reward-model-deberta-v3-large-v2"
16-
max_length: int = 1024
17+
max_length: int = 2560
1718

1819
def __post_init__(self):
19-
self.rank_model = AutoModelForSequenceClassification.from_pretrained(self.reward_name)
20-
self.tokenizer = AutoTokenizer.from_pretrained(self.reward_name)
20+
self.num_gpus = torch.cuda.device_count()
2121

22-
self.rank_model.eval()
23-
self.rank_model.to("cuda")
22+
@staticmethod
23+
def process_chunk(rank, pairs, reward_name, max_length, return_dict):
24+
device = f'cuda:{rank}'
25+
torch.cuda.set_device(rank)
2426

25-
async def evaluate_single(self, pair: TextPair) -> float:
26-
loop = create_event_loop()
27-
return await loop.run_in_executor(None, self._tokenize_and_rank, pair)
27+
rank_model = AutoModelForSequenceClassification.from_pretrained(reward_name)
28+
tokenizer = AutoTokenizer.from_pretrained(reward_name)
29+
rank_model.to(device)
30+
rank_model.eval()
2831

29-
def _tokenize_and_rank(self, pair: TextPair) -> float:
30-
question, answer = pair.question, pair.answer
32+
results = []
33+
with torch.no_grad():
34+
for pair in tqdm(pairs):
35+
inputs = tokenizer(
36+
pair.question,
37+
pair.answer,
38+
return_tensors="pt",
39+
max_length=max_length,
40+
truncation=True
41+
)
42+
inputs = {k: v.to(device) for k, v in inputs.items()}
43+
score = rank_model(**inputs).logits[0].item()
44+
results.append(score)
3145

32-
# concatenate the question and answer
33-
inputs = self.tokenizer(question, answer, return_tensors="pt", max_length=self.max_length, truncation=True)
34-
inputs = {k: v.to("cuda") for k, v in inputs.items()}
46+
return_dict[rank] = results
3547

36-
score = self.rank_model(**inputs).logits[0].item()
37-
return score
48+
def evaluate(self, pairs: list[TextPair]) -> list[float]:
49+
chunk_size = len(pairs) // self.num_gpus
50+
chunks = []
51+
for i in range(self.num_gpus):
52+
start = i * chunk_size
53+
end = start + chunk_size
54+
if i == self.num_gpus - 1:
55+
end = len(pairs)
56+
chunks.append(pairs[start:end])
57+
58+
# multi-process
59+
manager = mp.Manager()
60+
return_dict = manager.dict()
61+
processes = []
62+
63+
for rank, chunk in enumerate(chunks):
64+
p = mp.Process(
65+
target=self.process_chunk,
66+
args=(rank, chunk, self.reward_name, self.max_length, return_dict)
67+
)
68+
p.start()
69+
processes.append(p)
70+
71+
for p in processes:
72+
p.join()
73+
74+
# 合并结果
75+
results = []
76+
for rank in range(len(chunks)):
77+
results.extend(return_dict[rank])
78+
79+
for p in processes:
80+
if p.is_alive():
81+
p.terminate()
82+
p.join()
83+
84+
return results
85+
86+
def get_average_score(self, pairs: list[TextPair]) -> float:
87+
"""
88+
Get the average score of a batch of texts.
89+
"""
90+
return sum(self.evaluate(pairs)) / len(pairs)

models/evaluate/uni_evaluator.py

Lines changed: 125 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -1,124 +1,146 @@
11
# https://github.com/maszhongming/UniEval/tree/main
22

3+
from dataclasses import dataclass, field
4+
from tqdm import tqdm
35
import torch
46
from torch import nn
5-
from dataclasses import dataclass, field
6-
import asyncio
7-
from tqdm.asyncio import tqdm as tqdm_async
7+
import torch.multiprocessing as mp
88

99
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
10-
from models.evaluate.base_evaluator import BaseEvaluator
11-
from utils import create_event_loop
12-
from models.text.text_pair import TextPair
13-
10+
from models import TextPair
11+
12+
13+
def _add_questions(dimension: str, question: str, answer: str):
14+
if dimension == "naturalness":
15+
cur_input = 'question: Is this a natural response in the dialogue? </s> response: ' + answer
16+
elif dimension == "coherence":
17+
cur_input = 'question: Is this a coherent response given the dialogue history? </s> response: ' \
18+
+ answer + ' </s> dialogue history: ' + question
19+
elif dimension == "understandability":
20+
cur_input = 'question: Is this an understandable response in the dialogue? </s> response: ' + answer
21+
else:
22+
raise NotImplementedError(
23+
'The input format for this dimension is still undefined. Please customize it first.')
24+
return cur_input
1425

1526
@dataclass
16-
class UniEvaluator(BaseEvaluator):
27+
class UniEvaluator:
1728
model_name: str = "MingZhong/unieval-sum"
1829
dimensions: list = field(default_factory=lambda: ['naturalness', 'coherence', 'understandability'])
19-
max_length: int = 1024
30+
max_length: int = 2560
2031

2132
def __post_init__(self):
22-
self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name)
23-
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
24-
25-
self.model.eval()
26-
self.model.to("cuda")
33+
self.num_gpus = torch.cuda.device_count()
2734

28-
self.softmax = nn.Softmax(dim=1)
35+
@staticmethod
36+
def process_chunk(rank, pairs, model_name, max_length, dimension, return_dict):
37+
device = f'cuda:{rank}'
38+
torch.cuda.set_device(rank)
2939

30-
self.pos_id = self.tokenizer("Yes")["input_ids"][0]
31-
self.neg_id = self.tokenizer("No")["input_ids"][0]
40+
rank_model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
41+
tokenizer = AutoTokenizer.from_pretrained(model_name)
42+
rank_model.to(device)
43+
rank_model.eval()
3244

33-
def evaluate(self, pairs: list[TextPair], dimension: str) -> list[float]:
34-
"""
35-
Evaluate the text and return a score.
36-
"""
37-
return create_event_loop().run_until_complete(self.async_evaluate(pairs, dimension))
45+
softmax = nn.Softmax(dim=1)
3846

39-
async def async_evaluate(self, pairs: list[TextPair], dimension: str) -> list[float]:
40-
semaphore = asyncio.Semaphore(self.max_concurrent)
41-
42-
async def evaluate_with_semaphore(pair):
43-
async with semaphore:
44-
return await self.evaluate_single(pair, dimension)
47+
pos_id = tokenizer("Yes")["input_ids"][0]
48+
neg_id = tokenizer("No")["input_ids"][0]
4549

4650
results = []
47-
for result in tqdm_async(
48-
asyncio.as_completed([evaluate_with_semaphore(pair) for pair in pairs]),
49-
total=len(pairs),
50-
):
51-
results.append(await result)
52-
return results
53-
54-
async def evaluate_single(self, pair: TextPair, dimension: str) -> float:
55-
text = self._add_questions(dimension, pair.question, pair.answer)
56-
loop = create_event_loop()
57-
return await loop.run_in_executor(None, self._score, text)
58-
59-
def get_average_score(self, pairs: list[TextPair], dimension: str) -> float:
51+
with torch.no_grad():
52+
for pair in tqdm(pairs):
53+
text = _add_questions(dimension, pair.question, pair.answer)
54+
55+
tgt = "No"
56+
57+
encoded_src = tokenizer(
58+
text,
59+
max_length=max_length,
60+
truncation=True,
61+
padding=True,
62+
return_tensors='pt'
63+
)
64+
encoded_tgt = tokenizer(
65+
tgt,
66+
max_length=max_length,
67+
truncation=True,
68+
padding=True,
69+
return_tensors='pt'
70+
)
71+
72+
src_tokens = encoded_src['input_ids'].to(device)
73+
src_mask = encoded_src['attention_mask'].to(device)
74+
75+
tgt_tokens = encoded_tgt['input_ids'].to(device)[:, 0].unsqueeze(-1)
76+
77+
output = rank_model(
78+
input_ids=src_tokens,
79+
attention_mask=src_mask,
80+
labels=tgt_tokens,
81+
use_cache = False
82+
)
83+
84+
logits = output.logits.view(-1, rank_model.config.vocab_size)
85+
86+
pos_score = softmax(logits)[:, pos_id] # Yes
87+
neg_score = softmax(logits)[:, neg_id]
88+
score = pos_score / (pos_score + neg_score)
89+
90+
results.append(score.item())
91+
92+
return_dict[rank] = results
93+
94+
def evaluate(self, pairs: list[TextPair]) -> list[dict]:
95+
final_results = []
96+
for dimension in self.dimensions:
97+
chunk_size = len(pairs) // self.num_gpus
98+
chunks = []
99+
for i in range(self.num_gpus):
100+
start = i * chunk_size
101+
end = start + chunk_size
102+
if i == self.num_gpus - 1:
103+
end = len(pairs)
104+
chunks.append(pairs[start:end])
105+
106+
# multi-process
107+
manager = mp.Manager()
108+
return_dict = manager.dict()
109+
processes = []
110+
111+
for rank, chunk in enumerate(chunks):
112+
p = mp.Process(
113+
target=self.process_chunk,
114+
args=(rank, chunk, self.model_name, self.max_length, dimension, return_dict)
115+
)
116+
p.start()
117+
processes.append(p)
118+
119+
for p in processes:
120+
p.join()
121+
122+
# 合并结果
123+
results = []
124+
for rank in range(len(chunks)):
125+
results.extend(return_dict[rank])
126+
127+
for p in processes:
128+
if p.is_alive():
129+
p.terminate()
130+
p.join()
131+
132+
final_results.append({
133+
dimension: results
134+
})
135+
return final_results
136+
137+
def get_average_score(self, pairs: list[TextPair]) -> dict:
60138
"""
61139
Get the average score of a batch of texts.
62140
"""
63-
return sum(self.evaluate(pairs, dimension)) / len(pairs)
64-
65-
def _score(self, text: str) -> float:
66-
"""
67-
Get scores for the given samples.
68-
final_score = postive_score / (postive_score + negative_score)
69-
"""
70-
71-
# The implementation of "forward" in T5 still requires decoder_input_ids.
72-
# Therefore, we construct a random one-word target sequence.
73-
# The content of the target has no effect on the final scores.
74-
75-
tgt = "No"
76-
77-
with torch.no_grad():
78-
encoded_src = self.tokenizer(
79-
text,
80-
max_length=self.max_length,
81-
truncation=True,
82-
padding=True,
83-
return_tensors='pt'
84-
)
85-
encoded_tgt = self.tokenizer(
86-
tgt,
87-
max_length=self.max_length,
88-
truncation=True,
89-
padding=True,
90-
return_tensors='pt'
91-
)
92-
93-
src_tokens = encoded_src['input_ids'].to("cuda")
94-
src_mask = encoded_src['attention_mask'].to("cuda")
95-
96-
tgt_tokens = encoded_tgt['input_ids'].to("cuda")[:, 0].unsqueeze(-1)
97-
98-
output = self.model(
99-
input_ids=src_tokens,
100-
attention_mask=src_mask,
101-
labels=tgt_tokens
102-
)
103-
104-
logits = output.logits.view(-1, self.model.config.vocab_size)
105-
106-
pos_score = self.softmax(logits)[:, self.pos_id] # Yes
107-
neg_score = self.softmax(logits)[:, self.neg_id]
108-
109-
score = pos_score / (pos_score + neg_score)
110-
111-
return score.item()
112-
113-
def _add_questions(self, dimension: str, question: str, answer: str):
114-
if dimension == "naturalness":
115-
cur_input = 'question: Is this a natural response in the dialogue? </s> response: ' + answer
116-
elif dimension == "coherence":
117-
cur_input = 'question: Is this a coherent response given the dialogue history? </s> response: ' \
118-
+ answer + ' </s> dialogue history: ' + question
119-
elif dimension == "understandability":
120-
cur_input = 'question: Is this an understandable response in the dialogue? </s> response: ' + answer
121-
else:
122-
raise NotImplementedError(
123-
'The input format for this dimension is still undefined. Please customize it first.')
124-
return cur_input
141+
results = self.evaluate(pairs)
142+
final_results = {}
143+
for result in results:
144+
for key, value in result.items():
145+
final_results[key] = sum(value) / len(value)
146+
return final_results

0 commit comments

Comments
 (0)