데이터실무

DA, SQL, DB보안 등 실무자를 위한 위한 DB기술 바이블!

스톰

데이터 처리
스트리밍 데이터 처리
스톰
작성자
admin
작성일
2021-02-15 14:13
조회
1601

하둡의 한계

지난 10년간 데이터 프로세싱 분야는 많은 발전이 있었다. 특히 하둡(Hadoop)은 대용량 데이터를 분산저장 및 분석함에 있어서 혁명을 가져 왔다. 현재는 구글(Google)이나 페이스북(Facebook)과 같은 해외 주요 기업뿐 아니라 국내 기업ㆍ기관에서도 하둡을 이용해 대용량 데이터를 분산 저장ㆍ분석하고 있다. 더 나아가 업계에서는 단순히 대용량 처리뿐만 아니라 실시간 처리까지 바라고 있다. 그러나 하둡은 배치성 처리에 특화된 시스템으로 실시간 처리에는 적합하지 않다.

사용자가 자신의 웹 페이지에 특정 시간대의 페이지 뷰 데이터를 요청한다고 생각해 보자.

Function mapper(allData){if(data.url=url&&data).timestamp>=start &&data.timestamp<=end{output.collect(url, one);}} Function reducer(url, one){sum+= values.next().get(); output.collect(key, new IntWritabel(sum));}

실제로 작동하기 위해서는 훨씬 많은 코드와 라이브러리를 연결해야 하지만, 위는 대략의 로직을 보기 위한 슈도코드다. 핵심 로직만 보면 간단하다. 클러스터가 잘 갖추어져 있으면, 테라바이트(TB) 단위의 데이터를 수십 분 안에 처리해 결과를 얻을 수도 있다. 그러나 단순히 자신의 웹 페이지의 페이지 뷰를 확인하기 위해 어떤 사용자가 수십 분을 기다릴 수 있을까? 게다가 본인뿐만 아니라 수만 명의 페이지 방문객이 동일한 상황에 직면하게 된다면 어떻게 될까? 위와 같이 매번 요청이 있을 때마다 전체 데이터를 읽어서 맵리듀스(Mapreduce)한다는 것은 매우 비효율적인 일이다. 그래서 많은 기업이 중요한 데이터에 대해 맵리듀스로 매시간 혹은 매일 미리 모든 페이지의 페이지 뷰를 구하는 것과 같은 배치 작업을 수행한다.
그렇다면 최근 실시간 분야에 있어서 이슈가 되고 있는 tajo, shark, nosql은 어떨까? 분명 이 솔루션들은 메모리 활용과 색인 등을 통해 빠른 응답성을 지원하지만, 필요할 때마다 대용량 데이터를 처리해야 하는 것은 마찬가지다. 또한 일반적으로 인바운딩 데이터가 계속 들어오는 것이 아니라, 배치성으로 들어오므로 최신의 데이터로는 분석을 하지 못한다. 그러므로 앞단에서는 스톰(storm)을 이용해 사전 작업과 실시간 분석을 하고 스톰 뒤에서 tajo, shark 등으로 고속 분석이 가능하게 구성하는 경우가 많다.
데이터가 continuous, unbounded, rapid, time-varying stream와 같은 속성을 가진다면, 기존의 맵리듀스와 같은 패러다임이 아닌, 새로운 접근이 필요하다. 이와 같은 데이터들은 여러 분야에서 많이 발생하지만, 위와 같은 한계 때문에 상당수가 버려지고 있었다(물론 HDFS의 출현에 따라 저장 비용 자체는 많이 낮아져서 저장은 충분히 가능하다). 예를 들면 네트워크 모니터링, 센서 데이터, 통화 내역(Telecom Call Record), 웹 로그, 생산관리 데이터, 금융 데이터 등이 이에 해당한다. 나열된 로그 예제에서는 최근 주목 받고 있는 IoT(Internet of Things) 관련 분야나 빅데이터 적용이 한창인 통신회사, 앞으로 전망이 높은 제조 분야 등이 포함된다.
다시 페이지 뷰 이야기로 돌아와서 새로운 접근법에 대해 고민해 보자. 만약에 정의돼 있는 비즈니스 로직에 대한 결과를 요청할 때마다 작업을 수행하는 것이 아니라, 클릭할 때마다 웹 페이지의 카운트를 늘려 나간다면, 전체 데이터를 매번 접근할 필요도 없고 거의 실시간(real-time)으로 사용자의 요청에 응답할 수 있지 않을까?

Function getData(continuousData){addcount(url)}

아까와 같이 슈도코드(Pseudocode) 형태 코드이므로 심플하게 구현할 수 있을 것 같다. 그러나 이것을 가능하게 하기 위해서는 다음과 같은 사항이 필요하다.


  • 메시지 큐: 데이터를 순차적으로 받아서 임시 저장한 후 처리 로직으로 전달해주기 위한 프로세서
  • 워커(Worker): 큐로부터 데이터를 받아서 계산을 하거나 database를 업데이트한 후 다른 큐로 데이터를 전달하기 위한 프로세서
  • 스케쥴러: 이 모든 작업들을 관리하고 작업 실패 시 재시도하는 작업 등을 수행하는 프로세서
  • ResourceManager: 큐와 워커에 자원을 할당하고 회수하는 프로세서

단순히 페이지 뷰를 구하기 위해 시작한 작업인데 손도 못 댈 만큼 일이 커져버렸다. 심지어 쌓여있는 데이터가 서버 한 대로 처리할 수 없을 만큼 큰 용량이라면, 분산 처리를 해야 하므로 위 프로세서들은 수십 배 더 복잡해 질 것이다. 트위터(Twitter)는 수년 전부터 이와 같은 고민을 하고 있었다. 이에 BackType를 인수해 스톰(storm)이라는 리얼타임 솔루션을 만들어 실제로 트위트 분석 작업과 안티스팸 작업 등에 사용하고 있다.


