데이터실무

DA, SQL, DB보안 등 실무자를 위한 위한 DB기술 바이블!

하이브

데이터 처리
SQL On Hadoop
하이브
작성자
admin
작성일
2021-02-15 14:10
조회
4670

맵리듀스의 대안 하이브와 피그

맵리듀스의 복잡성

하이브는 하둡에서 맵리듀스를 직접 돌리는 대신, 사용자가 SQL(Simple Query Language)로 쿼리를 작성하면 이것을 자동으로 맵리듀스 작업으로 변경해주는 쿼리 엔진이다. 맵리듀스는 기본적으로 자바 기반의 프레임워크이므로 기존 데이터베이스나 DW를 통해 분석을 하던 개발자가 아닌 사용자들에게는 다소 어렵게 느껴질 수 있다. 하이브는 이것을 극복하도록 개발된 하둡 기반 쿼리엔진이다. 이 엔진을 사용하면 맵리듀스를 작성하지 않고 쿼리 언어만으로 하둡의 비정형 데이터 분석이 가능하다.
맵리듀스를 처음 배울 때, 가장 많이 나오는 예제는 WordCount다. 가장 간단하고 맵리듀스의 작동원리를 가장 잘 표현해줄 수 있는 코드이기 때문이다. 다시 한 번 간단하게 원리를 설명한다. 레코드리더가 매퍼로 데이터를 넘겨주면, 매퍼가 이것을 각 워드 단위로 키(key)와 값(value)으로 나눈다. 이어서 리듀서가 이것을 워드 당 빈도를 세서 워드 당 카운트를 만드는 흐름으로 코드가 작동한다.

[그림 Ⅲ-2-4] 데이터 예제

[그림 Ⅲ-2-4] 데이터 예제

이제 WordCount보다 약간의 요건을 추가해 위 데이터를 바탕으로 ‘18세에서 25세 연령대의 사용자가 가장 많이 방문하는 사이트 5개를 맵리듀스 코드로 작성해 찾으라’는 분석 요청이 들어왔다고 가정해 보자. 아직 맵리듀스에 익숙하지 않다면, 처음부터 쉽게 방법을 찾을 수는 없을 것이다. 일단 데이터가 있으므로 고심 끝에 다음과 같은 로직을 구상할 수 있다.

[그림 Ⅲ-2-5] 분석 샘플로직

[그림 Ⅲ-2-5] 분석 샘플로직

자 이제 이것에 대해 맵리듀스만 작성하면 된다. 샘플로 작성된 코드는 다음과 같다.

[그림 Ⅲ-2-6] 샘플 맵리듀스

[그림 Ⅲ-2-6] 샘플 맵리듀스

논리적으로는 아주 간단한 로직인 것 같으면서도 실제로 구현하려면 위와 같이 많은 코드가 필요하다. 맵리듀스 코드 자체보다도 코드의 복잡성과 양을 보여주는 것이 목적이므로 가독성 있게 코드를 표시하지는 않고 위와 같이 한 화면에 표시했다.
하지만 힘들게 코드를 만들었는데 갑자기 요구사항이 ‘남성과 여성이 좋아하는 상위 5개의 사이트를 각각 뽑는 맵리듀스를 작성하라’고 바뀌었다고 해보자(데이터 분석이란 항상 그렇다. 일단 한 가지를 보자마자 바로 다른 것을 보고 싶어 한다). 하둡이 비록 기존보다는 훨씬 쉽게 분산 처리와 분석을 할 수 있게 만들어 줬지만, 개발자 중심의 프레임워크이고 개발자들도 맵리듀스라는 개념에 익숙하지 않으면 구현하기 힘들다. 따라서 일반적으로 맵리듀스를 이용해 데이터 분석을 할 때 다음과 같은 문제가 있다.


  • 사용자보다는 개발자 중심이라서 복잡
  • 중복된 코딩과 반복적인 노력의 지속
  • 실행을 위한 버전 관리, 환경 설정의 복잡성

이런 복잡함을 줄이기 위해 사용자별로 하둡 맵리듀스를 자동으로 작성해 주는 고차원 언어를 만들어냈다. 야후(yahoo)는 Pig라는 고차원 언어를 만들어 냈고, 페이스북은 하이브라는 쿼리엔진을 만들어 냈다


아파치 피그

아파치 피그는 데이터 처리를 위한 고차원 언어다. 야후에서 개발한 Pig는 야후 데이터 분석의 30%정도를 담당하고 있다. 2007년에 처음 등장한 이후로 꾸준히 업데이트 되고 있으며, 현재는 순수 맵리듀스 대비 약 80% 정도의 성능을 지원한다.
Pig를 사용해 앞에서 나왔던 특정 연령별 톱 5 사이트를 구하는 Pig 스크립트는 다음과 같다.

Users = load 'users' as (name, age); Fltrd = filter Users by age >= 18 and age <=25; Pages = load 'pages' as (User, url); Jnd = join Fltrd by name, Pages by user; Grpd = grp Jnd by url; Smmd = foreach Grpd generage group, COUNT(Jnd) as clicks; Strd = order Smmd by clicks desc; Top5 = limit Strd 5; store Top5 into 'top5sites';

위에서 나왔던 순수 맵리듀스 코드 대비 길이가 1/20 정도로 줄었다. 언어가 아주 직관적이므로 Pig의 문법을 모르는 사람이 보더라도 대강의 내용을 파악할 수 있을 정도다. 코드가 간결하기 때문에 Pig 사용자는 코드 오류보다 알고리즘에 집중할 수 있다. 무엇보다 코드 길이가 짧아서 사용자가 데이터 분석에 집중할 수 있도록 한다.


하이브

하이브 소개와 아키텍처

하이브는 페이스북 주도로 개발된 패키지다. 페이스북 서비스도 초창기에는 아주 간단한 로그 서버 몇 대와 오라클 클러스터를 연결한 구조로 운영됐다. 하지만 점차 로그 양이 늘어나고 처리해야 할 알고리즘과 모델이 늘어나면서 오라클에 들어가는 라이선스 비용이 부담이 됐다. 그래서 페이스북은 오라클에서 하던 배치 작업을 맵리듀스로 수행하기로 결정한다. 결정을 내릴 당시 기본적인 요구 사항은 다음과 같았다고 한다.


  • 사용자를 위한 CLI 제공
  • 코딩 없이 Ad-hoc 질의를 할 수 있는 기능 제공
  • 스키마 정보의 관리기능 지원

즉, 기존에 사용하던 데이터베이스와 아주 유사한 환경의 분석 플랫폼을 원했던 것이다. 이러한 요구 사항을 바탕으로 만들어낸 것이 바로 하이브다.

[그림 Ⅲ-2-7] 하이브 내부 구조도

[그림 Ⅲ-2-7] 하이브 내부 구조도


  • Metastore: 하이브에서 생성한 테이블 스키마를 저장하는 곳. 테스트용으로는 더비(derby) 임베디드 데이터베이스가 사용되고, 실제 서비스용으로는 MySQL과 같은 데이터베이스를 사용할 수 있다.
  • HiveQL: 사용자로부터 입력 받은 쿼리를 분석해 쿼리 수행 계획을 작성하고 이에 따르는 멥리듀스 코드를 생성한다.
  • 맵리듀스 작업: HiveQL에 의해 생성된 맵리듀스로 맵리듀스 작업이 수행된다. TS/SelOperator들을 수행할 수 있도록 코드가 생성된다. SerDe라는 추상 계층을 통해 계산에 필요한 데이터를 맵리듀스의 인풋 아웃풋 포매터로부터 읽어 들인다.

[그림 Ⅲ-2-8] 하이브의 워크플로(Hive Wiki)

[그림 Ⅲ-2-8] 하이브의 워크플로(Hive Wiki)

하이브의 내부 절차는 아래와 같다.


  • executeQuery: 사용자가 쿼리를 실행한다.
  • getplans : 드라이버(Driver)는 컴파일러(Compiler)에 쿼리 플랜을 요청한다.
  • getMetaData : 컴파일러는 쿼리를 파싱(parsing)한 후 의미분석기와 로지컬 플랜 생성기를 거친 후 맵리듀스 코드를 생성한다. 이 과정에서 하이브 테이블의 컬럼 정보와 컬럼 매핑(metadata) 정보를 Metastore로부터 가져온다.
  • sendMetaData: 컴파일러에서 요청한 테이블의 메타정보를 전달한다.
  • sendPlans: 컴파일러의 결과물인 맵리듀스와 플랜을 실행엔진(execution engine)으로 전달한다.
  • executePlan: 컴파일러에서 생성한 플랜을 수행한다.
    1. - DDL 작업을 위해서 메타데이터에 접근한다. 더불어 job.xml을 생성해 쿼리 플래너가 작성한 맵리듀스 작업을 수행한다.
    2. - 맵리듀스 작업이 완료되면 실행엔진에 시그널을 보낸다.
    3. - 맵리듀스 결과 파일을 쿼리 시 설정한 테이블이나 파일 시스템에 저장한다.
  • fetchResult: 쿼리 결과를 요청하면 실행엔진(8:sendResults)과 파일 시스템을 거쳐서 (9:fetchResults)결과를 가져온다.
