포스트

Spring Batch 최적화: Partitioning 및 멀티스레딩

I. 문제 상황

before-result (그림 I-1) 기능 개선 전, 배치 실행 결과

공공 API로부터 전국의 기업 정보를 동기화하는 배치 기능을 구현했다. 약 120만 건의 데이터를 페이지네이션 방식으로 가져와 데이터베이스에 저장하는 단순한 작업이지만, 처리 시간은 예상보다 훨씬 길었다.

1
2
3
4
5
총 데이터: 약 1,200,000건
페이지당 데이터: 1,000건
총 페이지 수: 약 1,200페이지

단일 스레드 순차 처리: 68분

문제는 명확했다. 모든 페이지를 순차적으로 처리하다 보니:

  1. API 호출 대기 시간이 누적된다
  2. 네트워크 I/O 대기 중 CPU가 유휴 상태로 놀고 있다
  3. 데이터베이스 저장 작업도 순차적으로 진행된다

이는 전형적인 I/O Bound 작업이며, 병렬 처리로 극적인 성능 향상을 기대할 수 있는 케이스다.

II. 해결 방안: Partitioning + Multi-Threading

Spring Batch는 대용량 데이터 처리를 위한 두 가지 강력한 기능을 제공한다.

  1. Partitioning: 데이터를 논리적으로 분할해 독립적으로 처리
  2. TaskExecutor: 각 파티션을 별도 스레드에서 병렬 실행

이 두 가지를 조합하면 다음과 같은 구조가 된다.

1
2
3
4
5
6
7
Master Step (조정자)
  ↓ 데이터 분할
  ├─ Woker Step [Partition 0 (Thread 1) → Page 0-239]
  ├─ Woker Step [Partition 1 (Thread 2) → Page 240-479]
  ├─ Woker Step [Partition 2 (Thread 3) → Page 480-719]
  ├─ Woker Step [Partition 3 (Thread 4) → Page 720-959]
  └─ Woker Step [Partition 4 (Thread 5) → Page 960-1199]

각 파티션은 독립적인 스레드에서 실행되며, 서로 영향을 주지 않는다.

III. 구현 상세

전체 구조는 다음 5개의 컴포넌트로 구성된다.

1
2
3
4
5
6
7
8
9
UpdateCorporationDocumentConfig    # Job 및 Step 정의
  ↓
CorporationPartitioner             # 데이터 분할 전략
  ↓
CorporationClientReader            # 외부 API 읽기
  ↓
CorporationDocumentWriter          # DB 저장
  ↓
CorporationSyncScheduler           # 스케줄 실행

1. Job 구성 (UpdateCorporationDocumentConfig)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Configuration
@RequiredArgsConstructor
public class UpdateCorporationDocumentConfig {
    private final int GRID_SIZE = 5;

    @Bean
    public Job updateCorporationDocument() {
        return new JobBuilder("updateCorporationDocument", jobRepository)
                .start(masterStep())
                .build();
    }