스톰 소개와 아키텍처

스톰 소개

스톰은 트위터로 합병된 BackType에서 최초 개발된 것으로 하둡에서 처리하지 못하는 실시간 데이터를 대용량으로 처리 할 수 있는 실시간 분석 시스템이다. 트위터는 BackType을 합병후 트위터의 실시간 분석과 최적화, 안티 스팸 등에 활용하다가 스톰이라는 이름의 프로젝트를 오픈소스로 공개했다. 2013년 9월, 아파치 인큐베이터 프로젝트에 등록됐으며 2014년 9월 아파치 톱 프로젝트로 올라섰다. 주로 자바와 클로저라는 언어로 구현돼 있고, 핵심 부분은 간결성을 위해 클로저로 구현돼 있다. 스톰은 위에서 언급한 Message Queue, Worker 등의 기능을 빌트인으로 제공해 실시간 계산 작업이 가능하도록 해 준다. 이것을 이용해 지속적인 메시지를 처리하거나 실시간으로 계산 및 데이터베이스 업데이트를 하는 등 스트리밍 프로세싱과 지속적 처리(continuous computation)를 가능하게 한다. 또한 고비용의 병렬 계산을 동적으로 수행하는 Distributed RPC에도 사용될 수 있다.

스톰의 주요 특징은 다음과 같다.


  • 심플 프로그래밍 모델: 맵리듀스가 병렬처리 프로세싱 구현의 복잡도를 낮춰주는 것과 같이 스톰 또한 분산 real-time 프로세싱 구현의 복잡도를 낮춰준다.
  • 다양한 언어로 구현 가능: 어떤 언어든 사용자가 익숙한 언어를 이용해 구현할 수 있다. 클로저(Clojure), 자바, 루비, 파이썬을 기본으로 제공하며 그 밖에 언어도 Storm communication protocol의 구현만으로도 사용이 가능하다.
  • 폴트톨러런트: 스톰은 worker process나 노드의 장애를 자동으로 관리해준다.
  • Horizontally scalable: 멀티플 스레드, 프로세스, 서버를 이용해 병렬 처리가 가능하며, 추가 확장이 쉽다.
  • Guaranteed message processing: 작업 실패 시에는 데이터의 시작 단계부터 다시 재처리해주는 replaying message system이 구현되어있기 때문에 하둡과 같이 각 메시지가 유실되지 않는다.
  • 빠른 속도: Netty(혹은 ZeroMQ)를 사용해 메시지를 빠르게 처리할 수 있도록 설계돼 있다.
  • 로컬 모드: 스톰에서는 클러스터 모드와 로컬 모드를 지원한다. 개발 시 클러스터 테스트가 어려울 경우 로컬 모드로 테스트해 번거로운 배포 작업을 피하면서 단위 테스트를 용이하게 할 수 있다.
  • 쉬운 관리: 하둡과는 달리 클러스터를 관리하는 작업이 매우 간단하다. 복잡한 설정이나 관리 포인트가 없이 매우 단순하면서도 강인하다.
스톰 소개

스톰의 클러스터는 마스터 노드(Nimbus)와 워커노드(Supervisor)로 구성되며, 주키퍼(Zookeeper)를 이용해 관리한다.


  • 님버스: 님버스(Nimbus)라는 이름의 데몬이 마스터 노드 역할을 한다. 여기서 마스터 노드는 하둡의 잡트레커와 유사한 개념으로서 작업 할당과 실패 확인 등 관리 역할을 한다.
  • 수퍼바이저: 수퍼바이저(Supervisor) 데몬이 실제적으로 워커 프로세스의 시작과 종료, 실행 상태 모니터링 등을 수행하며, 하둡의 태스크 트레커와 유사하다.
  • 주키퍼: 스톰에 포함된 모듈은 아니며, 아파치 프로젝트 중 하나다. 분산된 노드 간의 관리를 수행하고 시스템의 안정성을 유지하도록 관리해주는 역할을 한다.

[그림 Ⅲ-3-1] 스톰 아키텍처

[그림 Ⅲ-3-1] 스톰 아키텍처

스톰에서는 Stream, Spout, Bolt, Task, Worker, Topology 등의 용어를 정의하고 있다.


  • Stream: 스톰의 중요한 추상 개념이다. 데이터가 끊임없이 연속적으로 들어올 때, 그 각각의 데이터를 병렬 분산해 Tuple이라는 형태로 관리한다. Stream은 integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays 형태로 Tuple을 지정할 수 있다. 또한 시리얼라이저하게 자신만의 타입을 구현할 수도 있다.
  • Spout: Stream의 데이터 소스다. 하나의 Spout은 하나 이상의 Stream을 발생시킬 수 있으며, Kestrel, RabbitMQ, Kafka 등을 통해 외부로부터 데이터를 받아 오는 역할을 한다.

    [그림 Ⅲ-3-2] Stream, Tuple, Spout

    [그림 Ⅲ-3-2] Stream, Tuple, Spout

  • Bolt: 실제로 데이터 처리를 수행한다. 연속적인 연산을 하거나 DB에 기록하는 등의 작업을 한다. 하나의 Bolt는 1개 이상의 input Stream을 받을 수 있고, 1개 이상의 output Stream을 발생시킬 수 있다.

    [그림 Ⅲ-3-3] Bolt

    [그림 Ⅲ-3-3] Bolt

  • Topology: Stream의 그루핑에 의해 Spout와 Bolt 조합으로 이뤄진 네트워크다. Topology는 다양한 조합의 형태로 가능하다. Spout는 Kafka, Flume 등과 연동해 데이터 스트림을 받고, Bolt 중 몇 개는 HDFS, HBase, RDBMS 등에 결과 데이터를 쓸 수 있다. 이것에 대한 자세한 내용은 다음 절에서 다루겠다.

    [그림 Ⅲ-3-4] 스톰 토폴로지

    [그림 Ⅲ-3-4] 스톰 토폴로지

