데이터실무

DA, SQL, DB보안 등 실무자를 위한 위한 DB기술 바이블!

스파크 프로그래밍과 스파크 스트리밍

데이터 처리
스트리밍 데이터 처리
스파크 프로그래밍과 스파크 스트리밍
작성자
admin
작성일
2021-02-15 14:14
조회
5207

스파크 개요

스파크 소개

스파크(Apache Spark)는 UC버클리대학 AMPLab에서 내놓은 대용량 분산 처리ㆍ분석 오픈소스다. 가장 대표적인 하둡의 맵리듀스가 맵과 리듀스 두 종류로만 돼 있어서 한계가 있는 반면, 스파크는 여러 가지 오퍼레이터를 지원한다. 그리고 SQL-On-Hadoop에서 언급한 하둡의 한계점 대부분을 해결해서 성능이 뛰어나다(General And Fast). 또한 맵리듀스와 같이 분석 엔진만 있는 것이 아니라, SQLㆍ스트리밍ㆍ머신러닝 라이브러리가 하나의 프로젝트에 유기적으로 묶여 있어서 관리가 편하다. 이 세 가지를 쉽게 조합해 하나의 프로그램으로 만들 수도 있다. 특히 메모리를 최대한 활용해 성능이 좋을 뿐만 아니라, Iteration 작업을 통해 큰 성능 향상을 기대할 수 있다. Iteration 작업은 머신러닝과 같이 복잡한 분석에서 자주 발생하기 때문에 특히 도움이 된다.
위와 같은 장점 덕분에 다른 오픈소스에 비해 역사가 그렇게 길지 않음에도 지난 2014년 2월 아파치 톱 프로젝트가 됐고, 해외에서는 가장 ‘핫한’ 오픈소스 가운데 하나로 주목 받고 있다. 참고로 스파크는 지난 2010년, 「Spark: Cluster Computing with Working Sets」이라는 논문 발표와 함께 본격적으로 발전하기 시작했다.


스파크 아키텍처
설계 목표

스파크는 크게 3가지의 설계 목표를 가지고 시작을 했다.



① Low latency (interactive) queries on historical data

과거 데이터들을 빠르게 처리해 대화형 질의가 가능하도록 한다. 이것은 SQL-On-Hadoop 파트에서 언급된 내용으로, Low Latency가 가능해야 데이터 탐색 - 분석 ? 탐색 ? 분석의 반복 과정을 통해제대로 된 데이터 분석을 할 수 있다.

② Low latency queries on live data(streaming)

실시간으로 들어오는 데이터를 분석할 수 있어야 한다. 이것은 스톰을 소개할 때 언급했던 내용이다. 스파크는 실시간 스트리밍 처리ㆍ분석도 가능하도록 설계됐다.

③ Sophisticated data processing

복잡한 분석도 가능해야 한다. Anomaly detection, Trend analysis 등 복잡한 분석도 가능해야 좀 더 나은 의사결정을 할 수 있다고 생각했다. 머하웃(Mahout)이나 R과 같은 프로젝트의 목표와 비슷하다.
위와 같은 3가지 설계 목표는 최근 이슈가 되고 있는 처리ㆍ분석 단계의 장점들을 수년 전부터 추구하고 있다. 또한 이 각각이 잘 동작한다는 것도 중요하지만, 무엇보다 이 3가지가 하나의 오픈소스 안에서 처리됨에 따른 관리의 편의성과 프로그램의 확장성 측면에서 좋은 평가를 받고 있다.

[그림 Ⅲ-3-11] 스파크 모듈

[그림 Ⅲ-3-11] 스파크 모듈

위 그림은 스파크의 전체 모듈을 표현한 것이다.


  • 스파크 코어: 스파크의 중심 부분으로 다음 챕터에서 설명할 중요 기능들이 들어 있다. 상단에 위치한 모듈들을 융합해 프로그램을 구현할 수 있도록 해준다.
  • 클러스터 매니저: 스파크 자체에 내장된 것 아닌, 스파크의 자원을 관리하기 위한 외부 모듈들이다. 가장 이슈가 되고 있는 클러스터 매니저인 Yarn, Mesos, AWS를 모두 지원한다.
  • 데이터 소스: 이 부분도 스파크에 내장된 모듈이 아니다. 외부의 데이터 저장소와 연계할 수 있음을 표현한 것이다. 가장 일반적인 HDFS나 아마존의 S3, NoSQL인 Cassandra, SAP의 Hana를 지원한다. 물론 로컬 파일도 가능하다.
  • 상단 모듈: 위에서 언급한 설계 목표에 부합되는 모듈들이다. Interactive SQL, Streaming, Machine Learning, Graph Processing이 가능하다. 아직 개발중이지만 R과 연동해 스파크 엔진으로 분석이 가능하도록 한 SparkR과 약간의 정확도를 낮추는 대신 빠른 속도로 조회가 가능한 BlinkDB도 있다.


구성 요소

작동 프로세스

