나는야 데이터사이언티스트/PYTHON

[Python] sklearn.pipeline, 파이프라인(Pipeline)이란 ?

우주먼지의하루 2020. 2. 23. 14:42
728x90

대부분의 기계 학습 데이터는 최종 모델을 만드는데 있어서 이상적인 형식이 아니다. 범주형 변수를 조작하거나 스케일링 및 정규화와 같은 많은 데이터 변환이 수행되어야 한다. Scikit-Learn은 전처리 기능에 일반적으로 사용되는 대부분의 기능을 내장하고 있다. 그러나 일반적인 기계 학습 워크플로우에서는 이러한 모든 변환을 두 번 이상 적용해야 한다. 모델을 교육할 때 한 번 그리고 예측하고자 하는 모든 새로운 데이터에 대해 다시 한 번하기 때문이다. 물론 재사용하는 기능을 쓸 수 있지만 먼저 실행하고 나서 모델을 따로 불러야 할 것이다. Scikit-Learn pipeline은 이 과정을 단순화하는 도구로써 다음과 같은 몇 가지 주요 이점이 있다.

 

파이프라인을 사용하면 데이터 사전 처리 및 분류의 모든 단계를 포함하는 단일 개체를 만들 수 있다.

 

  • train과 test 데이터 손실을 피할 수 있다.
  • 교차 검증 및 기타 모델 선택 유형을 쉽게 만든다.
  • 재현성 증가

Simplest possible Pipeline

- classifier와 전처리 단계(데이터 표준화)

 

from sklearn.datasets import make_regression,make_classification

from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

X, y = make_classification(n_samples=100,n_features=10,n_informative=2)

X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.33, random_state=42)

X_train.shape, X_test.shape, y_train.shape, y_test.shape

# it takes a list of tuples as parameter
pipeline = Pipeline([
    ('scaler',StandardScaler()),
    ('clf', LogisticRegression())
	])

# use the pipeline object as you would
# a regular classifier
pipeline.fit(X_train,y_train)


y_preds = pipeline.predict(X_test)

accuracy_score(y_test,y_preds)

Pipeline for Text Classification/NLP , Pipeline with Cross-Validation

교차 검증을 수행하는 것은 모델 단계를 파이프라인을 사용해야 하는 주요 이유 중 하나이다.

 

from sklearn.feature_extraction.text import CountVectorizer,TfidfTransformer
from sklearn.metrics import f1_score
from sklearn.model_selection import cross_val_score
from sklearn.svm import LinearSVC
from sklearn.pipeline import Pipeline
from sklearn.datasets import fetch_20newsgroups

cats = ['alt.atheism', 'sci.space']
newsgroups_train = fetch_20newsgroups(subset='train', categories=cats)
newsgroups_test = fetch_20newsgroups(subset='test', categories=cats)

X_train = newsgroups_train.data
X_test = newsgroups_test.data
y_train = newsgroups_train.target
y_test = newsgroups_test.target

# this calculates a vector of term frequencies for 
# each document
vect = CountVectorizer()

# this normalizes each term frequency by the 
# number of documents having that term
tfidf = TfidfTransformer()

# this is a linear SVM classifier
clf = LinearSVC()

pipeline = Pipeline([
    ('vect',vect),
    ('tfidf',tfidf),
    ('clf',clf)
])

scores = cross_val_score(pipeline,X_train,y_train,cv=3,scoring='f1_micro')

scores

scores.mean()

# now train and predict test instances
pipeline.fit(X_train,y_train)
y_preds = pipeline.predict(X_test)

# calculate f1
f1_score(y_test, y_preds, average='micro')
    
  

 

Pipeline with Cross-Validation (GridSearchCV)

 

교차 검증하고 동시에 최상의 파라미터 구성을 선택하려면 GridSearchCV를 사용한다. 이를 통해 모델을 랜덤 분할하여 일반화 상태가 양호한지 또는 오버핏인지를 알아내는 KFold 전략을 사용하여 다양한 하이퍼 파라미터 구성을 쉽게 테스트할 수 있다. 또한 GridSearchCV를 사용하면 반복할 하이퍼 파라미터 구성 값을 가진 ParameterGrid를 정의하고 모든 조합을 테스트하고 점수를 매긴다.

 

from sklearn.feature_extraction.text import CountVectorizer,TfidfTransformer
from sklearn.metrics import f1_score
from sklearn.model_selection import GridSearchCV
from sklearn.svm import LinearSVC
from sklearn.pipeline import Pipeline
from sklearn.datasets import fetch_20newsgroups

