데이터실무

DA, SQL, DB보안 등 실무자를 위한 위한 DB기술 바이블!

맵리듀스 실습

데이터 처리
분산병렬배치처리
맵리듀스 실습
작성자
admin
작성일
2021-02-15 14:06
조회
1790

첫 번째로 앞에서 실행해 본 WordCount 맵리듀스를 학습하는 차원에서 맵리듀스의 근간인 매퍼, 리듀서, 컴바이너, 셔플링과 소팅에 대해 살펴본다. 또한 데이터 입력과 출력 포맷, 카운터에 대해 알아본다. 두 번째로 맵리듀스를 단위 테스트할 수 있는 MRUnit에 대해 학습한다. 마지막으로 세번째에는 맵리듀스를 모니터링할 수 있는 잡트레커 웹 인터페이스를 살펴본다.


WordCount 자세히 살펴보기

맵리듀스 드라이버 분석

public static void main(String[] args) throws Exception{ Configuration conf = new Configutation(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length<2){System.err.println("Usage: wordcount <in> [<in>...]<out>"); System.exit(2);} Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(Tokenizer.Mapper.class); job.setCombinerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length -1; ++i){FileInputFormat.addInputPath(job, new path(otherArgs[1]));} FileOutputFormat.setOutputPath(job, new path(otherArgs[otherArgs.length - 1])); system.exit(job.waitForCompletion(true)? 0 : 1); }

맵리듀스 구동 클래스의 메소드에 매퍼와 리듀서 코드를 정의한다. 위 코드에서 ‘Configuration’은 하둡의 설정파일을 읽는 것이다. 하둡의 설정파일은 많지만, 맵리듀스는 mapred-site.xml 파일을 기본으로 읽어 설정파일 값을 메모리에 로드한다. GenericOptionParser는 맵리듀스 잡을 실행할 때의 커맨드라인 입력 값이다. 이 예제에서는 데이터의 입력과 출력 위치를 처리한다. 잡을 설정하기 위해서 Job 클래스를 사용한다. 현재로서는 Job 객체 생성 방법이 Deprecated돼 있으므로 Job.getInstance() 메소드를 사용하고, 이후 setJarClass()에 잡 드라이버 클래스를 정의한다.
setMapperClass() 메소드는 매퍼 클래스를 정의하는 부분이다. 여기서는 TokenizerMapper.class라는 매퍼를 정의했다. setReducerClass() 메소드는 리듀서 클래스를 정의하는 부분이다. setCombinerClass()는 매퍼 사이드 리듀서라는 컴바이너를 정의한다. 매퍼, 리듀서, 컴바이너는 조금 뒤에 살펴볼 것이다. setOuputKeyClass(), setOutputValueClass()는 출력 값의 키와 출력 값의 밸류에 대한 파일의 타입을 정의한다.
여기서 적용한 워드카운트 키값은 텍스트 타입이고, 밸류는 IntWritable 타입이다. 워드카운트가 무엇인가? 해당 문서가 몇 단어로 구성됐는지 살펴보는 프로그램이다. 즉 리듀서에서 키/밸류를 고려해 출력한 것이다. 드라이버 클래스에서 마지막으로 job.waitForCompletion() 메소드가 호출되면 비로소 매퍼 클래스가 호출된다.


매퍼 클래스

public static class TokenizerMapper extends Mapper<object, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException{StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()){word.set(itr.nextToken()); context.write(word, one);}}}

모든 매퍼 클래스는 org.apache.hadoop.mapreduce.Mapper 클래스로부터 상속 받는다. 이 클래스로부터 상속을 받을 때 한 가지 먼저 생각해야 할 것이 있다. 즉, 읽을 데이터의 키 타입과 밸류 타입이 무엇인지를 설계해야 한다. 마찬가지로 출력의 키 타입과 밸류 타입도 무엇인지를 설계해야 한다. 리듀서가 있다면, 리듀서에도 같은 고민을 해야 한다. WordCount에서 입력 키는 라인 시작 옵셋(offset)이고, 밸류는 라인의 데이터가 될 것이다. 그래서 키는 Object 타입으로 지정했고, 입력 라인은 String이기 때문에 밸류는 Text 타입으로 지정했다. 출력 키는 Text, 밸류는IntWritable로 각각 지정했다. 한 라인을 읽어서 라인의 문자열을 토큰 단위로 구분하고 반복자를 통해 context로 전달한다. Mapper 클래스는 map 메소드 외에도 setup, cleanup, run이라는 3개의 메소드를 갖고 있고 다음과 같이 요약할 수 있다.



