Chapter 2 - 맵리듀스

맵리듀스는 데이터 처리를 위한 프로그래밍 모델이며, 하둡에서는 다양한 언어로 작성된 맵리듀스 프로그램을 구동 시킬 수 있다.

 

맵과 리듀스

맵리듀스 작업은 맵 단계리듀스 단계로 구분된다. 각 단계는 입력과 출력으로 키-값의 쌍을 가진다.

맵 단계의 출력이 리듀스 단계의 입력으로 보내지는 과정은 맵리듀스 프레임워크에 의해 처리된다.

이 과정에서 키-값 쌍은 키를 키준으로 정렬되고 그룹화된다.

맵리듀스의 논리적 데이터 흐름

  • 맵 단계
    준비 단계로써 필요한 값을 찾는 역할을 하며 리듀스 함수를 위해 데이터를 제공
    잘못된 레코드를 걸러주는 작업도 맵 단계에서 수행
  • 리듀스 단계
    키를 기준으로 그룹화된 리스트를 반복하여 필요한 값을 추출

맵리듀스의 동작 방식을 이해하기 위해 Java로 작성된 맵리듀스 애플리케이션 소스코드를 살펴본다.

애플리케이션은 [ 맵 함수, 리듀스 함수, 잡을 구동하기 위한 코드]로 이루어져 있다.

하둡은 최적화된 네트워크 직렬화를 위해 자체적으로 기본 타입 셋을 제공한다.(org.apache.hadoop.io참고)

  • 맵 함수 
    추상 map() 메서드를 정의하는 Mapper 클래스로 구현된다. 
public class FunctionMapper
	extends Mapper<LongWritable, Text, Text, IntWritable> {
    
    ...
    
    @Override
    public void map(LongWritable key, Text value, Context context) 
    	throws IOException, InterruptedException {
     	
        Stirng line = value.toString();
        
        String key = getMapOutKey(line);
        Integer value = getMapOutValue(line);
        
        if(!isInvaildValue(value)) {
            context.write(new Text(key), new IntWritable(value));
        }
        
    }
    
    private String getMapOutKey(String str) {
    	...
        return result
    }
    
    private Integer getMapOutValue(String str) {
    	...
        return result
    }
    
    private Boolean isInvalidValue(Integer value) {
        ...
        return result
    }
}

위의 정의된 함수는 입력 키-값 쌍으로 LongWritable과 Text을 받고, 출력 키-값 쌍으로 Text와 IntWritable을 출력하는 클래스를 구현하였다. 출력되는 값은 context 인스턴스의 write 메서드를 통해 출력 키-값를 전달한다. 이와 더불어서 map 단계에서는 값에 대한 검증도 수행해야한다.

 

  • 리듀스 함수
    맵 함수와 유사하며 그룹화된 맵 함수의 출력 값을 통해 함수를 작성한다.
public class FunctionReducer
    extends Reducer<TExt, IntWritable, Text, IntWritable> {
    
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
        
        Integer reducerOutValue = getReducerOutValue(values);
        context.write(key, new IntWritable(reducerOutValue);
    }
    
    private Integer getReducerOutValue(Iterable<IntWritable> values) {
        ...
        return result
    }
}

 

  • Job 구동을 위한 코드
    잡 명세서를 작성하는 코드로 하둡 클러스터에서 실행할 때는 JAR 파일로 묶어야한다. 
    JAR 파일의 이름을 명시적으로 지정하거나 Job의 SetJArByClass() 메서드를 이용하여 클래스를 하나 지정하면 하둡은 관련 JAR 찾아서 클러스터에 배치해준다.

import org.apache.hadoop.mapreduce.Job;

public class Function {

    public static void main(String[] args) throws Exception {
        Job job = new Job();
        job.setJarByClass(Function.class);
        job.setJobName("Function");
    
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
        job.setMApperClass(FuctionMapper.class);
        job.setReducerClass(FunctionReducer.class);
    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        System.exit(job.watForCompletion(true) ? 0 : 1);
    }
}

Job 객체를 생성할 때는 입력/출력 경로를 지정한다.

입력 경로는 FileInputFormat의 정적 메서드인 addInputPath()를 호출하여 지정한다. 입력 경로는 하나의 파일이나 디렉터리 혹은 특정 패턴의 파일 일 수 있으며, 다수의 입력 경로를 지정하려면 addInputPath()를 여러번 호출해서 사용한다.