[표 Ⅲ-3-1] Bolt


스톰 하둡
처리방식 리얼타임 배치
잡관리 님버스는 스톰을 통해 요청되고 실행되는 모든 잡을 관리 JobTracker
태스크 관리 수퍼바이저가 모든 워커 프로세스들을 관리 TaskTracker
처리 프로세서 Spout, Bolt를 실행하는 프로세서를 Worker라고 함 Task
처리 방식 여러방법과 조합이 가능. Map과 Reduce 두 가지 방법 밖에 없음.
스토리지 여부 스토리지가 필요 없음(물론 스토리지 사용도 가능함) HDFS가 필요함
작업 유형 작업의 끝이 없음(Continuous Processing) 맵리듀스 작업은 끝이 있음

스톰 설치

사전 준비 작업

스톰을 설치하기 위한 선결 조건은 아래와 같다.


  • 리눅스 또는 Mac(OSX) 시스템
  • 자바 1.6 이상
  • 파이썬 2.6.6 이상, unzip, git
  • 주키퍼, Zeromq, jzmq(참고로 버전에 따라 Netty 대신 ZeroMQ가 쓰이기도 하며, 스톰을 직접 빌드하기 위해서는 gcc, maven 등 각종 라이브러리가 추가로 필요)
스톰 설치하기
Zeromq 설치

Zeromq는 최근 버전에서 netty로 변경되었지만 옵션으로 사용하기 위해서는 미리 설치해두는 것이 좋다.

$ wget http://download.zeromq.org/zeromq-2.1.7.tar.gz $ tar -xzf zeromq-2.1.7.tar.gz $ cd zeromq-2.1.7 $ ./configure $ make $ sudo make install


Jzmq 설치

Jzmq는 Zeromq를 사용하기 위한 모듈이다.

$ git clone http://github.com/nathanmarz/jzmq.git $ cd jzmq $ ./autogen.sh $ ./configure $ make $ sudo make install


주키퍼 설치

주키퍼는 메시지 전달이 아니라 주로 데몬 관리를 위한 것이다. 님버스 데몬은 실패를 빠르게 복구하도록 돼 있다. 이것은 주키퍼를 이용해 분산된 노드 간의 관리를 수행하고 시스템을 견고하게 하고 있으므로 가능한 것이다. 그렇기 때문에 단일 노드 주키퍼도 가능하지만, 주키퍼 클러스터를 구성하는 것이 전체적인 시스템 안정성 측면에서 유리하다.

$ wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz $ tar xzvf zookeeper-3.4.5.tar.gz $ ln -s zookeeper-3.4.5 zookeeper $ cd zookeeper


주키퍼 클러스터 설정

$ cp conf/zoo_sample.cfgconf/zoo.cfg $ vi conf/zoo.cfg server.1=192.168.1.1:2888:3888(본인의 서버 주소) server.2 = 192.168.1.2:2888:3888 server.3 = 192.168.1.3:2888:3888

각 서버마다 다른 이름을 할당한다.

$ cat> $dataDir/myid 1 (ctrl d를 눌러서 빠져나온다.)

zoo.cfg 파일에서 server.1=192.168.1.1:2888:3888 부분은 주키퍼 1번, 서버는 192.168.1.1에 설정한다는 의미다. 이에 따라 myid에서 파일을 1로 준 것이다. 192.168.1.2 서버에서는 server.2로 설정해 주었으므로 myid 파일은 2로 설정한다. 또한 192.168.1.x 주소는 예시일 뿐이고 실제로는 각자의 실제 IP 주소를 넣어 주어야 한다.


스톰 설치

최근 페이지 변경을 많이 하면서 주소가 자주 바뀌고 있다. 아래 주소로 접속이 안된다면 http://storm.apache.org/downloads.html에서 내려 받는다.

$ wget http://mirror.apache-kr.org/storm/apache-storm-0.9.4/apache-storm-0.9.4.zip $ unzip apache-storm-0.9.5.zip $ ln -s apache-storm-0.9.4 storm


각 서버의 storm/conf/storm.yaml

storm.zookeeper.servers: -192.168.1.1 -192.168.1.2 -192.168.1.3 storm.local.dir : "/mnt/storm" nimbus.host:"192.168.1.1" #how many workers run on that machine supervisor.slots.ports: -6700 -6701 -6702 -6703 storm.messaging.transport: "backtype.storm.messaging.netty.Context" storm.messaging.netty.buffer_size:16384 storm.messaging.netty.max_retries:10 strom.messaging.netty.min_wait_ms:1000 storm.messageing.netty.max_wait_ms:5000


스톰 실행
마스터 노드 실행

$ bin/storm nimbus & $ bin/storm ui&


워커노드 실행

$ bin/storm supervisor


샘플 프로그램 테스트

#Download sample code $ git clone git://github.com/apache/storm/git $ cd storm/example/storm-starter #Compile the sample code $ mvn clear install -DskipTests=true #Submit the package to the master $ storm jar storm-starter-0.9.3-incubating-SNAPSHOT-jar-with-dependencies.jar storm.starter. ExclamationTopology ExclamationTopology #Kill the topology $ storm kill ExclamationTopology


스톰 실습

트위터는 스톰을 오프소스로 전환하면서 코드를 단순히 공개만 해놓은 것이 아니라, wiki를 통해 스톰의 기본 개념 설명과 양질의 소스코드를 제공하면서 지속적으로 업데이트하고 있다. 혼자서도 학습이 가능하도록 가능한 기본적인 실습은 Storm wiki에서 제공하는 코드 위주로 소개하겠다.


