본문 바로가기

MLOps

A/B Testing with Seldon Core

셀던 코어 공식 문서 : https://docs.seldon.io/projects/seldon-core/en/latest/index.html

 

이번 포스트에서는 셀던 코어라는 머신러닝(ML) 모델 서빙 프레임워크를 소개하고자 한다. 셀던 코어는 ML 파이프라인의 각 컴포넌트를 gRPC/REST 마이크로서비스로 변환해준다. 요청이 들어왔을 때 각 컴포넌트가 순서대로 실행되도록 해주며 컴포넌트 간의 통신은 개발자가 별도로 손볼 필요가 없다. 그리고 Kubeflow를 이용하여 파이프라인을 Kubernetes에 손쉽게 배포할 수 있다.

 

셀던 코어로 ML 파이프라인을 Kubernetes에 배포하는 과정은 다음과 같이 5 단계로 나누어진다.

  1. 학습된 모델과 전처리기를 원격 저장소(ex. metakage, s3, Google Cloud, PV, etc.)에 저장한다.
  2. 각 컴포넌트의 로직을 구현하고 셀던 코어로 wrapping한다.
  3. 각 컴포넌트의 도커 이미지를 빌드한다.
  4. yaml 파일에 배포 스펙을 명시하고 파이프라인 DAG(Directed Acyclic Graph)를 정의한다. 
  5. Kubeflow를 이용하여 파이프라인을 Kubernetes에 배포한다.

셀던 코어로 A/B testing 파이프라인을 배포하는 방법을 한 번 알아보자. 참고로 seldon-core 버전 1.6.0 기준으로 작성하였다.

 

A/B Testing Pipeline 설계

A/B Testing 파이프라인

위 파이프라인이 아마 A/B Testing의 가장 일반적인 파이프라인일 것이다. 물론 모델 개수가 3개 이상인 case로 확장 가능하다. 각각의 네모(전처리, 라우터, Inference)가 파이프라인에서 하나의 컴포넌트(component)가 된다.

 

전처리 컴포넌트는 raw feature들로 이루어진 input array X를 받아 전처리(preprocess)한다. 이 전처리 과정에는 categorical feature들의 encoding(ex. label encoding, one-hot encoding, etc.)과 continuous feature들의 scaling 혹은 normalization, null값 처리 등이 포함된다. 전처리 컴포넌트는 전처리된 피처들로 이루어진 array를 반환한다. 그 다음 라우터는 어느 모델에게 inference를 요청할지 결정한다. 마지막으로 선택된 모델을 담당하는 inference 컴포넌트가 전처리된 array를 전달받아 모델의 예측값으로 응답한다.  

 

각 컴포넌트 구현 및 도커 이미지 빌드하기

전처리 컴포넌트

InputTransformer.py에 전처리 로직을 구현해보자. 셀던 코어에서 전처리 및 후처리를 담당하는 컴포넌트의 컴포넌트 타입은 TRANSFORMER이다.

 

from seldon_core.user_model import SeldonComponent
import numpy as np

class InputTransformer(SeldonComponent):
    def __init__(self, container_name: str, run_name: str):
    	super(InputTransformer, self).__init__()
    	self.one_hot_encoder = {학습된 OneHotEncoder 로드}
    	self.min_max_scaler = {학습된 MinMaxScaler 로드}
        
    def transform_input(self, X, features_names=None):
    	x_1 = self.one_hot_encoder.transform(X[:, 0:1])
    	x_2 = self.min_max_scaler.transform(X[:, 1:])
        return np.concatenate([x_1, x_2], axis=-1)

 

주의점

  1. 파이썬 파일명과 클래스 이름을 같게 해야한다. 이 점만 지키면 원하는 이름 아무거나 사용해도 된다. 다른 컴포넌트에서도 마찬가지로 적용되는 사항이다.
  2. 전처리 로직을 구현한 함수명은 꼭 transform_input이어야 한다. 셀던 코어가 이 이름의 함수를 TRANSFORMER의 기능을 구현한 함수로 인식하기 때문이다. 참고로 후처리를 구현할 경우에는 함수명을 transform_output이라고 해야한다.
  3. transform_input의 파라미터와 반환 형식을 지켜주어야 한다. 전처리할 데이터를 받는 파라미터가 X이며 numpy array 타입이다. 반환값 역시 numpy array 타입이다. 또한 파라미터에 features_names=None을 추가해주어야 에러가 안 난다.  

