diff --git a/docs/model_serving_framework/run.ipynb b/docs/model_serving_framework/run.ipynb new file mode 100644 index 0000000000..81cab5c644 --- /dev/null +++ b/docs/model_serving_framework/run.ipynb @@ -0,0 +1,590 @@ +{ + "cells": [ + { + "cell_type": "raw", + "id": "b32dbef8", + "metadata": { + "vscode": { + "languageId": "raw" + } + }, + "source": [ + "/*\n", + " * Copyright OpenSearch Contributors\n", + " * SPDX-License-Identifier: Apache-2.0\n", + " */" + ] + }, + { + "cell_type": "markdown", + "id": "f6ff4e2d", + "metadata": {}, + "source": [ + "> **_NOTE:_** **This script is supposed to be executed at SageMaker Notebook!**\n", + "\n", + "## prerequesites\n", + "- We have setup an **SageMaker Notebook**, the **S3 bucket** to store the bindle, and config their permission\n", + "\n", + "## Step 1\n", + "Use git to clone this file to your SageMaker Notebook instance, and open this run.ipynb at your SageMaker Notebook\n", + "\n", + "## Step 2\n", + "Prepare the model file for SageMaker. Run below code blocks in sequence." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b6bc7a23", + "metadata": {}, + "outputs": [], + "source": [ + "!mkdir handler\n", + "!mkdir handler/code\n", + "!mkdir handler/MAR-INF" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e4ca2c0c", + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile handler/code/requirements.txt\n", + "transformers==4.44.1\n", + "sentencepiece==0.1.99" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "071ff5c9", + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile handler/MAR-INF/MANIFEST.json\n", + "{\n", + " \"runtime\": \"python\",\n", + " \"model\": {\n", + " \"modelName\": \"neuralsparse\",\n", + " \"handler\": \"neural_sparse_handler.py\",\n", + " \"modelVersion\": \"1.0\",\n", + " \"configFile\": \"neural_sparse_config.yaml\"\n", + " },\n", + " \"archiverVersion\": \"0.9.0\"\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9c7d23ac", + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile handler/neural_sparse_config.yaml\n", + "## configs about dynamic batch inference\n", + "batchSize: 16\n", + "maxBatchDelay: 5\n", + "responseTimeout: 300" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3ee8d2bc", + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile handler/neural_sparse_handler.py\n", + "import os\n", + "import re\n", + "import itertools\n", + "import json\n", + "import torch\n", + "import transformers\n", + "from ts.torch_handler.base_handler import BaseHandler\n", + "\n", + "model_id = os.environ.get(\n", + " \"MODEL_ID\", \"opensearch-project/opensearch-neural-sparse-encoding-doc-v2-distill\"\n", + ")\n", + "max_bs = int(os.environ.get(\"MAX_BS\", 32))\n", + "prune_ratio = float(os.environ.get(\"PRUNE_RATIO\", 0.1))\n", + "\n", + "version_match = re.search(r\"v(\\d+)\", model_id)\n", + "use_l0 = False\n", + "if version_match:\n", + " version = int(version_match.group(1))\n", + " use_l0 = version >= 3\n", + "\n", + "\n", + "class SparseEncodingModelHandler(BaseHandler):\n", + " class SparseModel(torch.nn.Module):\n", + " @staticmethod\n", + " def from_pretrained(path):\n", + " return SparseEncodingModelHandler.SparseModel(path)\n", + "\n", + " def __init__(self, model_id):\n", + " super().__init__()\n", + " self.backbone = transformers.AutoModelForMaskedLM.from_pretrained(model_id)\n", + " self.special_token_ids = []\n", + "\n", + " def set_special_token_ids(self, special_token_ids):\n", + " self.special_token_ids = special_token_ids\n", + "\n", + " def forward(self, **kwargs):\n", + " output = self.backbone(**kwargs)[0]\n", + " values, _ = torch.max(\n", + " output * kwargs.get(\"attention_mask\").unsqueeze(-1), dim=1\n", + " )\n", + " values = torch.log1p(torch.relu(values))\n", + " if use_l0:\n", + " values = torch.log1p(values)\n", + " values[:, self.special_token_ids] = 0\n", + " max_values = values.max(dim=-1)[0].unsqueeze(1) * prune_ratio\n", + " return values * (values > max_values)\n", + "\n", + " class SparsePostProcessor(object):\n", + " def __init__(self, tokenizer):\n", + " self.tokenizer = tokenizer\n", + " self.id_to_token = [\"\" for i in range(tokenizer.vocab_size)]\n", + " for token, _id in tokenizer.vocab.items():\n", + " self.id_to_token[_id] = token\n", + "\n", + " def __call__(self, sparse_vector):\n", + " sample_indices, token_indices = torch.nonzero(sparse_vector, as_tuple=True)\n", + " non_zero_values = sparse_vector[(sample_indices, token_indices)].tolist()\n", + " number_of_tokens_for_each_sample = (\n", + " torch.bincount(sample_indices).cpu().tolist()\n", + " )\n", + " tokens = [self.id_to_token[_id] for _id in token_indices.tolist()]\n", + "\n", + " output = []\n", + " end_idxs = list(\n", + " itertools.accumulate([0] + number_of_tokens_for_each_sample)\n", + " )\n", + " for i in range(len(end_idxs) - 1):\n", + " token_strings = tokens[end_idxs[i] : end_idxs[i + 1]]\n", + " weights = non_zero_values[end_idxs[i] : end_idxs[i + 1]]\n", + " output.append(dict(zip(token_strings, weights)))\n", + " return output\n", + "\n", + " def __init__(self):\n", + " super().__init__()\n", + " self.special_token_ids = None\n", + " self.tokenizer = None\n", + " self.all_tokens = None\n", + " self.initialized = False\n", + "\n", + " def initialize(self, context):\n", + " self.manifest = context.manifest\n", + " properties = context.system_properties\n", + "\n", + " # Print initialization parameters\n", + " print(f\"Initializing SparseEncodingModelHandler with model_id: {model_id}\")\n", + " print(\n", + " f\"Configuration parameters - use_l0: {use_l0}, max_bs: {max_bs}, prune_ratio: {prune_ratio}\"\n", + " )\n", + "\n", + " # load model and tokenizer\n", + " self.device = torch.device(\n", + " \"cuda:\" + str(properties.get(\"gpu_id\"))\n", + " if torch.cuda.is_available()\n", + " else \"cpu\"\n", + " )\n", + " print(f\"Using device: {self.device}\")\n", + " self.model = SparseEncodingModelHandler.SparseModel.from_pretrained(model_id)\n", + " self.model.to(self.device)\n", + " self.tokenizer = transformers.AutoTokenizer.from_pretrained(model_id)\n", + "\n", + " self.post_processor = SparseEncodingModelHandler.SparsePostProcessor(\n", + " tokenizer=self.tokenizer\n", + " )\n", + " self.special_token_ids = [\n", + " self.tokenizer.vocab[token]\n", + " for token in self.tokenizer.special_tokens_map.values()\n", + " ]\n", + " self.model.set_special_token_ids(self.special_token_ids)\n", + "\n", + " self.initialized = True\n", + "\n", + " def preprocess(self, requests):\n", + " inputSentence = []\n", + "\n", + " batch_idx = []\n", + " for request in requests:\n", + "\n", + " request_body = request.get(\"body\")\n", + " if isinstance(request_body, bytearray):\n", + " request_body = request_body.decode(\"utf-8\")\n", + " request_body = json.loads((request_body))\n", + "\n", + " if isinstance(request_body, list):\n", + " inputSentence += request_body\n", + " batch_idx.append(len(request_body))\n", + " else:\n", + " inputSentence.append(request_body)\n", + " batch_idx.append(1)\n", + "\n", + " input_data = self.tokenizer(\n", + " inputSentence,\n", + " padding=True,\n", + " truncation=True,\n", + " max_length=self.tokenizer.model_max_length,\n", + " return_tensors=\"pt\",\n", + " return_attention_mask=True,\n", + " return_token_type_ids=False,\n", + " )\n", + "\n", + " tokens = input_data[\"attention_mask\"].sum(dim=-1).numpy().tolist()\n", + " input_data = input_data.to(self.device)\n", + " return {\"input\": input_data, \"batch_l\": batch_idx, \"tokens\": tokens}\n", + "\n", + " def inference(self, data, *args, **kwargs):\n", + " batch_idx = data[\"batch_l\"]\n", + " tokens = data[\"tokens\"]\n", + " data_input = data[\"input\"]\n", + "\n", + " total_samples = len(tokens)\n", + " outputs = []\n", + "\n", + " for start_idx in range(0, total_samples, max_bs):\n", + " end_idx = min(start_idx + max_bs, total_samples)\n", + "\n", + " batch_data = {\n", + " \"input_ids\": data_input[\"input_ids\"][start_idx:end_idx],\n", + " \"attention_mask\": data_input[\"attention_mask\"][start_idx:end_idx],\n", + " }\n", + "\n", + " with torch.cuda.amp.autocast(), torch.no_grad():\n", + " output = self.model(**batch_data)\n", + " outputs.append(output)\n", + "\n", + " output = torch.cat(outputs, dim=0)\n", + " return {\"pred\": output, \"batch_l\": batch_idx, \"tokens\": tokens}\n", + "\n", + " def postprocess(self, prediction):\n", + " batch_idx = prediction[\"batch_l\"]\n", + " output = prediction[\"pred\"]\n", + " tokens = prediction[\"tokens\"]\n", + " output = self.post_processor(output)\n", + " usage = [\n", + " {\"inputTokens\": tokens[i], \"outputTokens\": 0} for i in range(len(output))\n", + " ]\n", + "\n", + " # return the inference results to each request according to batch size\n", + " outputs = []\n", + " index = 0\n", + " for b in batch_idx:\n", + " outputs.append(\n", + " {\n", + " \"response\": output[index : index + b],\n", + " \"tokens\": usage[index : index + b],\n", + " }\n", + " )\n", + " index += b\n", + " return outputs\n", + "\n", + " def handle(self, data, context):\n", + " model_input = self.preprocess(data)\n", + " model_output = self.inference(model_input)\n", + " model_output = self.postprocess(model_output)\n", + " return model_output" + ] + }, + { + "cell_type": "markdown", + "id": "0de74dbf", + "metadata": {}, + "source": [ + "Wrap the handler folder to a tarball. And upload it to your S3 bucket.\n", + "\n", + "In handler/neural_sparse_handler.py, we define the model loading, pre-process, inference and post-process. We use mixed-precision to accelerate the inference.\n", + "\n", + "In handler/neural_sparse_config.yaml, we define some configs for the torch serve (include dynamic micro-batching)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ebf38b1b", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "bucket_name = \"your_bucket_name\"\n", + "os.system(\"tar -czvf neural-sparse-handler.tar.gz -C handler/ .\")\n", + "os.system(\n", + " f\"aws s3 cp neural-sparse-handler.tar.gz s3://{bucket_name}/neural-sparse-handler.tar.gz\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "161796c1", + "metadata": {}, + "source": [ + "## Step 3\n", + "Use SageMaker python SDK to deploy the tarball on a real-time inference endpoint\n", + "\n", + "Here we use ml.g5.xlarge. It's a GPU instance with good price-performance.\n", + "\n", + "Please modify the region base according to your settings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5d16be94", + "metadata": {}, + "outputs": [], + "source": [ + "# constants that can be customized for models\n", + "model_id = \"opensearch-project/opensearch-neural-sparse-encoding-doc-v2-distill\"\n", + "max_batch_size = \"32\"\n", + "prune_ratio = \"0.1\"\n", + "\n", + "# constants related to deployment\n", + "model_name = \"ns-handler\"\n", + "endpoint_name = \"ns-handler\"\n", + "instance_type = \"ml.g5.xlarge\"\n", + "initial_instance_count = 1\n", + "\n", + "# run this cell\n", + "import boto3\n", + "import sagemaker\n", + "from sagemaker.model import Model\n", + "from sagemaker.predictor import Predictor\n", + "from sagemaker.serializers import JSONSerializer\n", + "from sagemaker.deserializers import JSONDeserializer\n", + "\n", + "role = sagemaker.get_execution_role()\n", + "sess = boto3.Session()\n", + "region = sess.region_name\n", + "smsess = sagemaker.Session(boto_session=sess)\n", + "\n", + "envs = {\n", + " \"TS_ASYNC_LOGGING\": \"true\",\n", + " \"MODEL_ID\": model_id,\n", + " \"MAX_BS\": max_batch_size,\n", + " \"PRUNE_RATIO\": prune_ratio,\n", + "}\n", + "\n", + "baseimage = sagemaker.image_uris.retrieve(\n", + " framework=\"pytorch\",\n", + " region=region,\n", + " py_version=\"py312\",\n", + " image_scope=\"inference\",\n", + " version=\"2.6\",\n", + " instance_type=instance_type,\n", + ")\n", + "\n", + "model = Model(\n", + " model_data=f\"s3://{bucket_name}/neural-sparse-handler.tar.gz\",\n", + " image_uri=baseimage,\n", + " role=role,\n", + " predictor_cls=Predictor,\n", + " name=model_name,\n", + " sagemaker_session=smsess,\n", + " env=envs,\n", + ")\n", + "\n", + "endpoint_name = endpoint_name\n", + "predictor = model.deploy(\n", + " instance_type=instance_type,\n", + " initial_instance_count=initial_instance_count,\n", + " endpoint_name=endpoint_name,\n", + " serializer=JSONSerializer(),\n", + " deserializer=JSONDeserializer(),\n", + " ModelDataDownloadTimeoutInSeconds=3600,\n", + " ContainerStartupHealthCheckTimeoutInSeconds=3600,\n", + " VolumeSizeInGB=64,\n", + ")\n", + "\n", + "print(predictor.endpoint_name)" + ] + }, + { + "cell_type": "markdown", + "id": "0863ed26", + "metadata": {}, + "source": [ + "## Step 4\n", + "\n", + "After we create the endpoint, use some sample request to see how it works" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e1a7918c", + "metadata": {}, + "outputs": [], + "source": [ + "# run this cell\n", + "import json\n", + "\n", + "body = [\"Currently New York is rainy.\"]\n", + "amz = boto3.client(\"sagemaker-runtime\")\n", + "\n", + "response = amz.invoke_endpoint(\n", + " EndpointName=predictor.endpoint_name,\n", + " Body=json.dumps(body),\n", + " ContentType=\"application/json\",\n", + ")\n", + "\n", + "res = response[\"Body\"].read()\n", + "results = json.loads(res.decode(\"utf8\"))\n", + "results" + ] + }, + { + "cell_type": "markdown", + "id": "ae30128b", + "metadata": { + "vscode": { + "languageId": "raw" + } + }, + "source": [ + "response:\n", + "```json\n", + "{'response': [{'has': 0.19832642376422882,\n", + " 'new': 0.9849710464477539,\n", + " 'like': 0.20112557709217072,\n", + " 'now': 0.7473171949386597,\n", + " 'state': 0.20818853378295898,\n", + " 'still': 0.26296505331993103,\n", + " 'going': 0.17759032547473907,\n", + " 'york': 1.5465646982192993,\n", + " 'water': 0.5180262327194214,\n", + " 'present': 0.24726435542106628,\n", + " 'today': 0.5316043496131897,\n", + " 'currently': 0.6706798672676086,\n", + " 'current': 0.9104140996932983,\n", + " 'dry': 0.2999960780143738,\n", + " 'rain': 1.3858059644699097,\n", + " 'weather': 1.4669378995895386,\n", + " 'climate': 0.392688512802124,\n", + " 'wet': 1.070887804031372,\n", + " 'happening': 0.3875649571418762,\n", + " 'ny': 1.4108916521072388,\n", + " 'brooklyn': 0.2983669638633728,\n", + " 'yorkshire': 0.15651951730251312,\n", + " 'manhattan': 0.969535231590271,\n", + " 'flood': 0.2403770089149475,\n", + " 'flooding': 0.4161500036716461,\n", + " 'rainfall': 0.9889746904373169,\n", + " 'damp': 0.38938602805137634,\n", + " 'moist': 0.32199856638908386,\n", + " 'mist': 0.2026219218969345,\n", + " 'precipitation': 0.5729197263717651,\n", + " 'drought': 0.41227778792381287,\n", + " 'rains': 0.8187123537063599,\n", + " 'rainy': 1.4709837436676025,\n", + " 'nyc': 1.308121681213379,\n", + " 'yorker': 0.6350979804992676,\n", + " 'monsoon': 0.6218147873878479,\n", + " 'raining': 0.9827804565429688,\n", + " 'cloudy': 0.6314691305160522,\n", + " 'nyu': 0.7196483612060547}],\n", + " 'tokens': [{'inputTokens': 8, 'outputTokens': 0}]}\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "33035889", + "metadata": {}, + "source": [ + "## Step 5\n", + "> **_NOTE:_** **This step is supposed to be executed at an instance have access to OpenSearch cluster!**\n", + "\n", + "Register this SageMaker endpoint at your OpenSearch cluster\n", + "\n", + "Please check the OpenSearch doc for more information. Here we provide one demo request body using access_key and secret_key. Please choose the authentication according to your use case.\n", + "\n", + "### create connector\n", + "\n", + "(Fill the region and predictor.endpoint_name in request body)\n", + "```json\n", + "POST /_plugins/_ml/connectors/_create\n", + "{\n", + " \"name\": \"test\",\n", + " \"description\": \"Test connector for Sagemaker model\",\n", + " \"version\": 1,\n", + " \"protocol\": \"aws_sigv4\",\n", + " \"credential\": {\n", + " \"access_key\": \"your access key\",\n", + " \"secret_key\": \"your secret key\"\n", + " },\n", + " \"parameters\": {\n", + " \"region\": \"{region}\",\n", + " \"service_name\": \"sagemaker\",\n", + " \"input_docs_processed_step_size\": 2,\n", + " },\n", + " \"actions\": [\n", + " {\n", + " \"action_type\": \"predict\",\n", + " \"method\": \"POST\",\n", + " \"headers\": {\n", + " \"content-type\": \"application/json\"\n", + " },\n", + " \"url\": \"https://runtime.sagemaker.{region}.amazonaws.com/endpoints/{predictor.endpoint_name}/invocations\",\n", + " \"request_body\": \"${parameters.input}\"\n", + " }\n", + " ],\n", + " \"client_config\":{\n", + " \"max_retry_times\": -1,\n", + " \"max_connection\": 60,\n", + " \"retry_backoff_millis\": 10\n", + " }\n", + "}\n", + "```\n", + "\n", + "### register model\n", + "```json\n", + "POST /_plugins/_ml/models/_register?deploy=true\n", + "{\n", + " \"name\": \"test\",\n", + " \"function_name\": \"remote\",\n", + " \"version\": \"1.0.0\",\n", + " \"connector_id\": \"{connector id}\",\n", + " \"description\": \"Test connector for Sagemaker model\"\n", + "}\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e1fda443", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "base", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}