ExclamationTopology의 기본 원리

public class ExclamationTopology {public static class ExclamationBolt extends BaseRichBolt{OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector=collector;}@Override public void execute(Tuple tuple){_collector.emit(tuple, new Values(tuple.getString(0)+"!!!")); _collector.ack(tuple);}@Override public void deClareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("word"));}}public static void main(String[] args) throws Exception{TopologyBuilder=new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); builder.setBolt("exclamin2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); Configconf=new Config(); conf.setDebug(true); if (args !=null &amp;&amp;args.length>0){conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology());}else{LocalCluster cluster=new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology());Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown();}}

이렇게 짧은 프로그램만으로 실시간 분산 처리가 가능하다. 이 짧은 코드 안에 스톰의 주요 개념이 모두 들어 있다. 다소 생소한 코드가 포함돼 있으므로 첫 번째 코드는 하나하나 설명하겠다. 일단 위의 코드를 실행해 보자. 실행할 때마다 조금씩 결과가 다르게 나오겠지만, “!!!!!!”가 붙은 영어 단어가 계속 출력될 것이다. 왜 이런 결과가 나오는지 아래를 살펴보자.


토폴로지

1:TopologyBuilder builder = new TopologyBuilder(); 2:builder.setSpout("words", new TestWordSpout(), 10); 3: builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("words"); 4: builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"));

TopologyBuilder라는 이름을 보면, 정확히 어떻게 구성돼 있는지는 몰라도 토폴로지를 만드는 Builder가 있고, 그 builder에 이전에 배웠던 spout과 bolt라는 것을 set해 줌을 알 수 있다.


  1. line1 : 토폴로지를 Builder를 통해 만든다.
  2. line2 : TestWordSpout이라는 Spout Class에 “word”라는 이름을 붙여서 할당한다.
  3. line3 : 위에서 설정한 “words spout”을 shuffleGrouping이라는 방식으로 ExclamationBolt라는 Bolt Class로 보낸다. 그리고 이름은 “exclaim1”으로 한다.
  4. line4 : 위에서 설정한 “exclaim1”을 shuffleGrouping해 이번에도 똑같이 ExclamationBolt로 보내고 그 이름을 “exclaim2”로 한다.

2장 SQL On Hadoop에서 설명했듯이 spout는 토폴로지의 원천 소스가 되고, bolt해서 다음 단계로 넘기는 역할을 한다. 각 spout와 bolt에서 어떤 역할을 하는지는 몰라도 spout에서 특정 데이터를 생성하고, 똑같은 ExclamationBolt를 두 번 수행하는 것이 이 토폴로지의 전체 흐름이다.


TestWordSpout

public void nextTuple() {Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Randow(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word));}

TestWordSpout이 어떤 역할을 하는지 알아보자. 사실 이 코드는 맨 위에서 언급하지는 않았었지만, 자체적으로 내장된 샘플 코드다. "nathan", "mike", "jackson", "golda", "bertels"와 같은 리스트에서 무작위로 하나씩 뽑아서 하나의 Tuple로 emit하고 있다. emit는 다음 단계로 처리할 데이터를 전달해주고 데이터 전송이 되었음을 알려준다.


ExclamationBolt

public void nextTuple() {OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector){_collector=collector;}@Override public void execute(Tuple tuple){_collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple);} @Override public void deClareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}


  • prepare(): bolt로부터 내보내지는 Tuple들에 사용되는 OutputCollector라는 형식을 미리 준비해둔다.
  • excute(): bolt로 들어온 Tuple을 한 개씩 받아서 excute()내의 로직을 처리한다. ExclamationBolt에 excute() 내에서 첫 번째 라인은 받아온 Tuple에 “!!!”를 붙여서 내보낸다(emit). 두 번째 라인은 입력 받은 Tuple을 ack하는 것이다. 이것은 스톰의 데이터 손실을 막기 위한 방법이다.
  • declareOutputFields() :ExclamationBolt가 "word"라는 하나의 필드를 내 보내도록 선언하고 있다.

이로써 중요 코드를 모두 보았다. 종합해 보자면, TestWordSpout에서 임의의 단어를 내 보내고, 2번의 Bolt를 통해 “!!!”를 한 번씩 추가하는 작업을 함을 알 수 있다. 그래서 맨 처음 나왔던 결과와 같이 특정 단어에 “!!!!!!”이 붙어서 출력되는 것이다. 스톰은 로컬 모드(Local mode)와 분산모드(Distributed mode)라는 두 가지 동작 모드가 있다.


로컬 모드

스톰은 스레드를 이용해 워커 노드들을 모의 수행한다. 로컬 모드는 topology 개발과 테스트할 때에 유용하다.
ExclamationToplogy에서 사용됐던 코드를 통해 로컬 모드가 어떻게 동작하는지 알아보자.

1:conf.setDebug(true); 2:conf.setNumWorkers(3); 3:LocalCluster cluster = new LocalCluster(); 4:cluster.submitTopology("test", conf, builder.createTopology()); 5:Utils.sleep(10000)l 6:cluster.killTopology("test"); 7:cluster.shutdown();


  • debug 모드: true로 설정하면, spout과 bolt에서 나오는 모든 Tuple들이 로그로 기록된다. 디버깅할 때 유용하다.
  • number of threads: 로컬 모드에서 몇 개의 프로세스를 할당할 것인지 설정한다.
  • LocalCluster 객체를 생성해 클러스터를 정의한다.
  • 로컬l 내 가상 클러스터에 Topology를 제출하는 것으로 submitTopology(Topology name, Config, Topology)와 같은 형식을 가진다
  • 10초 동안 대기한다.
  • 2번에서 사용한 Topology name을 갖는 Topology를 kill한다.
  • 로컬 클러스터를 종료한다.