public void setup(Mapper.Context context)

이 메소드는 map 메소드가 호출되기 전에 딱 한 번 호출된다. 선행작업을 할 것이나 자원의 초기화가 필요할 때 이 메소드에서 구현한다. map 메소드 내에서 분산 캐시로 파일을 읽어 오픈할 때도 이 메소드를 사용한다.

public void cleanup(Mapper.Context context)

이 메소드는 setup 메소드와 반대되는 역할을 한다. 매퍼가 돌면서 마지막으로 정리할 것이 있으면 이 메소드에서 한다.

public void run(Mapper.Context context)

Mapper 클래스의 Drive 메소드라고 생각하면 된다. 모든 매퍼는 실행이 되면서 반복적으로 이 함수를 호출한다.


리듀서 클래스

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>{private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOExeption, InterruptedExeption{int sum = 0; for(IntWritable val: values){sum += val.get();} result.set(sum); context.write(key, result);}}

모든 리듀서 클래스는 org.apache.hadoop.mapreduce.Reducer 클래스로부터 상속을 받는다. 리듀서 클래스도 매퍼 클래스와 마찬가지로 입력 키/밸류, 출력 키/밸류 타입을 설계해야 한다.
리듀서는 매퍼 출력의 키/밸류 타입을 받을 것이므로 입력 키/밸류 타입은 매퍼의 출력 키/밸류와 같고, 출력 키/밸류는 최종 맵리듀스 잡의 키/밸류일 것이다. 잡의 드라이버 클래스에서 setOuputKeyClass(), setOutputValueClass() 메소드로 정의했기 때문에 타입이 같아야 한다. Reducer 클래스도 마찬가지로 setup, cleanup, run이라는 3개의 메소드를 갖고 있으며, 메소드의 의미는 매퍼와 같다.


MapReduce 변수 타입

매퍼와 리듀서에서 사용하는 타입들은 모두 하둡에서 사용하는 타입이다. 이 타입들의 유형은 다음과 같다.

[표 Ⅲ-1-1] MapReduce 변수타입


타입 설명
Text String 타입에 해당하며 Text에서 String 타입은 toString() 메소드를 호출한다.
IntWritable Integer, int형 타입이다. 값을 얻으려면 get() 메소드를 호출한다.
LongWritable Long, long형 타입이다. 값을 얻으려면 get() 메소드를 호출한다.
FloatWritable Float, float형 타입이다. 값을 얻으려면 get() 메소드를 호출한다.
BooleanWritable Boolean, boolean형 타입이다. 값을 얻으려면 get() 메소드를 호출한다.
ArrayWritable 하나 이상의 값을 한 번에 저장해야 한다면 이 타입을 사용해야 한다. Array 특성상 각 원소는 동일한 타입을 가져야만 한다.
NullWritable null과 같다. 값이 없음을 의미한다.
기타 이외에는 org.apache.hadoop.io 아래 많은 타입이 살펴본다.