스파크의 프로그램들을 클러스터 내에서 각각 독립적으로 관리된다. 이때 사용되는 것이 이전 장에서 언급했던 SparkContext다. 이 SparkContext가 포함된 각각의 메인 프로그램을 드라이버 프로그램이라고 한다. 드라이버 프로그램에서는 RDDs의 transform이나 조작을 정의해 놓고, 스파크 마스터에 요청해서 클러스터 매니저를 통해 워커 노드에서 실제적인 처리가 되도록 한다. 클러스터 매니저는 Yarn, Mesos, AWS 혹은 스탠드얼론 모두를 지원하며, 각 프로그램들에 자원 할당을 한다.
마지막으로 Worker Node는 실제로 데이터를 처리하는 노드들을 의미한다(개념적으로 하둡의 NodeManager와 비슷하다). 프로그램의 요청이 오면 각각 Executor를 실행시키고, 내부에서 태스크와 캐시를 생성한 후, 프로그램의 코드를 받아 실제적인 데이터 계산과 저장 역할을 한다. 이때 스파크 내부에서는 Yarn의 Application Master처럼 Executor들은 프로그램마다 독립적으로 관리된다. 따라서 서로 간섭이 발생하지 않는다는 장점이 있지만, 값들을 서로 공유하기 위해 별도의 저장소를 필요로 하는 단점도 있다.

[그림 Ⅲ-3-12] 스파크 클러스터 노드

[그림 Ⅲ-3-12] 스파크 클러스터 노드


RDDs

RDD(Resilient Distributed Dataset)는 2012년 ‘Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing’라는 Berkeley 논문에서 처음 나온 용어다. RDD는 클러스터의 메모리들을 활용해 데이터를 분산ㆍ저장(Distributed Cached)하는 것으로, 기존의 분산 메모리와는 구분되는 몇 가지 특징이 있다. 아래에서 RDD의 특징과 동작 흐름에 대해 알아보자(기존 분산 메모리와 공통된 특징도 언급된다).

RDD의 특징


  • Read Only: 데이터를 수정할 수 있게 되면 데이터 유실 시 복구가 어려워진다. 데이터 수정이나 특정 시점에서 복구하는 기술은 생각보다 많은 리소스와 구현 복잡도를 가지고 있다. 그렇기 때문에 과감하게 Read Only(읽기 전용)으로 기능을 제한하여 처리 속도를 올릴 수 있도록 하였다. 대신 수정이나 추가가 필요할 경우 새로운 메모리를 확보해 새로운 값을 가질 수 있다.
  • 폴트톨러런스: 데이터 유실 시 다시 복구가 된다. 만약에 RDD의 파티션 일부가 유실됐을 경우, 해당 값이 생성된 경로를 통해 다시 데이터를 복구시킨다. 이때 사용되는 데이터 생성된 경로 혹은 수식을 lineage라고 한다. 아래의 그림은 HDFS에 위치한 특정 Text 파일에서 “ERROR”라는 단어로 시작하는 문장들을 필터링하고 탭(\t)을 기준으로 분리하는(Map) 코드와 데이터 흐름을 나타내고 있다.
  • 여기에서 Mapped RDD 중에 일부 데이터가 손실됐다면 아래의 데이터 도출 경로(Lineage)를 통해 재계산을 해서 해당 데이터를 다시 복구해낸다. 이 Lineage를 통한 데이터 복구는 RDDs가 메모리 자원의 한계로 모든 백업 데이터나 여러 개의 Replication을 갖기 힘들다는 단점을 해결한 중요한 개념이다.

[그림 Ⅲ-3-13] RDD 변환 과정(Lineage)

[그림 Ⅲ-3-13] RDD 변환 과정(Lineage)


  • Need not exist in physical storage: 위에서 데이터는 메모리에 분산 임시 저장한다고 소개했다. 그러므로 RDDs는 내부에 별도의 물리적인 공간이 필요 없다. HDFS와 같이 매우 안정적이고 큰 저장 공간을 이용하여 데이터가 필요할 때마다 메모리에 올려서 읽기 전용 임시 저장소를 만든다(RDDs). 또한 데이터 유실 시 Lineage를 이용해 HDFS에서 데이터를 복구한다.
  • Lazy: 계산을 바로 수행하지 않다가 전체 요청이 완료되면 그때 시작한다. 이 부분은 실습부분에서 다시 설명하겠다.
  • Replication: 하드디스크에 분산ㆍ저장하는 것도 가능하다.

RDD의 동작 흐름


  • Load: HDFS와 같은 안정적인 저장소로부터 파일을 읽어온다.
  • Parallelizing: 드라이버 프로그램의 scala collection을 이용해 여러 조각으로 나누고 각 노드에 분리해 보낸다.
  • Transforming: 사용자(혹은 개발자)가 원하는 방식으로 데이터 세트들을 변형ㆍ계산을 한다.
  • Persistence: RDD는 별도의 작업을 하지 않으면 임시로만 저장되고, 계산을 바로 수행하지 않도록(Lazy) 설정돼 있다. 이것을 Cache 명령을 통해 미리 계산하고, 항상 메모리에 상주해 있도록 할 수 있다. HDFS에 저장해 두는 것도 가능하다.


지원 언어