분산 모드

스톰은 여러 서버들에서 분산해 동작한다. Master로 Topology를 submit하면, master는 분산 실행을 위해 그 Topology의 코드를 전달한다. 만약 worker가 죽게 되면, master가 해당 작업을 다른 곳에 재할당한다.

1:conf.setNumWorkers(20); 2:conf.setMaxSpoutPending(5000); 3:StormSubmitter.submitTopology("mytopology", conf, topology);


  1. line1 : Topology를 실행하기 위한 worker process 수를 설정한다.
  2. line2 : Spout의 pending 숫자를 조절한다. 너무 크면 queue overflow가 발생할 수 있고, 너무 작으면 성능이 나오지 않는다.
  3. line3 : 로컬 클러스터에 Topology를 제출하는 것과 동일한 방법으로 처리하면 된다.

1:conf.setNumWorkers(20);

간단하게 분산 모드의 작동 원리를 알아보자.

[그림 Ⅲ-3-5] Physical View

[그림 Ⅲ-3-5] Physical View


  • bin/storm jar를 실행하면 toplogy sumitter가 topology 관련 파일(구현 로직과 설정 파일)을 업로드한다.
  • 님버스는 주키퍼를 통해 현재 가용한 자원(supervisor)들을 확인한다.
  • supervior는 님버스로부터 topology를 내려 받는다.
  • 이후 supervisor는 JVM processor인 worker를 실행해 실제 작업을 시작한다.
WordCountTopology(다중 언어 지원, Grouping)

이제 기본적인 코드 작성법을 알았으므로 맵리듀스를 처음 배울 때 사용한 예제인 WordCount를 구현해 보자.


RandomSentenceSpout

가장 먼저 Topology의 시작점인 Spout부터 구현해 보자. 맵리듀스를 공부할 때는 텍스트 파일이나 일정한 문장을 input으로 해 WordCount를 했다. 하지만 스톰은 연속적으로 발생하는 데이터도 처리할 수 있다. 이러한 기능을 재현하기 위해 이전 예제와 같이 무작위로 스트림을 발생시키는 Spout를 구현하도록 해 보자.

public class RandomSentenceSpout extends BaseRichSpout{SpoutOutputCollector _collector; Random _rand; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){_collector = collector; _rand = new Random();} @Override public void nextTuple(){Utils.sleep(100); String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence));} @Override public void ack(Object id){@Override public void fail(object id){}@Override public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("word"));}}

Spout의 구조도 Bolt와 크게 다른 것이 없다. nextTuple() 부분만 내보낼 Tuple를 위한 로직을 넣어 주고, declareOutputFields에서 내 보낼 Tuple의 형태를 정의해 주면 된다.


SplitSentenceBolt

Spout에서 문장 단위로 전달을 해오기 때문에 이것을 Bolt에서 단어 단위로 나눌 필요가 있다.

public static class SplitSentence extends ShellBolt implements IRichBolt {publicSplitSentence(){super("python", "splitsentence.py");} @Override public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new fields("word"));} @Override public Map<String, Object>getComponentConfiguration(){return null;}}

위의 예제 코드를 보면, split하는 로직은 보이지 않는다. 대신 SplitSentence()라는 함수 안에서 파이썬을 언급하는 코드가 있음을 볼 수 있다. 이것은 파이썬을 이용해 Bolt를 매우 간단하게 구현한것으로, 실제 split하는 로직이 들어 있는 파이썬 코드는 아래와 같다.


splitsentence.py

import storm classSplitSentenceBolt(storm.BasicBolt); def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) SplitSentenceBolt().run()

스톰은 이와 같이 간단하게 다중 언어를 지원한다.


WordCount

이제 단어 단위로 input을 받아서 count해 주는 로직만 남았다. 맵리듀스에서도 했듯이, 로직 자체는 매우 간단하다. declareOutputFields에 “word, count”로 선언해 단어별 count를 결과로 하도록 구현하는 것만 염두에 두면 된다.

public static class WordCount extends BaseBasicBolt {Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector){String word=tuple.getString(0); Integer count = counts.get(word); if(count==null) count=0; count++; counts.put(word, count); collector.emit(new Values(word, count));} @Override public void deClareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fields("word", "count"));}}


Grouping

builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count, new WordCount(), 12).shuffleGrouping("split");

ExclamationTopology에서 했던 것을 참고해 Topology를 구현해 보았다. 그러나 이 Topology를 실행해 보면, 예상했던 결과와는 다르게 나오는 것을 알 수 있다.
그 이유는 ExclamationTopology 예제에서 자세히 설명하지 않았던 shuffleGrouping라는 함수 때문이다. shuffleGrouping은 단어가 뜻하는 것과 같이 데이터를 임의로 섞어서 전송해 주는 역할을 한다.
WordCount할 때, 각 단어가 임의대로 각각의 서버에 보내지면 제대로 된 결과값을 얻지 못한다. 그러므로 맵리듀스에서는 Key에 단어를 넣고 Value에 1을 넣은 후 단어가 하나의 서버로 들어 가도록 구현한다. 스톰에서 WordCount를 구현하려면, Grouping 기능을 활용해 Count를 해야 정확한 결과를 얻을 수 있다.
먼저 스톰에서 제공하는 Grouping에 대해 알아 보자. Streaming Grouping이란 Topology에서 두 컴포넌트 간에 Tuple을 어떻게 전송할지 결정하는 것이다. 이전 예제에서 확인했듯이 Spout과 Bolt는 병렬로 많은 태스크를 이용해 실행하며, 다양하게 상호 데이터를 전송한다.