왜 하둡의 맵리듀스는 이렇게 기본적인 객체타입이나 Primitive 타입을 사용하면 되는데도, 래퍼(Wrapper) 클래스를 만들어서 제공하는 것일까? 그것은 매퍼와 리듀서 사이에 네트워크로 통신을 하기 때문이다. 객체가 네트워크로 전송되면서 데이터 형태가 바뀌거나 다르게 해석이 될 수 있으므로 직렬화/역직렬화를 위한 자료형이 필요했기 때문이다. 사실 이러한 타입은 사용자가 정의할 수 있는데, 이때 키는 WritableComaparable에 따라, 밸류는 Writable 인터페이스에 따라 구현하면 된다.
Writable 인터페이스는 직렬화/역직렬화를 위한 하둡의 특성상 키/밸류가 디스크에 저장되거나 네트워크로 전송되기 때문에 필요하다. Writable 인터페이스에는 write(), readField()라는 두 개의 메소드가 있다. write()는 객체가 직렬화될 때 호출되는 메소드로, 데이터를 저장하는 일을 한다. readField()는 역직렬화될 때 호출되는 메소드로 write()에서 저장한 데이터를 읽을 때 호출된다.
WritableComparable 인터페이스는 Writable 인터페이스를 구현한 인터페이스로, 객체 간을 비교할 수 있게 해준다. 하둡에서 맵과 리듀스에서 사용하는 키들은 소팅할 수 있어야 하는데, 이때 이 인터페이스로 구현한 클래스에서 비교 로직을 구현할 수 있다. 구현해야하는 메소드는 write(), readField() 메소드 외에 compareTo() 메소드가 있는데, compareTo() 메소드를 사용해서 비교로직을 구현할 수 있다.


입력 포맷

입력과 출력 포맷을 어디에서 사용할지 생각해보자. 하둡은 다양한 데이터를 다루고 저장한다. 입력 포맷의 첫 번째 역할은 입력 파일 해석이고 두 번째는 레코드에서 키/밸류를 어떻게 나눌지 결정하는 것이다. 출력 포맷은 입력 포맷에 비해 단순하다. 하둡에 어떤 포맷으로 저장할 것인가에만 중점을 둔다. 입/출력 자료는 항상 setInputFormatClass()와 setOutputFormatClass() 메소드를 호출하여 구성한다. WordCount 예제에서는 기본 TextInputFormat을 사용하므로 생략되었다. 한 가지 알아야 하는 것은 입력 포맷은 대부분 출력포맷에 대응된다는 점이다. 예를 들어, TextInputFormat이면 TextOutputFormat이 존재한다.



① TextInputFormat

하둡의 가장 기초적인 포맷은 TextInputFormat이다. TextInputFormat은 다음과 같은 특징이 있다.
  • FileInputFormat에서 상속된다.
  • 텍스트 파일을 대상으로 한다.
  • Gzip으로 압축한 .gz 압축 파일도 대상이 된다.
  • 라인 하나(\n, \r)가 입력 레코드가 된다.
  • 한 레코드에서 키는 라인의 파일 옵셋(offset)이고 타입을 Object로 하지 않는다면 일반적으로 LongWirtiable이 된다.
  • 한 라인의 밸류는 라인 전체에 스트링되며 Text 타입이다.

② KeyValueTextInputFormat

이 입력 포맷은 일반적으로 데이터 형태가 키/밸류인데, 키와 밸류 사이에 구분자(separator)를 기준으로 데이터를 해석한다. 구분자를 지정하지 않으면 하둡에서는 기본 값인 탭(\t)이 입력된다. 이것은 하둡 설정으로 변경할 수 있다. 설정할 때의 키 값은 key.value.separator.in.input.line이고 키와 밸류 타입은 Text가 된다. KeyValueTextInputFormat은 출력 포맷이 없다. TextOutputFormat이 비슷한 기능을 수행해주기 때문이다.

③ SequenceFileInputFormat

하둡의 고유 파일 포맷은 시퀀스 파일이다. 마찬가지로 FileInputFormat을 상속 받았으면 키와 밸류는 어떠한 타입도 가능하지만, 생성될 때의 타입을 고려해야만 한다. 다른 포맷은 addInputPath(s)로 데이터의 위치를 알려주었지만, SequenceFileInputFormat은 SequenceFileInputFormat. addInputPath(s)로 데이터의 위치를 알린다. 또한 시퀀스 파일은 기본적으로 맵파일(MapFile)을 읽는 데 사용할 수 있다. 맵파일은 인덱스 파일과 데이터 파일이 각각 시퀀스 파일 형태로 구성된 것이다. 그래서 SequenceFileInputFormat.addInputPath(s)로 데이터 파일을 읽으면 먼저 맵파일 형태로 로드하고, 맵파일을 읽지 못하면 시퀀스 파일로 읽게 된다.