스파크는 상당부분이 스칼라(Scala)라는 객체지향 성격과 함수형 성격을 모두 가지는 언어로 프로그래밍 돼 있다. 스칼라는 자바에 비해 매우 간결하면서도 다양한 기능을 제공한다. 자료형 유연성 등의 편리성과 JVM 및 닷넷(.net)과의 호환성 등도 지원하여 최근 빅데이터 분야에서 떠오르고 있다. 그렇다고 해서 스칼라만 지원하는 것은 아니다. 기본적으로 스칼라와 더불어 자바, 파이썬을 지원한다. 자바는 가장 범용적인 언어 중 하나이고 맵리듀스와 같이 많은 오픈소스들이 자바를 기반으로 프로그램을 만들 수 있도록 하고 있다. 또한 파이썬은 최근 간결성과 다양한 기능으로 사용자가 많아지고 있다.
스파크에서 3가지 언어를 대부분 지원하지만, 모든 기능을 3가지 언어에 대해 동일하게 지원하지는 않는다. Spark SQL에서 Language-Integrated queries는 스칼라만, Spark Streaming은 스칼라와 자바, MLlib의 각종 Matrix는 스칼라와 자바에서 지원한다. 또한 셀 환경은 스칼라와 파이썬만 지원한다. 이 밖에도 많은 부분에서 지원되지 않는 경우가 있다. 그렇기 때문에 가급적이면 스칼라를 권장하고 자바나 스파크를 사용할 경우, 사용하려는 기능을 제공하는지 미리 확인해야 한다.
참고로 스파크에서 스칼라와 자바의 코드 간결성 차이를 보자. 아래의 코드는 저장소에서 텍스트 파일을 읽어와서 RDD에 저장하고 “error” 단어가 포함된 문장을 추출해 총 몇 개가 나오는지 계산하는 프로그램이다. 아래와 같이 스칼라가 처음에는 익숙하지 않은 문법이라 어려울 수는 있지만 훨씬 간결하고 다양한 기능을 제공한다.

// Java: JavaRDD<String> lines = sc.textFile(...); lines.filter(new Function<String, Boolean>(){Boolean call(String s){return s.contains('error'); } }).count(); // Scala: val lines = sc.textFile(...) lines.filter(x=>x.contains("ERROR")).count()


스파크 실습
스파크 설치

샤크(Shark) 부분에서도 스파크 설치에 대해 다뤘지만, 샤크에서 지원 가능한 스파크와 스칼라의 버전이 다르므로 본인 설정에 맞춰서 변경해 주어야 한다.



스칼라 설치

스파크 1.0.2 버전은(원고 작성 시 안정화 버전 가운데 가장 최신이었으며 출판 당시 현재는 1.2.1버전이 나왔다.) 스칼라 2.10 버전과 호환되므로 가능하면 스칼라 2.10대 버전을 설치하는 것이 좋다.

$ wget http://www.scala-lang.org/files/archive/scala-2.10.4.tgz $ tar xvfz scala-2.10.4.tgz #Path 설정 export SCALA_HOME=/path/to/scala-2.10.4


스파크 설치

스파크는 하둡 1, CDH4, 하둡 2 등 여러 가지 버전을 지원한다. 본인의 하둡 설정에 맞는 파일을 http://spark.apache.org/downloads.html에서 다운받는다. 예제에서는 스파크 1.0.2 버전 중에서 하둡 2 호환 버전을 기준으로 설명하겠다.)

$ wget http://mirror.apache-kr.org/spark/spark-1.1.1/spark-1.1.1-bin-hadoop2.3.tgz $ tar wvfz spark-1.0.2-bin-hadoop2.tgz #Slave 노드의 호스트명들을 써 준다. # 만약 StandAlone으로 구동시킬 경우 localhost로 설정한다. $ vi conf/slaves #하둡, 스파크 경로 등 설정을 해준다. $ vi conf/spark-env.sh export SCALA_HOME=/path/to/scala-2.10.0 export SPARK_WORKER_MEMORY=16g # Path 설정 export SPARK_HOME=/path/to/ spark-1.0.2-bin-hadoop2

그런데 스파크도 하둡 버전에 따라서 빌드를 새로 해줘야 하는 경우가 있다. 이 경우에는 아래와 같이 진행하되, 하둡 버전 부분은 사용자 설정에 맞게 바꿔 준다.

# 빌드시 미리 MAVEN에서 사용할 메모리 공간을 늘려준다. export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m" # 소스 버전을 내려 받는다. # 최신 버전을 다운받을 경우 git을 이용하면 편리하다. $ wget https://github.com/apache/spark/archive/v1.0.2.tar.gz $ git clone https://github.com/apache/spark.git # 소스 디렉토리에서 빌드를 한다. $ mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package # 예제 프로그램 사용 시 $ sbt/sbt assembly

추가로 필요에 따라서 log4j.properties의 noti 수준도 변경하자. 기본 설정으로는 너무 많은 로그가 출력된다. 간단히 보기 위해서는 아래와 같이 설정한다.

# Set everything to be logged to the console log4j.rootCategory=WARN, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1} : %m%n #Setting to quiet third party logs that art too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.apache.spark,repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO



스파크 실행