일반 RDBMS와 하이브의 차이점

일반 RDBMS와 비교했을 때 하이브의 단점은 다음과 같다.


  • 쿼리 응답 속도가 느리다.(작은 데이터 기준)
  • 레코드 단위 Insert, delete, update를 지원하지 않고, 트랜잭션도 지원하지 않는다. 그래서 이런 작업을 할 때는 case문 등을 이용해 복잡하게 구현해야 한다.
  • 통계정보도 바로 확인할 수 없으며, 인풋 데이터의 오류도 바로 확인할 수 없다.

이와 같이 RDBMS에 비해 하이브의 단점들이 너무 많아 보인다. 그러나 이와 같은 단점들 중에 몇 가지는 빅데이터를 처리하기 위한 어쩔 수 없는 설계이기도 하다. 이러한 설계가 빅데이터 처리시에 장점으로 작용하기도 한다. RDBMS와 하이브는 데이터를 관리함에 있어서 근본적인 차이가 있다.


Schema On WRITE(RDBMS)

Schema On WRITE는 데이터를 입력할 때(Write) 테이블의 스키마 형태를 미리 선언하고, 스키마에 맞게 넣으면서 데이터 형태나 컬럼 등을 점검한다. 그러므로 일반적인 RDBMS에서는 테이블 스키마를 선언하기 전까지 데이터를 넣을 수 없다. 물론 파일 형태로 그대로 넣을 수는 있겠지만, 데이터를 제대로 읽을 수 없다. 또한 테이블 스키마가 변경되면, 테이블을 드롭하고 데이터를 리로드해야 한다.
물론 이와 같은 것이 기존처럼 작은 데이터일 때는 거의 문제가 되지 않는다. 하지만 처리해야 할 데이터들이 수백 TB라면, 그리고 foreign key가 변경됐다면 어떻게 될까? 수백 TB를 완벽하게 테이블 설계를 할 때까지 미리 넣을 수도 없고 급하게 설계해서 넣었다고 하더라도 다시 수정하려면 문제가 된다. 또한 테이블 설계를 변경하려는데 하필 변경해야 하는 key가 외래키(foreign key)라면 그와 연결되는 테이블은 모두 다시 적재해 줘야 한다. 그런데 위에서 이 데이터가 수백 TB라고 가정했다. 수백 TB를 다시 적재하려면 굉장한 시간과 노력이 들어갈 것이다. 이제 하이브에서는 어떻게 처리하는지 보자(이 처리 방법은 대부분의 SQL-On-Hadoop에서도 비슷하다).


Schema On READ(하이브)

Schema On WRITE는 데이터를 읽을 때(Read) 스키마 형태에 맞게 됐는지 확인하고 최대한 유연하게 데이터를 읽어온다. 예를 들어 데이터가 선언된 테이블의 컬럼 수보다 작으면 null로 처리하거나 데이터형을 임의로 바꿔서 처리해 준다.
그러므로 하이브에서는 테이블 스키마를 선언하기 전에도 데이터를 HDFS에 넣을 수 있다. 미리 데이터를 넣어 놓고 테이블을 생성ㆍ분석을 하면서 변경하더라도 상대적으로 문제가 되지 않는다. 따라서 데이터 사이즈가 크고 비정형 데이터가 많아서 데이터의 타입이나 컬럼들이 명확하지는 않을때 유연하게 대처하기 위해서 Schema On Read를 사용하게 됐다. 물론 이 방식이 만능은 아니지만, 빅데이터 처리를 할 때 장점이 많아서 대부분의 SQL-On-Hadoop들이 이 방식을 지원한다.


하이브 설치
하이브 패키지 설치

하이브를 설치하기 위한 선결조건은 아래와 같다.


  • 리눅스 또는 맥(OSX) 시스템
  • 자바(java) 1.6 이상
  • 하둡(hadoop) 1.0 이상
  • Tez 엔진 사용 시 하둡 2.0 이상

여러 가지 변형을 통해 다른 시스템에서도 활용할 수 있는 방법도 있지만, 위 사양이 가장 기본조건이다. 보통 하이브를 설치하기 위해서는 하이브 사이트(hive.apache.org)의 다운로드 메뉴에서 패키지를 내려 받아서 설치할 수 있으며, Ambari와 같은 자동 패치키 설치 소프트웨어를 이용할 수도 있다. 여기에서는 가장 기본적인 방법인 직접 다운로드를 받아서 설치하는 방법으로 진행하겠다.
하이브뿐 아니라 대부분의 오픈 소스들의 파일을 다운받을 때 주의할 점은 파일명 중간에 bin이 들어간 버전을 받아야 한다는 것이다. 일반적인 경우 bin이 붙은 파일과 붙지 않은 파일의 차이점은 소스코드 형태의 패키지와 실행 가능한 패키지(압축 파일 이름에 bin이 붙어있음)의 차이다. 오픈소스의 장점이 내부 구조를 파악할 수 있고, 필요 시 소스코드를 수정해 기능을 바꿀 수 있다는 점이다. 그렇기 때문에 오픈소스 프로젝트들은 소스버전과 그 소스를 컴파일한 실행가능 버전 2가지로 배포한다. 또한 사용자 환경에 따라서 참조 라이브러리를 다르게 설정해서 직접 컴파일을 해야 하는 경우가 있는데 이 경우에도 소스 버전이 필요하다.
여기에서는 일반적인 환경에서 테스트하기 위해서 빌드된 버전을(이름에 bin이 포함된) 받도록 한다. 파일을 다운 받은 이후에는 tar 명령어를 사용해 압축을 해지한다. 그리고 버전관리를 용이하게 하기 위해 심볼릭링크를 생성한다. 이는 윈도우에서 ‘바로가기’와 비슷한 역할을 한다.
설치 위치는 실습을 간단하게 하기 위해서 하둡 계정의 홈 디렉터리(/home/hadoop)로 하는데, 실제 업무에서는 다양한 계정의 사람들이 사용하기 위해 /usr/local에 설치하기도 한다. 자동설치의 경우 라이브러리 파일과 실행파일, 환경설정 파일들을 각각 다른 경로에 넣기도 한다.

// http://mirror.apache-kr.org/hive에서 다운로드 $ wget http://mirror.apache-kr.org/hive/hive-0.13.1/apache-hive-0.13.1-bin.tar.gz $ tar - xvfz hive-0.13.0-bin.tar.gz $ ln- s hive-0.13.0-bin hive

그리고 HIVE_HOME이라는 환경변수에 방금 설치한 하이브의 경로를 사용자의 bash_profile에 입력한다. 만일 하둡이 관련 설정이 없다면, HADOOP_HOME이라는 환경변수를 추가로 입력한다. $HADOOP_HOME이라는 변수가 입력돼 있지 않다면, 하이브가 테스트를 수행할 때 하둡 관련 정보를 찾을 수 없으므로 주의하자.

$vi ~/.bash_profile / export HIVE_HOME=/home/hadoop/hive, / export HADOOP_HOME=/home/hadoop/Hadoop


메타 데이터용 데이터베이스 설정

하이브는 테이블 데이터를 저장하기 위한 별도의 데이터베이스를 사용한다. 디폴트값은 더비(Derby)라는 임베디드 파일형 데이터베이스다. 이 데이터베이스의 경로를 다음과 같이 절대경로(full path)로 바꿔 준다. 그렇지 않을 경우에 하이브 명령어를 수행하는 경로마다 데이터베이스 파일을 생성하고 사용하므로, 만들어 두었던 하이브 테이블이 없어지는 일이 발생한다.
먼저 $HIVE_HOME/conf 아래의 hive-default.xml.template 를 hive-site.xml로 이름을 바꿔 저장한 다음, derby database 항목을 찾아 아래와 같이 변경한다.

$ cd $HOME_HIVE/conf $ cp hive-default.xml.template hive-site.xml $ vi hive-site.xml 변경 전 <property> <name>javax.jdo.option.ConnectionURL</name><value>jdbc:derby:;databaseName=metastore_db;create=true</value><description>JDBC connect string for a JDBC metastore</description></property>변경 후 <property><name>javax.jdo.option.ConnectionURL</name><value>jdbc:derby:;databaseName=/home/hadoop/hive/metastore_db;create=true</value><description>JDBC connect string for a JDBC metastore</description></property>