포맷에 따른 맵태스크 수

맵태스크 수는 하둡 맵리듀스 잡을 실행하는 데 있어서 중요한 요소다. 보통 입력 파일을 설정한 블록 수로 데이터 크기를 나누면 태스크 수가 나온다. 즉 읽어야 하는 데이터가 100MB이고, 하둡에서 설정한 블록 사이즈가 64MB라면 태스크 수는 ‘데이터 사이즈/블록 사이즈’이므로 2개의 태스크가 된다. 입력 포맷은 getSplit()이라는 메소드를 갖고 있다. 입력 파일 조각으로 나누는 것을 Split라고 하며, 입력 파일 조각을 InputSplit이라고 한다. getSplits() 메소드가 실행되면, InputSplit의 리스트를 반환한다. 하지만 조각으로 나누지 못하는 데이터 파일도 있으므로 입력 포맷 가운데 하나인 isSplitable()이라는 메소드가 조각을 나눌 수 있으면 true를 반환하고, 나눌 수 없으면 false를 반환한다. 압축파일은 조각으로 나눌 수가 없어서 태스크 수에 크게 영향을 줄 수 있다. 이런 점에서 시퀀스 파일은 압축을 지원하면서 블록 단위로 조각을 나눌 수 있는 싱크포인트가 있다. 이는 어디서부터 조각을 나눴는지 기록하는 훌륭한 포매터로 볼 수 있다.


출력 포맷

① TextOutputFormat

FileOutputFormat에서 상속받았으며 FileInputFormat과 반대 역할을 한다. 키와 밸류는 탭(\t)으로 구분하며, 이전에 언급했던 것처럼 KeyValueTextIntputFormat에 상응하는 출력 포맷도 존재한다. 이 출력 포맷은 setCompressOutput, setOutputCompressorClass를 사용해 압축 여부와 어떤 압축방식을 사용할지 정할 수 있다. 이 압축 방식은 트레이드오프(Trade-off) 관계이므로 상황에 따라 선택ㆍ적용해야 한다. 가령 압축률을 높일 것인지, 압축률은 낮지만 연산에 최적화할 것인지를 고려해야 한다. 구글에서 만든 스내피(snappy) 압축은 압축률은 낮지만 연산을 매우 빠르게 한다. 반면 Lzo 압축은 연산은 상대적으로 느리지만, snappy, gzip보다 훨씬 높은 압축률을 갖는다. 압축은 (특히, snappy는) 네이티브로 설치돼 있어야 하며, mapred-site.xml 파일의 mapred.compress.map.output 파라미터와 mapred.map.output.compress.code 파라미터를 통해 압축 여부를 지정할 수 있다. 다음은 맵리듀스에서 gzip 압축 코덱을 사용하는 방법과 mapred -site.xml 설정하는 방식의 예다.

자바코드: TextOutputFormat.setCompressOutput(job, true); TextOutputFormat,.setOutputCompressorClass(job, GzipCodec.class); 설정파일: <property><name>mapred.compress.map.output</name><value>true</value></property><property><name>mapred.map.output.compression.codec</name><value>org.apache.hadoop.io.compress.GzipCodec</value></property>


② SequenceFileOutputFormat

시퀀스 파일도 압축을 지원하며, SequenceFileOutputFormat 클래스의 setOutputCompressionType 메소드를 통해 압축 방식을 결정한다. 압축 유형은 다음과 같다.

[표 Ⅲ-1-2] 압축유형



압축모드 설명
BLOCK 블록 내의 레코드들까지 압축한다.
NONE 압축을 하지 않는다.
RECORD 기본 설정 값으로 레코드별로 압축을 한다.

③ MapFileOutputFormat

앞서 설명한 출력을 맵 파일 형태로 만들어주는 출력 포맷이다. 맵 파일은 기본적으로 디렉터리로 구성되며, 디렉터리 안에 두 개의 파일이 존재한다.


컴바이너