타조(Tajo)는 공식 홈페이지에 문서화가 완전히 진행되지 않아서 중요한 옵션을 모두 언급했지만, 스파크는 http://spark.apache.org/docs/latest/configuration.html 등에 세부 옵션들이 문서화돼 있으므로 참고하기 바란다. 간단한 실습을 할 때에는 특별히 옵션을 수정할 것은 없고, 사양에 따라서 메모리 설정과 경로만 수정하면 된다.
스파크 실행 방법은 하둡이나 타조 등과 비슷하게 개별적으로 데몬을 구동하는 방법과 모든 클러스터의 데몬을 한 번에 구동시키는 방법이 있다. 아래는 스파크 실행 파일들이고, bin 폴더가 아닌 sbin에 위치하고 있다. 또한 start-all.sh처럼 다른 오픈소스에서도 자주 사용되는 명령어도 있으니 path를 걸지 말고 직접 실행하는 것을 추천한다.

# 실행 sbin/start-master.sh - 스크립트를 실행하는 node에서 master 인스턴스를 실행한다. sbin/start-slaves.sh - conf/slaves에 명시된 node들에 slave 인스턴스를 실행한다. sbin/start-all.sh - 위 두 가지 명령을 수행한다. # 종료 sbin/stop-master.sh - 스크립트를 실행하는 node에서 master 인스턴스를 종료한다. sbin.stop-slaves.sh-conf/slaves에 명시된 node들에 slave 인스턴스를 종료한다. sbin/stop-all.sh - 위 두 가지 명령을 수행한다.


스파크 실행 확인

마지막으로 정상적으로 설치됐는지 스파크 내부에 내장돼 있는지 예제를 실행해 본다. Prebuild 버전이 아니라 직접 빌드한 때에는 예제 프로그램도 빌드해 줘야 아래 예제가 실행된다.

bin/run-example SparkPi Pi is roughly 3.14524

위 예제는 파이를 구하는 프로그램으로 실행할 때마다 소수점 이하의 값들은 조금씩 다르게 나타난다.
http://masterip:8080 로 접속하면 웹 화면을 통해서도 스파크의 동작 상태를 볼 수 있다.

[그림 Ⅲ-3-14] 스파크 모니터링 페이지

[그림 Ⅲ-3-14] 스파크 모니터링 페이지


프로그램 시작

셀 시작하기

셀(Shell)은 파이썬을 써 본 사람에게는 익숙할 것이다. 대화형(Interactive)으로 프로그래밍할 수 있는 방식이다. 대화형 프로그램은 결과를 즉시 확인하거나 데이터 분석 시 매우 유용하다. 스파크에서는 스칼라 버전의 셀과 스파크 버전의 셀 두 가지가 존재한다고 소개했다. 여기에서는 전체 기능을 사용할 수 있는 스칼라 버전을 소개한다.

# Scala Shell bin/spark-shell #Python Shell bin/pyspark

만약에 다른 서버에 접속하고 싶으면, --master spark://IP:PORT 옵션을 이용하면 된다. 위에서처럼 아무 값도 주지 않으면 디폴트로 설정된 값으로 연결한다. 또한 ?cores <numCores> 옵션을 이용해 셀이 사용할 CPU 수도 조정할 수 있다. 이외에도 추가 jar를 로딩해오는 옵션 등 몇 가지 기능을 제공한다. spark-shell --help를 입력하면, 자세한 옵션을 확인할 수 있다.
아래는 spark-shell을 실행한 화면이다. 아래와 같이 여러 로그가 뜬 이후에 scala 프롬프트가 뜬다. 로그 설정을 따로 하지 않았다면 훨씬 많은 로그가 나올 것이다.

[그림 Ⅲ-3-15] spark-shell 실행화면

[그림 Ⅲ-3-15] spark-shell 실행화면

먼저 간단한 사용법을 알아보기 위해 아래와 같이 도움말을 실행해 보자. 아래 설명에서와 같이 좀 더 자세한 설명을 보려면 :help 명령어를 사용하면 된다.

scala> :help All commands can be abbreviated, e.g. :he instead of : help. Those marked with a * have more detailed help, e.g. : help imports. :cp<path> add a jar or directory to the classpath :help [command] print this summary or command-specific help :history [num] show the history (optional num is commands to show) :h? <string> search the history :imports [name name ...] show import history, identifying sources of names :implicits [-v] show the implicits in scope :javap <path:class> disassemble a file or class name :load <path> load and interpret a Scala file :load<path> load and interpret a Scala file :padte enter paste mode: all input up to ctrl-D compiled together :quit exit the repl :replay reset execution and replay all previous commands :reset reset the repl to its initial state, forgetting all session entries :sh <command line> run a shell command (result is implicitly => List[String]) : silent disable/enable automatic printing of results :type [-v] <expr> display the type of an experssion without evaluating if : warnings show the supperessed warnings from the most recent line which had any


스파크 셀 실습

간단한 예제를 통해서 스파크 셀을 사용해 보자. 참고로 실제로 입력해야 하는 부분은 스칼라 프롬프트가 있는 라인만이며, 그 아래 줄은 실행 시 반환되는 메시지 혹은 값이다.

# README.md 파일을 읽어 온다. 설정에 따라 셀을 실행한 위치일 수도 있고 HDFS의 위치일 수도 있다. scala>val textFile = sc.textFile("README.md") textFile: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console> :12 # 아이템의 숫자를 반환한다. 여기서는 라인 수다. scala>textFile.count() res0:Long = 127 # 첫 번째 아이템(첫번째 라인)을 반환한다. scala>textFile.first() res1: String = # Apache Spark

