데이터실무

DA, SQL, DB보안 등 실무자를 위한 위한 DB기술 바이블!

맵리듀스 개발환경의 이해

데이터 처리
분산병렬배치처리
맵리듀스 개발환경의 이해
작성자
admin
작성일
2021-02-15 14:04
조회
2120

하둡의 맵리듀스는 일반적으로 자바로 개발한다. 다른 언어로 개발해 하둡의 스트리밍 기능으로 운용할 수 있지만, 여기에서는 순수 자바로만 작성한다. 선호하는 자바 IDE를 사용하면 되는데, 여기서는 IDE로 이클립스를, 빌드 도구로는 메이븐(Maven)을 선택했다.


맵리듀스 맛보기

하둡 클러스터 환경에서 하둡 예제 가운데 하나인 WordCount로 맵리듀스를 실행해보자. 실행 절차는 다음과 같다.


  • 하둡 시스템(클러스터)에 로그인
  • HDFS에서 자신의 계정에 데이터 디렉터리 생성
  • HDFS 명령어를 이용해 데이터 업로드
  • 맵리듀스 예제 실행
  • 결과 확인
하둡 클러스터에 로그인

ssh로 접근하게 된다. 시스템마다 포트 번호가 다를 수 있으니 확인 후 접속한다. 다음은 ryan이라는 아이디로 hdp.pvm.centos라는 호스트의 하둡 시스템에 접근하는 예다.

$ ssh ryan@hdp.pvm.centos


HDFS에서 자신의 계정에 데이터 디렉터리 생성

다음은 HDFS에서 사용자 계정에 data라는 디렉터리를 만드는 과정이다. ryan이라는 계정은 필자가 ryan:hdfs 소유권으로 직접 만든 디렉터리이자 사용자다.

$ sudo -u hdfs hdfs dfs -mkdir /user/ryan $ sudo -u hdfs hdfs dfs -chown ryan:hdfs /user/ryan $sudo -u hdfs hdfs dfs -mkdir /user/ryan/data $ sudo -u hdfs hdfs dfs -lsr /user/ryan drwxr- xr-x -ryan hdfs 0 2014-09-02 00:42 /user/ryan/data


HDFS 명령어로 데이터 업로드

임의의 데이터를 HDFS의 /user/{id}/data 디렉터리에 업로드한다. 어떤 데이터를 업로드하든 신경 쓸 필요는 없다. 테스트이므로 편의상 텍스트 위주의 데이터를 업로드하자. put이나 copyFromLocal 명령으로 다음과 같이 수행하면 된다.

$ hadoop fs -put ./Data/book1.txt /user/ryan/data/ $ hadoop fs -lsr /user/ryan drwxr-xr-x - ryan hdfs 0 2014-09-02 00:46 /user/ryan/data -rw-r--r-- 3 ryan hdfs 449415 2014-09-02 00:46 /user/ryan/data/book1.txt


맵리듀스 예제 실행

하둡을 설치할 때 자동으로 설치된 예제 프로그램을 실행해 보자. 예제 프로그램은 하둡을 어떻게 설치했느냐에 따라 다르지만, 기본으로 설치했을 때에는 /usr/lib/hadoop-mapreduce에 있다. 다음과 같이 실행하면 샘플 예제 프로그램을 살펴볼 수 있다.

$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar An example program must be given as the first argument. Valid program names are: aggregatewordhist: An Aggregate based map/reduce program that counts the words in the input files. aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files. bbp: A map/reduce programm that uses Bailey-Borwein-Plouffe to compute exact digits of Pi. dbcount: An example job that count the pageview counts from a database. distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi. grep: A map/reduce program that counts the matches of a regex in the input. join: A job that effects a join over sorted, equally partitioned datasets multifilewc: A job that counts words from several files. pentomion: A map/reduce title laying program to find solutions to pentomino problems. pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method. randomtextwriter: A map/reduce program that writes 10GB of random textual data per node. randomwriter: A map/reduce program that writes 10GB of random data per node. secondarysort: An example defining a secondary sort to the reduce. sort: A map/reduce program that sorts the data written by the random writer. sudoku: A sudoku solver. teragen: Generate data for the terasort terasort: Run the terasort teravalidate: Checking results of terasort wordcount: A map/reduce program that counts the words in the input files. wordmean: A map/reduce program that counts the average length of the words in the input files. wordmedian: A map/reduce program that counts the median length of the words in the input files. wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.