컴바이너는 맵사이드에서 운용되는 로컬 리듀서다. 리듀서는 일반적으로 정렬과 집계 역할을 한다. 이때 매퍼는 리듀서로 전달할 데이터의 크기를 줄이는 역할을 담당하는데, 리듀서로 전달할 데이터가 줄어들기 때문에 성능이 좋아진다. 그렇다면 모든 잡에 컴바이너를 적용해야 할까? 그렇지 않다. 잡 특성이 서로 다르기 때문이다. 만일 매퍼 → 리듀서 → 리듀서 이렇게 해서 최종 결과가 같다면 두번째 리듀서는 컴바이너를 사용해도 된다. 수학적으로 교환의 법칙과 결합의 법칙이 만족되는 잡에서는 컴바이너가 매우 중요한 역할을 한다. 그리고 이렇게 컴바이너를 적용할 수 있는 경우라면 리듀스 클래스를 그대로 컴바이너 클래스로 사용하는 것이 좋다. 컴바이너 클래스는 setCombinerClass() 메소드를 통해 설정한다. 그 밖에 셔플링, 소팅, 카운터 개념이 있다. 사실 셔플링과 소팅은 맵리듀스 개발과는 관계가 없다. 왜냐하면 하둡 맵리듀스 프레임워크가 알아서 해주기 때문이다. 토마스 화이트의 ‘하둡 완벽가이드’에서 나오는 다음과 같은 그림이 셔플링과 소팅 개념을 명확하게 해준다.

[그림 Ⅲ-1-8] 맵리듀스 셔플링

[그림 Ⅲ-1-8] 맵리듀스 셔플링


MapRduce 단위 테스트 MRUnit

MRUnit은 맵리듀스를 위한 자바 단위 테스트 라이브러리다. 클라우데라에서 개발한 JUnit과 같은 표준 자바 테스트 툴과 맵리듀스 프레임워크를 통합한 것이다. 공식 웹사이트는 http://mrunit.apache.org/다. WordCount의 단위 테스트를 진행해 보자. 먼저 프로젝트의 pom.xml 파일을 열어 다음을 추가한다. (항목 중에 classifier 태그를 꼭 넣자. 아직까지 하둡 버전 1.X의 레거시 코드 지원과 호환을 위해서 이 항목은 꼭 필요하다.)

<dependency><groupId>org.apache.mrunit</groupId><artifactId>mrunit</artifactId><version>1.1.0</version><classifier>hadoop2</classifier></dependency>


테스트 코드 작성

./src/test/java 패키지 디렉터리에 같은 패키지로 WordCountTest.java를 만든다. 이어서 다음과 같이 테스트 메소드를 생성한다.

public class WordCountTest{//테스트 드라이버 생성, 키/밸류 순서대로 MapDriver<object, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver; @Before public void setUp(){// 실제 매퍼와 리듀서 객체를 생성한다. WordCount.TokenizerMapper mapper = new WordCount.TokenizerMapper(); WordCount.IntSumReducer reducer = new WordCount.IntSumReducer(); // 다음으로 맵 태스크 드라이버를 만들고 매퍼를 등록한다. mapDriver = new MapDriver<Object, Text, Text, IntWritable>(); mapDriver.setMapper(mapper); // 다음으로 리듀스 태스크 드라이버를 만들고 리듀서를 등록한다. reduceDriver = new ReduceDriver<Text, IntWritable, Text, IntWritable>(); reducerDriver.setReducer(reducer); // 앞에서 생성한 매퍼와 리듀서 객체를 등록한다. mapReduceDriver = new MapReduceDriver<Object, Text, Text, IntWritable, Text, IntWritable>(); mapReduceDriver.setMapper(mapper); mapReduceDriver.setReducer(reducer);} @Test public void testMapper(){mapDriver.withInput(new IntWritavble(1), new Text("cat cat dog")); mapDriver.withOutput(new Text("cat"), new IntWritavle(1)); mapDriver.withOutput(new Text("cat"), new IntWritable(1)); mapDriver.withOutput(new Text("dog"), new IntWritable(1)); try {mapDriver.runTest();} catch (IOExeption e) {fail(e.getMassage());}} @Test public void testReducer() {List<IntWritable> values = new ArrayList<IntWritable>(); values.add(new IntWritable(1)); values.add(new IntWritable(1)); reduceDriver.withInput(new Text("cat"), values); reduceDriver.withOutput(new Text("cat"), new IntWritable(2)); try{reduceDriver.runTest();} catch (IOException e){fail(e.getMassage());}} @Test public void testMapReduce(){mapReduceDriver.withInput(new IntWritable(1), new Text("cat cat dog")); mapReduceDriver.addOutput(new Text("cat"), new IntWritable(2)); mapReduceDriver.addOutput(newText("dog"), new IntWritable(1)); try{mapReduceDriver.runText();} catch (IOExeption e){fail(e.getMessage());}}}


코드 설명

일반적으로 JUnit 코드는 메소드 중심으로 테스트를 하고 클래스를 완성하지만, 맵리듀스 테스트 코드는 다음과 같은 원칙과 순서가 있다.