설치 테스트

설치가 완료됐으면, 다음과 같이 테스트를 해본다. 하이브 설치 디렉터리로 이동해 bin/hive 명령어로 수행해 본다. 메타데이터가 잘 연결돼 있는지 보기 위해 show tables 명령어가 잘 수행되는지 확인한다.

$ bin/hive, / hive> show tables, / OK, / Time taken: 6.391 seconds

아직 테이블을 생성한 것이 없기 때문에 테이블은 보이지 않지만 조회는 성공했다. 그리고 하둡과의 연동을 확인하기 위해서 dfs -ls 명령어를 수행한다.

hive> dfs -ls ; -rw-r--r-- 1 root supergroup 4 2012-10-11 16:29 /user/root/a -rw-r--r-- 1 root supergroup 4 2012-10-11 16:29 /user/root/b drwxr-xr-x -root supergroup 0 2013-10-01 03:00 /user/root/temp drwxr-xr-x -root supergroup 0 2013-10-30 01:24 /user/root/temp2 drwxr-xr-x -root supergroup 0 2013-10-30 08:32 /user/root/temp3 drwxr-xr-x -root supergroup 0 2013-10-01 03:35 /user/root/test2 hive>

정상적으로 설정돼 있다면, 위와 같이 자신이 갖고 있는 하둡의 파일 리스트가 나온다. 만일 나타나지 않으면, 환경 설정파일에서 HADOOP_HOME 설정이 잘 돼 있는지 확인해 본다.


하이브 실습
DDL 테스트 실습

테이블을 생성하기 위해 하이브의 DDL(Data Definition Language)을 사용해 테이블을 생성하는 실습을 한다. 일반적인 데이터베이스의 DDL과 크게 다르지 않다. 생성된 테이블을 조회하기 위해서는 describe 또는 describe extended로 확인할 수 있다.

// 테이블 생성 hive> CREATE TABLE pokes (foo INT, bar STRING); hive> CREATE TABLE invites (foo INT, bar STRING) PARTITIONED BY (ds STRING); hive>describe invites; OK foo int bar string ds string Time taken: 0.152 seconds //테이블 상세 정보 조회(각 컬럼뿐만 아니라 저장 위치나 파티션 정보들을 볼 수 있다.) hive>describe extended invites; OK foo int bar string ds string Detailed Table Information Table(tableName:invites, dbName: default, owner: root, createTime: 1384689115, lastAccessTime: 0, retention: 0, sd: StorageDescriptor(cols: [FieldSchema(name: foo, type: int, comment: null), FieldSchema(name: bar, type: string, comment: null), FieldSchema(name: ds, type: string, comment: null)], location: hdfs://vm2:9000/user/hive/warehouse/invites, inputFormat: org.apache.hadoop.mapred.TextInputFormat, outputFormat: org.apache.hadoop.mapred.TextInputFormat, outputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets: -1, serdeInfo: SerDeInfo(name: null, serializationLib: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters: {serialization.format=1}), bucketCols: [], sortCols:[], parameters: {}), partitionKeys: [FieldSchema(name:ds, type: string, comment: null)], parameters: {transient_lastDdlTime=1384689115}, viewOriginalText:null, viewExpendedText:null, tableType:MANAGED_TABLE) Time taken: 0.063 seconds hive>


데이터를 활용한 실습

이번 실습에서는 나스닥과 뉴욕 증권거래소의 데이터를 사용해 2010년도의 평균 주가 상위 5개업체를 찾는 방법에 대해 실습하겠다. 이 예제에서 사용할 샘플 데이터는 인포침스에서 (www.infochimps.com)에서 제공하는 나스닥(NASDAQ) 거래량 데이터다. 인포침스의 웹사이트에 접근한 다음, resource 탭의 datamarket place를 선택한다. 이어서 search 항목에서 NASDAQ을 입력하면, 필요한 데이터가 검색된다. 해당 페이지를 찾을 수 없을 경우 https://github.com/reillywatson/nasdaq-outliers에서 받으면 된다

[그림 Ⅲ-2-9] 나스닥 거래량 입력 화면

[그림 Ⅲ-2-9] 나스닥 거래량 입력 화면

데이터를 다운로드한 후 압축을 푼다. 그 다음 하둡에 user/hadoop/stock이라는 디렉터리를 만든다. 마지막으로 압축을 푼 데이터 중에 NASDAQ_daily로 시작하는 데이터를 하둡으로 올린다.

$ hadoop fs -mkdir /user/hadoop/stock, / $ hadoop fs -put NASDAQ_daily_prices_* /user/hadoop/stock/

이번에 사용할 데이터의 구조는 아래와 같다. 이 데이터에 상응하는 하이브 테이블을 생성하기 위해 하이브 스크립트를 작성한다.

[그림 Ⅲ-2-10] NASDAQ data 샘플

[그림 Ⅲ-2-10] NASDAQ data 샘플

하이브용 스크립트(stock.hql)를 작성한다.

