데이터실무

DA, SQL, DB보안 등 실무자를 위한 위한 DB기술 바이블!

Spark MLlib

데이터 분석
기계학습
Spark MLlib
작성자
admin
작성일
2021-02-15 14:21
조회
1674

MLlib소개

MLlib는 Spark SQL과 스파크 스트리밍(Spark Streaming)과 같이 스파크 내부의 서브 프로젝트다. 이름에서 알 수 있듯이 머신러닝을 위해 만들어졌다. 사실 머신러닝 프로그램은 매우 많고, 이미 상용 시장에서도 널리 활용되고 있다. 오픈소스만 나열하자면, 빅데이터용은 아니지만 가장 일반적인 분석 오픈소스인 R, 파이썬의 scikit-learn, 빅데이터용이고 하둡과 연계로 유명해진 머하웃, 최근 주목을 받고 있는 H20 등이 있다.
앞서 언급했듯이 여러 가지 분석툴이 있고, 특히 대용량 데이터 분석에서는 머하웃이 각광받고 있었다. 그런데 굳이 다시 스파크 서브 프로젝트로서 MLlib를 만든 이유는 무엇이고, 그 장점은 어떤것이 있을까? 대용량 분석이 가능한 머하웃과 비교해 보며 MLlib의 특징에 대해 알아 보자.


다른 프로그램과 연동

계속 이야기해 왔던 다른 채널 혹은 분석 방법과의 연계다. 스파크(Spark)는 동일한 환경에서 여러가지 분석 방법을 지원한다(same pipeline). 다른 프로그램과 마찬가지로 스파크 엔진 위에서 작동하며 스트리밍이나 SQL과 연동할 수 있다.


인메모리

스파크는 데이터를 RDDs 형태로 메모리에 올릴 수 있다고 했었다. 다른 데이터를 메모리에 올려서 처리하면 디스크를 사용하는 맵리듀스나 머하웃보다 (스파크 진영에서 주장하길) 10배에서 100배까지 빠른 결과를 얻어낼 수 있다. 머신러닝 부분에서는 스트리밍이나 SQL On Hadoop보다 속도 논란이 적다. 그만큼 압도적인 성능을 보여주고 있다.
아래는 비록 스파크 진영에서 테스트한 결과이지만, 확실하게 Spark MLlib가 머하웃보다는 높은 성능을 지원함을 보여주고 있다.

[그림 Ⅳ-1-24] 머하웃과 MLlib 속도비교

[그림 Ⅳ-1-24] 머하웃과 MLlib 속도비교


지원 언어

기본적으로 머하웃은 자바만 지원한다. 그런데 MLlib는 스파크와 같이 스칼라, 자바, 파이썬 모두를 지원한다. 물론 두 프로젝트 모두 다른 언어로 포팅하거나 서드파티로 만드는 작업들은 진행되고 있다. 그럼에도 일반적인 범위 안에서는 머하웃은 자바만 지원하고, 스파크는 자바는 물론 스칼라와 파이썬까지 지원한다. 단 MLlib도 완벽하게 모든 부분에 있어서 세 가지 언어를 지원하는 것은 아니다. 메인은 스칼라이고 matrix 같은 부분에 대해 파이썬은 지원하지 않는 등 제한사항이 있다.


지원 알고리즘

지원 알고리즘은 아직까지 머하웃이 훨씬 많다. MLlib는 1.0 버전 현재 K-Means, regression tree, SVM, nanve Bayes 등 15개 가량의 알고리즘을 지원한다. 그러나 MLlib는 아직 시작한 지 얼마 되지 않았지만, 머하웃에 비해 기능 추가 속도가 빠르다. 실제로 1.1 혹은 1.2 버전이 넘어가기 전까지 지원 알고리즘을 두 배 정도로 늘릴 계획이라고 한다.


지원 환경

스파크는 어느 하나에 귀속된다기보다는 범용성의 목적으로 만든 빅데이터 플랫폼이다. 그래서 Yarn, Mesos, Amazon 등의 Management를 이용할 수 있다. 데이터 또한 HDFS나 S3, Hbase 등 다양한 소스로부터 받을 수 있다. MLlib도 스파크 기반으로 작동하기 때문의 위에서 언급한 Management들을 사용할 수 있을 뿐 아니라, 스트리밍으로부터 받은 데이터로도 분석이 가능하다.


Graph Processing

Graph는 Vertex와 Edge로 구성됐으며, 다양한 방법으로 각각의 Vertex에 연결할 수 있도록 구성돼 있다. 또한 각각의 연결에는 가중치와 방향을 부여할 수 있어서 도로망, 소셜 네트워크, 데이터 구조, 생물 유전학, 문서구조 등과 같이 행렬로만 표현하기에는 서로 관계도 너무 복잡하고 iteration(반복처리)이 많은 분야에 많이 사용된다.

[그림 Ⅳ-1-25] Graph 알고리즘

[그림 Ⅳ-1-25] Graph 알고리즘