    @Bean
    public Step masterStep() {
        return new StepBuilder("masterStep", jobRepository)
                .partitioner("workerStep", corporationPartitioner)
                .step(workerStep())
                .gridSize(GRID_SIZE)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Step workerStep() {
        return new StepBuilder("workerStep", jobRepository)
                .<Corporation, Corporation>chunk(1000, transactionManager)
                .reader(corporationClientReader)
                .writer(corporationDocumentWriter)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(GRID_SIZE);
        executor.setMaxPoolSize(GRID_SIZE);
        executor.setQueueCapacity(GRID_SIZE);
        executor.setThreadNamePrefix("corporation-batch-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
}

이 구성의 핵심은 Master-Worker 패턴이다.

  • Master Step: 데이터를 5개의 파티션으로 분할하고 각 Worker Step을 스레드풀에 할당
  • Worker Step: 실제 데이터 읽기/쓰기를 Chunk 단위(1000건)로 처리
  • TaskExecutor: 5개의 스레드를 생성해 각 파티션을 병렬 실행

GRID_SIZEStepExecution의 수량을 의미한다. 위 코드에선 WorkStep 하나만 매핑되기 때문에, GRID_SIZEWorkerStep의 수량과 같다. WorkerStep의 수량과 동일하게, Thread의 PoolSize 또한 GRID_SIZE로 맞췄다.

2. 데이터 분할 전략 (CorporationPartitioner)

before-result (그림 I-2) Partitioner를 통한 데이터 분할

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
@RequiredArgsConstructor
public class CorporationPartitioner implements Partitioner {
    private final CorporationClient corporationClient;
    private static final int PAGE_SIZE = 1000;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        int totalCorporationCount = corporationClient.totalCorporationCount();
        int totalPages = (int) Math.ceil((double) totalCorporationCount / PAGE_SIZE);
        log.info("Corporation Pages: totalCount={}, totalPages={}", totalCorporationCount, totalPages);

        Map<String, ExecutionContext> partitions = new HashMap<>();
        int pagesPerPartition = (int) Math.ceil((double) totalPages / gridSize);

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();

            int startPage = i * pagesPerPartition;
            int endPage = Math.min(startPage + pagesPerPartition, totalPages);

            context.putInt("partitionNumber", i);
            context.putInt("startPage", startPage);
            context.putInt("endPage", endPage);
            context.putInt("pageSize", PAGE_SIZE);
            partitions.put("partition" + i, context);
        }

        return partitions;
    }
}

Partitioner는 데이터를 어떻게 나눌지 결정하는 전략을 정의한다. 여기서는 페이지 기반 분할 방식을 사용했다.

예를 들어 총 1200페이지를 5개 파티션으로 나누면:

1
2
3
4
5
Partition 0: Page 0-239    (240 pages)
Partition 1: Page 240-479  (240 pages)
Partition 2: Page 480-719  (240 pages)
Partition 3: Page 720-959  (240 pages)
Partition 4: Page 960-1199 (240 pages)

각 파티션의 메타데이터(startPage, endPage 등)는 ExecutionContext에 저장되어 Worker Step으로 전달된다. 이를 통해 각 스레드는 자신이 처리해야 할 페이지 범위를 알 수 있다.

2-1. 동적 분할의 중요성

단순히 고정된 페이지 범위로 나누지 않고, 런타임에 전체 데이터 개수를 조회해 동적으로 분할한다. 이는 데이터가 증가하거나 감소해도 유연하게 대응할 수 있게 한다.

1
2
int totalCorporationCount = corporationClient.totalCorporationCount();
int totalPages = (int) Math.ceil((double) totalCorporationCount / PAGE_SIZE);

3. 데이터 읽기 (CorporationClientReader)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Component
@StepScope
@RequiredArgsConstructor
public class CorporationClientReader extends AbstractPagingItemReader<Corporation> {
    private final CorporationClient corporationClient;

    @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber;
    @Value("#{stepExecutionContext['startPage']}") Integer startPage;
    @Value("#{stepExecutionContext['endPage']}") Integer endPage;
    @Value("#{stepExecutionContext['pageSize']}") Integer pageSize;

    private int currentPage;

    @Override
    protected void doOpen() throws Exception {
        setPageSize(pageSize);
        super.doOpen();
        currentPage = startPage;
        log.info("[Partition-{}]: pages={}-{} (Total {} pages)", 
                 partitionNumber, startPage, endPage - 1, endPage - startPage);
    }

    @Override
    protected void doReadPage() {
        if (results == null) results = new CopyOnWriteArrayList<>();
        else results.clear();

        if (currentPage >= endPage) return;

        List<Corporation> corporations = corporationClient.getCorporations(currentPage + 1, getPageSize());
        if (corporations != null && !corporations.isEmpty()) results.addAll(corporations);
        currentPage++;
    }
}

CorporationClientReader는 Spring Batch의 AbstractPagingItemReader를 상속받아 구현했다. 핵심은 @StepScope와 SpEL을 활용한 파티션별 메타데이터 주입이다.

1
@Value("#{stepExecutionContext['startPage']}") Integer startPage;

이 표현식은 각 Worker Step의 ExecutionContext에서 startPage 값을 읽어온다. 덕분에 동일한 Reader 클래스가 5개의 다른 스레드에서 각기 다른 페이지 범위를 처리할 수 있다.

3-1. Thread-Safe 컬렉션 사용

1
if (results == null) results = new CopyOnWriteArrayList<>();

멀티스레딩 환경에서는 동시성 문제를 항상 염두에 두어야 한다. CopyOnWriteArrayList는 읽기 작업이 많고 쓰기 작업이 적은 경우 효율적인 Thread-Safe 컬렉션이다.

4. 데이터 쓰기 (CorporationDocumentWriter)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
@StepScope
@RequiredArgsConstructor
public class CorporationDocumentWriter implements ItemWriter<Corporation> {
    private final CorporationRepository corporationRepository;