이와 같이 대화형(인터프리터)으로 프로그램을 확인하면서 구현할 수 있다.

빌드 후 런칭하기

프로그램을 빌드하기 위해서는 아래와 같은 maven 설정이 필요하다.

<plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </plugin> <pluginRepositories> <pluginRepository> <id> scala-tools.org </id> <name>Scala-tools Maven2 Repository</name> <url>http://scala-tools.org/repo-releases</url> </pluginRepository> </pluginRepositories> <dependencies> <dependency> <groupId> org.scala-lang </groupId> <artifactId>scala-library</artifactId> <version>2.10.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>your-hdfs-version</version> </dependency> </dependencies>

스파크는 프로그램을 launching하기 위한 스크립트를 지원한다.

./bin/spark-submit\-class<main-class> -master <master-url>\-deploy-mode <deploy-mode>\...#other options <application-jar> \ [application-arguments]


  • class: 실행시킬 애플리케이션의 클래스
  • master: 클러스터 접속 정보(예: spark://master IP:7077)
  • deploy-mode: 드라이버를 worker 노드에 배포할 것인지(cluster) 로컬에서 사용할 것인지(client) 설정(default: client)*
  • application-jar: jar 파일 위치. 단 위치를 지정할 때 hdfs://path와 같이 명확한 위치로 입력해야 한다.
  • application-arguments: 실행한 애플리케이션의 메인 클래스에서 사용할 인자값
공식 홈페이지에서 추천하는 아래의 샘플들을 보면 환경마다 설정하는 방법을 쉽게 알 수 있다.

# Run application locally on 8 cores ./bin/spark-submit \ --class org.apache.spark.example.SparkPi\--master local[8] \ /path/to/examples.jar\100 #Run on a Spark standalone cluster ./bin/spark-submit\--class org.apache.spark.examples.Sparkpi\--master spark://spark-masterIP:7077\--executor-memory 20G\--total-executor-cores 100\ /path/to/examples.jar\1000 export HADOOP_CONF_DIR=XXX ./bin/spark-submit\--class org.apache.spark.examples.SparkPi\--master yarn-cluster\#can also be 'yarn-client' for client mode --executor-memory 20G\--num-executors 50\ /path/to/examples.jar\1000



스파크 프로그래밍

스파크 프로그래밍은 위에서 언급한 spark-shell 모드에서도 대부분 사용할 수 있다. 빌드 후 배포과정이 번거롭거나 클러스터를 구성하지 않은 때에는 아래 예제들을 spark-shell 모드에서 실행해보아도 큰 무리가 없다. 단, 셀 모드에서는 자동으로 SparkConetext를 sc라는 변수로 만들어 놓는다는 점만 기억해 두자.
아래 코드는 특정 위치(main 인자값 0)로부터 텍스트 파일을 읽어 특정 임계치를 넘는(threshold, main 인자값 1) 단어들의 빈도를 세는 것이다. 계속 해왔던 맵리듀스에 로직이 더 추가됐음에도 하둡에서 자바로 구현한 것보다 훨씬 심플하게 구현됐다. 또한 추가 설명이 필요 없을 정도로 가독성도 좋아졌다.
참고로 자바 버전으로 이것을 구현하면 약 세 배의 코드가 필요하다. 그리고 성능 또한 스칼라가 자바보다 떨어지지 않으며, 파이썬보다도 빠르다.

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SparkWordCount{def main(args: Arrary[String]){val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))val threshold = args(1).toInt // 지정한 파일을 단어 단위로 자른다. val tokenized = sc.textFile(args(0)).flatMap(_.split(" ")) // 각각의 단어의 출현 빈도를 카운트한다. val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) // threshold 이하의 값들은 버린다. val filtered = wordCounts.filter(_._2 &gt;=threshold) // 필터링된 단어들을 카운트한다. val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _) System.out.println(charCounts.collect().mkString(","))}}

아래 코드는 프로그램 작성 시 클러스터에 접속하기 위한 필수 클래스를 import하고, 클러스터 정보 입력해주는 것이다. 빌드 후 배포해서 프로그램을 실행할 때 필수적인 부분이다. 차후에 나오는 코드들에서는 이 부분이 생략될 수 있으니 예제를 테스트할 때 참고하자.

import org.apache.spark.SparkContext import org.apache.spark.spark.Context._ import org.apache.spark.SparkConf ... val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf) ...


RDD Persistence

RDDs의 특징 중에 하나가 데이터를 계속 저장하고 있는 것이 아니라 필요할 때마다 데이터를 불러서 재계산하는 것이라고 했다. 그런데 반복적으로 쓰이는 값은 매번 재작업을 하면 매우 비효율적일 것이다. 그래서 임의로 데이터를 메모리에 계속 상주하게 해서 재활용할 수 있도록 코드를 짤 수 있다. 이 부분은 이전에 언급한대로 스파크를 이용해 iteration을 이용한 분석을 할 때, 성능 향상의 가장 중요한 부분 중 하나다.

val lines = sc.textFile("data.txt") val lineLengths = lines.map(s=> s.length) val totalLength = lineLengths.reduce((a, b) => a+b)

