데이터실무

DA, SQL, DB보안 등 실무자를 위한 위한 DB기술 바이블!

샤크 소개와 아키텍처

데이터 처리
SQL On Hadoop
샤크 소개와 아키텍처
작성자
admin
작성일
2021-02-15 14:12
조회
1232

이번 절에서 다룰 내용은 샤크(Shark, Spark SQL)다. 굳이 샤크와 Spark SQL을 병기한 이유는 원고가 작성되는 시점에서 ‘샤크에 대한 개발은 중지하고 Spark SQL에 개발 리소스를 할당하겠다’는 발표가 있었기 때문이다. 다음에 다루겠지만 샤크는 Spark 엔진 위에서 돌던 하이브 기반 프로젝트다. 하이브의 변경 사항을 매번 반영하기엔 너무 비효율적이라 아예 Spark SQL이라고 하는 새로운 프로젝트를 만들기 시작했고, 샤크의 대부분의 기능을 승계해서 넣기로 했다. 그러나 아직까지 샤크를 쓰는 곳이 많고 Spark SQL이 완전히 안정화된 상태가 아니라서 여기서는 둘 다 언급한다. 이외에도 Hive Jira에(아파치 프로젝트들이 대부분 사용하는 이슈트레커) 하이브에서 Spark 엔진을 사용할 수 있도록 하는 ticket이 진행되고 있다(HIVE-7292). 이것은 아직 개발이 많이 남아있기 때문에 따로 언급하지는 않지만 내년쯤에는 다시 이슈가 될 수 있을 것이다.
참고로 이 단원은 SQL-On-Hadoop 이후에 스트리밍 처리를 다루고 있으므로 샤크와 Spark SQL을 먼저 소개한다. 하지만 Spark와 스칼라라는 프로그래밍에 대한 선지식이 있어야 더 이해하기 쉽다. 셀(shell)이나 SQL문 실행만 한다면 자세히는 몰라도 되지만, Spark의 강력한 기능 중 하나인 연동기능을 쓰지 못하는 것이니 미리 알아 두는 것이 좋다. Spark는 3장 스트리밍 데이터 처리에서 소개하고 있으니 Spark 부분을 먼저 읽고, 스칼라에 대한 내용은 http://goo.gl/k0merD를 통해 간단히 알아보자.


샤크

샤크 아키텍처

샤크(Shark)는 Spark의 Sark와 하이브의 H를 합쳐서 만든 단어로 Spark 엔진 위에서 하이브가 동작하도록 만든 프로그램이다(Spark에 대해서는 ‘3단원 3장 스트트리밍 데이터 처리’에서 좀 더 자세히 소개한다). SQL-On-Hadoop과 같이 하이브의 단점을 개선한 샤크의 특징은 다음과 같다.


  • 맵리듀스의 의 실행 오버헤드를 없앴다.
  • 스텝별 중간 작업에서 발생하는 불필요한 하드디스크 쓰기를 없앰
  • Columnar 방식을 사용
  • 가능한 메모리를 많이 사용해 처리
  • 인메모리, 하드디스크, 하이브리드 테이블
  • 셔플 성능 향상
  • JDBC 프로그래밍보다 더 유연한 Spark와 연계
타조 아키텍처

[그림 Ⅲ-2-34] 샤크의 메모리 활용

[그림 Ⅲ-2-34] 샤크의 메모리 활용

가장 큰 특징 중 하나는 Spark 엔진을 사용함으로써 메모리를 최대한 많이 활용한다는 점이다. 이 방식을 사용함에 따라서 반복 작업은 수 배에서 수십 배 빨리 처리가 가능하고, 인터랙티브한 쿼리도 지원할 수 있게 됐다. 단, 너무 큰 데이터는 아주 높은 성능을 기대하기 어려울 수 있고, 하이브에 비해서 메모리 관리를 더 잘해 줘야 한다.
그러나 모든 기능을 전부 새로 구현한 것은 아니다. 기존의 하이브의 사용자가 워낙 많고, 하이브 엔진(맵리듀스)을 제외하면 이미 성숙한 오픈소스이므로 일부만 변경작업을 하고 나머지는 하이브의 라이브러리를 그대로 이용했다.