cats = ['alt.atheism', 'sci.space']
newsgroups_train = fetch_20newsgroups(subset='train', categories=cats)
newsgroups_test = fetch_20newsgroups(subset='test', categories=cats)

X_train = newsgroups_train.data
X_test = newsgroups_test.data
y_train = newsgroups_train.target
y_test = newsgroups_test.target

pipeline = Pipeline([
    ('vect',CountVectorizer()),
    ('tfidf',TfidfTransformer()),
    ('clf',LinearSVC())
])

# this is where you define the values for
# GridSearchCV to iterate over

# l1 penalty is incompatible with other configs
param_grid = [
    {
        'vect__max_df':[0.8,0.9,1.0],
        'clf__penalty':['l2'],
        'clf__dual':[True,False]
    },
    {
        'vect__max_df':[0.8,0.9,1.0],
        'clf__penalty':['l1'],
        'clf__dual': [False]
    }
]

# do 3-fold cross validation for each of the 6 possible
# combinations of the parameter values above
grid = GridSearchCV(pipeline, cv=3, param_grid=param_grid,scoring='f1_micro')
grid.fit(X_train,y_train)

# summarize results
print("Best: %f using %s" % (grid.best_score_, 
    grid.best_params_))
means = grid.cv_results_['mean_test_score']
stds = grid.cv_results_['std_test_score']
params = grid.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))
    
# now train and predict test instances
# using the best configs
pipeline.set_params(clf__penalty='l2',vect__max_df=0.9,clf__dual=True)
pipeline.fit(X_train,y_train)
y_preds = pipeline.predict(X_test)

# calculate f1
f1_score(y_test, y_preds, average='micro')

 

Custom Pipeline step

Trasnaper를 통해 파이프라인에서 수행할 수 있는 모든 작업(변수, 데이터 사전 처리 등)

 

# -*- coding: utf-8 -*-

import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin

class ToDenseTransformer(BaseEstimator,TransformerMixin):
    # here you define the operation it should perform
    def transform(self, X, y=None, **fit_params):
        return X.todense()

    # just return self
    def fit(self, X, y=None, **fit_params):
        return self

class SelectColumnsTransfomer(BaseEstimator, TransformerMixin):
    """ Select dataframe columns
    """

    def __init__(self, columns=None, ravel=None):

        if columns is None:
            self.columns = []
        elif type(columns) is not list:
            self.columns = [columns]
        else:
            self.columns = columns

        if ravel is None:
            self.ravel = False
        else:
            self.ravel = ravel

    def transform(self, X, **transform_params):

        cpy_df = X[self.columns].copy()

        if self.ravel:
            return cpy_df.values.ravel()
        else:
            return cpy_df

    def fit(self, X, y=None, **fit_params):
        return self


class DataframeFunctionTransformer(BaseEstimator, TransformerMixin):
    """
    Apply an arbitrary function to a Dataframe column, as you would use a `map` funcion
    """

    def __init__(self, column_name, func, none_treatment=None):
        """
        :param column_name: the name of the dataframe column to which the function will be applied
        :param func: the function object, e.g. lambda
        :param none_treatment: what to do with NaN, Nones, etc. Default behaviour is to perform no
            special treatment, i.e. the function itself should treat nulls. Other options: 'return_none',
            returns the input itself in case it's null-lie (as per pd.isnull)
        """
        self.column_name = column_name
        self.func = func
        self.none_treatment = none_treatment

    def transform(self, in_df, **transform_params):
        cpy_df = in_df.copy()

        if self.column_name not in cpy_df.columns.values:
            raise ValueError('Provided column name is not part of the dataframe: "{}" '.format(self.column_name))

        if self.none_treatment is None:
            cpy_df[self.column_name] = cpy_df[self.column_name].map(self.func)
        elif self.none_treatment.upper() == "RETURN_NONE":
            cpy_df[self.column_name] = cpy_df[self.column_name].map(lambda x: x if pd.isnull(x) else self.func(x))
        else:
            raise ValueError(
                'Provided none treatment is invalid. Expected one of {}, got: '.format((None, 'return_none'),
                                                                                       self.none_treatment))

        return cpy_df

    def fit(self, X, y=None, **fit_params):
        return self

 

 

출처

https://medium.com/vickdata/a-simple-guide-to-scikit-learn-pipelines-4ac0d974bdcf

http://queirozf.com/entries/scikit-learn-pipeline-examples

 

 

반응형