위 코드를 수행해 보면, totalLength가 수행되는 시점에 실제적인 계산을 한다(앞서 언급한 RDDs의 Lazy 특성과 Lineage). 이 데이터는 계속 메모리에 상주시키는 것이 아니라 리듀스가 호출될 때마다 전체를 재수행한다. 그렇다면 여기에서 lineLengths가 자주 호출돼 리듀스와 같은 연산 등을 계속 수행하면 어떻게 될까? 그때마다 데이터를 계속 읽어와서 매우 비효율적인 작업이 될 것이다. 그래서 RDDs를 메모리에 캐싱할 수 있는 기능을 넣게 됐다. 또한 메모리에도 한계가 있으므로 least-recently-used (LRU) 방식을 이용해 자동으로 uncaching을 하기도 하지만, 수동으로 하기 위한 기능도 있다. 캐싱 함수에는 크게 cache()와 persist가 있다. cache는 기본 설정인 메모리에 그대로 올리는 것이고, persist는 저장하는 방법을 지정할 수 있다. 예를 들면, MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK 등 저장소를 지정할 수도 있다. MEMORY_ONLY_2, MEMORY_AND_ DISK_2와 같이 클러스터에 복제본을 만드는 등 다양한 옵션을 지정할 수 있다. 만약 자주 사용될 데이터가 많아서 매번 이것을 명시하기는 불편할 때는 SparkWordCount에서 사용한 reduceByKey와 같이 특수한 shuffle operation을 이용하면 리듀스 작업을 수행하면서 자동으로 캐싱을 해 준다. 이와 비슷한 함수로는 sortByKey()와 collect() 등이 있다.


RDD Operations

계속 사용됐지만 자세히 설명하지 않은 함수들이 있다. 바로 RDD Operations들이다. RDD Operation은 크게 데이터를 변형시켜서 새로 만드는 Transformation과 집계 함수와 같이 주로 최종 결과를 얻어내는 action 함수가 있다. SparkWordCount와 같이 예제를 통해 주요 함수를 중간중간 설명하겠다. 전체 함수 리스트는 http://goo.gl/WWA38F를 참고하자. RDD Operations들 중 몇가지는 스칼라의 함수를 재구현한 것들이고, 함수형 언어에 대한 인식이 필요한 경우들이 있으므로 미리 스칼라를 학습하자.


Broadcast Variables

SQL-On-Hadoop에서 Star형 데이터는 Broadcast를 활용해 전체 데이터를 재분배하는 대신, 작은 데이터를 전체 클러스터에 복사해 효율적으로 처리하는 방법에 대해 언급했다. 스파크에서도 이러한 기능을 제공한다. 간단하게 broadcast 함수를 이용해 데이터를 전체 클러스터에 복사하고 value 메소드를 이용해 그 값을 로드한다.

val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value


스파크 스트리밍

스파크 스트리밍 개요

스파크 스트리ald(Spark Streaming)은 다양한 데이터 소스로부터 데이터를 받아서 실시간 스트리밍 처리할 수 있도록 해준다는 점에서 스톰과 비슷하다. 그러나 스파크 스트리밍은 스파크 API를 확장해 내장된 머신러닝 알고리즘을 수행하거나 추가 분석을 할 수도 있다. 또한 Cleansing을 통해 데이터를 저장시키고 다시 SQL 형식이나 머신 러닝 혹은 일반 스파크 API를 이용해 추가적인 분석을 유기적으로 할 수 있는 장점이 있다.

[그림 Ⅲ-3-16] 스파크 스트리밍 연동

[그림 Ⅲ-3-16] 스파크 스트리밍 연동

이와 같은 압도적인 장점이 있음에도 아직까지 스파크 스트리밍이 해결해야 하는 문제나 단점도 많다. 스파크 내부 구조에 대해서는 앞 장에서 설명했기 때문에 스톰과 비교해 드러나는 차이점을 중심으로 스파크 스트리밍만의 동작 원리와 특징을 알아보고, 간단한 예제를 설명하도록 하겠다.


스파크 스트리밍 아키텍처
데이터 처리 방식

카프카나 플룸, HDFS 등 다양한 소스로부터 발생되는 데이터를 스파크에서 사용할 수 있도록 데이터 형태를 재구성한 것을 Discretized Stream 혹은 DStream이라고 한다. 이름에서 알 수 있듯이 데이터를 작은 배치 단위로 잘라 각 노드에 분산ㆍ처리한다. 스톰에서는 이것을 사용자 임의의 튜플 단위로 처리했다.

[그림 Ⅲ-3-17] 스트리밍 데이터 변환 과정

[그림 Ⅲ-3-17] 스트리밍 데이터 변환 과정

스톰과 비교해 알아 보자. 스톰은 데이터를 작은 단위 하나 하나를 바로 처리하는 반면, 스파크 스트리밍은 Sliding window 방식과 비슷하게 일정 윈도우로 데이터를 잘라서 처리한다. 엄밀히 따지면 스트리밍이라기보다는 마이크로배치에 가깝다.

물론 스톰에서도 마이크로 배치(Trident)를 제공하지만 다음과 같은 약간의 차이가 있다.