[그림 Ⅲ-2-35] 하이브 구성도

[그림 Ⅲ-2-35] 하이브 구성도

[그림 Ⅲ-2-36] 샤크 구성도

[그림 Ⅲ-2-36] 샤크 구성도

위 그림을 보면, 하이브에서 변경된 부분을 파악하기 쉽다. 먼저 하이브와 똑같이 HDFS에서 데이터를 읽어와서 작업을 수행하고, 맵리듀스 대신 Spark를 엔진으로 처리하도록 한다. Metastore는 그대로 사용하되 클라이언트는 Optimizer와 Cache, Plan 등을 수정하고 나머지는 거의 그대로 사용했다.
이런 아키텍처는 하이브의 사용자가 기존의 SQL을 수정하지 않고 그대로 사용가능하고, JDBC 등을 통해 연결해 놓은 프로그램이나 UDF도 수정할 필요 없이 그대로 사용한다는 장점이 있다. 그러나 하이브의 코드가 계속 변경되고 새로운 기능이 추가됨에 따라 샤크에서도 변경해야 할 작업들이 많이 발생하고, 주도적으로 개발방향을 잡기가 힘들다는 단점이 있다. 그래서 현재까지 나와 있는 샤크는 하이브의 0.11 버전의 기능까지만 커버하게 됐고(글을 쓰는 시점에 하이브는 0.13까지 나와 있다), 대신 Spark SQL이라는 이름으로 전체를 새로 개발하게 된다.


샤크 설치방법

Spark 위에서 작동하는 프로그램들은 로컬, 클러스터, EC2, Yachyon 등 여러 가지 환경에서 구동이 가능하다. 여기서는 가장 일반적인 로컬 모드와 Yarn 모드에 대해 알아 보겠다.


로컬 모드 설치 방법

샤크는 Spark를 엔진으로 사용하고 있다고 소개했다. Spark는(샤크 포함) 스칼라라는 언어로 작성돼 있다. 그렇기 때문에 이번뿐만 아니라 Spark 관련 프로그램을 사용할 때는 이전에 설치했던 환경에서 추가로 스칼라가 설치돼 있어야 한다. 그 밖에 프로그램들은 대부분 압축만 풀고 본인의 설정에 맞게 환경설정 파일만 수정해 주면 된다. 그러나 로컬 모드는 Spark의 장점을 확인하기 힘들어서 사용할 일이 거의 없으므로 간단히 테스트해 보고 클러스터 모드로 넘어가 보자.



스칼라 설치


$ wget http://www.scala-lang.org/files/archive/scala-2.9.3.tgz $tar xvfz scala-2.9.3.tgz


샤크 설치


$ wget https://github.com/amplab/shark/releases/download/v0.8.0/shark-0.8.0-bin-hadoop1.tgz $wget https://github.com/amplab/shark/relases/download/v0.8.0/shark-0.8.0-bin-cd4.tgz $tar xvfz shark-*-bin-*.tgz $cd shark-*-bin-*

로컬 모드는 HDFS를 사용하지는 않지만, 라이브러리 참조로 인해서 버전 확인은 필요하다. 샤크는 다른 SQL-On-Hadoop과 달리 하둡 1.0도 지원한다. 본인의 환경에 맞는 것을 추천하지만, 가능한 하둡 2.0을 권장한다.

로컬 디렉터리

로컬 모드이므로 저장소가 따로 필요하다. 디폴트 설정은 /user/hive/warehouse다.

mkdir -p /user/hive/warehouse; chown user:user /user/hive/warehouse


경로 설정

/etc/profile이나 ~/.bashrc 혹은 ~/.bash_profile 파일에 설정한다.