여기서 아래 부분에 있는 WordCount라는 프로그램으로 ‘초간단 테스트’를 해보자. 이제 데이터도 HDFS에서 업로드했으니 WordCount를 실행한다.


워드카운트 맵리듀스 예제 실행

워드카운트 맵리듀스 예제 실행

위와 같이 실행하면 사용법이 나온다.?은 데이터 경로를,?은 wordcount가 실행결과를 출력할 경로를 의미한다. 여기서는 /user/{id}/data를 만들어 그 안에 데이터를 업로드 했으므로 다음과 같이 실행하고 로그를 살펴보자.

$hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar wordcount /user/ryan/data/user/ryan/output/book 14/09/02 00:53:08 INFO impl.TimelineClientImpl: Timeline service address: http://hdp.pvm.centos:8188/ws/v1/timeline/ 14/09/02 00:53:08 INFO client.RMProxy: Connecting to ResourceManager at hdp.pvm.centos/10.211.55.8: 8050 14/09/02 00:53:09 INFO input.FileInputFormat: Total input paths to process : 1 14/09/02 00:53:09 INFO mapreduce.JobSubmitter: number of splits: 1 14/09/02 00:53:10 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1409582940230_0005 14/09/02 00:53:10 INFO impl.YarnClientImpl: Submitted application application_1409582940230_0005 14/09/02 00:53:10 INFO mapreduce.Job: The url to track the job: http://hdp.pvm.centos:8088/proxy/application_1409582940230_0005/ 14/09/02 00:53:10 INFO mapreduce.Job:Running job: job_1409582940230_0005 14/09/02 00:53:18 INFO mapreduce.Job: Job job_1409582940230_0005 running in uber mode : false 14/09/02 00:53:18 INFO mapreduce.Job: map 0% reduce 0% 14/09/02 00:53:28 INFO mapreduce.Job: map 100% reduce : 0% 14/09/02 00:53:37 INFO mapreduce.Job: map 100% reduce 100% 14/09/02 00:53:37 INFO mapreduce.Job: Job job_14095829408230_0005 completed successfully 14/09/02 00:53:37 INFO mapreduce.Job: Conters: 49 File System Counters FILE: Number of bytes read=166572 FILE: Number of bytes written=533555 FILE: Number of read operations=0 FILE : Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=449531 HDFS: Number of bytes written = 118874 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations =2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots(ms)=7050 Total time spent by all reduces in occupied slots (ms)=5729 Total time spent by all map tasks (ms)=7050 Total time spent by all reduce tasks (ms)=5729 Total vcore-seconds taken by all map tasks=7050 Total vcore-seconds taken by all reduce tasks=5729 Total magabyte-seconds taken by all map tasks=7219200 Total megabyte-seconds taken by all reduce tasks=5866496 Map-Reduce Framework Map input records=12324 Map output records=78876 Map output bytes=752596 Map output materialized bytes=166572 Input split bytes=116 Combine input records=78876 Combine output records=12195 Reduce shuffle bytes=166572 Reduce output records=12195 Spilled Records=24390 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=130 CPU time spent (ms)=3190 Physical memory (bytes) snapshot=647483392 Virtual memory (bytes) snapthot=3026345984 Total committed heap usage (bytes)=554110976 Shuffle Errors BAD_ID=0 CONNECTION=0 ID_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=449415 File Output Format Counters Bytes Written=118874

어떤 Exception과 에러가 없다면 성공적으로 끝난 것이다. 중간 부분의 “Job job_1409582940230_0005 completed successfully”라는 메시지에서 job_1409582940230_0005는 Job ID다. 하둡에서는 모든 맵리듀스의 실행 단위를 Job이라고 부르며, 실행할 때마다 Job ID가 생성된다.


결과 확인

WordCount라는 맵리듀스를 실행해 보았다. 뒤에서 설명하겠지만, 텍스트 데이터를 읽어 단어를 세는 프로그램임을 금방 알 수 있다. 결과를 확인해 보자. 다음은 part-r-00000 파일의 실행 결과다.

$ hadoop fs -cat /user/ryan/output/book/part-r-00000 │ more ! 1 !" 2 " 53 "). 1 "- 1 "-- 4 "--and 1 ". 1 "... 5 "...for 1 "A 15 "ALL 1 "ARE 1 "Aaah,1 "Aargh! 1 "Abou'1 "About 3 "Absolutely 1 "Absolutely," 1

결과가 정상적으로 나왔음을 알 수 있다. 참고로 필자가 올린 데이터는 인터넷에서 구한「해리포터」1권의 내용이다.


개발환경 구축