CREATE EXTERNAL TABLE IF NOT EXISTS stocks ( …… ① exchange STRING, symbol STRING, ymd STRING, price_open FLOAT, price_high FLOAT, price_low FLOAT, price_close FLOAT, volume INT, price_adj_close FLOAT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','......② location '/user/hadoop/stock'; ......③


  1. ① 하이브용 외부 테이블(external table)을 생성하는 명령어. 테이블 생성 시 external을 붙이면, 데이터를 하이브가 관리하는 하둡 디렉터리로 옮길 필요가 없어서 간단히 테이블을 생성할 수 있다. 데이터의 위치는 테이블 생성 마지막에 location으로 지정해 준다.
  2. ② 하이브 테이블에서 컬럼의 구분자를 지정. 여기서 사용하는 샘플의 경우 콤마로 구분하므로 ,를 입력해 줬다.
  3. ③ 하이브가 사용할 하둡 데이터 위치. 테이블 생성시 external로 지정했기 때문에 location 을 사용했다. 테이블을 external로 지정하지 않을 경우에는 load 명령어를 사용한다.

하이브 스크립트를 완성했으면 커맨드 창에서 hive.cmd를 수행한다.

$ hive -f stock.hql Hive history file = /use/hadoop/history/hive_job_log_root_2013092 40156_344434990.txt OK Time taken: 3.25 seconds

명령어가 정상적으로 수행됐다면, 하이브에서 테이블이 생성된 것을 describe 명령어로 확인할 수 있다.

$ bin/hive -e "describe extended stocks" OK exchange string symbol string ymd string price_open float price_high float price_low float price_close float volume int price_adj_close float Detailed Table Information Table(tableName:stocks, dbName: default, owner: hyejung, createTime: 1379955366, lastAccessTime: 0, retention: 0, sd: StorageDescriptor(cols: [fieldSchema(name: exchange, type: string, commet: null), FieldSchema(name:symbol, type: string, comment: null), FieldSchema(name: ymd, type: string, comment: null), FieldSchema(name: price_open, type: float, comment:null), FieldSchema(name: price_high, type: float, comment: null), FieldSchema(name: price_low, type: floar, comment: null), FieldSchema(name: price_close, type: float, comment: null), FieldSchema(name: volume, type: int, comment: null), FieldSchema(name: price_adj_close, type: float, comment: null)], location: hdfs://localhost:8020/user/hadoop/stock, inputFormat: org.apache.hadoop.mapred.TextInputFormat, outputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed: false, numBuckets: -1, serdeInfo: SerDeInfo(name: null, serializationLib: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, paramaters: {serialization.format=,, field.delim=,}), bucketCols:[], sortCold:[], parameters:{}),partitionKeys: [], parameters: {EXTERNAL=TRUE, transient_lastDdlTime=1379955366}, viewOriginalText:null, viewExpandedText: null, tableType: EXTERNAL_TABLE) Time taken : 0.737 seconds

하이브 테이블이 잘 생성됐다. 그렇다면 하둡 사용자 화면으로 돌아가서 하이브 작업을 수행해 보자. 여기에서는 애플의 장종가가 50달러를 넘었던 연도를 뽑아보는 것을 예제로 수행해 본다. 전체 쿼리문은 아래와 같다.

SELECT symbol, avg(price_open) AS openprice FROM stocks WHERE ymd = '2010' GROUP BY symbol ORDER BY openprice DESC limit 5


하이브와 맵리듀스 성능 비교

하이브는 같은 목적으로 작성한 맵리듀스보다 약 20% 속도가 느리다고 했다. 실제로 그런지 확인을 해 보겠다. 먼저 하둡 맵리듀스를 방금 생성한 테이블 테이터가 있는 하둡의 데이터 디렉터리를 대상으로 해 구동한 다음, 시간을 측정한다. WordCount 예제 프로그램은 하둡에 포함된 hadoop-examples에서 사용하고 시간은 time이라는 명령어를 hadoop 명령어 앞에 넣는다. 먼저 WordCount를 수행해보도록 하겠다.

time hadoop jar hadoop-exmples-1.1.3.jar wordcount /user/hive/warehouse/stocks /out/1 13/11/17 23: 30:11 INFO input.FileInputFormat: Total input paths to process : 36 13/11/17 23:30: 11 INFO util.NativeCodeLoader: Loaded the native-hadoop library 13/11/17 23:30:11 WARN snappy.LoadSnappy: Snappy native library not loaded 13/11/17 23:30:11 INFO mapred.JobClient: Running job: job_201311172250_0018 13/11/17 23:30:12 INFO mapred.JobClient: map 0% reduce 0% 13/11/17 23:30:28 INFO mapred.JobClient: map 5% reduce 0% ... 13/11/17 23:33:37 INFO mapred.JobClient: Reduce output records=8487522 13/11/17 23:33:37 INFO mapred.JobClient: Virtual memory (bytes) snapshot = 67939352576 13/11/17 23:33:37 INFO mapred.JobClient: Map output records=8487557 real 3m27.581s user 0m2.018s sys 0m0.303s

WordCount는 3분 27초 정도가 나왔다. 그러면 이번에 하이브로 동일한 내용의 쿼리를 수행한다. 하이브에서 사용하는 사용자 정의 함수 중에서 explode를 사용해 하나의 라인을 단어(word) 단위로 나누고 이것을 group by로 묶어서 단어의 개수를 새어 본다. 이것을 위한 쿼리문용 파일을 (wordcount.hql) 아래와 같이 작성한다.

create table sample ( line string); dfs - cp/user/hive/warehouse/stocks/user/hadoop/sample; load data inpath '/user/hadoop/sample/stocks' into table sample; select word, count(1) as count FROM (select explode(split(line, ',')) as word FROM sample ) w GROUP by word ORDER BY word;

파일을 완성했으면, 역시 다음 명령어로 하이브를 통해 WordCount를 수행해 본다.

[root@vm2 hive]# time hive -f wordcount.q WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files. Logging initialized using configuration in jar: file:/usr/local/hive-0.8.1/lib/hive-common-0.8.1.jar!/hive-log4j.properties Hive history file=/tmp/root/hive_job_log_root_201311172342_1640623017.txt OK exchange 72 stock_price_adj_close 72 stock_price_close 72 stock_price_high 72 stock_price_low 72 stock_price_open 72 stock_symbol 72 stock_volume 72 Time taken: 201.353 seconds

생각과는 달리 하이브의 결과가 더 빨랐다. 여기서 알 수 있는 것은 적당히 맵리듀스로 로직을 개발한다면 오히려 하이브보다 더 느릴 수 있다는 점이다. 그래서 처음에는 하이브나 Pig 같은 툴로 빠르게 개발해 적용한 다음, 필요 시 맵리듀스를 직접 개발하는 것이 훨씬 효과적이다.


중첩쿼리 실습

애플의 평균 주가가 50달러가 넘었던 연도를 알아보자. 먼저 다음과 같은 쿼리를 통해 작업을 수행한다.

hive >select year(ymd), avg(price_close) as test from stocks where symbol=‘AAPL’ group by year(ymd) having test>50.0 Ended Job = job_201311172250_0021 MapReduce Jobs Launched: ... Job 0 : Map: 2 Reduce : 1 Accumulative CPU: 15.55 sec HDFS Read: 481100881 HDFS Write: 257 SUCESS Total MapReaduce CPU Time Spent: 15 seconds 550 msec OK 1987 53.88968399108163 1991 52.49553383386182 1992 54.80338610251119 1999 57.77071460844949 2000 71.74592876261757 2005 52.401745992993554 2006 70.81063753105255 2007 128.27390423049016 2088 141.9190115054888 2009 146.81412711976066 2010 204.72159912109376 Time taken: 67.491 seconds

동일한 내용의 쿼리를 이번에는 다음과 같이 중첩쿼리(Sub Query)로 수행해 본다.

hive> SELECT s2.year, s2.avg FROM > (SELECT year(ymd) AS year, avg(price_close) AS avg FROM stocks > WHERE exchange = 'NASDAQ' AND symbol = 'AAPL' > GROUP BY year(ymd)) s2 > WHERE s2.avg> 50.0; MapReduce Jobs Launched: .. Job 0 : Map: 2 Reduce : 1 Accumulative CPU: 16.11 sec HDFS Read: 481100881 HDFS Write: 257 SUCESS Total MapReduce CPU Time Spent : 16 seconds 110 msec OK 1987 53.88968399108163 1991 52.4955383386182 1992 54.80338610251119 1999 57.77071460844979 2000 71.74892876261757 2005 52.401745992993554 2006 70.81063753105255 2007 128.27390423049016 2008 141.9790115054888 2009 146.81412711976066 2010 204.72159912109376 Time taken: 67.967 seconds

아주 근소하게 중첩쿼리를 쓰지 않는 쪽의 성능이 빨랐다. 필자는 1개의 컴퓨터로 만든 하둡 노드에서 데이터를 처리했으므로 성능 차이를 크게 느끼기 힘들었지만, 대개의 경우 중첩쿼리로 처리하는 경우에 성능이 이 더 좋다. 왜냐하면 중첩쿼리를 사용할 경우 첫 번째 스테이지에서 맵으로만 구성된 작업(map only) 을 수행하고, 리듀스 작업을 수행하지 않기 때문이다. 그만큼 디스크 I/O가 줄어든다.


색인을 사용한 하이브 가속화

대개의 경우 데이터베이스는 특정 테이블이나 데이터베이스에 색인(index)을 생성해서 검색 속도를 빠르게 한다. 색인을 사용할 경우 아래와 같은 장점이 있다.


  • - 데이터베이스의 특정 컬럼에 대해 검색 가능한 형태인 자료구조를 만드는 것
  • - I/O의 범위를 전체적으로 줄여주는 역할을 함
  • - 색인 자체가 기본적인 정렬이 돼 있으므로 검색 시간도 줄어듦

하이브도 자신이 관리하는 테이블에 색인을 생성해서 검색속도를 빠르게 할 수 있다. 아래와 같이 CREATE INDXE라는 명령어를 통해 앞 절에서 생성한 stocks 테이블에 symbol 컬럼을 기준으로 색인을 생성해 보자.

hive> CREATE INDEX simple_index >ON TABLE stocks(symbol) >AS 'org.apache.hadoop.hive.ql.index.compactIndexHandler' >WITH DEFERRED REBUILD

WITH DEFERRED REBUILD 옵션을 사용해서 색인 테이블 생성 즉시 색인을 만드는 게 아니라 alter와 같은 명령어로 테이블을 변경할 때 색인을 작성하도록 했다.

hive> alter index simple_index on stocks rebuild;

위 명령어로 색인을 실제로 생성한다. 이 작업을 수행할 때 혹시 memory overflow error가 발생할 수도 있다. 맵리듀스 작업 시에 많은 메모리를 사용하기 때문에 발생하는 에러다. 하둡으로 돌아가서 태스크 트레커가 생성하는 자식(Child) 프로세스의 메모리 양을 늘려주자.
이 옵션은 하둡의 mapred-site.xml에서 mapred.child.java.opts 옵션을 찾은 다음, 1024m로 수정해 줘야 한다.

# cd $HADOOP_HOME/conf #vi mapred-site.xml <property><name>mapred.child.java.opts</name><value>-Xmx1024m</value><description>Java opts for the task tracker child processes.</description></property>

파일을 수정한 다음 하둡 프로세스를 다시 수행해 줘야 한다. 그렇지 않으면, 변경한 옵션이 반영되지 않는다.

$ stop-all.sh, / $ start-all.sh

색인(Index)를 다 생성했으면, symbol 컬럼에서 쿼리를 수행할 주식 이름을 뽑아내 별도의 하둡 파일로 저장하자. 아래의 명령어는 생성한 인덱스 중에서 AAPL이라는 이름을 가진 색인정보를 /tmp/index_test_result라는 파일로 저장하는 명령어다.

hive> INSERT OVERWRITE DIRECTORY "/tmp/index_test_result" SELECT '_bucketname' , '_offsets' FROM default__stocks_simple_index__where symbol='AAPL';

이 명령어를 잘 수행했으면, 하둡의 /tmp/ 아래에 해당 파일이 생성된 것을 확인할 수 있다.

$ hadoop fs -ls / tmp Warning: $HADOOP_HOME is deprecated. Found 2 items drwxr-xr-x -root supergroup 0 2013-11-18 00:34 /tmp/hive-root drwxr-xr-x -root supergroup 0 2013-06-19 11:27 /tmp/index_test_result

이제 하이브가 이 색인 파일을 사용해 쿼리를 사용할 수 있도록 다음과 같은 설정한다. 다만 정보는 영속적이지 않으므로 hive 셀 안에서 수행해야 하고 로그아웃 하지 않도록 주의한다. 이것을 막기 위해 이러한 정보들은 사용자의 홈 디렉터리 아래에 .hiverc 파일을 작성해 저장한다 그렇지 않으면 hive 명령어를 내릴 때마다 입력해야 한다.

hive> SET hive.index.compact.file=/tmp/index_test_result; / hive> SET hive.input.format=org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat;

색인을 사용할 준비가 됐다면, 이제 아래와 같은 쿼리를 한번 수행해 보자. 그리고 색인을 쓰지 않고 쿼리를 수행할 때와 시간을 비교해 보자.

hive> SELECT s2.year, s2.avg FROM (SELECT year(ymd) AS year, avg(price_close) AS avg FROM stocks WHERE symbol = 'AAPL' GROUP BY year(ymd)) s2 WHERE s2.avg>50.0;

하이브의 색인은 데이터베이스의 색인과는 좀 다르다. 데이터베이스는 자료가 입력될 때마다 색인을 갱신하지만, 하이브는 데이터의 업데이트나 입력과는 전혀 관계없이 alter라는 명령어를 명시적으로 수행하지 않으면 갱신되지 않는다. 그리고 갱신할 경우에도 변경된 부분만 별도로 계산하는 것이 아니라, 테이블이 관리하는 전체 데이터를 갖고 하기 때문에 시간이 오래 걸리고 번거롭다. 이유는 위에서 설명했듯이 Schema On Read 방식으로 구현됐기 때문이다.
하지만 위의 예에서와 같이 미리 한 번 만들어 놓으면, 속도가 빨라지는 것은 사실이므로 동일 테이블에서 특정 컬럼이나 데이터를 기준으로 쿼리를 반복적으로 수행할 경우에는 색인을 생성해 놓고 작업을 수행하는 것도 고려해 볼 일이다.


파티셔닝

일반적으로 DW에서 메인 로그의 특정일별 혹은 시간별 분석을 하는 경우가 많다. 그런데 데이터가 시계열로 쌓이고 특정 일자의 데이터를 찾기 위해서 전체 데이터를 검색해야 한다면 매우 비효율적일 것이다.
이러한 요건을 해결하기 위해서 파티셔닝(Partitioning)이라는 개념이 나왔다. 데이터마다 날짜 표시를 해놓고 원하는 시간대가 아닌 데이터는 분석 대상에서 아예 제외시키는 방법이다. RDBMS에도 있는 기능이지만 하이브에서는 더 직관적으로 돼 있다.
하이브는 HDFS를 Read해서 분석을 하는데, 이때 테이블마다 디렉터리가 할당됐으므로 쉽게 읽어올 수 있다. 하지만 파티셔닝은 이 디렉터리 안에 추가로 원하는 컬럼 값의 디렉터리를 만드는 것이다(이 경우에는 날짜 값). 그렇게 되면, 모든 데이터를 읽어와서 날짜 컬럼을 비교ㆍ분석할 필요 없이 디렉터리 이름만 비교해 원하는 데이터만 추려 내면 훨씬 간단해진다. 이것이 파티셔닝의 기본 원리다. 예제를 보도록 하자.



파티셔닝 테이블 생성


CREATE TABLE sales(sales_order_id BIGINT, order_amount FLOAT, order_date STRING, due_date STRING, customer_id BIGINT) PARTITIONED BY (country STRING, yera INT, mount INT, day INT) ;


데이터 삽입


방법 1> INSERT INTO sales PARTITION (country = 'KR', year = 2014, month = 05, day = 30) SELECT ... FROM source_table WHERE country = 'KR' AND year = 2014 AND month = 05 AND day = 30;

방법 2> LOAD DATA LOCAL INPATH ‘/data/sales/20140530.txt’ OVERWIRTE INTO TABLE sales PARTITION (country = 'KR', year = 2014, month = 05, day = 30) 방법 3> ALTER TABLE sales ADD PARTITION (country = 'KR', year = 2014, month = 05, day = 30) LOCATION 'hdfs://localhost/data/sales/20140530.txt'


파티션 정보와 디렉터리 구조

HDFS의 디렉터리 형태로 관리를 하므로 사용자 입장에서 다양한 조작을 할 수 있다. 파티션 생성시 디렉터리 이름을 명시하지 않았을 때는 어떻게 될까? /user/hive/warehouse/sales/country=kr/year=2014/month=05/day=30와 같이 직관적으로 HDFS에 디렉터리 형태로 저장된다.

show partitions sales; hdfs dfs -ls -R /user/hive/warehouse/sales


다중 입력 + 정적(static) 파티션


FROM source_table st INSERT OVERWRITE TABLE sales PARTITION(country = 'KR', year = 2014, month = 05, day = 30) SELECT * WHERE(st.country = 'KR', st.year = 2014, st.month = 05, st.day = 30) INSERT OVERWRITE TABLE sales PARTITION(country = 'KR', year = 2013, month = 06, day = 30) SELECT * WHERE (st.country = 'KR', st.year = 2013, st.month = 06, st.day = 30)

- 동적(dynamic) 파티션

INSERT OVERWRITE TABLE sales PARTITION(country, year, month, day) SELECT ..., st.country, st.year, st.month, st.day FROM source_table st;

- 파티션 변경

파티션 삭제> ALTER TABLE sales DROP IF EXISTS PARTITION (country = 'KR', year = 2014, month = 05, day = 30); 파티션 추가 > ALTER TABLE sales ADD PARTITION (country = 'KR', year = 2014, month = 05, day = 30) LOCATION 'sales/kr/2014/05/30; 디렉터리 위치 변경> ALTER TABLE sales PARTITION (country = 'KR', year = 2014, month = 05, day = 30) SET LOCATION 'sales/kr/2014/05/30;

마지막으로 파티션을 사용할 때 주의할 점은 파티션이 너무 많아지면 오히려 성능이 떨어질 수 있고, select를 할 경우 where 조건을 주지 않으면 모든 파티션을 읽기 때문에 set hive.mapred. mode=strict와 같은 옵션을 주는 것도 좋다.


Bucketing

데이터 세트를 좀 더 관리가 편리한 조각으로 분리하는 작업으로 파티션 값을 hash해 미리 설정된 버켓에 저장한다. 일반적으로 하나의 파일로 관리가 된다. Bucketing을 하면 Join을 하거나 샘플링 작업을 할 경우 성능향상을 시킬 수 있다.



Bucketing 테이블 생성


CREATE TABLE sales ( sales_order_id BIGNT, order_amount FLOAT, order_date STRING, due_date STRING, customer_id BIGINT ) PARTITIONED BY (country STRING, year INT, mount INT, day INT); CLUSTERED BY (customer_id) INTO 96 BUCKETS;


Map-side Join 및 샘플링을 하는 데 도움을 준다.


SELECT * FROM sales TABLESAMPLE (BUCKET 1 OUT 96 )



Store

압축

압축은 데이터의 물리적 크기를 줄여준다. 압축을 하면 CPU 로드가 더 발생하지만, 네트워크 사용량이 줄기 때문에 I/O 사용량이 줄어들어 쿼리 속도가 빨라진다. 그러나 데이터 종류와 쿼리 빈도에 따라 달라지므로 각 상황에 따른 테스트를 해야 한다. 압축 종류에는 Gzip, Snappy, Bzip2, LZO등이 있으며, 맵리듀스와 똑같이 splittable과 압축률 또한 고려해야 한다.

쿼리 옵션 hive.exec.compress.intermediate hive.exec.compress.output 예> set hive.exec.compress.output=true; set mapred.output.compression.codec=org.apache.hadoop.id.compress.GzipCodec; CREATE TABLE customer2 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' AS SELECT * FROM customer


Columnar


No Status City Name
S1 20 London Smith
S2 10 Paris Jones
S3 30 Paris Blake
S4 20 London Clark
S5 30 Athens Adams

위와 같은 테이블이 있다고 가정할 때, 기존 RDBMS와 같이 로우를 기준으로 테이블을 관리하면 다음과 같은 형태로 저장된다.

S1|20|London|Smith;S2|10|Paris|Jones;S3|30Paris|Blake;S4|20|London|Clark;S5|30|Athens|Adams

그러나 컬럼 기준으로 테이블을 관리하면 다음과 같은 형태로 저장된다.

S1|S2|S3|S4|S5;20|10|30|20|30;London|Paris|Paris|London|Athens;Smith|Jones|Blake|Clark|Adams

저장된 형태를 보면 직관적으로 알 수 있듯이 컬럼 형태로 저장하면, 비슷한 종류끼리(컬럼) 묶이게 되기 때문에 데이터 형식도 일치해서 압축률도 더 올라간다. 또한 일반적인 DW는 테이블의 모든 컬럼을 참고하는 것이 아니라, 특정 컬럼 몇 개만 선택하며, 그 컬럼을 Group by이나 sum 등의 작업을 하기 때문에 데이터 seeking 측면에서도 많은 이점이 있다. 물론 데이터 형식뿐 아니라 쿼리 종류에 따라서 속도, 저장 공간(압축률), 스키마 변경의 용이성이 달라진다.

컬럼 형태로 저장하는 방법은 아래와 같으며 ORC 외에도 Rcfile, Parquet 등의 저장 방법이 있다.

CREATE TABLE addresses ( name string, street string, city string, state string, zip int ) STORED AS orc tblproperties ("orc.compress"="SNAPPY");



Join

Shuffle Join

가장 일반적인 Join 방법으로 Key 값을 기준으로 같은 리듀스로 보내되 임의로 섞기(shuffling)를 수행해 처리한다.

[그림 Ⅲ-2-11] Shuffle Join

[그림 Ⅲ-2-11] Shuffle Join


Broadcast Join

스타 스키마가 메모리에 올라올 정도로 작은 디멘젼 테이블들을 사용한다. 모든 노드에 작은 테이블들을 메모리에 복제시켜서 빠르게 조인이 가능하도록 한다. 큰 테이블을 통한 단일 스캔을 하는 방식이다. 일반적인 DW의 스타-스키마 방식의 조인에서 널리 쓰인다.

[그림 Ⅲ-2-12] Broadcast Join

[그림 Ⅲ-2-12] Broadcast Join

사용 방법

select /*+ MAPJOIN(time_dim) */ count(*) from store_sales join time_dim on(ss_sold_time_sk = t_time_sk) set hive.auto.convert.join=true; select count(*) from store_sales join time_dim on(ss_sold_time_sk = t_time_sk)


Sort-Merge-Bucket Join

아래 그림을 Shuffle Join과 비교해 보면, 조인되는 데이터가 순서대로 정렬된 것을 볼 수 있다.
하둡은 기본적으로 하드디스크를 사용하므로 Seek(데이터 탐색)를 위해 디스크의 헤더가 움직이면서 해당 위치를 찾게 된다. 그러므로 이 시간을 줄이면, 훨씬 빠르게 처리가 가능하다. 아래와 같이 데이터가 일정 규칙에 의해서 정렬됐으면, Seek Time을 줄일 수 있다. 또한 Bucket을 이용해서 좀 더 효율적으로 데이터 처리가 가능하다.

[그림 Ⅲ-2-13] Sort-Merge-Bucket Join

[그림 Ⅲ-2-13] Sort-Merge-Bucket Join


조인 전략


[표 Ⅲ-2-1] 조인 전략


Type Approach Pros Cons
Shuffle Join 맵리듀스를 이용해 Key를 기준으로 shuffle해 Join side를 기준으로 조인 어떤 형태의 데이터 크기와 구성에서도 작동함 가장 많은 자원을 사용하며 가장 느린 조인 방식
Broadcast Join 작은 테이블을 모든 노드의 메모리에 올리고 매퍼는 큰 테이블을 읽어서 조인 가장 큰 테이블에서 굉장히 빠른 단일 스캔 작은 테이블이 메모리에 들어갈 정도로 작아야 함
Sort-Merge-B ucket Join Sort로 인접된 Key를 이용해 매퍼에서 효과적인 조인 어떤 크기의 테이블에서도 굉장히 빠름 사전에 자료가 정렬되고 bucketing 돼 있어야 함


Configuration

Strict Mode


hive.mapred.mode=strict

이 옵션을 설정할 경우 아래와 같은 제한이 생긴다.


  • 파티션이 있는 테이블은 where절을 줘야 한다.
  • Limit 절 없이 order by 절을 사용할 수 없다.
  • 카타시안 프로덕트를 막아준다(Where절을 ON으로 자동으로 변경해 주지 않음).
    1. ① select * from a join b where a.id = b.id;
    2. ② select * from a join b on (a.id=b.id);

맵리듀스


  • hive.exec.reducers.bytes.per.reducer: 디폴트로 데이터 사이즈에 따라 리듀스 수를 지정하는데, 이때 활용하는 메모리 값을 설정한다.
  • mapred.reduce.tasks: 리듀스 수를 지정한다.
  • hive.exec.reducers.max : 하나의 쿼리가 너무 많은 자원을 소모하지 않도록 리듀스의 최대 숫자를 제한한다.

병렬처리


  • hive.exec.parallel: TRUE로 하며, 의존관계가 없는 스테이지를 동시에 병렬로 수행한다. 이에 따라 클러스터 자원을 더 많이 사용하게 된다.

Limit 설정

하이브에서 limit 문은 데이터 전체에 대해 쿼리를 수행한 후 일부만 보여주는 방식이다. 아래와 같은 설정을 해줘야 성능상에서도 limit를 통한 이익을 얻을 수 있다.

hive.limit.optimize.enable, hive.limit.row.max.size, hive.limit.optimize.limit.file


Dynamic partition


  • hive.exec.dynamic.partition.mode: strict 모드이면, 최소 한 개의 정적 파티션을 명시해야 한다.
  • hive.exec.max.dynamic.partitions: 생성이 허용되는 최대 동적 파티션 수를 지정한다.
  • hive.exec.max.dynamic.partitions.pernode : 맵리듀스 노드에서 생성이 허용되는 최대 파티션 수를 지정한다.

JVM Reuse


  • mapred.job.reuse.jvm.num.tasks : jvm reuse 횟수 설정(1: 재사용 x, -1: 무한 재사용)
    태스크 자체의 실행시간보다 JVM의 생성 및 초기화가 더 오래 걸릴 때가 있다. 이것을 위해 설정하는 옵션이며, 자바 가상 머신 재사용이 예약된 태스크 슬롯을 잡이 완료할 때까지 점유하고 있다.

조인 순서

작은 테이블에서 큰 테이블 순서로 조인을 해야 속도가 빠르다. 그러나 쿼리를 바꾸기 힘든 경우 STREAMTALBE Hint를 이용해 강제로 조인 순서를 바꿀 수도 있다.

SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON …


복제계수

작은 테이블의 경우 복제 계수를 늘려서 data locality 기회를 높여 접근성을 올려주는 방법도 있다.

hadoop fs -setrep -R ?w 5 /hive/warehouse/sales/kr/



JDBC드라이버를 통한 하이브 연동

하이브는 JDBC를 통해 자바 클라이언트에서 접속할 수 있다. 먼저 하이브 서버를 띄워 둔다. 아래 명령어를 수행하면, 하이브 서버가 수행되고 외부로부터 커넥션을 기다리고 있다.

$ hive --service hiveserver Starting Hive Thrift Server WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.

JDBC용 연동 샘플 코드를 깃 허브로부터 내려 받는다.

$git clone https://github.com/sstratoopen/udf.git

내려 받은 코드를 이클립스에서 로딩하도록 한다. 이클립스에서 File→ import→ existing Maven 순서로 메뉴를 선택한다. 그리고 다운받은 디렉터리 아래의 pom.xml을 선택한다. 그러면 그림과 같이 메이븐 프로젝트가 로드된다.

[그림 Ⅲ-2-14] 프로젝트 로드화면

[그림 Ⅲ-2-14] 프로젝트 로드화면

프로젝트를 로드했으면, pom.xml을 더블클릭하고 dependencies 탭을 선택한다. 여기서 자신의 환경에 맞도록 dependency을 수정한다. 수정방법은 옆의 properties를 선택하고 자신의 환경에 맞는 버전을 입력하면 된다.

[그림 Ⅲ-2-15] 의존성 변경화면

[그림 Ⅲ-2-15] 의존성 변경화면

의존성을 자신의 버전과 다 일치 시켰으면, 소스 코드 중에서 HiveJdbcClient.java를 선택한 후 소스코드를 열어보자.

public static void main(String[] args) throws SQLException { try{Class.forName(driverName);} catch(ClassNotFoundException e) { e.printStackTrace(); System.exit(1); } //192.168.10.5 대신에 자신의 하이브 서버 IP를 입력한다. Connection con = DriverManager.getConnection("jdbc:hive//192.168.10.5:1000/default", "", ""); Statement stmt = con.createStatement(); String tableName = "testHiveDriverTable"; stmt.executeQuery("drop table " + tableName); //key, value 컬럼으로 이뤄진 테이블을 생성한다. 그리고 구분자는 탭으로 나뉜다. ResultSet res = stmt.executeQuery("create table" + tableName + "(key int, value string)" + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'"); //show tables String sql = "show tables'" + tableName + "'"; System.out.println("Running: " +sql); res = stmt.executeQuery(sql); if (res.next()){System.out.println(res.getString(1));} //describe table sql = "describe" + tableName; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); while (res.next()){System.out.println(res.getString(1) + "\t" + res.getString(2));} //데이터를 하이브 테이블로 입력한다. //파일의 경로는 이 클라이언트 예제를 수행하는 로컴 컴퓨터의 경로다. // /home/hadoop/a.txt는 탭으로 구분되는 파일을 아무것이나 미리 생성해 둔다.

String filepath = "/home/hadoop/a.txt"; sql = "load data local inpath" '" + filepath + "' into table " + tableName; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); // select * 쿼리를 수행한다. sql = "select * from" + tableName; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); while (res.next()){System.out.println(String.valueOf(res.getInt(1))+"\t" + res.getString(2));} // regular hive query sql = "select count(1) from " + tableName; System.out.println(String.valueOf(res.getInt(1)) + "\t" + res.getString(2));} //regular hive query sql = "select count(1) from " + tableName; System.out.println("Running: " + sql); res = stmt.executeQuery(sql); while (res.next()){System.out.println(res.getString(1));}}


사용자 정의 함수 예제

[그림 Ⅲ-2-16] 사용자 정의 함수의 종류

[그림 Ⅲ-2-16] 사용자 정의 함수의 종류

사용자 정의 함수(UDF)는 위의 그림과 같이 크게 세 가지로 구분된다. 먼저 Standard Function은 floor(), ucase(), concat()와 같이 간단한 작업을 수행하는 사용자 정의 함수다.
UDAF는 하나 이상의 로우나 컬럼으로부터 데이터를 가져와 계산을 수행하는 함수다. abs(), sum()과 같은 함수가 여기에 속한다. UDAF는 중에 Generic UDAF가 있는데, 이것은 UDAF보다 좀 더 일반적으로 작업을 수행할 수 있도록 복잡한 인터페이스를 상속받아 구현하도록 돼 있다. 마지막으로 UDTF는 사용자 정의 테이블 생성 함수로써 앞서 예제에서 사용한 explode가 여기에 속한다.
이번 예제에서는 UDAF를 제작하는 방법에 대해 설명한다. 앞서 다운 받은 코드에서 Fullname.java를 선택한다. 이 루틴은 aapl이나 AAPL을 APPLE로 변환해 출력하는 코드다.

import java.util.Date; import java.text.SimpleDateFormat; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text; @Description(name="fullname", value = "_FUNC_(name)-for the input string"+"returns Fullname", extended = "Example_Func_('aaple') from src;\n") public class Fullname extends UDF{ public Fullname(){SimpleDateFormat df = new SimpleDateFormat("MM-dd-yyyy");} public Fullname(){ SimpleDateFormat df = new SimpleDateFormat("MM-dd-yyyy");} public String evaluate(String shortname){if("aapl".aquals(shortname) &#124;&#124; "AAPL".equals(shortname)) {return "APPLE";}else return shortname; }

이것이 UDF 전체 코드다. UDF는 하이브 클래스 중에서 UDF를 상속한 다음, evalute() 메소드에 원하는 내용만 프로그래밍해 주면 된다. 그리고 @Description는 하이브 안에서 Describe function했을 때 나오는 설명이다.
코드를 완성했으면 jar 파일 형태로 배포(export)한다. 이클립스에서 프로젝트를 선택한 다음, 오른쪽 마우스를 클릭하면 그림과 같이 메뉴가 나온다. 여기서 export를 메뉴로 jar 파일을 생성한다. 그리고 하이브 창에서 이 jar 파일을 로드하고 사용자 정의 함수를 fullname으로 입력한다.

[그림 Ⅲ-2-17] 이클립스 프로젝트 메뉴화면

[그림 Ⅲ-2-17] 이클립스 프로젝트 메뉴화면

Hive > ADD JAR /full/path/to/fullname.jar; hive > CREATE TEMPORARY FUNCTION fullname AS 'foo.Fullname'; 이렇게 한다음 다음 명령어들로 사용자 정의 함수가 잘 동작하는지 확인해 본다. hive > DESCRIBE FUNCTION fullname; hive > DESCRIBE FUNCTION EXTENDED fullname; hive > SELECT year, fullname(symbol) FROM Stocks where symbol ='aapl';


Apache Tez

Tez 소개

하이브는 맵리듀스를 기반으로 데이터 처리ㆍ분석을 한다고 소개했다. 그러나 맵리듀스 분산 처리를 쉽게 한다는 측면에서 큰 장점이 있지만, 여러 가지 단점 또한 존재한다. 기본적으로 배치 작업을 목표로 했으므로 빠른 응답속도는 기대할 수 없고, 불필요한 쓰기와 같은 속도 저하 요소들이 있다.
그래서 하둡 2.0으로 넘어오면서 이러한 단점을 개선하기 위해 여러 가지 변화가 생겼다. 하둡2.0에는 1장 분산병렬배치처리에서 다뤘던 Yarn이 맵리듀스에서 하던 리소스 관리를 한다. 기존 맵리듀스를 확장ㆍ발전시켜서 데이터 처리ㆍ분석을 하는 새로운 모듈이 이번에 다룰 Tez다.

[그림 Ⅲ-2-18] 하둡 1.0과 하둡 2.0의 구성요소

[그림 Ⅲ-2-18] 하둡 1.0과 하둡 2.0의 구성요소

또한 하둡 1.0에서 맵리듀스는 유일한 데이터 처리 엔진이었으나, 하둡 2.0에서는 데이터 처리 엔진 중 하나일 뿐이다(클라우데라 하둡은 하둡 2.0과 맵리듀스 1.0을 같이 사용할 수도 있고, Tez 모드가 아닌 순수 맵리듀스를 사용할 수도 있다). Yarn 위에서 스톰이나 Hbase와 같은 하둡에 속해 있지 않던 에코시스템들이 하나로 통합 관리가 가능해졌다. 기존의 맵리듀스는 Tez 엔진 위에서 수행되게 됐다. 자세한 내용은 1장 분산병렬배치처리를 참고하고, 여기에서는 간단한 예제와 함께 Tez의 장점에 대해 설명하겠다.

[그림 Ⅲ-2-19] 호튼웍스에서 테스트한 Hive10과 Hive13 벤치마크

[그림 Ⅲ-2-19] 호튼웍스에서 테스트한 Hive10과 Hive13 벤치마크


Tez 실습

Yarn과 Tez가 설치된 환경이라면 Tez를 사용하는 것은 매우 간단하다. 쿼리를 수행하기 전에 혹은 속성값에 아래의 값만 넣어주면 Tez 모드가 적용된다.

set hive.execution.engine=tez;

또한 기존의 로직들 중에 Map-Reduce-Map-Reduce와 같이 반복적으로 맵리듀스가 사용될 경우에는 불필요한 기록이 발생한다. 하지만 Tez에서는 Map-Reduce-Reduce 모델이 가능해 불필요한 기록을 덜어 준다. Map-Reduce-Reduce를 사용하기 위해서는 다음과 같이 설정해 주면 쿼리에 따라 필요한 경우 사용된다.

set hive.execution.engine=mr;

Tez에서는 SMB joins, SELECT TRANSFORM queries, Index creation, Skew joins 기능을 아직 지원하지 않으므로 이에 해당하는 쿼리를 실행할 때는 set hive.execution.engine=mr; 옵션으로 기존 방식으로 수행해야 한다.
이전에 생성했던 stocks 테이블을 조회하되, 먼저 맵리듀스 모드로 수행한 다음, 이후에 Tez 모드로 수행해 작동과 성능 차이를 테스트해 보자.

hive> select count(*) from stocks; Query ID = hdfs_20140923111818_b6f9b970-0aff-4a3c-a67e-306e2b6b0475 Total jobs = 1 Lanching Job 1 out of 1 Number of reduce tasks determined at compile time : 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers : set mapreduce.job.reduces=<number> Starting Job = job_1410784288179_0069, Tracking URL = http://bigdata01-02:8088/proxy/application_1410784288179_0069/ kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1410784288179_0069 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1 2014-09-23 11:18:57, 679 Stage-1 map = 0%, reduce = 0% 2014-09-23 11:19:06, 397 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 2.41 sec 2014-09-23 11:19:21, 418 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 4.3 sec MapReduce Jobs Launched: Job 0 : Map: 1 Reduce: 1 Cumulative CPU: 4.3 sec HDFS Read : 52040344 HDFS Read : 52040344 HDFS Write : 7 SUCESS Total MapReduce CPU Time Spent : 4 seconds 300 msec OK 916120 Time taken: 43.08 seconds, Fetched : 1 row(s)

이제 Tez 모드로 다시 실행해 보자.

hive> set hive.execution.engine=tez; hive> select count(*) from stocks; Query ID = hdfs_20140923111919_61aa2506-451f-4c0a-94cc-06a9c0e76254 Total jobs = 1 Launcing Job 1 out of 1 Status: Running (application id: application_14107842888179_0070) Map 1: -/- Reducer 2: 0/1 Map 1: 0/1 Reducer: 0/1 Map 1: 0/1 Reducer 2: 0/1

Map 1: 0/1 Reducer 2: 0/1 Map 1: 1/1 Reducer 2: 0/1 Map 1: 1/1 Reducer 2: 1/1 Status: Finished successfuly OK 916120 Time taken: 20.201 seconds, Fetched: 1 row(s) hive>

Tez 엔진 사용 옵션 하나를 통해 43초가 걸리던 작업이 20초밖에 안 걸렸다. 아직 Tez에서는 작업 중에 자세한 진행률을 보기는 어렵지만, 확실히 속도가 올라간 것을 확인할 수 있다.
이어서 위에서 수행했던 쿼리를 바로 다시 실행해 보자.

hive> select count(*) from stocks; Query ID = hdfs_20140923112020_b5e6d958-797e-4c28-940e-ee0a6a073561 Total jobs = 1 Launching Job 1 out of 1 Status: Running (application id: application_1410784288179_0070) Map 1: 0/1 Reducer 2: 0/1 Map 1: 0/1 Reducer 2: 0/1 Map 1: 0/1 Reducer 2: 0/1 Map 1: 0/1 Reducer 2: 0/1 Map 1: 0/1 Reducer 2: 0/1 Status: Finished succesfuly OK 916120 Time taken: 9.615 seconds, Fetched: 1 row(s) hive>

앞서 20초 걸렸던 작업이 이제 9초에 끝났다. 이것이 가능한 이유는 이전 쿼리에서 사용했던 session등을 릴리즈하지 않고 재활용했기 때문이다. 맵리듀스에서는 같은 쿼리를 재실행했더라도 이와 같은 성능 향상을 기대하기 어렵다.
아래 쿼리로 맵리듀스와 Tez의 차이점을 더 확인해 보자.

SELECT * FROM (SELECT Count(DIFTINCT symbol), symbol FROM stocks

GROUP BY symbol UNION ALL SELECT Count(DISTINCT symbol), symbol FROM stocks1 GROUP BY symbol)a;


맵리듀스

Total jobs = 3 Launching Job 1 out of 3 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> ... Time taken: 76.535 seconds, Fetched: 677 row(s)


Tez

Total jobs = 1 / Launching Job 1 out of 1 Tez session was closed. Reopening... Session re-established. Status: Running (application id: application_1410784288179_0077) Map 1: -/- Map 4: -/- Reducer 2: 0/1 Reducer 5: 0/1 Map 1: 0/1 Map 4: 0/3 Reducer 2:0/1 Reducer 5:0/1 Map 1: 0/1 Map 4: 0/3 Reducer 2:0/1 Reducer 5:0/1 Map 1: 0/1 Map 4: 0/3 Reducer 2:0/1 Reducer 5:0/1 Map 1: 0/1 Map 4: 0/3 Reducer 2:0/1 Reducer 5:0/1 Time taken: 41.682 seconds, Fetched: 677 row(s) .... // 재수행 Time taken: 20.851 seconds, Fetched: 677 row(s)

앞서와 동일하게 시간이 많이 단축됐을 뿐 아니라, 로그에서도 차이가 있음을 확인 할 수 있다. 맵리듀스에서는 3단계의 맵리듀스가 순차적으로 수행되는 반면, Tez에서는 전체를 한 번에 병렬로 처리하는 것을 볼 수 있다.
내부 처리에 대한 자세한 Plain은 explain 명령을 통해 다음과 같이 확인할 수 있다.


맵리듀스

hive> explain select * from (select count(distinct symbol), symbol from stocks group by symbol union all select count(distinct symbol), symbol from stocks1 group by symbol)a; OK STAGE DEPENDENCIES: Stage-1 is a root stage Stage-2 depends on stages: Stage-1, Stage-3 Stage-3 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: ... 이하 생략


Tez

STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 is a root stage STAGE PLANS: Stage: Stage-1 Tez Edges: Reducer 2 <-Map 1 (SIMPLE_EDGE), Union 3(CONTAINS) Reducer 5 <- Map 4 (SIMPLE_EDGE), Union 3 (CONTAINS) DagName: hdfs_20140923112929_d6a6fa7b-facd-4920-aeee-13bd7b5de395:5 Vertices: Map 1 Map Operator Tree: ... 이하 생략


Tez의 장점

이제 샘플 쿼리를 통해 Tez의 장점을 알아 보자.


대표적인 장점

SELECT g1.x, g1.avg, g2.cnt


  • 불필요한 HDFS 쓰기를 방지해 준다.
  • 자동으로 최적화해 맵리듀스 프로세스 처리 숫자를 줄여준다.
  • Only Mapper뿐만 아니라 Map-Reduce-Reduce 형태의 작업도 가능하다.

[그림 Ⅲ-2-20] 맵리듀스와 Tez 처리 비교

[그림 Ⅲ-2-20] 맵리듀스와 Tez 처리 비교


Shuffle Join

SELECT ss.ss_item_sk, ss.ss_quantity, inv.inv_quantity_on_hand

맵리듀스는 각각의 매퍼에서 두 개의 테이블을 읽어와 파티션 정렬을 한다. 그러나 Tez 모드에서는 각각의 매퍼에서 테이블을 따로 처리하므로 데이터를 주고받는 양이 훨씬 줄어든다.

[그림 Ⅲ-2-21] Shuffle Join

[그림 Ⅲ-2-21] Shuffle Join


BroadCast Join

SELECT ss.ss_item_sk, ss.ss_quantity, avg_price, inv.inv_quantity_on_hand

Inventory 테이블이 더 크다고 가정할 때, BroadCast 조인과 맵리듀스 단계를 줄이고, 병렬 처리를 통해 HDFS 쓰기를 한 단계 줄여서 훨씬 빠른 결과를 얻을 수 있다. 이로써 MapJoin보다 더 좋은 성능을 얻을 수 있는데, 그 이유는 불필요한 기록뿐 아니라 다중 해싱을 병렬 처리할 수 있고 HashTable이 메모리에 더 컴팩트하게 맞출 수 있기 때문이다.

[그림 Ⅲ-2-22] Broadcast Join

[그림 Ⅲ-2-22] Broadcast Join


Dynamically Partitioned Hash Join

SELECT ss.ss_item_sk, ss.ss_quantity, inv.inv_quantity_on_hand

MR에서는 하나의 매퍼에서 시쿼스하게 처리하는 반면, Tez에서는 여러 개의 매퍼에서 버켓 처리가 가능하며 불필요하게 HDFS에 기록하지 않는다.

[그림 Ⅲ-2-23] Dynamically Partitioned Hash Join

[그림 Ⅲ-2-23] Dynamically Partitioned Hash Join


Union all

SELECT count(*) FROM (

맵리듀스 모드에서 Union all 조인의 경우 두 테이블에서 한 번씩 맵리듀스를 수행하고 각 데이터를 HDFS에 저장한다. 그 이후 다시 조인 조건에 해당하는 데이터를 하나의 맵리듀스로 수행하기 때문에 속도가 느리다. 그러나 Tez 모드에서는 두 테이블을 읽으면서 미리 aggregate하기 때문에 중복처리를 줄일 수 있다.

[그림 Ⅲ-2-24] Union All

[그림 Ⅲ-2-24] Union All


Multi-insert queries

FROM (SELECT * FROM store_sales, date_dim WHERE ss_sold_date_sk = d_date_sk and d_year = 2000)

중복 스캔과 처리를 피할 수 있기 때문에 ETL 작업 시 유용하다.

[그림 Ⅲ-2-25] Multi-insert Queries

[그림 Ⅲ-2-25] Multi-insert Queries