#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
author: Ewen Wang
email: wolfgangwong2012@gmail.com
license: Apache License 2.0
"""
import os
import json
import time
from time import gmtime, strftime
import pickle
import boto3
import joblib
import tarfile
import sagemaker
import subprocess
def save_model(model, model_name='model', model_suffix='.joblib'):
"""Saves a machine learning model using Joblib.
Args:
model (object): The trained machine learning model.
model_name (str, optional): The name to save the model under. Defaults to 'model'.
model_suffix (str, optional): The file extension to save the model with. Defaults to '.joblib'.
Returns:
None
"""
model_path = model_name + model_suffix
with open(model_path, 'wb') as f:
joblib.dump(model, f)
return None
def template_inference():
"""Generate a template Python script for inference. This helps SageMaker understand how your input and output for your model serving will be configured.
Args:
None
Yields:
A template AWS SageMaker inference file to the current directory.
"""
template = \
"""#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import joblib
import os
import json
def model_fn(model_dir):
model = joblib.load(os.path.join(model_dir, "model.joblib"))
return model
def input_fn(request_body, request_content_type):
if request_content_type == 'application/json':
request_body = json.loads(request_body)
inpVar = request_body['Input']
return inpVar
else:
raise ValueError("This model only supports application/json input")
def predict_fn(input_data, model):
return model.predict(input_data)
def output_fn(prediction, content_type):
res = prediction.tolist()
respJSON = {'Output': res}
return respJSON
"""
with open('inference.py', 'w') as writer:
writer.write(template)
return None
def upload_model_to_s3(model_name='model', model_suffix='.joblib', bucket='[bucket]'):
"""Uploads a machine learning model to an S3 bucket.
Args:
model_name (str, optional): The name to save the model under. Defaults to 'model'.
model_suffix (str, optional): The file extension to save the model with. Defaults to '.joblib'.
bucket (str): The S3 bucket to upload the model to.
Returns:
model_artifacts (str): The S3 path where the model is uploaded.
"""
boto_session = boto3.session.Session()
s3 = boto_session.resource('s3')
model_path = model_name + model_suffix
# Build tar file with model data + inference code
bashCommand = f"tar -cvpzf {model_name}.tar.gz {model_path} inference.py"
process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE)
output, error = process.communicate()
# Upload tar.gz to bucket
key_model = f"model/{model_name}.tar.gz"
model_artifacts = f"s3://{bucket}/{key_model}"
response = s3.meta.client.upload_file(f'{model_name}.tar.gz', bucket, key_model)
return model_artifacts
[docs]
def deploy_model(model,
model_name='model',
bucket='[bucket]'):
""" Deploy an scikit-learn model to SageMaker Endpoint.
Args:
model: An scikit-learn model.
model_name (str): The model name.
bucket (str): The bucket to store model, which is also the project name in BatCat convention.
Return:
reponse (dict): The model, endpoint configuration, endpoint information.
"""
response = dict()
response["ModelName"] = model_name
print("Model Name: " + response["ModelName"])
## Model Setup
save_model(model=model)
template_inference()
model_artifacts = upload_model_to_s3(bucket=bucket)
response["ModelArtifacts"] = model_artifacts
print("Model Artifacts: " + response["ModelArtifacts"])
## SKLearn Image Setup
client = boto3.client(service_name="sagemaker")
runtime = boto3.client(service_name="sagemaker-runtime")
boto_session = boto3.session.Session()
region = boto_session.region_name
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
# retrieve sklearn image
image_uri = sagemaker.image_uris.retrieve(
framework="sklearn",
region=region,
version="0.23-1",
py_version="py3",
instance_type="ml.m5.xlarge",
)
## Deploy Endpoint
#Step 1: Model Creation
model_name = model_name.upper() + strftime("-%Y%m%d-%H%M%S", gmtime())
response["ModelVersion"] = model_name
print("Model Name (Version): " + response["ModelVersion"])
create_model_response = client.create_model(
ModelName=model_name,
Containers=[
{
"Image": image_uri,
"Mode": "SingleModel",
"ModelDataUrl": model_artifacts,
"Environment": {'SAGEMAKER_SUBMIT_DIRECTORY': model_artifacts,
'SAGEMAKER_PROGRAM': 'inference.py'}
}
],
ExecutionRoleArn=role,
)
response["ModelArn"] = create_model_response["ModelArn"]
print("Model Arn: " + response["ModelArn"])
#Step 2: EPC Creation
epc_name = "EPC-" + model_name
endpoint_config_response = client.create_endpoint_config(
EndpointConfigName=epc_name,
ProductionVariants=[
{
"VariantName": "sklearnvariant",
"ModelName": model_name,
"InstanceType": "ml.c5.large",
"InitialInstanceCount": 1
},
],
)
response["EndpointConfigName"] = epc_name
response["EndpointConfigArn"] = endpoint_config_response["EndpointConfigArn"]
print("Endpoint Configuration Name: " + response["EndpointConfigName"])
print("Endpoint Configuration Arn: " + response["EndpointConfigArn"])
#Step 3: EP Creation
endpoint_name = "EP-" + model_name
create_endpoint_response = client.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=epc_name,
)
response["EndpointName"] = endpoint_name
response["EndpointArn"] = create_endpoint_response["EndpointArn"]
print("Endpoint Name: " + response["EndpointName"])
print("Endpoint Arn: " + response["EndpointArn"])
#Monitor creation
describe_endpoint_response = client.describe_endpoint(EndpointName=endpoint_name)
while describe_endpoint_response["EndpointStatus"] == "Creating":
describe_endpoint_response = client.describe_endpoint(EndpointName=endpoint_name)
print(describe_endpoint_response["EndpointStatus"])
time.sleep(15)
print(describe_endpoint_response)
return response
[docs]
def invoke(endpoint_name, input_data):
"""Invokes a SageMaker endpoint with input data.
Args:
endpoint_name (str): The name of the SageMaker endpoint.
input_data (list): The input data to send to the endpoint.
Returns:
result (list): The response from the SageMaker endpoint.
"""
runtime_client = boto3.client('sagemaker-runtime')
content_type = "application/json"
request_body = {"Input": input_data}
data = json.loads(json.dumps(request_body))
payload = json.dumps(data)
response = runtime_client.invoke_endpoint(
EndpointName=endpoint_name,
ContentType=content_type,
Body=payload)
result = json.loads(response['Body'].read().decode())['Output']
return result
if __name__ == '__main__':
main()