export HIVE_HOME=/path/to/hive-0.9.0-shark-0.8.0-bin export SCALA_HOME=/path/to/scala-2.9.3 export PATH=$PATH:$HIVE_HOME/bin:$SCALA_HOME/bin


환경 설정

앞서 설치한 스칼라의 경로를 지정하고 하이브의 위치를 지정해 준다. 여기서 주의해야 할 것은 기존에 설치해 놓은 하이브가 아닌, 다운받은 파일 내에 들어있는 하이브(자체 패치 버전)의 위치를 지정해 준다.

$ cp shark-0.8.0/conf/shark-env.sh.template shark-0.8.0/conf/shark-env.sh $ vi shark-0.8.0/conf/shark-env.sh export SCALA_HOME=/path/to/scala-2.9.3 export HIVE_HOME=/path/to/hive-0.9.0-shark-0.8.0-bin



클러스터 모드 설치방법

클러스터 모드는 로컬 모드 설치에서 Spark만 추가로 설치해 주고, 일부 설정파일만 변경해 주면 된다. 만약 다운로드가 되지 않는다면 http://archive.apache.org/dist/spark/ 에서 필요한 버전을 내려받는다.



Spark 설치


# 하둡 버전에 따라서 필요한 버전을 내려 받는다. $wget http://spark-project.org/download/spark-0.8.0-incubation-bin-hadoop1.tgz $ wget http://spark-project.org/download/spark-0.8.0-incubation-bin-cdh4.tgz $ tar xvfz spark-*-bin-*.tgz # Slave 노드의 호스트 이름을 써 준다. $ vi spark-0.8.0/conf/slaves # Spark 경로 설정을 해 준다. $vi spark-0.8.0/conf/spark-env.sh export SCALA_HOME=/path/to/scala-2.9.3 export SPARK_WORKER_MEMORY=16g # 경로 설정 export SPARK_HOME=/path/to/spark-0.8.0


샤크 환경설정


$ vi shark-0.8.0/conf/shark-env.sh export HADOOP_HOME=/path/to/hadoop export HIVE_HOME=/path/to/hive-0.9.0-shark-0.8.0-bin export MASTER=spark://localhost:8080 export SPARK_HOME=/path/to/spark export SPARK_MEM=16g source $SPARK_HOME/conf/spark-env.sh

메모리 설정이나 경로 설정은 본인에 맞게 바꾸도록 한다.

기존 하이브와 연동

기존에 하이브를 사용하고 있었다면 각종 설정을 가져오기 위해서 기존 하이브의 xml 파일들을 패치한 하이브의 conf 디렉터리로 복사해 준다.

