3.1 이클립스로 스파크 프로젝트 생성
가상 머신의 이클립스를 GUI로 사용하기 위해서는 X 윈도 시스템을 사용해야한다.
호스트 OS 마다 다른 X 윈도 시스템을 사용해야하며, 나는 mac OS에서 사용할 수 있는 XQuartz를 사용한다.
( 다운로드 링크 : https://www.xquartz.org/ )
다운로드/설치 후 프로그램을 사용학 위해서는 SSH에서 -X 옵션을 사용하고, 사용할 프로그램을 실행시킨다.
# eclipse GUI로 실행
$ ssh -X spark@1192.168.10.2 /home/spark/eclipse/java-mars/eclipse/eclipse
실행 후에는 이클립스테 스칼라 IDE 플러그인을 설치한다.
- 이클립스 툴바에서 Help > Install new Software 클릭 후 Add 클릭
- Add Repository 창이 나타나면 아래 내용을 입력 후 OK 버튼 클릭한다.
- Name : scala-ide
- Location : http://download.scala-ide.org/sdk/lithium/e44/scala211/stable/site - 목록에서 Scala IDE for Eclipse와 그 하위 항목들을 선택한다.
- 선택한 항목들을 확인 후 그 다음 화면에서 라이센스 동의하고 플러그인을 설치
- 재시작 후 위와 같은 방법으로 스칼라 메이븐 플러그인 설치
( Name - m2eclipse-scala, Location -http://alchim31.free.fr/m2e-scala/update-site 만 다름 )
책에서는 메이븐 프로젝트를 생성할 수 있도록 scala-archetype-sparkinaction 아키타입을 미리 준비해두었다.
메이븐 프로젝트를 생성하기 위해서 아래 과정을 수행해보자
- 이클립스 툴바에서 File > New > Project > Maven > Maven Project
- Coonfigure 클릭 후 Maven > Archetype 섹션에서 Add Remote Catalog 클릭 후 아래 내용 입력
- Catalog File : https://github.com/spark-in-action/scala-archetype-sparkinaction/raw/master/archetype-catalog.xml
- Description : Spark in Action - 생성된 Archetype 선택 후 프로젝트 매개변수 설정
Group Id : org.sia
Artifact Id : chapter03App
Package : org.sia.chapter03App
위와 같이 설정 후에 프로젝트가 만들어져야하는데 몇 가지 에러가 발생해서 수정이 필요하였다.
첫 번째 에러는 error code 501 https required maven 이다.
여러 해결책을 수행해보았는데, 그 중 마지막 방법으로 해결 가능하였다.
- 방법 1 . ecplise.ini 파일 수정
- 방법 2. pom.xml 파일 수정
- 방법 3. pom.xml 파일 수정
첫 라인의 xmlns 의 http 부분을 https로 수정
이후 Project configuration is not up-to-date with pom.xml. Run project configuration update. 도 발생하였다. 프로젝트의 jre 라이브러리 버전이 메이븐 설정파일에 명시되어 있는 버전과 일치하지 않아서 발생하는 문제로써 Maven 프로젝트를 업데이트하면 쉽게 해결되는 문제이다. ( 프로젝트 우 클릭 > Maven > Update Project )

3.2 스파크 애플리케이션 개발
한 게임 회사의 모든 직원 명단과 각 직원이 수행한 푸시 횟수를 담은 일일 리포트를 개발해보자.
깃허브 아카이브의 하루치 데이터를 샘플 데이터로 사용하자 시작해본다.
가상 머신 터미널에서 다음 명령어를 사용한다.
# 데이터 저장 디렉터리 생성
$ mkdir -p $HOME/sia/github-archive
# Github 활동로그 다운로드
$ cd $HOME/sia/github-archive
$ wget http://data.githubarchive.org/2015-03-01-{0..23}.json.gz
# 파일 압축 해제
$ gunzip *
# 파일 내용 중 상위 1줄 출력
$ head -n 1 2015-03-01-0.json
# 파일 내용 JSON 포맷으로 출력
$ head -n 1 2015-03-01-0.json | jq '.'
앞서 생성한 JSON 데이터를 로드하기 위한 기능을 스파크 SQL과 DataFrame 에서 제공한다.
DataFrame에 대한 이야기는 이후에 다루고, 지금은 스파크 SQL으로 통한 JSON 로드에 초점을 맞춘다.
스파크 SQL의 메인 인터페이스는 SQLContext 클래스이다. ( 스파크 코어의 SparkContext와 유사 )
SQLContext의 read 메서드는 다양한 데이터를 입수하는데 사용할 수 있는 DataFrameReader 객체를 반환한다.
아래 예제와 같이 수행할 수 있다.
import org.apache.spark.sql.SparkSession
object App {
def main(args : Array[String]) {
val spark = SparkSessionbuilder()
.appName("GitHub push counter")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
// 시스템 함수를 통해 $HOME 변수 추출
val homeDir = System.getenv("HOME")
val inputPath = homeDir + "/sia/github-archive/2015-03-01-0.json"
// Json 데이터 추출
val ghLog = spark.read.json(inputPath)
// type이 PushEvent인 것만 필터링
val pushes = ghLog.filter("type = 'PushEvent'")
//스키마, 내용 출력
pushes.printSchema
pushes.show(5)
//actor.login 개수로 그룹화된 데이터 집계
val grouped = pushes.groupBy("actor.login").count
//count 값 기준 내림차순 정렬
val ordered = grouped.orderBy(grouped("count").desc)
// 직원 목록 사용을 위한 스칼라 컬렉션 로드
import scala.io.Source.formFile
val empPath = homeDir = "/first-edtion/ch03/ghEmployees.txt"
val employees = Set() + (
for {
line <- fromFile(empPath).getLines
} yield line.trim
)
// Set 기반 필터링을 위해 함수 정의 및 스파크의 사용자 정의 함수 활용
val isEmp = user => employees.contains(user)
val isEmployee = spark.udf.register("isEmpUdf", isEmp)
val filtered = ordered.filter(isEmployee($"login"))
이대로 예제 프로그램을 실행하면 스파크는 employees Set을 대략 200회( 필터링 작업을 수행할 태스크 개수) 가까이 반복적으로 네트워크에 전송하게 될 것이다.
이러한 반복 작업을 막기 위해 스파크에서는 공유 변수를 설정할 수 있다. 공유 변수는 클러스터의 각 노드에 정확히 한 번만 전송하고, 클러스터 노드에 자동으로 캐시되므로 프로그램 실행 중에 바로 접근이 가능하다. 또한 가십 프로토콜을 사용하여 워커 노드들이 서로 공유 변수를 교환하며 클러스터 전체에 공유 변수를 유기적으로 확산시킨다.
일반 변수인 employees 뒤에 공유 변수 설정 코드를 추가하고, 공유 변수에 접근 시 value 메소드를 사용해야한다. 공유 변수를 사용하는 예제 코드는 다음과 같다.
import org.apache.spark.sql.SparkSession
import scala.io.Source.formFile // 직원 목록 사용을 위한 스칼라 컬렉션 로드
object App {
def main(args : Array[String]) {
val spark = SparkSessionbuilder()
.appName("GitHub push counter")
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
// 시스템 함수를 통해 $HOME 변수 추출
val homeDir = System.getenv("HOME")
val inputPath = homeDir + "/sia/github-archive/2015-03-01-0.json"
// Json 데이터 추출
val ghLog = spark.read.json(inputPath)
// type이 PushEvent인 것만 필터링
val pushes = ghLog.filter("type = 'PushEvent'")
//스키마, 내용 출력
pushes.printSchema
pushes.show(5)
//actor.login 개수로 그룹화된 데이터 집계
val grouped = pushes.groupBy("actor.login").count
//count 값 기준 내림차순 정렬
val ordered = grouped.orderBy(grouped("count").desc)
val empPath = homeDir = "/first-edtion/ch03/ghEmployees.txt"
val employees = Set() + (
for {
line <- fromFile(empPath).getLines
} yield line.trim
)
// 공유 변수 선언
val bcemployees = sc.broadcast(employees)
// Set 기반 필터링을 위해 함수 정의 및 스파크의 사용자 정의 함수 활용
import spark.impicits._
val isEmp = user => bcemployees.value.contains(user)
val isEmployee = spark.udf.register("isEmpUdf", isEmp)
val filtered = ordered.filter(isEmployee($"login"))
작성된 애플리케이션을 검증 및 운영 환경으로 넘기기 전에 appName, appMaster, inputPath, empPath 변수를 애플리케이션의 매개변수로 만들어야한다. 모든 매개변수 값을 미리 파악한다고 해도 매개변수 값은 애플리케이션 외부에서 지정하는 것이 현명하며, 이를 통해 애플리케이션을 더욱 유연하게 만들 수 있다.
단일 json 파일이 다운 받은 모든 Json 파일에 대해서 분석하려면 inputPath 값을 *을 사용하여 변경한다.
// val inputPath = homeDir + "/sia/github-archive/2015-03-01-0.json" 대신 *를 사용
val inputPath = homeDir + "/sia/github-archive/*.json"
3.3 애플리케이션 제출
애플리케이션을 스파크 클러스터에서 실행하려면 애플리케이션이 모든 의존라이브러리에 접근할 수 있어야한다. 의존하는 라이브러리를 살펴보려면 pom.xml 파일의 Dependency Hierachy 탭에서 확인할 수 있다. 애플리케이션을 검증하기 위해서는 의존 라이브러리 또 같이 포함되어야하며, 포함시키는 방법은 두 가지가 있다.
- spark-submit 스크립트의 --jars 매개변수에 프로그램에 필요한 JAR 파일을 모두 나열해 실행자로 전송
- 모든 의존성 라이브러리를 포함하는 uberjar를 빌드
첫 번째 방법은 여러 클러스터를 운영할 경우 라이브러리를 일일이 배치해야하므로 다소 번거롭기 때문에, 두 번째 방법을 실습을 진행한다.
우선 추가하고 싶은 라이브러리에 대한 의존성도 발생할 수 있다. 의존성의 의존성을 해결하기 위해 maven-shade-plugin 을 사용하며, uberjar를 빌드하는데 사용된다. 각 의존 라이브러리가 필요한 단계를 지정할 수 있는 scope 속성이 있다. 이를 통해 해당 라이브러리와 이 라이브러리의 모든 라이브러리를 uberjar에서 제외할지(provided), 해당 라이브러리가 애플리케이션의 컴파일 및 런타임 단계에서 모두 사용할지(complie)을 결정할 수 있다.
이후 pom.xml 파일을 변경 후에는 이클립스 프로젝트를 갱신해야한다.
SparkConf에 애플리케이션 이름과 스파크 마스터를 지정하는 부분을 제거하고, spark-submit 인수로 전달하자.
최종 인자로 받아야하는 항목은 다음과 같다.
- 입력 JSON 파일 경로
- 직원 파일 경로
- 출력 파일 경로
- 출력 파일 형식
위의 순서대로 인자를 설정한 예제 프로그램 코드는 아래와 같다.
package org.sia.chapter03App
import org.apache.spark.sql.SparkSession
import scala.io.Source.fromFile
/**
* @author ${user.name}
*/
object GitHubDay {
def main(args : Array[String]) {
val spark = SparkSession.builder().getOrCreate()
val sc = spark.sparkContext
val homeDir = System.getenv("HOME")
val inputPath = homeDir + "/sia/github-archive/*json"
val ghLog = spark.read.json(args(0))
val pushes = ghLog.filter("type= 'PushEvent'")
val grouped = pushes.groupBy("actor.login").count
val ordered = grouped.orderBy(grouped("count").desc)
val employees = Set() ++ (
for {
line <- fromFile(args(1)).getLines
} yield line.trim
)
val bcEmployees = sc.broadcast(employees)
import spark.implicits._
val isEmp = user=> bcEmployees.value.contains(user)
val sqlFunc = spark.udf.register("SetContainUdf", isEmp)
val filtered = ordered.filter(sqlFunc($"login"))
filtered.write.format(args(3)).save(args(2))
}
}
uberjar로 빌드시키기 위해 이클립스 툴바에서 Run > Run Configurations... 클릭후
왼쪽에서 탭 Maven Build > Build uberjar 를 선택한다. ( 이후 아래와 같이 입력)

Run 이후에는 프로젝트의 target 디렉터리에 chapter03App-0.0.1-SNAPSHOT.jar 파일이 생성됨을 볼 수 있다.
이제 로컬에 설치된 스파크에서 애플리케이션을 테스트한다. 이때 spark-submit 을 사용한다.
spark-submit은 스파크 애플리케이션을 제출하는 일종의 헬퍼 스크립트로 애플리케이션을 스파크 클러스트에서 실행하는데 사용한다. 스파크 루트 디렉터리 아래 bin 디렉터리에 위치한다. 아래 명령어를 통해 스파크 애플리케이션을 제출한다.
//스파크 로그 파일 확인
$ tail -f /usr/local/spark/logs/info.log
// spark-submit 명령어
$ spark-submit --class org.sia.chapter03App.GitHubDay \
--master local[*] --name "Daily GitHub Push Counter" \
chapter03App-0.0.1-SNAPSHOT.jar "$HOME/sia/github-archive/*.json" \
"$HOME/first-edition/ch03/ghEmployees.txt" "$HOME/sia/emp-gh-push-output" "json"
// 실행 결과 확인
cat $HOME/sia/emp-gh-push-output/part-r-00000-b24f792c-c0d0-425b-85db-3322aab8f3e0.json
'빅데이터 > spark' 카테고리의 다른 글
Spark in Action - 2장 스파크의 기초 (0) | 2021.09.15 |
---|---|
Spark in Action - 1장 아파치 스파크 소개 (0) | 2021.09.12 |
Spark in Action - 0장 책 목차 소개 (0) | 2021.09.11 |