[그림 Ⅲ-3-6] Task간의 데이터 전송

[그림 Ⅲ-3-6] Task간의 데이터 전송

주로 쓰는 Grouping 방법


  • Shuffle grouping: 임의로 각 파티션에 grouping해 처리한다.
  • Fields grouping: 동일한 필드값을 가지고 있는 Tuple끼리 grouping한다.
  • All grouping: 해당 Stream을 모든 bolt에 replicate한다.
  • Global grouping: 가장 낮은 ID를 가지고 있는 곳(부하가 적은 곳)에 전체 Stream을 보낸다.
  • Custom grouping: 사용자가 원하는 임의의 방법으로 grouping할 수 있도록 구현할 수 있도록 해당 기능을 제공하고 있다.

[그림 Ⅲ-3-7] Grouping 종류

[그림 Ⅲ-3-7] Grouping 종류


Topology

다시 Topology로 돌아가 보자. 맵리듀스의 Key를 이용한 WordCount를 구현하기 위해서는 같은 단어끼리 묶을 수 있는 Fields 방식을 이용하면 된다.

builder.setSpout("sentences", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

FiledsGrouping 사용 방법은 ShuffleGrouping과는 달리 “FieldsGrouping(input, 구분할 filedname)”과 같은 방식으로 이용하면 된다.
위의 Topology 구성 방법을 해석해 보면, spout에서 지속적인 Stream이 발생하고 split bolt에서 ShuffleGrouping을 이용해 각 서버에 골고루 분산해 효율적으로 처리한다. 그리고 FieldsGrouping을 이용해 “word”fileds를 기준으로 각 서버에 전송해 WordCount를 수행한다.

마지막으로 TF-IDF를 구현하면서 스톰의 Trident 기능을 알아 보자. 그러나 지면 관계상 지금은 코드를 모두 다루기보다는 스톰을 이용해 어떻게 TF-IDF를 구할 수 있는지를 간단하게 알아 보자.


TF-IDF

TF-IDF는 Term Frequency-Inverse Document Frequency의 약자로 여러 검색엔진에서 활용하는 데이터 처리 알고리즘이다. 여러 문서가 있을 때, 특정 단어가 특정 문서 안에서 얼마나 중요한 의미를 갖는지 통계적 수치를 구하는 것이다. 검색엔진에서는 검색 결과의 순위를 결정하거나 문서들 사이에 비슷한 정도를 구하기 위해 활용된다.

구현을 하기에 앞서서 TF-IDF가 수학적으로 어떻게 정의돼 있는지 위키피디아를 참고해 알아 보자.

builder.setSpout("sentences", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));




TF-IDF의 수학적 정의

간단하게 설명하면 TF-IDF는 단어 빈도와 역문서 빈도의 곱이다.

단어 빈도?tf(t,d)에서 이 값을 산출하는 가장 간단한 방법은 단순히 문서 내에 나타나는 해당 단어의 총 빈도수를 사용하는 것이다. 문서 내에서 단어 t의 총 빈도를 f(t,d)라 할 때, 가장 단순한 tf 산출 방식은 tf(t,d) = f(t,d)가 된다. 그 밖에 TF값을 산출하는 방식에는 다음과 같은 것들이 있다.


  1. - boolean 빈도: tf(t,d) = 1: t가 d에 한 번이라도 나타나면 1, 아니면 0;
  2. - Log 스케일 빈도: tf(t,d) = log (f(t,d) + 1);
  3. - 증가빈도: 문서 길이에 따라 단어의 빈도값 조정

역문서 빈도는 한 단어가 문서 전체에서 얼마나 공통적으로 나타나는지를 나타내는 값이다. 전체 문서의 수를 해당 단어를 포함한 문서의 수로 나눈 뒤, 로그를 취해 얻을 수 있다.




위의 내용을 참고로 해 우리가 사용할 공식을 아래와 같이 도출했다.

[그림 Ⅲ-3-8] TF-IDF 수식

[그림 Ⅲ-3-8] TF-IDF 수식


  • tf(t,d): 주어진 문서 d에서 단어 t가 몇 번이 나타났는지 구한 것으로, 단어의 빈도를 뜻한다.
  • df(t): 단어 t가 전체 문서를 통틀어 얼마나 빈번하게 나타나는지를 나타낸다.
  • D: 문서들의 총합을 뜻한다.

위 수식을 기반으로 아래의 Spout과 Bolt만 구현하면 된다.


  • TF-IDF를 구할 대상인 문서를 Stream으로 발생시킬 Spout
  • Stream을 단어 단위로 자르는 TokenizerBolt
  • tf(t,d), df(t), D를 각자 구하고 TF-IDF 수식에 대입해 주는 Bolts

하지만 맵리듀스를 학습할 때도 느꼈겠지만, 모든 로직을 직접 코딩하는 것보다는 Pig와 같은 고급(high-level) 언어가 훨씬 간단하다. Tokenizer와 같이 세부적인 조작이 필요할 때는 제외하더라도 tf(t,d), df(t), D와 같이 “select .. group by”로 간단히 구현될 만한 부분은 고급 언어로 작성하는 것이 좋다. 이와 같은 로직을 위해 스톰에서는 Trident라는 기능을 추가했다.


Trident

Trident는 분산ㆍ실시간으로 high-level형 처리를 해주는 기능으로, 스톰 0.8 버전에서 처음 등장했다. Trident는 스톰의 트랜잭션 기능과 DRPC 기능을 활용하고, 지연 없이 빠르게 처리할 수 있는 쿼리를 혼합해 분산 환경에서 빠른 성능을 지원한다.
또한 pig나 cascading과 같은 높은 수준의 추상화 배치성 프로세스와 비슷한 로직을 구현할 수 있도록 joins, aggregations, grouping, functions, filters를 제공해 더 간단하게 로직을 구현할 수 있다.

