데이터실무

DA, SQL, DB보안 등 실무자를 위한 위한 DB기술 바이블!

오픈 API 수집방법(소셜 데이터 수집)

데이터 수집
데이터 수집 실무
오픈 API 수집방법(소셜 데이터 수집)
작성자
admin
작성일
2021-02-15 13:21
조회
6142

데이터 수집에서 로그 데이터, 관계형 데이터베이스의 데이터, 웹에 존재하는 데이터의 수집방법에 대해 알아보았다. 마지막으로 소셜 네트워크 서비스의 twitter API를 사용해 데이터를 수집하는 방법을 알아 본다. 먼저 이번 단락에서 소개할 내용의 개괄적인 그림은 다음과 같다.

[그림 Ⅰ-2-16] 트위터 수집 처리과정

[그림 Ⅰ-2-16] 트위터 수집 처리과정


트위터 정보생성

트위터 API를 사용하기 위해서는 트위터 개발자 사이트에 등록하고 접근하기 위한 정보를 생성해야 한다. 트위터 계정이 있다면 별도로 만들 필요는 없다.


트위터 API용 정보생성

[그림 Ⅰ-2-17] 트위터 계정 생성 화면

[그림 Ⅰ-2-17] 트위터 계정 생성 화면

계정을 생성하였다면 트위터 개발자 사이트(https://apps.twitter.com )에 접근해서 API를 사용할 수 있는 정보를 생성한다.

[그림 Ⅰ-2-18] 트위터 앱 개발 관리화면

[그림 Ⅰ-2-18] 트위터 앱 개발 관리화면


개발 애플리케이션 생성

[그림 Ⅰ-2-19] 트위터 앱개발 관리화면

[그림 Ⅰ-2-19] 트위터 앱개발 관리화면

계정을 생성하였다면 트위터 개발자 사이트(https://apps.twitter.com )에 접근해서 API를 사용할 수 있는 정보를 생성한다.

[그림 Ⅰ-2-20] 트위터 앱개발 정보 입력화면

[그림 Ⅰ-2-20] 트위터 앱개발 정보 입력화면


  1. ① Name: 생성할 application의 이름이다. 원하는 이름으로 입력하면 된다.
  2. ② Description: application을 설명하는 부분으로 간단히 자신의 app에 대한 설명을 입력한다.
  3. ③ Website: 자신이 소유한 도메인이 있다면 넣어주고, 없다면 임의의 url을 입력해준다.
  4. ④ Callback URL: 인증된 후에 연결될 URL을 입력해 준다. OAuth를 사용하는 애플리케이션은 반드시 이 URL을 입력하도록 돼 있기 때문이다. 역시 URL을 가지고 있지 않다면 임의의 URL을 넣어주도록 한다.

[그림 Ⅰ-2-21] 트위터 앱개발 정보 입력화면

[그림 Ⅰ-2-21] 트위터 앱개발 정보 입력화면

입력이 끝났으면 “Create your Twitter application” 버튼을 클릭해 애플리케이션 계정 정보를 발급 받을 수 있는 애플리케이션 관리화면으로 이동한다.


Application용 계정 정보생성

트위터용 Application을 생성했으면, 이 App에 접근할 계정 정보를 생성해준다. 생성정보는 다음과 같이 4종류다.


  • - Consumer key (사용자가 만든 API에 대한 키 값)
  • - Consumer secret (사용자 API의 비밀번호)
  • - Access token (API에 접근하기 위한 토큰 값)
  • - Access token secret (API 토큰의 비밀번호)
애플리케이션 관리화면

[그림 Ⅰ-2-22] 컨슈머키 확인화면

[그림 Ⅰ-2-22] 컨슈머키 확인화면

애플리케이션 정보 화면으로 이동하면 consumer key와 consumer secret key가 발급됐다. 해당정보는 ‘Test OAuth’ 버튼을 클릭하면 확인할 수 있다.


애플리케이션 계정정보 받기

화면의 세 번째 탭인 ‘Keys and Access Tokens’를 클릭해 Access Token과 Access Token secret키를 발급 받으면 트위터 데이터를 수집하기 위한 모든 키 발급이 끝난다.

[그림 Ⅰ-2-23] 액세스 토큰 발급화면

[그림 Ⅰ-2-23] 액세스 토큰 발급화면

Token 관련 키를 생성하기 위해 화면에서 ‘Create my access token’을 클릭한다. 에러 메시지가 없었다면 키가 생성됐을 것이다. 화면 우측 상단의 ‘Test OAuth’ 버튼을 클릭하면 다음 화면이 나타나며 4가지 api 키가 다음과 같이 화면에 출력될 것이다.

[그림 Ⅰ-2-24] API 계정정보 확인 화면

[그림 Ⅰ-2-24] API 계정정보 확인 화면


트위터 플룸 소스

트위터 API를 통해 데이터 소스를 가져오는 방법은 여러 가지가 있으나, 여기서는 twitter4j를 사용해 플룸의 소스를 개발하고, 필요한 키워드를 읽어오는 방법을 사용한다. 플룸의 소스는 ‘AbstractSource’ 클래스를 상속받고 ‘EventDrivenSource’와 ‘Configurable’을 구현해 작성한다. 그리고 ‘Configure’, ‘start’, ‘stop’ 메소드를 구현(override)하면 된다.

public class TwitterSource extends AbstractSource implements EventDrivenSource, Configurable {


Configure 메소드 구현

트위터 API에 접속하기 위한 정보나 플룸 셋업에 필요한 정보를 읽어들이는 함수다. 여기서는 API 접속에 필요한 정보를 플룸 작업 정보에서 읽어오게 하였다. 그리고 트위터의 모든 스트림을 읽어오는 대신에 관심 있는 정보(keyword)만을 가지고 오도록 만들었다.

@Override public void configure(Context context){// API 접속 정보를 configuration 파일에서 읽어옴. consumerKey = context.getString("consumerKey"); consumerSecret = context.getString("consumerSecret"); accessToken = context.getString("accessToken"); accessTokenSecret = context.getString("accessTokenSecret"); String keywordString = context.getString("keyword"); logger.info("Consumer Key: '" + consumerKey + "'"");


스타트 메소드 구현

플룸 에이전트가 트위터에 접속하고 스트림을 가져올 수 있도록 twitter4j의 StatusListener를 내부 클래스로 먼저 생성한다. StatusListener는 json형태로 트위터에서 정보를 가지고 온다.

StatusListener listener = new StatusListener(){ //The onStatus method is executed every time a new tweet comes in. public void onStatus(Status status) { //header와 raw json 형태의 트위트로 이벤트 구성. logger.bebug(status.getUser().getScreenName()+":"+status.getText()); //시간을 확인할 수 있도록 timestamp를 입력한다. headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime())); //헤더와 시간을 조합해 event를 생성한다. Event event = EventBuilder.withBody(DataObjectFactory.getRawJSON(status).getBytes(), headers); channel.processEvent(event);} // This listner will ignore everything except for new tweets public void onDeletionNotice(StatusDelectionNotice statusDeletionNotice){} public void onTrackLimitationNotice(int numberOfLimitedStatuses){}public void on ScrubGeo(long userId, long upToStatusId){}public void onException(Exception ex){}};

StatusListener 클래스를 작성했으면, 이것을 ‘tweeterstream’ 클래스의 ‘addListener’를 사용해 추가한다. 그리고 트위터 접속용으로 생성한 계정정보(consumer, token)를 setOAuthConsumer 메소드와 ‘AccessToken’ 클래스를 사용해 등록한다.

twitterStream.addListener(listener); //컨슈머키와 시크릿 등록 twitterStream.setOAuthConsumer(this.consumerKey, this.consumerSecret); AccessToken token = new AccessToken(this.accessToken, this.accessTokenSecret); twitterStream.setOAuthAccessToken(token); // Set up a fileter to pull put industry-relevant tweets if (keywords.length ==0){ logger.debug("starting up Twitter sampling..."); twitterStream.sample();}else{logger.debug("Starting up Twitter filtering..."); //필요한 키워드만을 트래킹함. FilterQuery query = new FilterQuery().track(keywords). setIncludeEntities(true); twitterStream.filter(query);} Super.start();


스톱 메소드 구현

플룸 에이전트를 중지할 때 사용할 메소드를 개발한다. 트위터 스트림을 종료하도록 한다.

public void stop() { logger.debug("Shutting down Twitter sample stream..."); twitterStream.shutdown(); super.stop();}


플룸 잡 컨피그레이션

다른 일반적인 플룸 잡 컨피그레이션과 같이 데이터 소스와 싱크를 지정해준다. 개발한 플룸소스를 사용하는것이 일반적인 경우와 다를뿐이다. 여기 예제에서는 트위터에서 정보를 읽어들여서 하둡에 저장하도록 설정하였다.

TwitterAgent.sources = Twitter TwitterAgent.channels = MemChannel TwitterAgent.sinks = HDFS TwitterAgent.source.Twitter.type = com.sstrato.flume.source.TwitterSource -----① TwitterAgent.sources.Twitter.channels = MemChannel TwitterAgent.source.Twitter.cousumerKey = OGeX833jE ----- ② TwitterAgent.source.Twitter.consumerSecret = mm ----- ③ TwitterAgent.sources.Twitter.accessToken = 132363521 ----- ④ TwitterAgent.sources.Twitter.keyword = hadoop, big data, analytics, bigdata, business intelligence, mapreduce, data warehouse, data warehouse, data warshousing, mahout, hbasem nosql, newsql, businessintelligence, cloudingcomputing --- ⑥ TwitterAgent.sinks.HDFS.channel = MemChannel TwitterAgent.sinks.HDFS.type = hdfs TwitterAgent.sinks.HDFS.type = hdfs.path = hdfs://vm2:9000/user/flume/tweets/%Y/%m/%d/%H/-----⑦ TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text TwitterAgent.sinks.HDFS.hdfs.bathSize = 1000 TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 TwitterAgent.sinks.HDFS.hdfs.rollCount = 1000 TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600TwitterAgent.channels.MemChannel.type = memory TwitterAgent.channels.Memchannel.capacity = 10000 TwitterAgent.channels.Memchannel.transactionCapacity = 100


  1. ① 플룸의 소스로 사용할 작업 클래스의 이름을 입력한다.
  2. ② 컨슈머 키를 입력한다.
  3. ③ 컨슈머 시크릿을 입력한다.
  4. ④ 액세스 토큰을 입력한다.
  5. ⑤ 액세스 토큰 시크릿을 변경한다.
  6. ⑥ 트위터에서 가져올 키워드를 입력한다. 콤마(,) 기호로 분리해 관심 있는 키워드들을 입력한다.
  7. ⑦ 하둡에 저장한다. 여기서 하둡 네임노드의 이름은 vm2이고, 디렉터리는 ‘/user/flume/tweets/년/월/일/시’ 에 트위터 스트림에서 끌어온 데이터를 저장한다.

위의 컨피그레이션을 사용해 플룸 잡을 시작한다.


하이브의 트위터 데이터용 설디 구현

플룸 트위터 소스를 통해서 하둡에 데이터를 수집하기 시작했으면 이젠 하이브에서 간단한 쿼리를 통해서 데이터 분석을 하도록 한다. 데이터 분석을 바로 시작하기 전에 플룹 작업이 쌓은 데이터를 확인한다. ‘hadoop fs ?text /user/flume/tweets/연/월/일/시’ 로 하면 아래와 같은 결과를 확인할 수 있다.

$ hadoop fs -text / user/flume/tweets/2014/04/05/01/00/* { "retweeted_status":{"contributors":null, "text":"$Crowdsourcing = drivers already generate traffic data for your smartphone to suggest alternative routes when a road is clogged. $bigata", "geo":null, "retweeted":false, "in_reply_to_screen_name":null, "truncated":{ "urls":[], "hashtags":[{

‘twitter4j’에서 생성한 데이터는 위와 같이 제이슨(JSON)형태의 반정형(semi-structured)데이터이다. 이것을 하이브에서 직접 읽어들이기 위해서는 제이슨 데이터를 읽어들이기 위한 설디(SerDe, Serializer/Deserializer)를 만들어야 한다. 설디는 특정 포멧의 데이터를 하이브에서 읽거나 쓸 때 사용하는 데이터 추상화 계층으로서 시리얼라이저는 데이터를 작성할 때, 디시리얼라이저는 데이터를 읽어들일 때 사용한다. 테이블 생성 시 별다른 정보를 입력하지 않으면 ‘LasinessSerde’가 디폴트로 사용된다. 디폴트 설디는 특정 케릭터를 딜리미터로 지정할 수 있으며, 공백기와 콜론 또는 임의의 값을 지정해 컬럼을 구분짓는 값으로 사용할 수 있다.
하이브는 여러 종류의 설디를 내부적으로 갖고 있지만, 아쉽게도 제이슨 형태의 데이터를 처리하는 설디는 없다. 그래서 자체적으로 개발해 주어야 한다. 하이브는 배열(array), 맵(map), 스트럭트(struct)와 같은 복잡한 형태의 데이터도 지원한다.
이중에서 제이슨처럼 키안에 또다른 키/밸류로 중첩돼 있는 데이터 형태의 경우, 스트럭트가 가장 적합한 형태의 데이터 타입이다.


제이슨용 설디 개발

설디를 개발하기 위해서는 설디 클래스(org.apache.hadoop.hive.serde2.SerDe)의 메소드를 구현(implement)하면 한다. 이 추상 클래스에서 ‘initialize’, ‘serialize’, ‘deserialize’ 3개의 함수를 구현하여 제이슨 데이터를 읽어오도록 개발해야 한다.


Initialize 함수

‘Initialize’ 함수는 테이블에 대한 정보를 읽어온다. 설디는 컬럼의 이름과 타입을 읽어 오며 이 정보를 시리얼라이제이션/디시리얼라이제이션에 사용한다.

public void initialize(Configuration conf, Properties tbl) throws SerDeException { // 메타서버에 접속해 컬럼의 이름을 읽어온다. String colNamesStr = tbl.getProperty(serdeContants.LIST_COLUMNS); colNames = Arrays.asList(colNamesStr.split(",")); // 컬럼의 타입 정보를 읽어 온다. 추후 컬럼 이름과 매칭해서 사용한다. String colTypesStr = tbl.getProperty(serdeConstants.List_COLUMN_TYPES); List<TypeInfo> colTypes = TypeInfoUtils.getTypeInfoFromTypeString(colTypesStr); //위에서 생성한 컬럼 이름과 타입을 통해 로(row) 타입을 생성한다. rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStrucTypeInfo(colNames, colTypes); row01 = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);}


역직렬화 함수 개발

역직렬화(Deserialize) 코드에서는 하이브의 ‘ObjectInspector’ 인터페이스가 동작할 수 있도록 이 형식에 맞춰서 객체를 전달한다. 여기서는 제이슨 형태의 데이터가 넘어오고, 이것을 잭슨(Jackson)라이브러리를 통해 파싱한다. 그리고 이 정보를 열(row) 정보에 추가해 하이브에서 row로 인식할 수 있도록 한다.

public Object deserialize(Writable blob) throws SerDeException { Map <?,?> root = null; row.clear (); try { ObjectMapper mapper = new ObjectMapper(); // 잭슨을 사용한 오브젝트 값 추출 root = mapper.readValue(blob.toString(),Map.class);} catch (Exception e) { throw new SerDeException(e);} // 소문자 전환 Map<String, object> lowerRoot = new HashMap(); for(Map.Entry entry: root. entrySet()){ lowerRoot.put((String)entry.getKey()).toLowerCase(), entry.getValue());} root = lowerRoot; Object value = null; for (String fieldName : rowTypeInfo.getAllStructFieldNames()){try{// 컬럼 타입에 맞추어서 제이슨 값 추출 TypeInfo fieldTypeInfo = rowTypeInfo.getStructFieldInfo(fieldName); value = paresField(root.get(fieldName), fieldTypeInfo);} catch (Exception a) { value = null; } // row 정보에 추가. row.add(value);} return row; }


직렬화 함수 개발

직렬화(Serialization) 함수는 데이터를 하이브로 넘겨받아 다시 제이슨 형태로 하둡 파일 시스템에 저장한다.

public Writable serialize (Object obj, ObjectInspector oi) throws SerDeException { //하이브로부터 넘겨받은 열 (row) 데이터를 분리해낸다. Object deparsedObj = deparseRow(obj, oi); ObjectMapper mapper = new ObjectMapper(); try { // 넘겨받은 데이터를 하둡에 스트링으로 풀어서 저장한다. return new Text (mapper.writeValueAsString(deparsedObj));}catch (Exception e1){ throw new SerDeException(e1); }}


설디 전체 코드

package com.sstrato.hive.serde; public class JSONSerDe implements SerDe { private StructTypeInfo rowTypeInfo; private ObjectInspector row01; private List<String> colNames; private List<Object> row = new ArrayList<Object>(); @Override // 테이블의 컬럼개수, 컬럼의 타입정보를 얻어오고 기본구분자를 정의한다. public void initialize(Configuration conf, Properties tbl) throws SerDeException{ //Get a list of the table's column names. String colNameStr = tbl.getProperty(serdeConstants.LIST_COLUMNS); // 구분자를 ","로 정의한다. colNames = Arrays.asList(colNameStr.split(",")); // Get a list of TypeInfos for the columns. This list lines up with // the list of column names. String colTypesStr = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); List<TypeInfo> colTypes = TypeInfoUtils.getTypeInfosFromTypeString(colTypesStr); rowTypeInfo = (StructTypeInfo) Type InfoFactory.getStruckTypeInfo(colNames, colTypes); row01 = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(rowTypeInfo);} @Override //역직렬화로테이블에 저장될 각 컬럼의 타입을 저장하여 리턴 한다. public Object deserialize(Writable blob) throws SerDeException { Map <?,?> root = null; row.clear(); try{ ObjectMapper mapper = new ObjectMapper(); // This is really a Map <String, Object>. For more information about how // Jackson parses JSON in this example, see // http://wiki.fasterxml.com/JacksonDataBinding root = mapper.readValue(blob.toString(), Map.class); } catch ( Exception e){ throw new SerDeException(e); } // Lowercase the keys as expected by hive Map<String, Object> lowerRoot = new HashMap(); for (Map.Entry entry: root.entrySet()){lowerRoot.put(((String)entry.getKey()).toLowerCase(), entry.getValue());} root = lowerRoot; Object value = null; for ( String fieldName : rowTypeInfo.getAllStructFieldNames()){try { TypeInfo fieldTypeInfo = rowTypeInfo.getStringFieldTypeInfo(fieldName); value = parseField(root.get(fieldName), fieldTypeInfo); } catch (Exception e){ value = null; } row.add(value); } return row; } private Object parseField(Object field, TypeInfo fieldTypeInfo){ switch(fieldTypeInfo.getCategory()){case PRIMITIVE: //Jackson will return the right thing in this case, so just return // the object if (field instanceof String){field = field.toString().replaceAll("\n", "\\\\n");} return field; case LIST: return parseList(field, (ListTypeInfo) fieldTypeInfo); case MAP: return parseMap(field, (MapTypeInfo) fieldTypeInfo); case STRUCT: return parseStruct(field, (StructTypeInfo) fieldTypeInfo); case UNION: //Unsupported by JSON default: return null; }} private Object parsseStruct (Object field, StructTypeInfo fieldTypeInfo){Map<Object, Object> map = (Map<Object, Object>)field; ArrayList<TypeInfo> structTypes = fieldTypeInfo.getAllStructFieldTypeInfos(); ArrayList<String> structNames = fieldTypeInfo.getAllStructFieldNames(); List<Object> structRow = new ArrayList<Object>(structTypes.size()); for (int i = 0; i < structNames.size(); i++) { structRow.add(parseField(map.get(structNames.get(i))), structTypes.get(i)));} return structRow; } private Object parseList(Object field, ListTypeInfo fieldTypeInfo){ArrayList<Object> list = (ArrayList<Object>)field; TypeInfo elemTypeInfo = fieldTypeInfo.getListElementTypeInfo(); for (int i = 0; i <list.size(); i++){ for (Map.Entry<Object, Object> entry : map.entrySet()){ map.put(entry.getKey(), parseField(entry.getValue(), valueTypeInfo)));}return map; } @Override public ObjectInspector getObjectInspector() throws SerDeException{return row01;} @Override public SerDeStats getSerDeStats(){return null;} @Override public Class<? extends Writable> getSerializedClass(){return Text.class;} @Override public Writable serialize(Object obj, ObjectInspector oi) throws SerDeException{Object deparsedObj = deparseRow(obj, oi); ObjectMapper mapper = new ObjectMapper(); try{ return new Text (mapper.writeValueAsString(deparsedObj)); } catch (Exception e1) { throw new SerDeException(e1);}} private Object deparseObject(Object obj, ObjectInspector oi){switch (oi.getCategory()){ case LIST: return deparseList(obj, (ListObjectInspector)oi); case Map: return deparseMap(obj, (MapObjectInspector)oi); case PRIMITIVE : return deparsePrimitive(obj, (Primitive


트위터 테이블 생성

설디 개발이 완료 되었으면, 이제 트위터 분석용 테이블을 생성한다. 먼저 방금 개발한 설디 클래스를 하이브 세션에 ‘ADD JAR’명령어를 추가한 후 아래와 같이 하이브 테이블을 생성한다.

ADD JAR/usr/local/hive/lib/hive-serdes-1.0-SNAPSHOT.jar;---------------① CREATE EXTERNAL TABLE tweets ( id BIGINT, created_at STRING, source STRING, favorited BOOLEAN, retweeted_status STRUCT<--------------② text:STRING, user:STRUCT<screen_name: STRING, name: STRING>,---------------------③ retweet_count:INT>, entities STRUCT<--------------------④ urls:ARRAY<STRUCT<expanded_url:


  1. ① 하이브에 개발한 설디를 사용할 수 있게 추가한다.
  2. ② STRUCT 형식의 retweeted_status라는 컬럼을 지정한다.
  3. ③ retweeted_status 컬럼 안에, user 컬럼을 STRUCT 형태로 지정한 다음 그에 해당하는 컬럼을 별도로 지정한다.
  4. ④ entities라는 컬럼을 STRUCT 형태로 지정한다. 각각의 내부 값 역시 STRUCT 형태로 정의한 다음 필요한 값들을 정의한다.
  5. ⑤ 스캔영역을 줄이기 위해 ‘datehour’ 값을 기준으로 파티셔닝한다.
  6. ⑥ 테이블을 external로 지정했으므로 파일이 저장돼 있는 위치를 지정한다.
파티션 데이터 추가

테이블을 생성한 이후 플룸을 통해서 시간대별로 받은 데이터를 하이브의 ‘tweets’ 테이블에 로드한다. 아래 예의 경우 2015년 1월 28일 01시 데이터를 파티션에 추가한 것이다. 파티션을 추가한 경우에 쿼리문에서 ‘where column=파티션’을 수행할 때 하둡의 전체 저장영역을 스캔해 정보를 추출해내는 것이 아니라, 지정한 파티션에서만 데이터를 읽음으로써 작업 시간을 줄여주는 효과가 있다.

hive>ALTER TABLE tweets ADD IF NOT EXISTS PARTITION (datahour = 2015052801) LOCATION 'user/flume/tweets/2015/01/28/01';


테이블 분석 예제

테이블을 생성하고 데이터도 정상적으로 로드됐다면 데이터 분석을 해볼 차례다. 먼저 어떤 타임존에서 가장 많은 트위트를 생성하는지 확인해보자.

hive > SELECT user.time_zone, SUBSTR(created_at, 0, 3), COUNT(*) AS total_count FROM tweets WHERE user.time_zome IS NOT NULL GROUP BY user.time_zone, SUBSTR(created_at, 0, 3) ORDER BY total_count DESC LIMIT 15; ..... Eastern Time (US & Canada) Sat 29260 Pacific Time (US & Canada) Sat 29167 Central Time (US & Canada) Sat 15735 London Sat 11228 Kuala Lumpur Sat 8936 Quito Sat 6733 Ruyadh Sat 6697 Amsterdam Sat 4517 Chennai Sat 4495 Mountain Time (US & Canada) Sat 4493 Greenland Sat 4482 Mumbai Sat 4480 Tokyo Sat 4477 Alaska Sat 4473 Abu Dhabi Sat 4472 Time taken: 195.413 seconds, Fetched: 15 row(s)

가장 많은 트위트를 생성한 지역은 미국ㆍ캐나다인 것을 확인할 수 있다. 두 번째로는 사용자가 생성한 트위트 중에서 가장 많은 태그 정보를 확인하도록 한다.

SELECT LOWER(hashtags.text), COUNT(*) AS total_count FROM tweets LATERAL VIEW EXPLODE(entities.hashtags) t1 AS hashtags GROUP BY LOWER(hashtags.text) ORDER BY total_count DESC LIMIT 15; ... bigdata 7038 opendata 4467 analytics 2350 job 2289 businessmgmt 2248 data 2246 big 2238 facebook 2236 chicago 2236 cloudcomputing 85 cloud 83 iot 78 api 60 internetofthings 56 hdoop 38 Time taken: 203.028 seconds, Fetched: 15 row(s)

다운 받은 트위트 중에서 가장 많은 태그는 ‘bigdata’와 ‘opendata’인 것을 확인할 수 있었다.


파이썬을 이용한 트위터 데이터 수집과 시각화

빅데이터 관련 부분에서 최근 가장 주목을 받고 있는 언어가 파이썬이다. 글로벌 사례를 보면 데이터 분석 분야 에서도 R의 사용추세보다 파이썬의 사용추세가 더 가파르게 증가하고 있다. 인터프리터 언어인 파이썬은 구조적으로 이해하기 쉽기 때문에 컴파일 언어보다 프로그램의 생산적인 측면에서 많은 강점을 가지고 있다. 또한 데이터 분석에서 널리 쓰이고 있는 R보다 많은 분석패키지들을 지원하고 있다. 이처럼 파이썬은 빅데이터 분야에서 요구하는 유연한 개발 지원에 적합한 언어의 강점을 모두 가지고 있기 때문에 사용자들을 더 많이 끌어 들이는 것 같다. 이번에는 파이썬을 이용해 트위트를 수집하는 방법과 트위트한 데이터를 어떻게 시각화를 하는지 알아 보자.


트위터용 파이썬 패키지 설치

파이썬의 많은 패키지 중에 트위터를 지원하는 패키지를 설치해 트위터의 데이터를 수집해 보자.


윈도우에서 tweepy 설치

https://github.com/tweepy/tweepy 에 접속해 tweepy를 내려 받는다.

[그림 Ⅰ-2-25] 기트허브 접속화면

[그림 Ⅰ-2-25] 기트허브 접속화면

Download Zip 버튼을 클릭해 tweepy 를 내려 받은 후 압축을 푼다. 커맨드 창에서 압축을 푼 디렉터리로 이동해 다음 명령을 실행한다.

$ python setup.py install


리눅스에서 tweepy 설치

루트 권한으로 접속해 pip을 실행한다.

# pip install tweepy


트위터 단어 필터링 구현

트위터에서 단어를 필터링해 트위트되는 내용을 출력하는 프로그램을 구현해 보자.


파이썬 트위터 접속 테스트

간단하게 트위터에 접속해 어떻게 작동하는지 살펴보자.
twit_test.py 프로그램을 작성하자.

import tweepy -----------------------① consumer_key = 'SRXugVpgEmSxFm0qGDgQz6hHO' ----------------② consumer_skey = 'dkakldlakjdakjadkadieekllkada' access_token = '196044070-G20nm84Vv7S7MuBsOUidVEkupSGgAzV1pxTVan22' access_stoken = 'aldkalakdkfSDEDKI0kelkkafjdlkakdfa' class listener(tweepy.StreamListener): -------------------③  def on_data(self, data):   print data   return data  def on_err(self, status):   print status auth = tweepy. OAuthHandler(consumer_key, consumer_skey) -------------- ④ auth.set_access_token(access_token, access_stoken) twitterStreaming = tweepy.Stream(auth, listener()) twitterStreaming.filter(track=("[korea"])


  1. ① 파이썬의 Twitter용 모듈인 tweepy를 임포트한다.
  2. ② 트위터 사용을 위해 받은 4개의 키를 2~5라인에 차례로 대입해 준다.
  3. ③ Tweepy 모듈의 트위터의 스트림을 리슨하는 StreamListener를 선언한다.
  4. ④ 12~13 라인은 2~5라인에서 대입한 트위터 api 키를 사용을 선언한다. 14~15 라인은 12~13라인에서 선언한 api를 통해 korea라는 단어를 필터링해 8번 라인에 선언한 data로 프린트한다.

테스트를 실행해 보자.

$ python twit_test.py {"created_at":"Thu Jan 29 23:48:45 +0000 2015", "id":560947689710297088, "id_str" : "560947689710297088", "text" : "RT @usnavykorea: Because #partnerships matter, the U.S. Navy and ROK NAVY continue to go together. Katchikapshida! #USNavy #Korea http:\/\/t\u2026","source":"\u003ca href=\"http:\/\/twitter.com\" rel=\"nofollow\"\u003eTwitter Web Client\u003c\/a\u003e"."truncated":false,"in_reply_to_status_id":null, "in_reply_to_status+id_str":null, "in_reply_to_user_id":null, "in_reply_to_user_id_str":null, "in_reply_to_screen_name":null, "user" : {"id" : 25930421, "id_str":"25930421","name":"7th Fleet","screen_name":"US7thFleet","location":"Yokosuka, Japan","url" :"http:\/\/www.c7f.navy.mil","description":"At any give time, there are 60-70 ships, 200-300 aircraft and 40,000 Navy and Marine Corps personnel assigned to the U.S. 7th Fleet.","protected":false, "verified":false,"followers_count":19469, "friends_count":36,"listed_count":784, "facourites_count":1, "statuses_count":2764, "created_at":"Mon Mar 23 01:57:30 +0000 2009", "utc_offset":32400, "time_zone":"Tokyo", "geo_enabled":false, "lang":"en", "contributors"_enabled":false, "is_translator":false, "profile_background_color":"B3C0CE","profile_background_image_url":"http:\/\/abs.twing.com\/images\/thems\/theme1\/bg.png""profile_background_image_url":"http:\/\/abs.twimg.com\/images\/thems\/theme1\/bg.png", "profile_background_title":false, "profile_link_color":"FF9900", "profile_sidebar_border_color":"B3C0CE","profile_sidebar_fill_color":"181C3A","profile_text_color":"A6B4C5", "profile_use_background_image":false, "profile_image_url":https":"\/\/pbs.twimg.com\/profile_images\/106833299\/c7f_logo_square_normal.jpg,"profile_banner_url":"https:\/\/pbs.twimg.com\/profile_profile_banners\/25930421\/1402623596","default_profile":false, "default_profile_image":false, "following":null, "follow_request_sent":null, "notifications":null},"geo":null, "coordinates":null,"place":null, "contributors":null, "retweeted_status": {"created_at":"Thu Jan 29 23:12:59 +0000 2015", "id":560938686939156480, "id_str":"560938686939156480", "text":"Because #partnerships matter, the U.S. Navy and ROK NAVY continue to go together. Katchikapshida! #USNavy #Korea http:\/\/t.co\/hVgWZBGuVh", "source":"\u003ca" href=\"http\/\/www.facebook.com\/twitter\" eply_to_status_id_str":null, "in_reply_to_user_id":null, "in_reply_to_user_id_str":null, "in_reply_to_screen_name":null, "user":{"id": 42992331, "id_str" : "42992331", "name":US Navy Korea","screen_name":"usnavykorea", "location":"Seoul, Republic of Korea", "url":"http:\/\/www.cnic.navy.mil\/ko"}

현재 트위트하고 있는 내용 중 ‘korea’가 들어간 문자로 필터링돼 출력되는 것을 확인할 수 있다.


트위터에서 단어 필터링 프로그램 구현

트위터에 연결해 트위트들을 스트리밍으로 가져오는 테스트는 해보았으니 좀 더 프로그램을 개선해보자. 특정 단어를 검색해 트위트하는 내용을 단어별로 분리해 특정 단어에 대해 어떤 단어가 얼마나 트위트되는지를 구현하는 프로그램을 작성해 보자.



twit_word_cloud.py 프로그램 작성

kodb라는 프로젝트를 생성한다.

import tweepy ① import json import re import sys ② reload (sys) sys.setdefaultencoding('utf-8') consumer_key = 'SRXugVpgEmSxFm0qGDgQz6hHO' ③ consumer_key = 'dkakldlakjdakjadkadieekllkada' access_token = '196044070-G20nm84Vv7S7MuBsOUidVEkup_SGgAzV1pxTVan22' access_stoken = 'aldkalakdkfsDEDKIOkelkkafjdlkakdfa' def get_api():  api_key = consumer_key  access_token_secret = access_stoken  auth = tweepy.OAuthHandler(api_key, api_secret)  auth.set_access_token(access_token, access_token_secret)  return auth class CustomStreamListener(tweepy.StreamListener): ④  def_init_(self, *args, **kwargs):   super (CustomStreamListener, self)._init_(*args, **kwargs)   self.count = 0   with open ('restrict.word') as f: ⑤    self.common = set(line.strip() for line in f)   self.all_words = {}   self.pattern = re.compile("[^/w]") ⑥  def on_status(self, status):   print 'Got a Tweet'  self.count += 1  tweet = status.text  tweet = self.pattern.sub('',tweet)   words = tweet.split() ⑧  for word in words:  if len(word) > 2 and word ! = '' and word not in self.common:  if word not in self.all_words: self.all_words[word] = 1  else:  self.all_words[word] +=1 if_name_=='_main_': ⑨ 1 = CustomStreamListener()  try: auth = get_api() s = "obamacare" ⑩ twitterStreaming.filter(track=[s.decode('utf-8').encode('utf-8')])  except KeyboardInterrupt:  print '-----total tweets-----'  print 1.count json_data = json.dumps(l.all_words, indent=4)   width open('word_data.json', 'w') as f:    print >> f, json_data    print s



  1. ① 필요한 모듈을 임포트한다. 트위터를 위한 tweepy, 정규표현식을 위한 re 모듈을 임포트한다.
  2. ② 한글처리를 위해 모듈과 함수를 선언했다. 지금은 필요없을지 모르지만, 한글처리 문제를 위해 삽입했다.
  3. ③ 트위터에 연결하기 위해 api 키를 선언한다.
  4. ④ 트위터에서 트위트되는 단어들을 스트리밍 처리하기 위해 여러 조건과 함께 선언한 클래스다.
  5. ⑤ 수집할 때 금지단어를 restrict_word라는 파일로 만든다. restrict_word는 편집기를 사용해 미리 준비해 두자.
  6. ⑥ 정규표현식을 이용해 숫자나 특수문자로 시작하는 단어를 제외하고 문자로 시작하는 단어만 추출한다.
  7. ⑦ 트위터에 트위트가 될 때마다 ‘Got a Tweet’을 화면에 출력한 후, 같은 단어가 존재하면 단어의 숫자를 증가시킨다.
  8. ⑧ 가져온 문장을 단어로 분리한다.
  9. ⑨ 트위트를 수집하는 클래스를 실행한다.
  10. ⑩ 검색할 단어를 지정한다. 여기서는 2014년 발표한 ‘obamacare’라는 단어를 입력했다.
  11. ⑪ 스톱 메소드가 없이 키보드에서 인터럽트를 발생시키면 프로그램 수행을 중단하고 트위트한 데이터를 기록하는 예외처리를 한다. 트위터에서 트위트한 데이터를 word_data.json이라는 파일로 저장한다.

프로그램 실행


$ python twit_word_cloud.py Got a Tweet Got a Tweet Got a Tweet Got a Tweet Got a Tweet Got a Tweet Got a Tweet Got a Tweet Got a Tweet Got a Tweet Got a Tweet Got a Tweet Got a Tweet Got a Tweet Got a Tweet  ..... ^(-----total tweets-----)235 Obamamcare


‘obamacare’라는 단어로 약 30분 동안 트위트되는 단어들을 수집했다.
편집기로 어떤 단어들이 수집됐는지 확인하자.

$ vi word_data.json "essay" : 2, "all" : 2, "code" : 2, "IrZWBITI3B" : 1, "trickle" : 1, "Fix" : 1, "Gut" : 3, "Obamacare" : 183, "month" : 2, "BWbceRoLUe" : 1, "deadline" : 1, "per" : 13, "apparatchiks" : 1, "ridiculous" : 1, "rlkMmQaRcZ" : 1, "issues" : 3, "KLSouth" : 1, "votes" : 1,


Obamacare, obamacare가 많이 나오는 것을 볼 수 있다. 이 단어를 restrict_word에 추가해 다시 프로그램을 실행한 후 확인해 보자.

$ vi word_data.json "essay" : 1, "all" : 1, "code" : 1, "Bankrupt" : 1, "Debts" :  1, "NARRATIVE" : 1, "results" : 1, "Action" :  1, "sleep" : 1, "Looming" : 3, "Jim_Harper" : 1, "covered" : 3, "Indiana" : 1, "hijos" : 1, "hijos" : 1, "Wing" : 2, "Rewritten" : 1, "g7D4gUTtJ" : 1, "CCOT" : 1, "const" : 1, "AmyMek" : 2, "sharethis" : 1, "concerned" : 1,



Obamacare나 obamacare가 수집되지 않았다.


데이터 시각화

이번에는 어떤 단어가 많이 추출됐는지를 확인하기 위해서 데이터를 시각화해 보자. Obamacare와 관계된 단어를 분리해 개수를 추출했으므로 얼마나 많은 단어가 나왔는지 워드 클라우드 방법으로 데이터를 시각화해 보자. 수집된 데이터를 R이나 d3.js같은 툴을 사용해 표현할 수 있지만 여기에서는 방법이나 툴의 설명은 필요한 부분만 설명하고 수집된 데이터가 어떻게 활용될 수 있는지에 대한 부분에 집중하기로 한다. 트위트한 데이터를 확인해 보기 위해 로컬 환경에서 구현해 보자. 플랫폼 독립적이다.
“https://github.com/jasondavies/d3-cloud” 사이트에 접속해 d3.js를 다운로드한다. 자바스크립트 기반이기에 html 코드를 작성해 테스트해보자.

Visual.html로 파일을 작성하자.



html 코드 작성


<!DOCTYPE html> <meta charset="utf-8"> <body>     <script src="../lib/d3/d3.js"></script>     <script src="../d3.layout.cloud.js"></script>     <script>     var fill = d3.scale.category20();         mydata = d3.json("word_data.json", function(error, json){             if(error) return console.warn(error);             data = json;             d3.layout.cloud().size([300, 300]).words(mydata.map(function(d){                 return {text : d[0], size: d[1]};             }))             .padding(5)             .rotate(function(){                 return ~~(Math.random()*2)*90;             })             .font("Impact")             .fontSize(function(d){return d.size;})             .on("end", draw)             .start();             function draw(words){                 d3.select("body").append("svg")                 .attr("width", 300)                 .attr("height", 300)                 .append("g")                 .attr("transform", "translate(150, 150)")                 .selectAll("text")                 .data(words)                 .enter().append("text")                 .style("font-size", function(d) {return d.size + "px";})                 .style("font-family", "Impact")                 .style("fill", function(d, i) {return fill(i);})                 .attr("text-anchor", "middle")                 .attr("transform", function(d){                     return "translate"(" + [d.x, d.y] + ")rotate("+d.rotate +")";                 })                 .text(function(d){return d.text;})''             }         }     </script>



작성한 html을 실행해보자.

[그림 Ⅰ-2-26] 데이터 시각화 결과

[그림 Ⅰ-2-26] 데이터 시각화 결과

결과를 보면 Obamacare에 대해 트위트에 올라온 단어들 중 Per라는 단어가 가장 많으며 false, POOR, ill, out등 등 부정적인 단어들도 많이 보인다. 문장의 조어 역할을 하는 단어들을 제외하는 등 좀 더 수정을 할 필요는 있다.
데이터 수집을 실무적인 관점에서 귀납적으로 접근해 프로그램의 설치→ 프로그램의 기능실습→ 프로그램 응용 순서대로 각 수집기술의 기본적인 활용방법에 대해 학습했다. 수집방법별로 어떤 것은 설치 및 기능이 주를 이루는 기술도 있었고, 어떤 것은 기술 활용이 주를 이루는 것도 있었다. 본장의 서두에도 기술했듯이 수집기술들은 계속 발전하고 기능 또한 추가되고 있어 기술들에 대해 부단한 학습을 해야 ‘데이터 수집’이라는 목적을 달성하기 위한 유연한 아키텍처를 구성할 수 있다. 데이터를 수집하다 보면 데이터를 내부 시스템으로 획득까지의 순간이 가장 어렵다. 일단 내부로 데이터를 획득하면 그 형태에 상관없이 데이터 활용을 위한 다양한 형태로 변형은 우리가 학습한 내용만 가지고도 충분히 구현이 가능하다. 1장 데이터 수집의 이해와 2장 데이터 수집실무는 서로 별개의 장이 아니라 ‘데이터 수집’ 목적을 이루기 위한 이론과 기능이라는 두 개의 날개로 생각하고 ‘데이터 수집’을 이해하기 바란다. 마지막으로 ‘데이터 수집’ 단원에서 다룬 내용은 다른 단원과 완전히 독립적인 것도 있고 서로 의존관계에 있는 것도 있기 때문에 다른 단원의 학습 또한 데이터 수집의 관점에서 꼭 필요하니 열심히 학습할 것을 당부한다.