cp /etc/hive/conf/*.xml /path/to/hive-0.9.0-bin/conf/


슬레이브 노드

Spark와 샤크를 슬레이브 노드에 모두 복사해 준다. 단, 이 때에도 마스터 노드의 경로와 같은 위치에 설치해야 한다.

실행


# spark 디렉터리에서 아래와 같이 실행. start-all.sh이 중복되는 경우가 있으므로 경로를 걸지 않고 직접 들어가는 것이 좋을 경우가 있다. $ bin/start-all.sh



샤크 사용방법

샤크는 쿼리 파서로 하이브 라이브러리를 사용하기 때문에 샤크의 사용법은 몇 가지를 제외하고 하이브와 동일하다. 이 부분에서는 하이브와 다른 사용 방법 위주로 설명하겠다.



실행

Log 설정으로도 가능하지만 일반모드, info모드, debug 모드 각각 실행파일이 따로 존재한다. 자세한 로그를 보고 싶을 경우 사용한다.

$ bin/shark $ bin/shark-withinfo $ bin/shark-withdebug

# 직접 쿼리 입력 $ bin/shark -e "select*from tablename" # 쿼리 파일 수행 $ bin/shark - i query.sql # 도움말

$ bin/shark - H # 메모리 테이블을 다시 읽기 하지 않음 $ bin/shark --skipRddReload


캐싱 데이터

샤크는 테이블 전체를 메모리에 올릴 수 있다(물론 메모리 한도 안에서만 가능하다). 테이블 전체를 메모리에 올리면(캐싱) 하드디스크보다 훨씬 빠르게 처리할 수 있으므로 자주 사용되는 테이블을 아래와 같이 캐싱해 놓는다.

CREATE TABLE testable2 TBLPROPERTIES ("shark.cache" = "true") AS SELECT * from testable; CREATE TABLE testable2_cached AS SELECT * from testable;

사용방법은 Table Property에 명시해주거나 테이블 이름 뒤에 _cached라는 이름을 붙여주면 된다. 그런데 아직까지 Bucket 기능은 완전히 지원하지 않기 때문에 주의가 필요하다.

SharkServer

샤크는 데이터를 메모리에 캐싱할 수 있다고 했는데, 메모리에 캐싱하는 시점은 샤크 CLI를 처음 실행할 때다. 그런데 샤크 CLI에 접속할 때마다 캐싱하면 비효율적이므로 sharkserver를 실행해 계속 캐싱하도록 설정할 수 있다.

# 샤크 서버 실행 $ bin/shark - service sharkserver <port> # 샤크서버 접속 $ bin/shark - h <shark-host> -p <port>


Spark와 연동


sc = SharkEnv.initWithSharkContect("SparkSharkExample") val users = sc.sql2rdd("select * from testable") rdd.cache()

위 코드는 스칼라로 작성됐고 context와 같이 설명해야 할 것이 많기 때문에 3장 스트리밍 데이터 처리에서 다루도록 하고 위와 같이 sql 형태로 생성된 데이터를 Spark에서 rdd라는 형태로 메모리에 저장할 수 있다는 것만 기억해 두자. 또한 프로그래밍이 아니라 bin/shark-shell을 통해 위와 같이 셀 형태로 Spark와 연동하는 것이 가능하다.


Spark SQL

Spark SQL 소개

샤크는 0.9.2버전을 끝으로 더 이상 개발되지 않을 예정이다. 그 대신 Spark SQL이 그 역할을 대신하는데 아직까지 완전한 버전이 아니고 보완이 필요한 상태이다. 가장 최근에 나온 1.1.0 버전에서는 일부 구현됐지만, CLI나 JDBC도 최근에 구현되었고 하이브나 기타 SQL-On-Hadoop에 비해 안정화도 많이 부족하다.
API의 변경을 최소화 한다고 목표를 잡고 있지만, 아직까지는 상용으로 사용하기는 불안정한 측면이 있다. 다른 SQL-On-Hadoop이나 샤크를 사용하다가 안정화가 이뤄진 후에 Spark SQL로 전환하는 것이 좋다.
샤크는 Spark와 연계됐지만 상대적으로 분리된 프로젝트였다. 하지만 Spark 버전이 올라감에 따라 Spark Streaming, Spark MLlib 등과 같이 Spark SQL도 하나의 프로젝트로 관리되고 있다. 홈페이지에는 각각 다른 프로젝트로 구분하고 있지만 편의상 나눈 것일 뿐, 실제로는 하나의 프로젝트안에 포함돼 있다.

Spark SQL은 아직 완전한 버전은 아니지만 여러 장점이 있다.
성능 향상

가장 눈에 띄는 부분은 성능 향상이다.

[그림 Ⅲ-2-37] 샤크와 Spark SQL의 성능 차이(테스트: Databricks)

[그림 Ⅲ-2-37] 샤크와 Spark SQL의 성능 차이(테스트: Databricks)

위 그림은 샤크 0.9.2 버전과 Spark SQL의 성능 차이를 TPC-DS Benchmark를 이용해 비교한 것이다. SparkSQL에서 아래와 같이 Query rewrite, Transform와 같이 최대한 빠르게 필요한 데이터를 선택하는 기법이나 Runtime Bytecode Generation과 같이 JVM 부분을 개선하는 방법을 사용했다.


Adding SchemaRDD to RDDs

샤크에서 RDD와 연동하는 부분을 잠깐 설명했는데, SparkSQL에서는 훨씬 간단하게 RDD와 연동이 되고 여러 가지 Operator를 이용할 수 있게 됐다.


Spark SQL 실습

이번에 실습할 내용은 스칼라로 직접 프로그래밍 하는 방식이다. Spark SQL은 아직 셀을 제공하지 않기 때문에 직접 프로그래밍 하는 방법밖에 없다(1.1.0버전에서 완벽하진 않지만 셀 기능이 추가됐다). Spark를 이용한 자세한 프로그래밍은 3장 스트리밍 데이터 처리를 참고하자.


사용 언어와 실행방법

Spark Sql 프로그래밍은 스칼라, 자바, 파이썬으로 할 수 있다. 서드파티로 다른 언어도 지원하지만이 3가지 가운데 한 언어를 사용하는 것을 추천한다. Spark에서의 지원기능 차이와 간결성에 있어서 스칼라가 훨씬 좋기 때문에 스칼라를 쓰는 것이 제일 좋다. 이 책에서도 가능한 스칼라 위주로 설명할 예정이다. 책에 나오는 예제는 실습의 편리성을 위해서 Spark Committer들의 확인을 받고 Spark 공식 홈페이지(https://spark.apache.org)를 참고했다. 다른 언어로도 테스트하고 싶으면 위페이지에서 확인해 보면 된다.
코드 실행방법은 맵리듀스처럼 코드를 작성하고 직접 빌드해 실행하는 방법도 있고, bin/spark-shell을 통해 아래 프로그램들을 인터랙티브하게 실행할 수도 있다. 그러나 셀 모드는 스칼라와 파이썬 버전만 가능하고 자바 버전은 지원하지 않는다.


SparkContext

SparkContext는 Spark Cluster에 접근하기 위한 접점이라고 보면 된다. Spark Cluster에 접속하기 위한 정보가 포함된 Class이며, 분산 처리하기 위해서 꼭 미리 선언해 줘야 한다.

val sc: SparkContext val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchmaRDD


RDDs

샤크에서도 잠깐 언급했듯이 테이블의 데이터를 프로그래밍으로 쉽게 조작할 수 있다. 아래의 코드는 스칼라의 Case라는 클래스를 이용해 데이터를 읽고 Person이라는 테이블을 만들고, SQL 쿼리를 수행하는 코드다. 단, 스칼라 2.10에서는 컬럼 수가 22개를 넘을 수 없으므로 주의하자.

// 기존의 sc를 읽어서 SQLContext로 변경 val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD //NAME(String), Age(Int) 형태의 Person case class를 만듬 case class Person(name: String, age: Int) //텍스트 파일로부터 데이터를 읽어와서 Person에서 선언한 형식으로 Table을 생성한다. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(=>Person(p(0), p(1).trim.toInt)) people.registerTempTable("people") //sqlContext의 sql()를 이용해 SQL 쿼리를 수행한다. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <=19") //조회 결과를 출력한다. teenagers.map(t=>"Name:" + t(0)).collect().foreach(printIn)


JSON 파일 읽기

Spark SQL에서는 JSON 형태의 RDD 혹은 파일을 자동으로 스키마를 맞춰서 데이터 로드를 할 수 있다. 이때 사용되는 함수는 아래와 같다.

val people = sqlContext.jsonFile(path) val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)


하이브 데이터 읽어오기

하이브를 사용하기 위해서는 하이브 라이브러리가 필요하다. 처음 빌드 시에 ?P hive 옵션을 통해 hive jar 파일을 만들고 각 워커 노드에 복사해 둔다.

//Context는 SQLContext 대신 HiveContext를 이용한다. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.hql("CREATE TABLE IF NOT EXISTS src (ket INT, value STRING)")

hiveContext.hql("LOAD DATA INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") //쿼리결과 출력 hiveContext.hql("FROM src SELECT key, value").collect().foreach(printIn)

1.1.0 버전에서는 샤크에서처럼 SparkSQL도 셀 모드를 지원한다. bin/spark-sql을 실행하면, Spark SQL Shell로 진입하게 된다. localmode로 하이브의 데이터를 읽어올 수 있다.


캐싱

테이블의 데이터를 In-memory columnar format으로 캐싱할 수 있다. 자동으로 압축해서 메모리 사용을 최소화해 gc에 따른 시간 소모도 줄여서 속도를 높일 수 있다.

// Cacing cacheTable("tableName") //UnCacning uncacheTable("tableName")


샘플 코드 보기

이 코드는 Spark 코드 내 org.apache.spark.examples.sql에 포함돼 있다. 위에서 다뤘던 내용을 포함해 전반적인 내용을 보기 좋으므로 간단히 살펴보자.

// .. 아파치 라이선스 주석 생략 package org.apache.spark.example.sql import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext case class Record(key : Int, value: String) object RDDRealtion{def main(args: Array[String]){//SpakrCluster에 등록하기 위한 각종 정보 SparkConf val sparkConf = new SparkConf().setAppName("RDDRelation") // sparkconf를 이용해 SparkContext를 만들고 SparkContext를 SQLContext로 변환 val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc)

import sqlContext._ val rdd = sc.parallelize((1 to 100).map(i=> Record(i, s"val_$i"))) //automatically inferred using scala reflection. rdd.registerTempTable("records") //Once tables have been registered, you can run SQL queries over them. printIn("Result of SELECT *:") sql("SELECT*FROM records").collect().foreach(printIn) //Aggregation query도 지원한다. val count = sql("SELECT COUNT(*) FROM records").collect().head.getLong(0) printIn(s"COUNT(*):$count") //The results of SQL queries are themseleves RDDs and support all normal RDD functions. The // items in the RDD are of type Row, which allows you to access each column by ordinal. val rddFromSql = sql("SELECT key, value FROM records WHERE key < 10") printIn("Result of RDD.map:") rddFromSql.map(row=>s "Key: ${row(0)}, Value: ${row(1)}").collect.foreach(printIn) //Scala DSL을 이용해서 쿼리 작성이 가능하다. rdd.where('key == 1').ordeBy('value.asc').select('key).collect().foreach(printIn) //saveAsParquetFile 를 사용하면 parquet file 형식으로 저장할 수 있다. rdd.saveAsParquetFile("pair.parquet") //ParquetFile 읽기도 가능하다. 스키마는 parquet 파일에 내장됐다. val parquetFile = sqlContext.parquetFile("pair.parquet") //Scala DSL은 parquet파일로 쿼리 수행이 가능하다. parquetFile.where('key==1).select('value as' a).collect().foreach(printIn) //위에서 생성된 파일들을 Table로 기록한다. 매뉴얼과 달리 최신 소스에는 아래와 같이 registerTempTable() 함수를 이용한다. parquetFile.registerTempTable("parquetFile") sql("SELECT * FROM parquetFile").collect().foreach(printIn)}}


하이브 대체 지원

SparkSQL은 Spark 안에서 가장 빨리 발전하고 있는 컴포넌트 중 하나다. 그러나 아직까지 하이브에서 지원하는 기능을 모두 지원하지는 않는다. 일반적인 SQL문이나 Operator, 심지어 UDF와 SerDes도 지원하므로 지원하지 않는 대표적인 몇 가지만 뽑아 보겠다.


  • Buckets
  • Non-equi OuterJoin, Unique Join
  • Single Query multi insert
  • Hadoop archive
  • Automatically map join
  • Automatically number of reducers
  • Merge multiple small file