[표 Ⅲ-3-3] 스톰과 스파크 스트리밍 처리 비교


스톰 스톰(Trident) 스파크 스트리밍
At Most Once 지원 지원 미지원
At Least Once 지원 지원 미지원
Exactly Once 미지원 지원 지원

또한 데이터 형식이 일정하지 않으면, 처리하기 힘들다는 단점이 있다.


폴트톨러런스

위에서 언급한 At Most Once와 At Least Once를 지원하지 않는 이유는 폴트톨러런스 부분에서 문제가 발생할 수 있기 때문이다. 스파크 스트리밍은 기본적으로 스파크 기반으로 움직이므로 스파크의 기능이나 특성을 상당부분 상속한다. RDDs도 그 중 하나인데 RDDs는 스파크에서 처리 중에 데이터에 문제가 생기더라도 안전한 Input Data(예: HDFS)가 문제가 되지 않는 이상, Lineage로 재계산해 복구가 가능하다고 했다. 이 부분도 스파크 스트리밍이 그대로 상속했다. 드라이버 노드가 죽었을 때, 뒤에서 설명할 CheckPoint와 State 저장이라는 기능을 이용해 다시 재기동할 수 있다. 또한 스파크 스트리밍에서 Worker가 죽었을 때, 워커 노드가 해당 데이터 부분을 Input Data로부터 다시 계산한다. 그러나 워커 노드가 완전히 죽거나 네트워크까지 끊겼고 복제(replication)가 이뤄지지 않은 상태라면 해당 데이터는 복구가 불가능하다.
일반적으로 스톰에서는 리소스가 허용하는 한, 데이터를 놓치는 경우는 거의 없다. 그러나 스톰은 님버스가 SPOF(single-point-of-failure)이므로 님버스가 장애가 발생한 상태에서 다른 노드들까지 동시에 장애가 발생하면 데이터가 유실되는 것은 마찬가지다. 물론 이것은 가장 안전한 편인 HDFS도 한 번에 여러 대의 서버가 죽으면(복제 수보다 많은 수의 서버), 서버들이 복구될 때까지 해당 데이터를 사용할 수 없다. 하지만 HDFS는 서버가 복구만 되면 파일을 사용할 수 있다. 반면, 스트리밍 서비스는 그만큼 기다릴 수도 없고, 모든 데이터를 디스크에 저장하는 방식이 아니므로 어쩔 수 없이 취약한 부분이기도 하다. 단, 스파크 1.2 버전에서 Write Ahead Log (WAL)라는 기능을 도입하여 SPOF을 제거하였으나 상용환경에서 좀 더 테스트가 필요하다.


State Management

스파크 스트리밍은 스탠드얼론 클러스터 모드일 때와 다른 매니지먼트(Yarn, Mesos)일 때의 복구 방식이 다르다.



스탠드얼론 클러스터 모드

스파크가 스탠드얼론 클러스터 모드일 때는 자동으로 HA(High Availability)를 제공한다. 먼저 주키퍼를 이용해 상태체크를 하다가 바로 복구하는 방법이다. 이것은 스톰에서 사용하던 방식과 유사하다. 이 방식을 적용하기 위해서는 먼저 주키퍼가 설치돼 있어야 하고, 다음 [표 Ⅲ-3-4]와 같이 설정해 주면 된다.

[표 Ⅲ-3-4] 주키퍼를 이용한 HA


프로퍼티
spark.deploy.recoveryMode ZOOKEEPER (default: NONE).
spark.deploy.zookeeper.url The ZooKeeper cluster url
(예: 192.168.1.100:2181,192.168.1.101:2181..).
spark.deploy.zookeeper.dir State를 저장할 주키퍼 내 경로(default: /spark).

두 번째로 로컬 파일 시스템을 이용하는 방법이다. 주키퍼는 HA에 있어서 오랜 기간 인증 받아온 방법이므로 훨씬 불안정한 로컬 파일 시스템을 이용한 HA는 추천하지 않는다. 설정 방법은 다음과 같다.

[표 Ⅲ-3-5] 로컬 파일을 이용한 HA


프로퍼티
spark.deploy.recoveryMode FILESYSTEM
spark.deploy.recoveryDirectory State를 저장할 로컬 파일 시스템 내 경로

외부 매니지먼트(Yarn, Mesos)

처음 프로그램이 시작하면, 아래 소스 코드와 같이 StreamingContext를 생성할 때 checkpoint를 이용해 현재 현재 상태를 HDFS 등에 저장할 수 있도록 설정하고 작업을 시작한다. 작업중에 오류로 인해 다시 복구를 해야 하면, checkpoint에서 설정 값을 이용해 해당 부분부터 데이터를 복구하며 작업을 시작한다.

// StreamingContext 생성 및 설정 def functionToCreateContext():StreamingContest={val ssc = new StreamingContext(...)//StremingContext생성 val lines=ssc.socketTextStream(...)//DStream 생성 ... ssc.checkpoint(checkpointDirectory) //checkpoint directory 설정 ssc} //checkpoint에서 받아오거나 새로운 StreaminContext 생성하여 할당 val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) context. ...(생략) // context 시작 context.start() context.awaitTermination()

