Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion configs/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ process:
- extract_qa_mapper: # mapper to extract question and answer pair from text.
hf_model: 'alibaba-pai/pai-qwen1_5-7b-doc2qa' # model name on huggingface to extract question and answer pair.
pattern: null # regular expression pattern to search for within text.
qa_format: 'chatml' # Output format of question and answer pair.
enable_vllm: true # Whether to use vllm for inference acceleration.
tensor_parallel_size: null # It is only valid when enable_vllm is True. The number of GPUs to use for distributed execution with tensor parallelism.
max_model_len: null # It is only valid when enable_vllm is True. Model context length. If unspecified, will be automatically derived from the model config.
Expand Down
5 changes: 5 additions & 0 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ def __init__(self, *args, **kwargs):
self.image_key = kwargs.get('image_key', 'images')
self.audio_key = kwargs.get('audio_key', 'audios')
self.video_key = kwargs.get('video_key', 'videos')

self.query_key = kwargs.get('query_key', 'query')
self.response_key = kwargs.get('response_key', 'response')
self.history_key = kwargs.get('history_key', 'history')

self.batch_size = kwargs.get('batch_size', 1000)

# whether the model can be accelerated using cuda
Expand Down
75 changes: 35 additions & 40 deletions data_juicer/ops/mapper/extract_qa_mapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json
import re
from typing import Dict, Optional