미리 학습되어 원격 저장소에 저장된 인코더나 scaler 등의 전처리기를 constructor에서 로드한다. 예시 코드에서 constructor의 파라미터 중 container_name에는 전처리기가 담긴 metakage 폴더 이름, run_name에는 전처리기의 학습 버전에 대한 정보가 들어오도록 설계하여 셀던 코어가 원격 저장소에서 알맞는 전처리기를 찾을 수 있도록 하였다.

 

이제 도커파일을 작성해보자.

 

FROM python:3.7
RUN mkdir /home/app && chmod -R 777 /home/app

COPY ./InputTransformer.py /home/app/InputTransformer.py
COPY ./requirements.txt /home/app/requirements.txt
WORKDIR /home/app

EXPOSE 9000

ENV GUNICORN_WORKERS {구니콘 워커 수}
ENV MODEL_NAME InputTransformer
ENV SERVICE_TYPE TRANSFORMER

RUN pip install -r requirements.txt

CMD exec seldon-core-microservice $MODEL_NAME --service-type $SERVICE_TYPE --workers $GUNICORN_WORKERS

 

GUNICORN_WORKERS에 희망하는 구니콘 워커 수를 적고 MODEL_NAME은 클래스 이름으로, SERVICE_TYPE은 TRANSFORMER로 지정한다. 이 글을 쓸 당시 파이썬 3.8 이상은 셀던 코어와 호환이 안 됐기 때문에 파이썬 3.7을 사용했다. 이제 도커 이미지를 빌드하고 도커 레포지토리에 푸쉬하면 된다. 

 

라우터 컴포넌트

이번에는 Router.py에 라우팅 로직을 구현해보자. 라우터는 A/B Test에서 트래픽을 나누는 역할을 한다. 매 요청마다 child 컴포넌트 중 하나를 선택하여 요청을 보낸다. 셀던 코어에서 라우팅을 담당하는 컴포넌트의 컴포넌트 타입은 ROUTER이다.

 

from seldon_core.user_model import SeldonComponent
import random

class Router(SeldonComponent):
    def __init__(self, n_branches):
    	super(Router, self).__init__()
        self.n_branches = int(n_branches)
        
    def route(self, features, features_names=None):
    	return random.randrange(self.n_branches)

 

라우팅 로직은 꼭 route라는 함수에 작성해야한다. Constructor가 받는 n_branches라는 파라미터는 라우터의 child 컴포넌트의 개수로 선택지에 있는 모델 개수이다. 우리는 2개의 모델로 테스트하고자 하기 때문에 나중에 2라는 값을 넣어줄 것이다.

 

예시 코드에서 route 함수는 동일한 확률로 0과 1 중 하나를 선택하여 반환하도록 되었다. 0을 반환했을 때는 첫 번째 모델이, 1을 반환했을 때는 두 번째 모델이 inference를 하게 되는 것이다. 확장해서 n-1을 반환했을 때는 n번째 모델에게 inference를 요청하게 된다. 물론 각 모델이 선택될 확률이 다르도록 설계할 수 있고 랜덤 선택 외의 다른 로직을 구현할 수도 있다.

 

마찬가지로 도커파일을 아래와 같이 작성하고 빌드, 푸쉬하면 된다.

 

FROM python:3.7

RUN mkdir /home/app && chmod -R 777 /home/app
COPY ./Router.py /home/app/Router.py
WORKDIR /home/app

EXPOSE 9000

ENV GUNICORN_WORKERS {구니콘 워커 수}
ENV MODEL_NAME Router
ENV SERVICE_TYPE ROUTER

RUN pip install seldon_core==1.6.0

CMD exec seldon-core-microservice $MODEL_NAME --service-type $SERVICE_TYPE --workers $GUNICORN_WORKERS

 

Inference 컴포넌트

ModelInfer.py에 inference 로직을 구현해보자. 셀던 코어에서 model inference를 담당하는 컴포넌트의 컴포넌트 타입은 MODEL이다. 

 