  1. ① 드라이버 객체를 생성한다.
  2. ② 태스크 역할을 수행할 태스크 객체 드라이버를 생성한다.
  3. ③ 태스크 드라이버에 각 매퍼 혹은 리듀서 태스크를 등록한다.
실행

[그림 Ⅲ-1-9]과 같이 @Test 어노테이션 테스트 메소드 이름을 블록 지정해 오른쪽 컨텍스트 메뉴의 [Run As > JUnit Test]를 선택해 실행한다. 초록 막대가 나오면 테스트가 성공적으로 된 것이고 그렇지 않으면 실패한 것이다.
메이븐으로 실행을 할 때, Maven Clean을 하고 Maven Install를 실행하면 [그림 Ⅲ-1-10]과 같이 콘솔에서 TEST 성공여부를 바로 알 수 있다.

[그림 Ⅲ-1-9] 맵리듀스 Junit 이용한 단위테스트

[그림 Ⅲ-1-9] 맵리듀스 Junit 이용한 단위테스트

[그림 Ⅲ-1-10] 메이븐 테스트 실행

[그림 Ⅲ-1-10] 메이븐 테스트 실행


잡트레커 웹 인터페이스

하둡 맵리듀스 잡 히스토리를 보면 다음과 같은 그림을 볼 수 있다. 잡 히스토리를 보면 잡의 시작시간, 종료시간, 잡ID, 맵/리듀스 태스크 수와 성공 여부를 등을 볼 수 있다. 또한 잡ID를 클릭하면 해당 잡의 상세한 부분을 볼 수 있다.

[그림 Ⅲ-1-11] 잡 히스토리 웹 UI

[그림 Ⅲ-1-11] 잡 히스토리 웹 UI

마찬가지로 잡ID 상세를 보면 Logs라는 컬럼의 ‘logs’를 클릭해 로그를 확인할 수 있다. 로그를 보면 해당 로그가 몇 번째 클러스터인지 확인할 수 있다. 실패했을 경우도 마찬가지다.

[그림 Ⅲ-1-12] 잡ID 상세항목

[그림 Ⅲ-1-12] 잡ID 상세항목

[그림 Ⅲ-1-13] 로그

[그림 Ⅲ-1-13] 로그

이 섹션에서는 WordCount 맵리듀스를 통해 하둡의 기본적인 내용을 학습했고 입력 포맷, 출력 포맷, 하둡 데이터 타입 등도 살펴봤다. MRUnit을 통해 하둡 맵리듀스를 코딩할 때 단위 테스트를 사용하는 방법도 습득했고, 마지막으로 맵리듀스 잡을 실행해 결과를 살펴볼 수 있는 웹 사용자 인터페이도 살펴봤다. 실제 하둡은 클라우데라, 호튼웍스, 맵알 등에서 배포하는 특정한 툴이 있다. 클라우데라는클라우데라 매니저를 통해 접근할 수 있으며, 호튼웍스는 암바리를 통해 접근할 수 있다. 그렇지 않다면 직접 URL를 통해 접근해야 한다.