Spring Batch 최적화: Partitioning 및 멀티스레딩
I. 문제 상황
공공 API로부터 전국의 기업 정보를 동기화하는 배치 기능을 구현했다. 약 120만 건의 데이터를 페이지네이션 방식으로 가져와 데이터베이스에 저장하는 단순한 작업이지만, 처리 시간은 예상보다 훨씬 길었다.
1
2
3
4
5
총 데이터: 약 1,200,000건
페이지당 데이터: 1,000건
총 페이지 수: 약 1,200페이지
단일 스레드 순차 처리: 68분
문제는 명확했다. 모든 페이지를 순차적으로 처리하다 보니:
- API 호출 대기 시간이 누적된다
- 네트워크 I/O 대기 중 CPU가 유휴 상태로 놀고 있다
- 데이터베이스 저장 작업도 순차적으로 진행된다
이는 전형적인 I/O Bound 작업이며, 병렬 처리로 극적인 성능 향상을 기대할 수 있는 케이스다.
II. 해결 방안: Partitioning + Multi-Threading
Spring Batch는 대용량 데이터 처리를 위한 두 가지 강력한 기능을 제공한다.
- Partitioning: 데이터를 논리적으로 분할해 독립적으로 처리
- TaskExecutor: 각 파티션을 별도 스레드에서 병렬 실행
이 두 가지를 조합하면 다음과 같은 구조가 된다.
1
2
3
4
5
6
7
Master Step (조정자)
↓ 데이터 분할
├─ Woker Step [Partition 0 (Thread 1) → Page 0-239]
├─ Woker Step [Partition 1 (Thread 2) → Page 240-479]
├─ Woker Step [Partition 2 (Thread 3) → Page 480-719]
├─ Woker Step [Partition 3 (Thread 4) → Page 720-959]
└─ Woker Step [Partition 4 (Thread 5) → Page 960-1199]
각 파티션은 독립적인 스레드에서 실행되며, 서로 영향을 주지 않는다.
III. 구현 상세
전체 구조는 다음 5개의 컴포넌트로 구성된다.
1
2
3
4
5
6
7
8
9
UpdateCorporationDocumentConfig # Job 및 Step 정의
↓
CorporationPartitioner # 데이터 분할 전략
↓
CorporationClientReader # 외부 API 읽기
↓
CorporationDocumentWriter # DB 저장
↓
CorporationSyncScheduler # 스케줄 실행
1. Job 구성 (UpdateCorporationDocumentConfig)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Configuration
@RequiredArgsConstructor
public class UpdateCorporationDocumentConfig {
private final int GRID_SIZE = 5;
@Bean
public Job updateCorporationDocument() {
return new JobBuilder("updateCorporationDocument", jobRepository)
.start(masterStep())
.build();
}
@Bean
public Step masterStep() {
return new StepBuilder("masterStep", jobRepository)
.partitioner("workerStep", corporationPartitioner)
.step(workerStep())
.gridSize(GRID_SIZE)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Step workerStep() {
return new StepBuilder("workerStep", jobRepository)
.<Corporation, Corporation>chunk(1000, transactionManager)
.reader(corporationClientReader)
.writer(corporationDocumentWriter)
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(GRID_SIZE);
executor.setMaxPoolSize(GRID_SIZE);
executor.setQueueCapacity(GRID_SIZE);
executor.setThreadNamePrefix("corporation-batch-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}
이 구성의 핵심은 Master-Worker 패턴이다.
- Master Step: 데이터를 5개의 파티션으로 분할하고 각 Worker Step을 스레드풀에 할당
- Worker Step: 실제 데이터 읽기/쓰기를 Chunk 단위(1000건)로 처리
- TaskExecutor: 5개의 스레드를 생성해 각 파티션을 병렬 실행
GRID_SIZE는 StepExecution의 수량을 의미한다. 위 코드에선 WorkStep 하나만 매핑되기 때문에, GRID_SIZE는 WorkerStep의 수량과 같다. WorkerStep의 수량과 동일하게, Thread의 PoolSize 또한 GRID_SIZE로 맞췄다.
2. 데이터 분할 전략 (CorporationPartitioner)
(그림 I-2) Partitioner를 통한 데이터 분할
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
@RequiredArgsConstructor
public class CorporationPartitioner implements Partitioner {
private final CorporationClient corporationClient;
private static final int PAGE_SIZE = 1000;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int totalCorporationCount = corporationClient.totalCorporationCount();
int totalPages = (int) Math.ceil((double) totalCorporationCount / PAGE_SIZE);
log.info("Corporation Pages: totalCount={}, totalPages={}", totalCorporationCount, totalPages);
Map<String, ExecutionContext> partitions = new HashMap<>();
int pagesPerPartition = (int) Math.ceil((double) totalPages / gridSize);
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
int startPage = i * pagesPerPartition;
int endPage = Math.min(startPage + pagesPerPartition, totalPages);
context.putInt("partitionNumber", i);
context.putInt("startPage", startPage);
context.putInt("endPage", endPage);
context.putInt("pageSize", PAGE_SIZE);
partitions.put("partition" + i, context);
}
return partitions;
}
}
Partitioner는 데이터를 어떻게 나눌지 결정하는 전략을 정의한다. 여기서는 페이지 기반 분할 방식을 사용했다.
예를 들어 총 1200페이지를 5개 파티션으로 나누면:
1
2
3
4
5
Partition 0: Page 0-239 (240 pages)
Partition 1: Page 240-479 (240 pages)
Partition 2: Page 480-719 (240 pages)
Partition 3: Page 720-959 (240 pages)
Partition 4: Page 960-1199 (240 pages)
각 파티션의 메타데이터(startPage, endPage 등)는 ExecutionContext에 저장되어 Worker Step으로 전달된다. 이를 통해 각 스레드는 자신이 처리해야 할 페이지 범위를 알 수 있다.
2-1. 동적 분할의 중요성
단순히 고정된 페이지 범위로 나누지 않고, 런타임에 전체 데이터 개수를 조회해 동적으로 분할한다. 이는 데이터가 증가하거나 감소해도 유연하게 대응할 수 있게 한다.
1
2
int totalCorporationCount = corporationClient.totalCorporationCount();
int totalPages = (int) Math.ceil((double) totalCorporationCount / PAGE_SIZE);
3. 데이터 읽기 (CorporationClientReader)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Component
@StepScope
@RequiredArgsConstructor
public class CorporationClientReader extends AbstractPagingItemReader<Corporation> {
private final CorporationClient corporationClient;
@Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber;
@Value("#{stepExecutionContext['startPage']}") Integer startPage;
@Value("#{stepExecutionContext['endPage']}") Integer endPage;
@Value("#{stepExecutionContext['pageSize']}") Integer pageSize;
private int currentPage;
@Override
protected void doOpen() throws Exception {
setPageSize(pageSize);
super.doOpen();
currentPage = startPage;
log.info("[Partition-{}]: pages={}-{} (Total {} pages)",
partitionNumber, startPage, endPage - 1, endPage - startPage);
}
@Override
protected void doReadPage() {
if (results == null) results = new CopyOnWriteArrayList<>();
else results.clear();
if (currentPage >= endPage) return;
List<Corporation> corporations = corporationClient.getCorporations(currentPage + 1, getPageSize());
if (corporations != null && !corporations.isEmpty()) results.addAll(corporations);
currentPage++;
}
}
CorporationClientReader는 Spring Batch의 AbstractPagingItemReader를 상속받아 구현했다. 핵심은 @StepScope와 SpEL을 활용한 파티션별 메타데이터 주입이다.
1
@Value("#{stepExecutionContext['startPage']}") Integer startPage;
이 표현식은 각 Worker Step의 ExecutionContext에서 startPage 값을 읽어온다. 덕분에 동일한 Reader 클래스가 5개의 다른 스레드에서 각기 다른 페이지 범위를 처리할 수 있다.
3-1. Thread-Safe 컬렉션 사용
1
if (results == null) results = new CopyOnWriteArrayList<>();
멀티스레딩 환경에서는 동시성 문제를 항상 염두에 두어야 한다. CopyOnWriteArrayList는 읽기 작업이 많고 쓰기 작업이 적은 경우 효율적인 Thread-Safe 컬렉션이다.
4. 데이터 쓰기 (CorporationDocumentWriter)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
@StepScope
@RequiredArgsConstructor
public class CorporationDocumentWriter implements ItemWriter<Corporation> {
private final CorporationRepository corporationRepository;
@Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber;
@Override
public void write(Chunk<? extends Corporation> chunk) {
List<Corporation> corporations = chunk.getItems()
.stream()
.map(u -> (Corporation) u)
.toList();
corporationRepository.saveAll(corporations);
}
}
Writer는 상대적으로 단순하다. Chunk 단위로 받은 데이터를 일괄 저장한다. saveAll()을 사용하면 JPA의 배치 insert 최적화를 활용할 수 있다.
여기서도 partitionNumber를 주입받는데, 이는 로깅이나 모니터링 용도로 활용할 수 있다.
5. 스케줄 실행 (CorporationSyncScheduler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
@RequiredArgsConstructor
public class CorporationSyncScheduler {
private final JobLauncher jobLauncher;
private final Job updateCorporationDocument;
@Scheduled(cron = "0 0 3 15 4 *", zone = "Asia/Seoul")
public void runUpdateCorporationDocument() {
try {
log.info("Starting UpdateCorporationDocument Batch Job");
JobParameters jobParameters = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(updateCorporationDocument, jobParameters);
log.info("UpdateCorporationDocument Batch Job Completed");
log.info("Start: {}, End: {}, Duration: {}m",
jobExecution.getStartTime(),
jobExecution.getEndTime(),
Duration.between(jobExecution.getStartTime(), jobExecution.getEndTime()).toMinutes());
} catch (Exception e) {
log.error("UpdateCorporationDocument Batch Job failed", e);
}
}
}
JobParameters에 timestamp를 추가해 매 실행마다 고유한 Job Instance가 생성되도록 했다.
IV. Chunk 지향 처리
Spring Batch의 핵심 개념 중 하나가 Chunk 지향 처리다.
1
2
3
4
5
6
7
Read (1000건)
↓
Process (1000건 변환/검증)
↓
Write (1000건 일괄 저장)
↓
Commit (트랜잭션)
Chunk Size를 1000으로 설정했다는 것은:
- Reader가 1000건을 읽을 때까지 반복
- 1000건이 모이면 Writer로 전달
- Writer가 완료되면 트랜잭션 커밋
이를 통해 메모리 사용량을 제어하고, 트랜잭션 범위를 적절히 관리할 수 있다. 만약 Chunk Size가 너무 크면 메모리 부족이 발생하고, 너무 작으면 트랜잭션 오버헤드가 증가한다.
V. 성능 개선 결과
Before: 단일 스레드 순차 처리
1
2
3
4
총 처리 시간: 68분
평균 페이지당 처리 시간: 3.4초
1200 pages × 3.4s = 4,080초 ≈ 68분
After: 5개 파티션 병렬 처리
1
2
3
4
5
6
7
총 처리 시간: 14분
파티션당 페이지 수: 240 pages
평균 페이지당 처리 시간: 3.5초 (병렬 오버헤드 포함)
240 pages × 3.5s ÷ 5 threads = 840초 ≈ 14분
성능 개선율: 79% 감소 (68분 → 14분)