이제부터 이클립스로 맵리듀스 개발환경을 구현해 보자. 하둡 개발을 위한 표준 환경은 없지만, 앞서 소개했던 이클립스와 메이븐을 많이 이용한다.


이클립스 메이븐 프로젝트 생성

먼저 이클립스를 실행한다. 아마도 실행하면 [그림 Ⅲ-1-2] 이클립스 자바 퍼스펙티브 초기화면과 비슷한 화면이 출력될 것이다. 필자는 이클립스 LUNA를 설치했다. [File > New > Other]의 Wizards: 텍스트 박스에 Maven이라고 입력해 Maven Project를 선택한다. 이어서 [그림 Ⅲ-1-3]처럼 Group Id와 Artifact Id 모두 WordCount로 하고, 패키지 이름은 kr.or.kodb로 지정하면, 메이븐 프로젝트 Template이 만들어진다.

[그림 Ⅲ-1-2] 이클립스 자바 퍼스펙티브 초기화면

[그림 Ⅲ-1-2] 이클립스 자바 퍼스펙티브 초기화면

[그림 Ⅲ-1-3] 메이븐 프로젝트 생성 초기화면

[그림 Ⅲ-1-3] 메이븐 프로젝트 생성 초기화면

프로젝트에 있는 pom.xml 파일을 다음과 같이 수정한다.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maben.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVerfion>4.0.0</modelVerfion><groupId>WordCount</groupId><version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging><name>WordCount</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-common</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-shuffle</artifactId><version>2.4.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifacId>maven-surefire-plugin</artifacId><configuration><skip>false</skip><environmentVariables><argLine>-DfileLocation=${basedir}/file</argLine></environmentVariables></configuration></plugin></plugins></build></project>


Mapper/Reducer 클래스 생성

이제 앞서 실행했던 샘플 예제 WordCount 파일을 열어서 복사한다. 소스코드 전문은 다음과 같다.

package kr.or.kodb; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.haddop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount{public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()){word.set(itr.hasMoreTokens()){word.set(itr.nextToken()); context.write(word, one);}}} public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{int sum = 0; for (IntWritable val : values){sum += val.get();} result.set(sum); context.write(key, result);}} public static void main(String[] args) throws Exception{Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length<2){System.err.println("Usage: wordcount <in> [<in>...]<out>");System.exit(2);} Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for(int i = 0; i < otherArgs.length -1; ++i){FileInputFormat .addInputPath(job, new Path(otherArgs[i]));} FileOutputFormat.setOutputPath(job, new Path(otherArgs.length -1])); System.exit(job.waitForCompletion(true)? 0 : 1); }}


자바 애플리케이션 실행

프로젝트 이름의 오른쪽을 클릭해 [Run As > Run Configurations …]을 선택한다. 왼쪽의 Java Application을 클릭해 실행 프로그램을 생성한다. 이름을 WordCount라고 입력하는데 이 예에서는 WordCount(2)로 했다. 이어서 오른쪽 두 번째의 Arguments 탭을 클릭해 Program arguments 항목에서 데이터 입력 및 출력 경로를 입력한다. 필자는 입력 경로로 /Users/Ryan/Workspace/Java/WordCount/data를, 출력 경로로 /Users/Ryan/Workspace/Java/WordCount/output이라 입력했다. (프로젝트 내부에 해당 경로를 [그림 Ⅲ-1-5]와 같이 만들었다.)

[그림 Ⅲ-1-4] 워드카운트 실행(커맨드라인 파라미터 추가)

[그림 Ⅲ-1-4] 워드카운트 실행(커맨드라인 파라미터 추가)

[그림 Ⅲ-1-5] 워드카운트 애플리케이션 실행

[그림 Ⅲ-1-5] 워드카운트 애플리케이션 실행


맵리듀스 애플리케이션 실행

이클립스는 큰 문제 없이 실행될 것이다. 결과 또한 앞서 하둡 시스템에서 실행했던 것과 비슷할 것이다. 출력 결과는 다음 그림의 output 디렉터리에서 확인할 수 있다.

[그림 Ⅲ-1-6] 워드카운트 맵리듀스 결과 화면

[그림 Ⅲ-1-6] 워드카운트 맵리듀스 결과 화면


빌드