from tensorflow.keras.models import load_model
from seldon_core.user_model import SeldonComponent

class ModelInfer(SeldonComponent):
    def __init__(self, container_name: str, run_name: str):
    	super(ModelInfer, self).__init__()
        
        self.container_name = container_name
        self.run_name = run_name
        {원격 저장소에서 모델을 받아 './models'에 저장}
        
    def load(self):
    	self.model = load_model('./models/{모델 이름}', compile=False)
        
    def predict(self, X, features_names=None):
        if self.model is None:
        	self.load()
        return self.model(X).numpy()

 

모델 inference는 predict라는 함수 안에 구현해야한다. 전처리 때와 마찬가지로 모델 입력 X와 함수 반환값 모두 numpy array이다.

 

Inference 컴포넌트 도커 이미지는 하나만 만들고 여러 모델이 공유하도록 하는 것이 효율적이다. 그러기 위해서는 위의 예시 코드에서처럼 어떤 모델이든 사용할 수 있게끔 만드는 것이 좋다. 예시 코드에서 constructor의 파라미터 중 container_name에 학습된 모델이 담긴 metakage 폴더 이름(모델 종류에 따라 다르다), run_name에는 모델 학습 버전과 관련된 정보가 들어오도록 하였다. 다만 모델마다 입력 피처가 다른 경우 문제가 생길 것으로 보인다. 그래서 각 모델을 학습시킬 때 모든 모델에게 필요한 피처들을 받아들이되 각자에게 불필요한 피처는 제외하도록 설계하고 save해야하는 번거로움이 있다. 

 

주의점

  1. 구니콘을 사용하여 텐서플로우 모델을 서빙할 경우 constructor에서 모델을 로드하면 안 된다. 구니콘의 master process만 constructor를 실행하기 때문이다. 텐서플로우 특성상 inference가 제대로 되려면 worker process들도 self.model = load_model(...) 을 실행해야한다. 그러므로 예시 코드에서처럼 constructor 밖에서 모델이 로드되도록 하자.

 

도커파일을 작성하고 빌드, 푸쉬하자.

 

FROM python:3.7

RUN mkdir /home/app && chmod -R 777 /home/app
COPY ./ModelInfer.py /home/app/ModelInfer.py
WORKDIR /home/app

EXPOSE 9000

ENV GUNICORN_WORKERS {구니콘 워커 수}
ENV MODEL_NAME ModelInfer
ENV SERVICE_TYPE MODEL

RUN pip install tensorflow==2.3.1 seldon_core==1.6.0

CMD exec seldon-core-microservice $MODEL_NAME --service-type $SERVICE_TYPE --workers $GUNICORN_WORKERS

 

YAML 파일 작성

각 컴포넌트의 스펙을 정하고 DAG를 정의하기 위해서 yaml 파일을 작성해야한다. seldon_deployment.yaml (파일명은 상관없다)에 한번 작성해보자.

 

apiVersion: machinelearning.seldon.io/v1alpha2
kind: SeldonDeployment
metadata:
  labels:
    app: seldon
  name: ab-test