Trident를 이해하기 위한 WordCount 예제를 보자.

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"),3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true);

WordCountTopology에서 다뤘던 예제와는 조금 다른 형태의 코드이지만, Value()에 있는 문장들을 반복해 FixedBatchSpout라는 이름의 spout을 통해 Stream을 발생시키고 있음을 알 수 있다.

TridentTopology topology = new TridentTopology(); TridenStatewordCounts = topology.newStream("spout", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .parallelismHint(6);

이제 가장 핵심이 되는 코드를 보자. 클래스는 다르지만 TridentTopology를 생성하고, newStream이라는 함수를 통해 spout를 할당하는 것까지는 일반적인 스톰 Topology와 같다. 그러나 다음 라인을 보면 each, groupby, persistentAggregate와 같이 pig와 비슷한 문법이 사용되고 있다.
Trident에 대해 자세히 알지 못해도 spout로부터 Stream을 받고, 각각을 split() 함수를 통해 word 단위로 나눠 grouping을 한다. persistentAggregate에서 grouping해 전달하는 단어들을 count()를 통해 단어 수를 세고 지속적으로 업데이트하는 역할을 함을 알 수 있다. 이와 같이 Trident를 이용하면, 훨씬 간단하고 직관적으로 로직을 구현할 수 있다.


트톨러런스

실시간 처리 시스템은 성능도 중요하지만, 장애 대비 기능이 매우 중요한 요소다. 물론 배치 작업도 장애가 발생해서는 안되겠지만, 실시간 처리 시스템은 더욱 더 장애에 민감하고 무중단을 추구해야 한다. 그래서 스톰에서는 주키퍼를 이용해 시스템 장애가 발생하더라도 무중단으로 대처할 수 있도록 구성됐다.



Worker가 죽었을 때

작업시간이 너무 오래 걸리거나 메모리 등의 문제로 worker가 제대로 동작을 못하고 죽는 경우가 있다. 이 경우에는 worker를 관장하는 해당 노드의 supervisor가 worker를 새로 구동시키고 새로운작업을 할당한다. 또한 하나의 노드에서 worker가 계속 죽을 경우 님버스에 요청해 해당 worker를 다른 노드에 재할당해 준다.

Supervisor가 죽었을 때

Worker는 여전히 살아있으므로 할당된 작업까지는 계속 수행한다.

Worker 노드가 죽었을 때

전원 등의 문제로 노드 자체가 죽었을 때 님버스는 다른 worker 노드에 작업을 재할당한다.

님버스가 죽었을 때

Supervisor와 worker가 계속 살아 있으므로 작업은 계속 수행된다. 그러나 님버스에서 수행하던 작업 재할당 같은 것은 수행되지 않으므로 최대한 빨리 님버스를 복구하는 것이 좋다.

Single point of failure?

현재로써는 님버스가 죽으면 작업은 계속 수행되지만 작업 재할당 등이 되지 않으므로 님버스가 살아있는 것이 매우 중요하다. 이러한 점 때문에 nimbus HA를 준비중에 있다.


Guaranteeing Message Processing

처리 데이터에 따라 다르겠지만, 스트리밍으로 처리하더라도 데이터 유실은 발생하지 않는 것이 좋다. 많은 실시간 스트리밍 소프트웨어들이 최대한 데이터 유실을 하지 않으면서 성능을 내기 위해서 노력중이지만, 모든 것을 다 만족시킬 수는 없는 트레이드오프 성격을 갖고 있다. 스톰에서는 이것을 위해서 여러 가지 옵션을 준비했다. 위에서 실습한 소스코드에서 ack(), fail() 함수를 보았을 것이다. 스톰에서는 이 함수를 이용해 Message(Tuple)가 완전히 처리됐는지 확인한다.

public class SplitSentence extends BaseRichBolt{OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector){_collector = collector;} public void execute(Tuple tuple){String sentence = tuple.getString(0); for(String word: sentence.split(" ")){_collector.emit(tuple, new Values(word));}_collector.ack(tuple);} public void declareOutputFields(OutputFieldsDeclarer declarer){declarer.declare(new Fielsd("word"));}}

일반적으로 메시지가 무사히 다음 단계로 전달되면 ack() 함수를 호출한다. 마지막 Bolt까지 ack()함수가 호출된다면, 해당 Tuple이 완전히 처리했다고 인지하고 Spout에서 해당 Message(Tuple)를 큐에서 삭제하도록 한다.
특수한 경우 fail() 함수를 호출해 고의로 전달 실패를 발생시킬 수도 있지만, 장애로 인해 ack()함수를 호출할 수 없을 때가 있다. Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS에서 설정된 시간안에 완전히 처리되지 못하면, 해당 Tuple은 실패한 것으로 간주한다. 그리고 실패한 Tuple은 님버스에 의해 다시 Tuple을 만들어서 전달하도록 재시도한다.
이때 모든 Message들을 다 추적하려면 메모리 사용이 많아지므로 Tuple마다 Task ID와 64비트 ack val을 부여한다. 그리고 이 ack val은 xor 연산을 이용해 전체 과정을 완료하면, 해당 ack val이 0이 된다. 0이 되면 Tuple tree가 완료된 것으로 인지하고, 님버스가 큐에서 해당 Message(Tuple)를 삭제한다.

참고로 위에서 언급한 Trident는 트랜잭션 기능을 조합해 메시지 전달을 보장하면서도 무조건 1번만 처리되도록 구현된 일종의 마이크로배치다.


스톰의 확장성