간단한 예시는 아래와 같다. 4번째 Time에서 작업 실패가 발생하고 7번째 Time에서 복구가 됐으면, 그동안 밀린 데이터들을 모두 복구한다.

[표 Ⅲ-3-6] 시간별 작업 복구 예시


Time Number of lines in input file Output without driver failure Output with driver failure
1 10 10 10
2 20 20 20
3 30 30 30
4 40 40 [DRIVER FAILS]
no output
5 50 50 no output
6 60 60 no output
7 70 70 [DRIVER RECOVERS]
40, 50, 60, 70
8 80 80
9 90 90
10 100 100
이러한 복구방법이 가능한 이유는 작업 중간 중간에 지정한 저장소(HDFS 등)에 상태(State)를 저장해 놓기 때문이다.


Parallelism

스톰에서는 데이터의 수신 부분이나 처리부분이 다른 형태로 만들어져 있을 뿐(Spout, Bolt), 작은태스크 단위로 분산해 병렬 처리되는 점은 동일하다. 그러나 스파크에서 데이터 처리(Data Processing) 부분은 스톰과 비슷하지만, 데이터 수신(Data Receiving) 부분은 약간 다른 구조다. 스파크에서 병목이 발생하는 부분 중에 하나가 이 데이터 수신 부분이다. 외부에서 받아온 데이터를 디시리얼라이즈(deserialized)해야 하기 때문이다. 그리고 이 데이터들을 마이크배치 단위로 나눠서 처리하므로 Data Processing처럼 태스크 단위로 자유롭게 나누지는 못하고 Receiver를 더 만드는 방식으로 처리한다.


데이터 소스

스톰은 Kafka, JMS, RabbitMQ, Kestrel, 아마존, 트위터, Scribe, MongDB 등 여러 가지 채널을 통해 데이터를 받을 수 있다. 그리고 여기에 언급되지 않은 소스에서 데이터를 받고 싶으면, 플러그인 형태의 프로그램만 작성하면 된다. 그러나 스파크 스트리밍은 Kafka, 플룸, 트위터, ZeroMQ 등을지원하지만, 스파크 진영에서 수정한 버전을 사용해야 한다. 스파크 스트리밍용 데이터 소스 프로그램을 만들기에는 상대적으로 어렵고, 다른 로그 수집 등의 오픈소스 버전을 따라갈 수도 없다는 단점이 있다.


속도

속도는 논란이 많은 부분이다. SQL-On-Hadoop에서도 그랬듯이 어느 진영에서 테스트했는지에 따라 결과가 다르게 나온다. 스톰은 스파크 진영에서는 노드당 초당 1만 레코드를 처리한 반면, 스톰 진영에서는 백만 레코드도 처리 가능하다는 결과를 내놓을 정도다. 이 부분은 각자의 환경에서 테스트하는 방법이 제일 좋을 것이다.


스파크 스트리밍의 예

스파크는 최근 아파티 톱 프로젝트로 올라오면서 문서화에 신경을 많이 쓰고 있다. 최근 이슈가 된(앞서 소개했던) 스톰이나 타조 등에 비해서 훨씬 정리가 체계적이고 자세하게 돼 있다. 또한 Test코드나 Sample 코드도 잘 만들고 있다(이 파트에서 스파크의 공식 홈페이지를 많이 참고하는 이유 중에 하나다)
https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming에 접속하면 스파크 스트리밍을 확인할 수 있다.
그 중에 2가지만 간단하게 확인해 보도록 하자.


HdfsWordCount

object HdfsWordCount { def main(args: Arrary [Sting]){if (args.length<1 ){Sysrem.err.printIn("Usage:HdfsWordCount <derectory>") System.exit(1)} //Spark Example에서는 Loglevel 설정을 별도로 묶어놨다. StreamingExamples.setStreamingLogLevels() val sparkConf = new SparkConf().setAppName("HdfswordCount") //StreamingContext에서는 Microbatch의 Duration(Batch Size)을 설정할 수 있다. //여기에서는 2초를 할당했다. val ssc=neq StreamingContext(sparkConf, Seconds(2)) //기본적인 함수는 Spark Core에서 사용하던 문법과 거의 동일하다. // 파일을 읽어서 wordCount를 한다. val lines = ssc.textFileStream(args(0)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() scc.start() ssc.awaitTermination()}} object StreamingExamples extends Logging{ def setStreamingExamples extends Logging { def setStreamingLogLevels () {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements if(!log4jInitialized{logInfo("Setting log level to [WARN] for streaming example." + " To override add a custom log4j.properties to the classpath.") //Log Level 설정 Logger.getRootLogger.setLevel(Level.WARN)}}}


NetworkWordCount

object NetworkWordCount { def main(args: Array[String]) {if (args.length<2){ System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1)} StreamingExamples.setStreamingLogLevels() val sparkConf = new SparkConf().setAppName("NetworkWordCount") val scc = new StreamingContext(sparkConf, Seconds(1)) //main 인자에서 전달하는 ip와 port를 기준으로 소켓을 연결한다. //문장은 MEMORY와 DISK에 RDDs 형태로 저장한다. // 동일하게 wordcount를 수행한다. val lilnes = ssc.socketTextStream(args(0), orgs(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.floatMap(_.split('' ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination()} }