Graph 알고리즘을 이용할 수 있는 오픈소스도 있다. 대표적으로 구글에서 Pregel을 오픈소스화해서 내놓은 Apache Giraph가 있다. 페이스북이 지난 13년에 발표한 내용(http://goo.gl/PurGt1)을 보면, 하이브에서 15시간이 걸리던 작업을 Giraph로 하면 9분으로 줄어들었다고 한다. Giraph 덕분에 CPU 활용을 20배 활용할 수 있게 됐고, 전체적으로 100배 이상의 성능을 얻을 수 있었다.
아직 알파 버전이지만 스파크에도 GraphX라는 Graph 프로세싱 서브 프로젝트가 있다. 참고로 원래 Pregel을 본 따서 만들었던 Bagel이라는 프로젝트도 있었지만, GraphX로 통합되고 있다.


Next Mahout

머하웃이 현재는 맵리듀스를 기반으로 하지만, 다음 중요 로드맵 중에 하나로 스파크 엔진을 기반으로 머하웃의 알고리즘을 수행하는 서브 프로젝트를 진행중이다. 정확히 이야기하자면, 서브 프로젝트라기보다는 codebase 자체를 바꾼다고 한다. 그래서 당분간은 머하웃의 코드 업데이트를 기대하기는 어려울 것이다. 다만 제대로 이전된다면 풍부한 머하웃 사용자가 굳이 Spark MLlib를 사용하지 않아도 되므로 앞으로 진행 상황을 지켜봐야 할 것이다.


실습

각 알고리즘에 대한 자세한 내용은 머하웃에서 언급했으므로 이번 실습에서는 각 알고리즘이 스파크에서 얼마나 간단하게 구현되는지 위주로 설명하겠다.


Example
KMeans

import org.apache.speark.mllib.clustering.KMeans import org.apache.spark.millib.linalg.Vectors //데이터를 읽고 파싱한다. val data = sc.textFile("data/mllib/kmeans_data.txt") val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))) //KMeans를 이용하여 데이터를 2개의 클래스로 클러스터링한다. val numClusters = 2 val numInterations = 20 val clusters = KMeans.tran(parsedData, numClusters, numInterations) //Evaluate clustering by computing Within Set Sum of Squared Errors val WSSSE = clusters.computeCost(parasedData) println("Within Set Sum of Squared Errors = " + WSSSE)


PCA

import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.distributed.RowMatrix val mat : RowMatrix = ... // 상위 10개의 주성분을 계산하고 로컬에 저장한다 val pc : Matrix = mat.computerPrincipalComponents(10) matrix. // 상위 10개의 주성을 통해서 row들을 나눈다 val projected : RowMatrix = mat.multiply(pc)


Decision Tree

import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.util.MLUtils // 파일을 읽고 파싱하고 캐슁을 한다 val dataq = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").cache() //DecisionTree 모델을 학습한다 val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val impurity = "gini" val maxDepth = 5 val maxBins = 100 val model = DecisionTree.trainClassfier(data, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins) //모델을 평가하고 에러를 계산한다. val labelAndPreds = data.map { point => val prediction = model.predict(point.features)(point.label, prediction)} val trainErr = labelAndPreds.filter(r=> r._1 !=r._2).count.toDouble / data.count println("Training Error = " + trainErr) println("Learned classification tree model:\n" + model)


NavieBayes

import org.apache.spark.mllib.classification.NaiveBayes import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabledPoint val data = sc.textFile("data/mllib/sample_naive_bayes_data.txt") val paresdData = data.map{ line=> val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))} //데이터를 트레이닝용 60프로와 테스트용 40프로로 나눈다. val splits = parsedData.randomSplit(Arrary(0.6, 0.4), seed = 11L) val traning = splits(0) val test = splits(1) val model = NaiveBayes.train(training, lambda = 1.0) val predictionAndLabel = test.map(p=> (model.predict(p.features), p.label)) val accuracy = 1.0 * predictionAndLabel.filter(x=> x._1 = x._2).count() / test.count()


연계

이번에는 스파크의 서브 프로젝트와 어떻게 연계되는지를 알아 보겠다.


Spark SQL + MLlib

// 하이브처럼 외부의 데이터도 쉽게 가져올 수 있다. val trainingTable = sql(""" SELECT e.action, u.age, u.latitude, u.longitude FROM Users u JOIN Events e ON u.userID = e.userID""") //SQL을 실행하면 RDD형태로 반환되고 MLlib에서 쉽게 사용이 가능하다. val training = trainingTable.map {row => val features = Vectors.dense(row(1), row(2), row(3)) LabeledPoint(row(0), features)} val model = SVMWithSGD.train(training)


Spark Streaming + MLlib

//스트리밍으로 들어오는 데이터를 바로 MLlib에서 처리가 가능하다. val model : KMmeansModel = ... val tweets = twitterUtils.createStream(ssc, Some(authorizations(0))) val statuses = tweets.map(_.getText) val filteredTweets = statuses.filter(t=> model.predict(featurize(t)) ==clusterNumber) filterTweets.print()


Graphx + MLlib

// graph를 반영한다. val graph = Graph(pages, links) val pageRank: RDD[(Long, Double)] = graph.staticPageRank(10).vertices //페이지 라벨(스팸여부)과 피쳐를 읽어온다 val labelAndFeatures: RDD[(Long, (Double, Seq((Int, Double))))] = ... val training: RDD[LabeledPoint] = labelAndFeatures.join(page.Rank).map{case (id, ((label, features), pageRank)) => LabeledPoint(label, Vectors.sparse(features ++ (1000, pageRank)))} // 로지스틱 회기분석을 이용하여 스팸 디렉터를 학습시킨다. val model = LogisticRegressionWithSGD.train(traning)