중간에 잠깐 언급했지만, 스톰은 다른 오픈소스와 연동할 수 있다. 예를 들어 플룸으로부터 수집한 로그를, Redis를 큐처럼 이용해 Spout과 연결할 수도 있다. 또한 scribe나 jms 등 여러 가지 방법으로 데이터 소스와 연결할 수 있다. 그리고 스톰은 저장 기능을 갖고 있지 않으므로 Cassandra나 HBase, MongoDB, HDFS, RDBMS 등과 Bolt를 연결해 데이터를 저장할 수도 있다. 일반적으로 Log수집으로부터 발생한 로그를 스톰을 이용하여 정규화하고, 정규화한 데이터를 HDFS나 NoSQL와 같은 다양한 저장소에 저장한다. 또한 실시간 조회가 필요한 통계 데이터는 스톰을 이용하여 RDBMS나 메모리 기반인 Redis에 넣어서 웹페이지와 연동하는 경우가 많다.


스톰의 기타 기능

UI

현재 작동중인 Topology 정보와 수행 시간, Worker(spout, bolt)별 처리 스트림 양 등을 웹 페이지를 통해 간편하게 볼 수 있다.
해당 웹 페이지를 보기 위해서는 “bin/storm ui” 명령을 통해 UI 데몬을 실행하고, http://<storm-ui-server>:8080으로 접속하면 아래와 같은 화면을 볼 수 있다.

[그림 Ⅲ-3-9] UI 웹 페이지 예시

[그림 Ⅲ-3-9] UI 웹 페이지 예시


CLI

스톰에서는 클라이언트를 이용해 리모트 클러스터에 여러 가지 콘솔 명령을 내릴 수 있다. 중요 CLI(Command line Interface)에 대해 알아 보자.

[표 Ⅲ-3-2] 스톰의 주요 CLI


CLI 문법 설명
nimbus nimbus 님버스 데몬을 실행한다.
supervisor storm supervisor supervisor 데몬을 실행한다.
Ui storm ui UI 데몬을 실행한다.
drpc storm drpc DRPC 데몬을 실행한다
Log Viewer UI storm logviewer 0.9 버전 이상에서만 가능하다. 이전 버전까지는 worker들의 로그를 보려면 ssh를 통해 각 서버에 들어간 다음, tail 등을 이용해야 했으나 logviewer를 이용하면 쉽게 로그를 볼 수 있다.
Jar storm jar topology-jar-path class ... 스톰 실행해 보기에서 확인했듯이 실제로 실행할 Topology를 설정해 준다. Jar 파일로 만든 프로그램과 실행시킬 메인 메소드가 들어 있는 클래스를 지정해주면 StormSubmitter에 의해 만든 topology가 submit된다.
kill storm kill topology-name [-w wait-time-secs] topology-name에 해당하는 이름을 가진 topology를 kill한다. 먼저 topology의 spout을 정지시키고, 전체 프로세스가 끝나면 최종적으로 worker를 셧다운을 해서 기타 상태들을 모두 클리어한다. 또한 ‘?w’ 옵션을 이용해 일정 시간 이후에 종료되도록 설정할 수도 있다.
activate storm activate topology-name 지정한 topology의 spout를 활성화한다.
deactivate storm deactivate topology-name 지정한 topology의 spout를 비활성화한다.
rebalance storm rebalance topology-name 클러스터의 worker들을 rebalancing해 주는 명령어다.
예를 들어 10개의 노드에 4개의 worker가 각각 작동하고 있었다고 하자. 이때 작업 양이 많아져서 추가로 10개의 노드를 추가하더라도 특정 노드에만 편중돼 불균형 상태가 돼 성능 향상을 크게 볼 수 없다. 이때 rebalance 명령을 하면, 각 노드에 2개의 worker만 동작하도록 rebalancing해 준다. 이것은 topology를 kill/resubmit하는 수고를 덜어준다
repl storm repl 스톰의 설정 및 작동중인 topology의 작동 정보를 들여다볼 수 있다. 이것은 디버깅할 때 아주 유용하다.
classpath storm classpath 스톰을 위해 설정된 classpath들을 출력한다.
localconfvalue storm localconfvalueconf-name 로컬 스톰의 config들을 출력한다.
remoteconfvalue storm
remoteconfvalueconf-name
클러스터 스톰의 config들을 출력한다.

스톰의 가능성

스톰의 확장성과 기능에 대해 아직 다루지 못한 내용이 많이 있다. 직접 클러스터를 구축할 필요없이 AWS(Amazon Web Services)를 이용하는 방법이나 Yarn-storm, 저장소(hdfs, nosql, RDBMS)와 연계, R을 이용한 리얼타임 분석, 리얼타임 머신 러닝, 리얼타임 추천 시스템 등 스톰을 이용한 실시간 분석 방법은 매우 다양하다. 물론 아직 대용량 분석 또한 대중화 되지 않았다. 그러나 아래 그림과 같이 기존의 배치성 대용량 분석은 이미 지나버린 결과에 대한 추적일 뿐이고, 결과가 나오기까지 최소 수 분에서 수 시간을 기다려야 한다. 이제 업계는 지나간 일에 대한 분석보다 실시간 분석 결과를 원한다. 대용량 배치성 분석을 준비하고 있다면, 스톰과 같은 솔루션을 이용한 실시간 분석을 병행해 준비하는 것이 대용량 데이터 흐름을 반영한 조치일 것이다.


UI

현재 작동중인 Topology 정보와 수행 시간, Worker(spout, bolt)별 처리 스트림 양 등을 웹 페이지를 통해 간편하게 볼 수 있다.
해당 웹 페이지를 보기 위해서는 “bin/storm ui” 명령을 통해 UI 데몬을 실행하고, http://<storm-ui-server>:8080으로 접속하면 아래와 같은 화면을 볼 수 있다.

[그림 Ⅲ-3-10] Batch의 한계

[그림 Ⅲ-3-10] Batch의 한계