출력 경로는 FileOutputFormat의 정적 메서드인 setOutputPath()로 지정한다. 이 경로는 리듀스 함수가 출력 파일을 저장할 디렉터리이다. 잡을 수행하는 시점에 지정한 디렉터리가 존재하면 절대 안되며, 만약 존재한다면 잡은 실행하지 않는다.

Job의 waitForCompletion() 메서드는 잡을 제출한 후 잡이 모두 끝날 때까지 기다린다. 반환값은 성공(true)와 실패(false)로 나눈다. 

 

분산형으로 확장하기

시스템 전체를 한눈에 내려다보면서 대용량 데이터를 어떻게 처리하는지 살펴보자.

 

데이터 흐름

맵리듀스 잡클라이언트가 수행하는 작업의 기본 단위로 입력데이터, 맵리듀스 프로그램, 설정 정보로 구성된다. 

하둡은 잡을 맵 태스크와 리듀스 태스크로 나누어 실행하며, 각 태스크는 YARN을 이용하여 스케쥴링되고 클러스터의 여러 노드에서 실행된다.

하둡은 맵리듀스 잡의 입력을 (입력)스플릿이라고 불리는 고정 크기로 조각을 분리한다. 각 스플릿마다 하나의 맵 태스크를 생성하고 스플릿의 각 레코드를 사용자 정의 맵 함수로 처리한다. 전체 입력을 통째로 처리하는 것보다 스플릿으로 분리된 조각을 처리하는 것이 부하 분산에 좋은 효과를 볼 수 있기 때문에 더 빠르다.

맵 태스크를 실행하는 노드를 찾기 위해서 데이터 지역성 최적화를 고려하여 실행 노드를 찾는다. 실행 노드의 우선 순위는 아래와 같다.

a. 입력 데이터가 있는 노드 - 입력 스플릿에 해당하는 HDFS 블록 복제본이 저장된 노드들이 후보

b. 블록 복제본이 저장된 동일 랙에 속한 다른 노드

c. 외부 랙의 노드 - 랙 간 네트워크 전송이 불가피하게 일어나는 worst case

 

맵 태스크의 결과는 HDFS가 아닌 로컬 디스크에 저장된다. 맵의 결과는 중간 결과물이자 잡 이후 제거 대상이며, HDFS에 저장 시 내부 복제로 인해 불필요한 과정이 많아지기 때문이다. 이와 달리 리듀스 태스크의 결과는 안정성을 위해 HDFS에 저장된다. 

 

리듀스 태스크 수에 따라 맵리듀스의 흐름을 분류할 수 있다. 리듀스 태스크 수는 입력 크기와 상관 없이 독립적으로 지정할 수 있지만 기본적으로 리듀스가 여럿이면 맵 태스크는 리듀스 수만큼 파티션을 생성하고 맵의 결과를 각 파티션에 분배한다. 

단일 리듀스 태스크의 맵리듀스 데이터 흐름

리듀스 태스크가 다수인 상황의 일반적인 흐름은 아래와 같다.

다수의 리듀스 태스크가 있는 맵리듀스 데이터 흐름

리듀스 태스크가 없는 경우는 아래와 같으며, 모든 처리 과정을 완전히 병렬로 처리하는 경우에 적합하다.

리듀스 태스크가 없는 맵리듀스 데이터 흐름

 

컴바이너함수

클러스터에서 맵리듀스 잡이 사용하는 네트워크 대역폭은 한계가 있기 때문에 맵과 리듀스 태스크 사이의 데이터 전송을 최소화할 필요가 있다. 맵의 결과를 처리하는 컴바이너 함수를 사용하여 리듀스에 전달하는 값을 최소화 할 수 있다.

컴바이너 함수를 사용하기 위해서는 잡을 구동하는 코드 내에서 컴파이너 클래스를 명시하면 사용 가능하다.

public class FunctionWithCombiner {

    public static void main(String[] args) throws Exception {
        Job job = new Job();
        job.setJarByClass(Function.class);
        job.setJobName("Function");
    
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
        job.setMApperClass(FuctionMapper.class);
        // Set Combiner Class
        job.setCombinerClass(FunctionReducer.class);
        job.setReducerClass(FunctionReducer.class);
    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        System.exit(job.watForCompletion(true) ? 0 : 1);
    }
}

 

+ Recent posts