이제 메이븐 빌드를 실행해 보자. 메이븐은 해당 pom.xml 파일이 있는 곳에서 실행해도 되고, 이클립스에서 실행해도 된다. 여기서는 이클립스에서 실행한다. 우선 프로젝트 이름에서 마우스 오른쪽 버튼을 클릭해 [Run As > Maven install]을 선택하면 [그림 Ⅲ-1-7]처럼 실행된다. 그리고 이클립스 Package Explorer에서 ‘프로젝트 새로 고침’을 선택하면 target 디렉터리 안에 WordCount-0.0.1-SNAPSHOT.jar 파일이 생성된 것을 확인할 수 있다. 이 파일을 하둡 시스템에 배포한다. (하둡 시스템에 자동으로 설치된 예제와 같은 기능을 한다.)




[Note]

빌드 배포시스템을 통해서 jar 파일을 개발 클러스터에 자동으로 배포하는 것이 좋다. 매번 수동으로 scp나 FTP 프로그램을 실행해 배포하는 것은 매우 귀찮은 일이다. 메이븐에 내장된 ant scp 플러그인을 이용하는 것도 좋다. 하지만 이것은 빌드 과정에서 배포이므로 Jenkins 같은 CI 툴을 이용하는 것을 추천한다.


[그림 Ⅲ-1-7] 메이븐 빌드

[그림 Ⅲ-1-7] 메이븐 빌드


배포와 실행

하둡 클러스터로 메이븐을 빌드한 jar 파일을 배포한다.

$ scp./WordCount-0.0.1-SNAPSHOT.jar ryan@hap.pvm.centos:/home/ryan/ WordCount-0.0.1-SNAPSHOT.jar

하둡 클러스터에서 배포한 파일을 다음과 같이 실행해 본다.

$ hadoop jar WordCount-0.0.1-SNAPSHOT.jar kr.or.kodb.WordCount Usage: wordcount <in> [<in>...] <out> $ hadoop jar WordCount-0.0.1-SNAPSHOT.jar kr.or.kodb.WordCount /user/ryan/data/user/ryan/data/output 14/09/02 01:53:51 INFO impl.TimelineClientImpl: Timeline service address: http://hdp.pvm.centos:8188/ws/v1/timeline/ 14/09/02 01:53:52 INFO client.RMProxy: Connecting to ResourceManager at hdp.pvm.centos/10.211.55.8:8050 14/09/02 01:53:53 INFO input.FileInputFormat: Total input paths to precess : 1 14/09/02 01:53:53 INFO mapreduce.JobSubmitter: number of splits: 1 14/09/02 01:53:53 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1409582940230_0006 14/09/02 01:53:54 INFO impl.YarnClientImpl: Submitted application application_1409582940230_0006 14/09/02 01:53:54 INFO mapreduce.Job: The url to track the job: http://hdp.pvm.centos: 8088/proxy/application_1409582940230_0006/ 14/09/02 01:53:54:02 INFO mapreduce.Job: Job job_1409582940230_0006 running in uber mode: false 14/09/02 01:54:02 INFO mapreduce.Job: map 0% reduce 0% 14/09/02 01:54:10 INFO mapreduce.Job: map 100% reduce 0% 14/09/02 01:54:19 INFO map 100% reduce 100% 14/09/02 01:54:19 INFO mapreduce.Job: Job job_1409582940230_0006 completed successfully 14/09/02 01:54:20 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=166572 FILE: Number of bytes written=533459 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=449531 HDFS: Number of bytes written=118874 HDFS: Number of read operations=6 HDFS: Number of large  read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=6172 Total time spent by all reduces in occupied slots (ms)=5743 Total time spent by all map tasks (ms)=6172 Total time spent by all reduce tasks (ms)=5743 Total vcore-seconds taken by all map tasks=6172 Total vcore-seconds taken by all reduce tasks=5743 Total megabyte-seconds taken by all map tasks=6320128 Total megabyte-seconds taken by all reduce tasks=5880832  Map-Reduce Framework Map input records=12324 Map output records=78876 Map output bytes=752596 Map output materialized bytes=166572 Input split bytes=116 Combine input records=78876 Combine output records=12195 Reduce input records=12195 Reduce output records=12195 Spilled Records=24390 Shuffled Maps=1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=110 CPU time spent (ms)=3060 Physical memory (bytes) snapshot=648716288 Virtual memory (bytes) snapshot=3026296832 Total committed heap usage (bytes)=554110976 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE-0 File Input Format Counters Bytes Read=449415 File Output Format Counters Bytes Written=118874

이 섹션에서는 예제 프로그램을 하둡 클러스터에서 실행해 보는 것으로 맵리듀스 실행방법을 알아보았다. 또한 이클립스를 이용해 맵리듀스 개발환경을 구축하고 메이븐을 통해 빌드했다.