spec:
  name: ab-test-deployment
  predictors:
  - componentSpecs:
    - spec:
        containers:
        - image: {전처리 컴포넌트 이미지 URL}
          imagePullPolicy: Always
          name: {전처리 컨테이너 이름}
          resources:
            requests:
              cpu: "2"
              memory: 1Gi
            limits:
              cpu: "4"
              memory: 2Gi
        - image: {라우터 컴포넌트 이미지 URL}
          imagePullPolicy: Always
          name: {라우터 컨테이너 이름}
          resources:
            requests:
              cpu: "2" 
              memory: 1Gi
            limits:
              cpu: "4"
              memory: 2Gi
        - image: {Inference 컴포넌트 이미지 URL}
          imagePullPolicy: Always
          name: {모델 A inference 컨테이너 이름}
          resources:
            requests:
              cpu: "1" 
              memory: 1Gi
            limits:
              cpu: "2"
              memory: 2Gi         
        - image: {Inference 컴포넌트 이미지 URL}
          imagePullPolicy: Always
          name: {모델 B inference 컨테이너 이름}
          resources:
            requests:
              cpu: "1" 
              memory: 1Gi
            limits:
              cpu: "2"
              memory: 2Gi   
    graph:
      name: {전처리 컨테이너 이름}
      endpoint:
        type: REST
      type: TRANSFORMER
      parameters:
        - name: container_name
          type: STRING
          value: 
        - name: run_name
          type: STRING
          value:
      children:
        - name: {라우터 컨테이너 이름}
          endpoint:
            type: REST
          type: ROUTER
          parameters:
            - name: n_branches
              type: INT
              value: '2'
          children:
            - name: {모델 A inference 컨테이너 이름}
              endpoint:
                type: REST
              type: MODEL
              parameters:
                - name: container_name
                  type: STRING
                  value:
                - name: run_name
                  type: STRING
                  value:
              children: []
            - name: {모델 B inference 컨테이너 이름}
              endpoint:
                type: REST
              type: MODEL
              parameters:
                - name: container_name
                  type: STRING
                  value:
                - name: run_name
                  type: STRING
                  value:
              children: []            
    svcOrchSpec:
      env:
        - name: SELDON_ENABLE_ROUTING_INJECTION
          value: 'true'
    name: ab-test-deployment
    replicas: {Pod 개수}

 

spec > predictors > componentSpecs > spec > containers 아래에 각 컨테이너의 이미지 URL과 자원할당량을 명시하고 컨테이너의 이름을 지정해준다. 한 컴포넌트마다 하나의 컨테이너를 만든다. 모델 A와 모델 B 컨테이너들이 동일한 이미지를 사용하는 것을 확인할 수 있다. 

 

graph 밑에 DAG를 정의한다. 각 컴포넌트마다 children 아래에 child node가 되는 컴포넌트를 추가한다. 라우터는 병렬적으로 두 개의 child node를 갖고 있다.

 

새로운 모델을 추가하고 싶으면 spec > predictors > componentSpecs > spec > containers에 컨테이너를 추가하고 ('image: {Inference 컴포넌트 이미지 URL}') graph에서 라우터 컨테이너의 children에 새로 추가해주면 된다.

 

각 컴포넌트의 parameters에는 constructor의 파라미터에 들어갈 값들을 적어준다. 

 

Kubeflow로 Kubernetes에 배포

마지막으로 아래와 같은 코드로 pipeline.yaml을 생성해서 Kubeflow로 쿠버네티스에 파이프라인을 배포하면 된다.

 

import kfp.dsl as dsl
import kfp.compiler as compiler
import yaml

@dsl.pipeline(name='Inference Pipeline')
def inference_ppl():
	sdep_yaml = yaml.load(
    	open('seldon_deployment.yaml'),
        Loader=yaml.FullLoader
    )
    
    return dsl.ResourceOp(
    	name='seldon_deployment',
        k8s_resource=sdep_yaml,
        action='create',
        attribute_outputs={'name': '{.metadata.name}'}
    )
    
filename = 'pipeline.yaml'
compiler.Compiler().compile(inference_ppl, filename)

 

요청 보내기

배포를 성공했으면 요청을 한번 보내보자.

 

import requests, json

session = requests.Session()

url = 'http://{ingress URL}/seldon/{kubeflow namespace}/{seldon deployment name}/api/v1.0/predictions'
headers = {'Content-Type': 'application/json'}
data = {'data': {'ndarray': [['dog', 0, 4, 0.2, 0.79, ...], ...]}}

res = session.post(url, data=json.dumps(data), headers=headers)
print(res.text)

 

참고로 배포명(seldon deployment name)은 k9s에서 :sdep를 입력하거나 터미널에 kubectl get sdep -n {kubeflow namespace}를 입력하면 찾을 수 있다.

 

 

 

'MLOps' 카테고리의 다른 글

AWS Serverless 1편  (0) 2023.01.02
Amazon SageMaker  (0) 2022.04.25
Multi-Armed Bandit with Seldon Core  (0) 2022.02.20
Ensemble with Seldon Core  (0) 2022.02.19
셀던 코어와 텐서플로우 서빙  (0) 2022.02.07