빅데이터/Hadoop

Hadoop The Definitive Guide - Chap7 맵리듀스 동작 방법

sumni0530 2023. 4. 22. 12:36

Chapter 7 - 맵리듀스 작동 방법

이 장에서는 하둡에서 맵리듀스의 작동 방법을 자세히 살펴본다. 

 

7.1 맵리듀스 잡 실행 상세분석

맵리듀스 잡을 동작하는 방식 (링크 : https://www.oreilly.com/library/view/hadoop-the-definitive/9781491901687/ch07.html)

맵리듀스의 잡은 5개의 독립적인 단계로 구분되어 동작한다.

  • 클라이언트 : 맵리듀스 코드를 작성하며, 맵리듀스 잡을 제출
  • YARN 리소스 매니저 : 클러스터 상에 계산 리소스의 할당을 제어(잡 할당 관리)
  • YARN 노드 매니저 : 클러스터의 각 머신에서 계산 컨테이너를 시작하고 모니터링(태스크 관리)
  • 맵리듀스 애플리케이션 마스터 : 맵리듀스 잡을 수행하는 각 태스크를 제어
    애플리케이션 마스터와 맵리듀스 태스크는 컨테이너 내에서 실행
  • 분산 파일시스템 : 다른 단계 간에 잡 리소스 파일을 공유하는데 사용 (보통 HDFS을 사용)

잡이 동작하는 순서에 따라 각 단계에서 어떤 동작을 하는지 알아보자.

 

7.1.1. 잡 제출

Job은 submit() 메서드를 호출하여 제출가능하다. submit() 메서드는 내부에 JobSubmitter 인스턴스를 생성하고, submitJobInternal() 메서드를 호출한다. 잡을 제출하면 waitForCompletion() 메서드가 1초에 한 번씩 잡의 진행 상황을 조사하여 변경 내역이 있으면 콘솔로 보여준다. JobSubmitter의 잡 제출 과정은 아래와 같다.

  1. 리소스 매니저에 잡 ID로 사용될 새로운 애플리케이션 ID를 요청 (2단계)
  2. 잡의 출력 명세 확인 - 출력 디렉터리 지정되지 않았거나, 출력 디렉터리가 존재하지 않는다면 잡을제출하지 않고 에러를 전달 
  3. 잡의 입력 스플릿 계산 - 스플릿을 계산할 수 없다면(데이터가 없다면) 잡은 제출되지 않고 에러를 전달
  4. 잡 실행에 필요한 파일 복사 - 잡 JAR파일, 환경 설정 파일, 계산된 입력 스플릿 등의 잡 리소르를 공유 파일 시스템에 있는 해당 잡 ID 이름의 디렉터리에 복사 (3단계)
  5. 잡 제출 - 리소스 매니저의 submitApplication() 호출하여 잡을 제출(4단계)

 

7.1.2 잡 초기화

리소스 매니저가 submitApplication() 메서드의 호출을 받으면 YARN 스케쥴러에 요청을 전달한다. 스케쥴러는 컨테이너를 하나 할당하고, 리소스 매니저는 노드 매니저의 운영 규칙에 따라 애플리케이션 마스터 프로세스를 시작한다.(5단계) 

맵리듀스 잡의 애플리케이션 마스터는 자바 애플리케이션이며, 메인 클래스는 MRAppMaster 이다. 잡을 초기화할 때는 진행 상태를 추적하기 위해 북키핑(장부) 객체를 생성하고 태스크로부터 진행 및 종료 리포트를 받는다.(6단계)

클라이언트가 계산한 입력 스플릿 정보를 공유 파일시스템에서 읽어오고(7단계), 입력 스플릿별로 맵 태스크 객체를 생성하고, 잡의 setNumReduceTask() 메서드로 지정한 속성 값(mapreduce.job.reduces)만큼 맵 태스크 객체를 생성한다. 

애플리케이션 마스터는 잡의 크기가 작다면 자신의 JVM에서도 실행할 수 있다. 병렬 처리를 위해 새로운 컨테이너에 태스크를 할당하고 실행하는 방식이 비효율적이라고 판단될 때 우버(단일 노드에서 순차적을 실행하는 상태)될 수 있으며, 우버 태스크로 실행된다고 한다. 

우버 태스크로 실행되는 기준은 10개 미만의 매퍼와 하나의 리듀서, HDFS 블록 하나보다 작은 크기의 입력을 말한다. 사용시에는 mapreduce.job.ubertask.enable 속성을 명시적으로 활성화 해야한다. 

 

7.1.3 태스크 할당 

우버 태스크로 실행하기 적합하지 않다면 애플리케이션 마스터는 리소스 매니저에 잡의 모든 맵과 리듀스 태스크를 위한 컨테이너를 요청한다.(8단계) 맵 태스크는 리듀스 태스크보다 우선 순위가 높으며, 전체 맵 태스크의 5%가 완료되기 전까지 리듀스 태스크는 처리되지 않는다.

리듀스 태스크 요청과 달리 맵 태스크 요청은 데이터 지역성 제약이 있으며, 데이터 로컬(입력 스플릿이 저장된 노드에서 맵 태스크 실행) or 랙 로컬에서 실행해야 성능에 유리하다. 요청할 때 태스크를 위한 메모리 및 CPU 요구사항을 명시하며, 기본적으로 1,024MB의 메모리와 가상 코어 1개를 태스크에 할당한다. 

 

7.1.4 태스크 실행

리소스 매니저의 스케쥴러가 특정 노드 상의 컨테이너를 위한 리소스를 태스크에 할당하면 애플리케이션 마스터는 노드 매니저와 통신하며 컨테이너를 시작한다.(9단계) 각 태스크는 YarnChild 메인 클래스를 가진 자바 애플리케이션으로 실행되며, 전용 JVM에서 동작하기 때문에 사용자 정의 맵/리듀스 함수에서 중지 혹은 종료되어도 노드 매니저는 영향을 받지 않는다. 태스크 실행 전 잡에 필요한 리소스를 로컬로 가져오고, 최종적으로 맵과 리듀스 태스크를 실제 실행한다.(11단계)

스트리밍 처리 워크플로우 (참고 - https://inking007.tistory.com/entry/How-MapReduce-Works)

스트리밍은 사용자가 제공한 실행 파일을 시작하고 이와 통신하기 위한 목적을 가진 특별한 맵/리듀스 태스크를 실행한다. 

보통은 YarnChild를 통해 태스크를 실행하지만, 스트리밍에서는 태스크가 스트리밍 프로세스를 실행하기 때문에 노드 매니저 관점에서 자식 프로세스가 스스로 실행한 것 처럼 보인다.

 

7.1.5 진행 상황과 상태 갱신

맵과 리듀스의 진행 상황, 잡의 카운터 값, 상태 메시지 또는 명세 등의 상태 정보는 진행 과정에서 수시로 변경되며 클라이언트는 해당 정보를 전달받고 싶어한다. 태스크가 수행되는 동안 태스크는 자신의 진행 상황을 추적하여 전달한다. 맵의 경우 처리한 입력 데이터 비율로, 리듀서의 경우 진행 상황을 3단계로 나눠 처리한 입력 데이터의 비율을 추정한다. 

맵리듀스가 진행중(무엇인가를 하고 있다)이라는 걸 판단하는 동작은 다음과 같다. 

  • 입력 레코드 읽기(매퍼 혹은 리듀서)
  • 출력 레코드 쓰기(매퍼 혹은 리듀서)
  • 상태 명세의 설정(Reporter 또는 TaskAttempContext의 setStatus() 메서드를 통해)
  • 카운터 증가(Reporter의 incrCounter() 메서드 또는 Counter의 increment() 메서드를 통해)
  • Reporter 또는 TaskAttempContext의 progress() 메서드 호출

상태 갱신 전달 방법 ( 참고 : http://what-when-how.com/Tutorial/topic-2059e313/Hadoop-The-Definitive-Guide-239.html)

맵과 리듀스 태스크가 실행되면서 자식 프로세스는 부모인 애플리케이션 마스터와 인터페이스를 통해 통신한다. 태스크는 진행 상황과 상태정보를 매 3초마다 이 인터페이스를 통해 리포트한다. 

 

 

7.1.6 잡 완료

애플리케이션 마스터가 마지막 태스크가 완료되었다는 리포트를 받으면 잡의 상태를 'successful'로 변경한다. 또한 HTTP 잡 리포트가 설정되었다면 이를 수행하며 클라이언트가 콜백을 받으려면 mapreduce.job.endnotification.url 속성을 설정하면 된다. 마지막으로 잡이 완료되면 애플리케이션 마스터와 태스크 컨테이너는 작업 상태를 정리하고 OutputCommitter의 commitJob() 메서드를 호출한다. 

 

 

7.2 실패

태스크, 애플리케이션 마스터, 노드 매니저, 리소스 매니저의 실패에 대해서 고려해보자.

 

7.2.1 태스크 실패

가장 흔한 실패 유형은 맵 또는 리듀스 태스크 내 사용자 코드에서 런타임 예외를 던질 때이다. 태스크 JVM이 종료 전 애플리케이션 마스터에 에러 보고 후 해당 리소스를 사용 가능하도록 컨테이너를 풀어준다. 스트리밍 태스크에서는 프로세스가 0이 아닌 코드를 반환하면 실패로 표시된다. 

다른 유형의 실패는 JVM이 갑작스럽게 종료되는 경우이며, 맵리듀스 사용자 코드로 인해 발생할 가능성이 높다. 이때 노드 매니저는 프로세스가 종료됨을 알리고, 애플리케이션 마스터에 실패되었다고 알려준다

행이 걸린(멈춘) 태스크는 애플리케이션 마스터가 잠시 동안 진행 상황을 갱신받지 못함을 알게 되면서 해당 태스크를 실패로 표시한다. 태스크 JVM 프로세스는 태스크 실패로 간주하는 타임아웃 시간인 10분 이후 자동으로 강제 종료한다. 타임 아웃을 0으로 설정하면 타임아웃을 비활성화하여 실행 시간이 긴 태스크는 실패로 표시되지 않는다. 애플리케이션 마스터는 태스크 시도 실패를 알게 되면 해당 태스크를 다시 스케쥴링한다. 스케쥴링시 동일 노드에 동작하는 것을 피하고, 4번까지 시도한다. 

태스크 시도가 강제로 종료될 수 있는데 이는 실패와 다르다. 또 실행 기반인 노드 매니저가 실패해서 모든 태스크 시도를 실패로 표시할 때 이를 모두 강제 종료한다. 

 

7.2.2 애플리케이션 마스터 실패

YARN 내에 애플리케이션은 실패할 때 몇 번의 재시도를 할 수 있다. 기본 값은 2이며, 애플리케이션 마스터가 2번 이상 실패하면 잡이 실패로 끝난다. 실패시 작업 복구 방식은 아래와 같다.

  1. 애플리케이션 마스터가 주기적으로 리소스 매니저에 하트비트를 전달
  2. 애플리케이션 실패 이벤트 발생 시 리소스 매너지는 실패를 감지하고 새로운 마스터 인스턴스를 시작
  3. 잡 히스토리를 사용하여 실행된 모든 태스크의 상태를 복구 (태스크 재실행 불필요)

7.2.3 노드 매니저 실패

노드 매니저가 크래시에 의해 실패하거나 굉장히 느리게 수행 중이라면 리소스 매니저에 한트비트 전송을 중단 혹은 느리게 전달될 것이다.리소스 매니저는 하트비트 전송을 중단한 노드 매니저가 일정 시간(기본값 10분)동안 한번도 전송하지 않음을 인지하면 이를 컨터이너를 스케쥴링하는 노드 풀에서 제거한다. 

애플리케이션 실패 횟수가 높으면 노드 매니저 자체가 실패하지 않더라도 노드 매니저는 블랙리스트에 등록된다. 블랙리스트 등록 주체는 애플리케이션 마스터이며, 하나의 노드 매니저에서 네 개 이상의 맵리듀스 태스크가 실패하면 다른 노드에 태스크를 다시 스케쥴링한다. 

 

7.2.4 리소스 매니저 실패

리소스 매니저 없이는 잡이나 태스크 컨테이너가 실행될 수 없기 때문에 굉장히 심각한 상황이다. 리소스 매니저는 단일 고장점이며, 고가용성(High availability)를 달성하기 위해 두 개의 리소스 매니저를 Active-StandBy 설정으로 실행해야 한다. 실행 중인 애플리케이션에 대한 모든 정보는 고가용 상태 저장소에 보관되기 때문에 대기 리소스 매니저는 실패한 활성 리소스 매니저의 핵심 상태를 복구할 수 있다. 새로운 리소스 매니저가 시작되면 상태 저장소로부터 애플리케이션 정보를 읽고 클러스터에서 실행 중인 모든 애플리케이션의 애플리케이션 마스터를 재시작한다. 태스크 복구는 애플리케이션 마스터의 역할이기 때문에 리소스 매니저는 관여하지 않는다. 

대기 리소스 매니저에서 활성 리소스 매니저로의 전환은 장애극복 관리자(failover controller)가 담당한다. 기본 장애극복 관리자는 주키퍼 대표 선출을 사용해서 어떠한 시점에라도 단일 활성 리소스 매니저가 존재하도록 보장한다. 

클라이언트와 노드 매니저는 활성 리소스 매니저를 찾을 때까지 라운드 로빈 방식으로 각 리소스 매니저에 연결을 시도하며, 만약 활성 리소스 매니저가 실패한다면 대기 리소스 매니저가 활성화될 때까지 재시도를 한다.

 

7.3 셔플과 정렬

맵리듀스는 모든 리듀서의 입력이 키를 기준으로 정렬되는 것을 보장한다. 시스템이 이러한 정렬을 수행하고 맵의 출력을 리듀서의 입력으로 전송하는 과정을 셔플이라고 한다. 

셔플과 정렬 (참고 - https://datainnovations.wordpress.com/tag/shuffle-and-sort-in-mapreduce/)

7.3.1 맵 부분

맵 함수가 결과를 생산할 때 효율적인 처리를 위해 메모리에 일정 크기만큼 쓴 다음 사전 정렬을 수행한다. 각 맵 태스크는 환형 구조의 메모리 버퍼(기본 100MB)를 가지고 있으며 이곳에 결과를 기록한다. 버퍼의 내용이 특정 한계치에 도달하면 백그라운드 스레드가 디스크에 스필하기 시작한다. 버퍼 기록과 스필은 동시에 수행하며 버퍼가 가득차면 스필이 종료될 때까지 블록된다. 

디스크로 쓰기 전에 전송할 리듀서 수에 맞게 파티션으로 나누고, 키를 기준으로 인메모리 정렬을 수행한다. 컴바이너 함수가 존재하면 정렬의 출력에 대해 수행한다. 스필의 한계치에 도달하면 새로운 스필 파일이 생성되므로 맵 태스크로 인해 여러 개의 스필 파일이 존재할 수 있다. 태스크가 종료되기 전에 여러 스필 파일은 단일 출력 파일로 병합되고 정렬된다. 

스필 파일이 기준 값(기본값 3)보다 많다면 맵 출력 크기를 줄이기 위해 컴바이너 함수를 다시 실행한다. 출력 파일의 파티션은 HTTP를 통해 리듀서에 전달되며 파일 파티션을 전달하는 워커 스레드의 최대 수는 조절 가능하다. 

 

7.3.2 리듀스 부분

맵 출력 파일은 맵 태스크를 수행한 서버의 로컬 디스크에 존재하는데 리듀스 태스크를 시작하는 서버에도 맵 출력 파일이 필요하며, 맵 태스크는 각기 다른 시간에 끝날 수 있으므로 각 맵 태스크가 끝나는 즉시 복사를 시작한다. 리듀스 태스크는 복사기 스레드를 가지고 있는데, 맵 출력 인출을 병렬로 수행한다. 인출할 서버는 애플리케이션 마스터를 통해 맵 출력과 호스트 사이의 매핑 정보를 확인할 수 있다. 

모든 맵 출력이 복사되는 시점에 리듀스 태스크는 맵 출력을 병합하고 정렬 순서를 유지한다. 50개의 맵 출력이 존재하고 병합 계수가 10인 경우 다섯 개의 라운드로 구성되며, 각 라운드는 10개의 파일을 하나로 병합하여 5개의 중간 파일이 생성된다.

중간 파일을 하나의 정렬된 파일로 병합하는 최종 라운드를 가지는 대신 마지막 단계의 리듀스 함수에 곧바로 전송하여 디스크 IO를 줄인다. 

 

7.3.3 설정 조정

맵 측면에서와 리듀스 측면에서 튜닝 속성들을 변경할 수 있다. 설정가능한 속성과 이에 대한 설명은 공식 홈페이지(MapReduce Tutorial)에서 확인 가능하다. 

일반적인 설정 조정에 대한 원칙은 셔플에 가능한 많은 메모리를 할당하는 것이다. 셔플의 메모리 확보도 중요하지만 맵과 리듀스 함수가 동작하는데 충분한 메모리 확보도 필요하기 때문에 함수 작성 시 가장한 적은 메모리를 사용하도록 작성해야한다. 맵 측면에서 보면 다수의 디스크 스필을 피하는 것이 최고의 성능을 내는 방법이다. 리듀스 측면에서는 중간 데이터 전체가 메모리에 존재할 때 최고의 성능을 얻을 수 있다. 

 

7.4 태스크 실행

맵리듀스 사용자가 태스크 실행에 관해 취할 수 있는 제어사항에 대해서 알아보자.

 

7.4.1 태스크 실행 환경

맵 리듀스 API가 제공하는 Mapper 또는 Reducer의 configure() 메서드 구현을 통해 얻을 수 잇는 잡의 환경 설정이다. 

Name Type Description
mapreduce.job.id String The job id
mapreduce.job.jar String job.jar location in job directory
mapreduce.job.local.dir String The job specific shared scratch space
mapreduce.task.id String The task id
mapreduce.task.attempt.id String The task attempt id
mapreduce.task.is.map boolean Is this a map task
mapreduce.task.partition int The id of the task within the job
mapreduce.map.input.file String The filename that the map is reading from
mapreduce.map.input.start long The offset of the start of the map input split
mapreduce.map.input.length long The number of bytes in the map input split
mapreduce.task.output.dir String The task’s temporary output directory

 

스트리밍 환경변수

하둡은 잡 환경 설정 파라미터를 스트리밍 프로그램의 환경 변수로 설정한다. 숫자 이외의 문자는 _로 대체하여 유요한 이름을 만든다. 

또한 맵리듀스가 스트리밍 프로세스 실행 시 -cmdenv 옵션을 스트리밍 시작 프로그램에 입력하여 환경 변수를 설정할 수 있다.

-cmdenv MAGIC_PARAMETER=abracadabr

 

7.4.2 투기적 실행

맵리듀스 모델은 잡을 태스크로 나누고 태스크를 병렬 수행하는 형태이다. 모든 태스크가 빠르지 않으면, 즉 하나라도 느린 태스크가 있다면 전체 잡 수행을 상당히 지연시킨다. 태스크는 하드웨어의 성능 저하나 소프트웨어의 잘못된 설정 등 다양한 이유로 느려질수 있다. 하둡은 느린 태스크를 진단하거나 고치려 하지 않는 대신 태스크 수행이 예상햇던 것보다 느린 것을 감지하여 또 다른 동일한 동일한 예비 태스크를 실행한다. 이를 투기적 실행이라고 한다. 이 방법은 클러스터의 리소스 낭비가 심해지기 때문에 모든 태스크의 진행 상황을 기록하고 평균보다 심각하게 느린 태스크에 대해서만 투기적 복제 태스크를 실행한다. 원본과 복제 태스크 중 하나라도 성공되면 실행 중인 태스크는 강제 종료한다. 투기적 실행은 안정적으로 잡을 실행하는 기능은 아니다. 버그가 존재할 때 문제를 회피하기 위해 사용하는 용도이며, 근본적인 원인을 찾아 해결하는 것이 올바른 방법이다. 

투기적 실행의 궁극적인 목적은 잡 실행 시간을 줄이는 것이지만 클러스터 효율성 측면에서 비용이 발생하므로, 클러스터 상에서는 끄는 것을 선호하고 사용자가 개발 잡에 대해 필요하면 명시적으로 켜도록 하고 있다. 리듀스 태스크에 투기적 실행을 끄는 것은 좋은 예이다. 동일한 맵 출력을 인출해야하며, 네트워크 트래픽을 심각하게 증가시키기 때문이다.

 

7.4.3 출력 커미터

하둡 맵리듀스는 잡과 태스크가 깨끗하게 성공하거나 실패하도록 보장하기 위한 커밋 프로토콜을 사용한다. 이 동작은 잡에서 사용 중인 OutputCommitter로 구현했으며, getOutputCommitter() 메서드로 확인할 수 있다. 기본 값은 파일 기반의 맵리듀스에 적합한 FileOutputCommitter다. OutputCommitter는 아래와 메서드로 구성되어있다.

  • setupJob() : 잡을 실행하기 전 초기화를 수행하기 위해 사용
    최종 출력 디렉터리와 태스크 출력을 위한 임시 작업 공간인 _temporary를 최종 출력 디렉터리의 서브 디렉터리로 생성
  • commitJob() : 잡이 성공한 이후 호출, 파일 기반 구현에서 임시 작업 공간을 삭제하고 _SUCCESS라는 빈 마커 파일을 생성
  • abortJob() : 잡이 성공하지 않은 경우 호출되는 메서드, 잡의 임시 작업 공간을 삭제
  • setupTask() : 태스크 실행 시 호출되고 기본 구현체는 아무런 일도 하지 않음
  • needsTaskCommit() : 커밋 단계는 선택사항이며, false를 반환하도록하여 비활성화할 수 있음
    비활성화시 분산 커밋 프로토콜을 실행해야하는 오버헤드를 덜어주고, commitTask(), abortTask()도 호출하지 않음
  • commitTask() : 태스크가 성공하면 임시 태스크 출력 디렉터리를 최종 출력 경로로 옮김
  • abortTask(): 태스크가 성공하지 않은 경우 임시 태스크 출력 디렉터리를 삭제

 

태스크의 부차적인 파일

맵과 리듀스 태스크의 출력을 작성하는 일반적인 방법은 키-값 쌍을 수집하는 OutputCollector를 사용하는 것이다. 주의할 점은 동일한 태스크의 다중 인스턴스가 동일한 파일에 쓰지 않도록 보장하는 것이다. 애플리케이션이 태스크 작업 디렉터리에 부차적인 파일을 작성했을 때 성공적으로 완료된 태스크의 부차적인 파일은 출력 디렉터리에 자동으로 옮겨지지만 실패한 태스크의 파일은 삭제될 것이다.