본문 바로가기

카테고리 없음

Elasticsearch hadoop

Elasticsearch hadoop 소개 


Elasticsearch에서는 강력한 Aggregation을 지원하지만 Aggregation에서 지원하지 않는 대용량 처리에 있어서는 불편한점이 있다. 


예를 들어 event를 분석한다고 했을 때, 데이터를 간단하게 json으로 index한 후에 각 event의 count (term aggergation), event의 시간단위의 count (date histogram) 등을 kibana를 통해 간단하게 볼 수 있다. 





하지만 event간의 상관 관계를 분석한다거나, data가 많아져서 data를 aggregation한다던가 할 때에는 어떻게 해야할까??

다시말해, HDFS에서 읽어들여서 spark에서 machine learning을 돌리거나 event flow를 연결하여 새로운 의미의 정보를 생산하는 map reduce를 돌려야한다면 말이다. 


Elasticsearch에서는 es에 저장된 data들을 hadoop의 map reduce, spark, hive, storm 등에 올려서 분석할 수 있도록 Elasticsearch hadoop을 지원한다.

이로써 실시간 모니터링 시스템과 배치시스템을 아우르는 Lambda architecture를 Elasticsearch 중심으로 구현해 낼 수 있다. 


Elasticsearch for Apache Hadoop 페이지 


Elasticsearch hadoop은 크게 두가지로 나누어 보면 좋은데 


첫번째로는 Elasticsearch 자체를 hadoop ecosystem에서 운영할 수 있도록 yarn 위에 띄우는 것 ( Elasticsearch on YARN

두번째로는 Elasticsearch의 데이터를 읽어 들여 hadoop ecosystem의 solution들을 적용해 보는 것이다. ( Elasticsearch for Apache Hadoop )


여기서는 두번째에 대한 이야기를 다룬다. 



Installation 


Project에 Maven을 사용하여 dependency를 추가하면 된다. 


<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>2.2.0</version> 

</dependency>


자세한 Install은 es hadoop의 install page를 참고한다. 




Architecture


hadoop에서는 parallelism을 사용하기위해서 splits라는 개념을 사용한다. hdfs혹은 다른 data source로 부터 InputSplit들을 만들어내면 그 갯수만큼 parallel하게 돌 수 있는 것이다. 

Elasticsearch에서는 shard를 split으로 사용한다. 따라서 index생성시 정해준 shard의 갯수 만큼 parallelism이 확보된다. 


따라서 map/reduce는 각각 read할 index의 총 shard개수와 write할 index의 총 shard개수만큼 동시에 실행될 수 있다. 


또 hadoop의 data locality와 같이 같은 장비에 있는 데이터는 local mapper가 읽어들임으로써 리소스의 이득을 볼 수 있다. 


더 자세한 부분은 es hadoop의 architecture page를 참고한다. 



Map/Reduce integration



그럼 hadoop job을 돌려 보자. 


Es hadoop에서 Elasticsearch에 있는 데이터를 read / write하기 위해서 InputFormat과 OutputFormat을 제공한다. 

Elasticsearch의 data가 json format이기 때문에 당연하게도 json data를 java로 mapping한 MapWritable 객체를 가지고 읽고 쓰도록 되어 있다. 


Configuration 


Configuration conf = new Configuration(); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists");  

...


위와 같이 Hadoop dirver에서 es.node들을 지정해주고 resource를 지정해주어 어떤 index/type에서 읽어올 것인지를 알려준다. 



Reading data from elasticsearch


map configuration

Configuration conf = new Configuration();
conf.set("es.resource", "radio/artists/");            
conf.set("es.query", "?q=me*");                       
Job job = new Job(conf);
job.setInputFormatClass(EsInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);        
...

job.waitForCompletion(true);



elasticsearch에서 data를 읽기위해서는 위의 1번과 같이 resource를 지정해주고 

2번과 같이 데이터 query를 입력해 준다. 

3번 처럼 MapWritable class를 Map의 outputvalue class로 지정해준다. 


Mapper implementation

public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { Text docId = (Text) key; MapWritable doc = (MapWritable) value; ... 

}}


위에서와 같이 configuration을 지정해놓으면 Mapper에서 key로 document의 id를 value로 MapWritable화 된 json document를 받아올 수 있다. 





writing data to elasticsearch 


configuration

Configuration conf = new Configuration(); conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); conf.set("es.nodes", "es-server:9200"); conf.set("es.resource", "radio/artists"); Job job = new Job(conf); job.setOutputFormatClass(EsOutputFormat.class); job.setMapOutputValueClass(MapWritable.class); ... 

job.waitForCompletion(true);


1, 2번과 같이 speculative execution을 disable한다. 

3번에서 결과가 쓰여질 index/type을 명시한다. 

4번처럼 output value를 MapWritable class로 명시한다. 




Mapper implementation

public class SomeMapper extends Mapper { @Override protected void map(Object key, Object value, Context context) throws IOException, InterruptedException { // create the MapWritable object MapWritable doc = new MapWritable(); ... context.write(NullWritable.get(), doc); 

}}


위와 같이 Docuement를 생성하고 write한다.