    @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber;

    @Override
    public void write(Chunk<? extends Corporation> chunk) {
        List<Corporation> corporations = chunk.getItems()
                .stream()
                .map(u -> (Corporation) u)
                .toList();
        corporationRepository.saveAll(corporations);
    }
}

Writer는 상대적으로 단순하다. Chunk 단위로 받은 데이터를 일괄 저장한다. saveAll()을 사용하면 JPA의 배치 insert 최적화를 활용할 수 있다.

여기서도 partitionNumber를 주입받는데, 이는 로깅이나 모니터링 용도로 활용할 수 있다.

5. 스케줄 실행 (CorporationSyncScheduler)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
@RequiredArgsConstructor
public class CorporationSyncScheduler {
    private final JobLauncher jobLauncher;
    private final Job updateCorporationDocument;

    @Scheduled(cron = "0 0 3 15 4 *", zone = "Asia/Seoul")
    public void runUpdateCorporationDocument() {
        try {
            log.info("Starting UpdateCorporationDocument Batch Job");

            JobParameters jobParameters = new JobParametersBuilder()
                    .addLong("timestamp", System.currentTimeMillis())
                    .toJobParameters();
            JobExecution jobExecution = jobLauncher.run(updateCorporationDocument, jobParameters);

            log.info("UpdateCorporationDocument Batch Job Completed");
            log.info("Start: {}, End: {}, Duration: {}m",
                    jobExecution.getStartTime(),
                    jobExecution.getEndTime(),
                    Duration.between(jobExecution.getStartTime(), jobExecution.getEndTime()).toMinutes());
        } catch (Exception e) {
            log.error("UpdateCorporationDocument Batch Job failed", e);
        }
    }
}

JobParameterstimestamp를 추가해 매 실행마다 고유한 Job Instance가 생성되도록 했다.

IV. Chunk 지향 처리

Spring Batch의 핵심 개념 중 하나가 Chunk 지향 처리다.

1
2
3
4
5
6
7
Read (1000건)
  ↓
Process (1000건 변환/검증)
  ↓
Write (1000건 일괄 저장)
  ↓
Commit (트랜잭션)

Chunk Size를 1000으로 설정했다는 것은:

  • Reader가 1000건을 읽을 때까지 반복
  • 1000건이 모이면 Writer로 전달
  • Writer가 완료되면 트랜잭션 커밋

이를 통해 메모리 사용량을 제어하고, 트랜잭션 범위를 적절히 관리할 수 있다. 만약 Chunk Size가 너무 크면 메모리 부족이 발생하고, 너무 작으면 트랜잭션 오버헤드가 증가한다.

V. 성능 개선 결과

Before: 단일 스레드 순차 처리

before-result (그림 V-1) 기능 개선 전, 배치 실행 결과

1
2
3
4
총 처리 시간: 68분
평균 페이지당 처리 시간: 3.4초

1200 pages × 3.4s = 4,080초 ≈ 68분

After: 5개 파티션 병렬 처리

before-result (그림 V-2) 기능 개선 후, 배치 실행 결과

1
2
3
4
5
6
7
총 처리 시간: 14분
파티션당 페이지 수: 240 pages
평균 페이지당 처리 시간: 3.5초 (병렬 오버헤드 포함)

240 pages × 3.5s ÷ 5 threads = 840초 ≈ 14분

성능 개선율: 79% 감소 (68분 → 14분)

IX. 참고

  1. Spring Batch - Partitioning
  2. Spring Batch - Multi-threaded Step
  3. Spring Batch - Chunk-oriented Processing
  4. ThreadPoolTaskExecutor - Spring Framework
이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.