Expand Down Expand Up @@ -33,12 +32,12 @@ class ExtractQAMapper(Mapper):
"""

_accelerator = 'cuda'
_batched_op = True

def __init__(self,
hf_model: str = 'alibaba-pai/pai-qwen1_5-7b-doc2qa',
trust_remote_code: bool = False,
pattern: Optional[str] = None,
qa_format: str = 'chatml',
enable_vllm: bool = True,
tensor_parallel_size: Optional[int] = None,
max_model_len: Optional[int] = None,
Expand All @@ -51,7 +50,6 @@ def __init__(self,
:param hf_model: Hugginface model id.
:param trust_remote_code: passed to transformers
:param pattern: regular expression pattern to search for within text.
:param qa_format: Output format of question and answer pair.
:param enable_vllm: Whether to use vllm for inference acceleration.
:param tensor_parallel_size: It is only valid when enable_vllm is True.
The number of GPUs to use for distributed execution with tensor
Expand Down Expand Up @@ -88,7 +86,6 @@ def __init__(self,
else:
self.pattern = pattern

self.qa_format = qa_format
self.enable_vllm = enable_vllm

if enable_vllm:
Expand Down Expand Up @@ -126,41 +123,39 @@ def _extract_qa(self, output):

return qa_list

def process_single(self, sample, rank=None):
def process_batched(self, samples, rank=None):
model, processor = get_model(self.model_key, rank, self.use_cuda())

if self.enable_vllm:
response = model.generate([sample[self.text_key]],
self.sampling_params)
output = response[0].outputs[0].text
else:
inputs = processor(sample[self.text_key],
return_tensors='pt').to(model.device)
response = model.generate(**inputs, **self.sampling_params)
output = processor.decode(response.cpu()[0],
skip_special_tokens=True)

qa_list = self._extract_qa(output)

if not len(qa_list):
logger.info(
'No question and answer data was extracted from this sample!')

dialogue_data = []
if self.qa_format == 'chatml':
for qa in qa_list:
dialogue_data.append({
'messages': [{
'role': 'user',
'content': qa[0]
}, {
'role': 'assistant',
'content': qa[1]
}]
})
else:
raise ValueError(f'Not support {self.qa_format}!')

sample[self.text_key] = json.dumps(dialogue_data, ensure_ascii=False)

return sample
keys = samples.keys()
first_key = next(iter(keys))
num_samples = len(samples[first_key])
out_samples = {
key: []
for key in keys | {self.query_key, self.response_key}
}
for i in range(num_samples):
sample = {key: samples[key][i] for key in keys}
if self.enable_vllm:
response = model.generate([sample[self.text_key]],
self.sampling_params)
output = response[0].outputs[0].text
else:
inputs = processor(sample[self.text_key],
return_tensors='pt').to(model.device)
response = model.generate(**inputs, **self.sampling_params)
output = processor.decode(response.cpu()[0],
skip_special_tokens=True)

qa_list = self._extract_qa(output)

if len(qa_list) > 0:
for q, a in qa_list:
for k, v in sample.items():
out_samples[k].append(v)
out_samples[self.query_key].append(q)
out_samples[self.response_key].append(a)
else:
logger.info(
'No question and answer was extracted from this sample!')

return out_samples
22 changes: 14 additions & 8 deletions data_juicer/ops/mapper/generate_instruction_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,12 @@ def parse_chatml_str(self, input_str):
return qa_pairs

def parse_response(self, response_str):
logger.debug(response_str)
pattern = self.qa_extraction_pattern
matches = re.findall(pattern, response_str, re.DOTALL)
response_str = ''
out_qa_pairs = []
for i, match in enumerate(matches):
for match in matches:
question, answer = match
question = question.strip()
answer = answer.strip()
Expand Down Expand Up @@ -257,11 +258,14 @@ def process_single(self, sample=None, rank=None):
output_ids = output_ids[:, inputs.data['input_ids'].shape[1]:]
response_str = processor.decode(output_ids.cpu()[0],
skip_special_tokens=True)
message_list = []
out_qa_pairs, response_str = self.parse_response(response_str)

if not response_str:
return {self.text_key: json.dumps({'messages': message_list})}
return {
self.query_key: '',
self.response_key: '',
self.history_key: []
}

if self.similarity_type == 'rouge_l':
sim_score = self.max_rouge_l_score(response_str,
Expand All @@ -271,13 +275,15 @@ def process_single(self, sample=None, rank=None):
f'Not support similarity type "{self.similarity_type}"!')

if sim_score <= self.similarity_threshold:
for question, answer in out_qa_pairs:
message_list.append({'role': 'user', 'content': question})
message_list.append({'role': 'assistant', 'content': answer})
query, response = out_qa_pairs[-1]
history = out_qa_pairs[:-1]
else:
query = response = ''
history = []
logger.info('Filter this generated sample due to similarity.')

return {
self.text_key:
json.dumps({'messages': message_list}, ensure_ascii=False)
self.query_key: query,
self.response_key: response,
self.history_key: history
}
4 changes: 2 additions & 2 deletions data_juicer/ops/mapper/optimize_instruction_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def process_single(self, sample=None, rank=None):
'content': self.system_prompt
}, {
'role': 'user',
'content': sample[self.text_key]
'content': sample[self.query_key]
}]
input_prompt = processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True)
Expand All @@ -110,6 +110,6 @@ def process_single(self, sample=None, rank=None):
output = processor.decode(response.cpu()[0],
skip_special_tokens=True)

sample[self.text_key] = output
sample[self.query_key] = output

return sample
25 changes: 23 additions & 2 deletions tests/config/test_config_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def test_yaml_cfg_file(self):
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'query_key': 'query',
'response_key': 'response',
'history_key': 'history',
'accelerator': None,
'num_proc': 4,
'cpu_required': 1,
Expand All @@ -62,6 +65,9 @@ def test_yaml_cfg_file(self):
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'query_key': 'query',
'response_key': 'response',
'history_key': 'history',
'accelerator': None,
'num_proc': 4,
'stats_export_path': None,
Expand Down Expand Up @@ -128,6 +134,9 @@ def test_mixture_cfg(self):
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'query_key': 'query',
'response_key': 'response',
'history_key': 'history',
'accelerator': None,
'num_proc': 4,
'stats_export_path': None,
Expand All @@ -146,6 +155,9 @@ def test_mixture_cfg(self):
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'query_key': 'query',
'response_key': 'response',
'history_key': 'history',
'accelerator': None,
'num_proc': 4,
'stats_export_path': None,
Expand All @@ -164,6 +176,9 @@ def test_mixture_cfg(self):
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'query_key': 'query',
'response_key': 'response',
'history_key': 'history',
'accelerator': None,
'num_proc': 4,
'stats_export_path': None,
Expand All @@ -182,6 +197,9 @@ def test_mixture_cfg(self):
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'query_key': 'query',
'response_key': 'response',
'history_key': 'history',
'accelerator': None,
'num_proc': 4,
'stats_export_path': None,
Expand All @@ -200,6 +218,9 @@ def test_mixture_cfg(self):
'image_key': 'images',
'audio_key': 'audios',
'video_key': 'videos',
'query_key': 'query',
'response_key': 'response',
'history_key': 'history',
'accelerator': None,
'num_proc': 4,
'stats_export_path': None,
Expand All @@ -216,8 +237,8 @@ def test_op_params_parsing(self):
from data_juicer.ops.base_op import OPERATORS

base_class_params = {
'text_key', 'image_key', 'audio_key', 'video_key', 'accelerator',
'num_proc', 'cpu_required', 'mem_required',
'text_key', 'image_key', 'audio_key', 'video_key', 'query_key', 'response_key', 'history_key',
'accelerator', 'turbo', 'batch_size', 'num_proc', 'cpu_required', 'mem_required',
}

parser = ArgumentParser(default_env=True, default_config_files=None)
Expand Down
20 changes: 7 additions & 13 deletions tests/ops/mapper/test_extract_qa_mapper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import unittest
import json
from loguru import logger
from data_juicer.core.data import NestedDataset as Dataset
from data_juicer.ops.mapper.extract_qa_mapper import ExtractQAMapper
from data_juicer.utils.unittest_utils import (SKIPPED_TESTS,
DataJuicerTestCaseBase)
Expand All @@ -14,20 +15,13 @@ def _run_extract_qa(self, samples, enable_vllm=False, sampling_params={}, **kwar
op = ExtractQAMapper(
hf_model='alibaba-pai/pai-qwen1_5-7b-doc2qa',
trust_remote_code=True,
qa_format='chatml',
enable_vllm=enable_vllm,
sampling_params=sampling_params,
**kwargs
)
for sample in samples:
result = op.process(sample)
out_text = json.loads(result[self.text_key])
print(f'Output sample: {out_text}')

# test one output qa sample
qa_sample = out_text[0]
self.assertIn('role', qa_sample['messages'][0])
self.assertIn('content', qa_sample['messages'][0])
**kwargs)
dataset = Dataset.from_list(samples)
dataset = dataset.map(op.process, batch_size=2)
for row in dataset:
logger.info(row)

def test_extract_qa(self):
samples = [
Expand Down
11 changes: 4 additions & 7 deletions tests/ops/mapper/test_generate_instruction_mapper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import unittest
import json
from loguru import logger
from data_juicer.ops.mapper.generate_instruction_mapper import GenerateInstructionMapper
from data_juicer.utils.unittest_utils import (SKIPPED_TESTS,
DataJuicerTestCaseBase)
Expand All @@ -25,13 +26,9 @@ def _run_generate_instruction(self, enable_vllm=False):

dataset = dataset.map(op.process)

for item in dataset:
out_sample = json.loads(item[self.text_key])
print(f'Output sample: {out_sample}')
# test one output qa sample
self.assertIn('role', out_sample['messages'][0])
self.assertIn('content', out_sample['messages'][0])

for row in dataset:
logger.info(row)

def test_generate_instruction(self):
self._run_generate_instruction()

Expand Down
10 changes: 5 additions & 5 deletions tests/ops/mapper/test_optimize_instruction_mapper.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import unittest
from loguru import logger
from data_juicer.ops.mapper.optimize_instruction_mapper import OptimizeInstructionMapper
from data_juicer.utils.unittest_utils import (SKIPPED_TESTS,
DataJuicerTestCaseBase)
Expand All @@ -7,8 +8,7 @@
# These tests have been tested locally.
@SKIPPED_TESTS.register_module()
class OptimizeInstructionMapperTest(DataJuicerTestCaseBase):

text_key = 'text'
query_key = 'query'

def _run_optimize_instruction(self, enable_vllm=False):
op = OptimizeInstructionMapper(
Expand All @@ -17,13 +17,13 @@ def _run_optimize_instruction(self, enable_vllm=False):
)

samples = [
{self.text_key: '鱼香肉丝怎么做?'}
{self.query_key: '鱼香肉丝怎么做?'}
]

for sample in samples:
result = op.process(sample)
print(f'Output results: {result}')
self.assertIn(self.text_key, result)
logger.info(f'Output results: {result}')
self.assertIn(self.query_key, result)

def test_optimize_instruction(self):
self._run_optimize_instruction()
Expand Down
Loading