들어가며
"매일 새벽 3시에 100만 건의 CSV 데이터를 DB에 적재해야 합니다." 백엔드 개발자라면 한 번쯤은 이런 요구사항을 받아본 적이 있을 것입니다. 단순히 for문으로 insert를 돌리면 메모리 초과, 트랜잭션 타임아웃, 중간 실패 시 재처리 등 끝없는 문제가 발생합니다. 이런 대용량 데이터 처리를 안정적으로, 그리고 체계적으로 해결하기 위해 등장한 것이 바로 Spring Batch입니다.
Spring Batch는 수년간 엔터프라이즈 환경에서 검증된 배치 프레임워크로, Job/Step/Chunk라는 명확한 아키텍처 위에 재시도, 스킵, 파티셔닝 같은 견고한 기능을 제공합니다. 이 글에서는 Spring Batch 5(Spring Boot 3.x 기반)의 핵심 개념부터 100만 건 CSV 가져오기 실전 예제, 파티셔닝 병렬 처리, 스케줄링 연동까지 실무에서 바로 적용할 수 있는 내용을 다루겠습니다.
1. Spring Batch 아키텍처 이해
Spring Batch의 핵심 구조는 Job → Step → (Chunk | Tasklet)의 3계층으로 이루어져 있습니다.
| 구성 요소 | 역할 | 비유 |
|---|---|---|
| Job | 배치 작업의 최상위 단위 | 프로젝트 전체 |
| Step | Job 내의 개별 단계 | 프로젝트의 각 태스크 |
| Chunk | 읽기→처리→쓰기를 묶어서 처리 | 페이지 단위 작업 |
| Tasklet | 단순 단일 작업(파일 삭제 등) | 단건 명령 |
| ItemReader | 데이터 소스에서 읽기 | 입력 |
| ItemProcessor | 읽은 데이터 가공/필터링 | 변환 |
| ItemWriter | 가공된 데이터 저장 | 출력 |
| JobRepository | Job 실행 메타데이터 저장 | 실행 이력 DB |
| JobLauncher | Job 실행 트리거 | 시작 버튼 |
Chunk 기반 처리 흐름
Chunk 방식은 chunkSize만큼 데이터를 읽고(Read), 가공하고(Process), 한 번에 쓰는(Write) 패턴입니다. 트랜잭션은 chunk 단위로 커밋되므로, 중간에 실패해도 이전 chunk까지는 안전하게 처리됩니다.
// Chunk 처리 흐름 (chunkSize = 1000)
// 1. ItemReader가 1000건 읽기
// 2. ItemProcessor가 1000건 가공
// 3. ItemWriter가 1000건 한 번에 쓰기
// 4. 트랜잭션 커밋
// 5. 다음 1000건 반복...
2. Spring Boot + Spring Batch 설정
의존성 추가 (build.gradle)
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
runtimeOnly 'org.postgresql:postgresql'
// 테스트
testImplementation 'org.springframework.batch:spring-batch-test'
}
application.yml
spring:
batch:
job:
enabled: false # 앱 시작 시 자동 실행 방지
jdbc:
initialize-schema: always # 메타 테이블 자동 생성
datasource:
url: jdbc:postgresql://localhost:5432/batch_db
username: batch_user
password: secret
jpa:
hibernate:
ddl-auto: validate
properties:
hibernate:
jdbc:
batch_size: 1000
order_inserts: true
order_updates: true
주의: Spring Boot 3.x(Spring Batch 5)부터는 @EnableBatchProcessing을 명시하면 오히려 자동 설정이 꺼집니다. Boot의 자동 설정을 활용하려면 이 어노테이션을 붙이지 않는 것이 권장됩니다.
3. 100만 건 CSV 가져오기 - 실전 예제
가장 흔한 배치 시나리오인 CSV 파일 → DB 적재를 구현해보겠습니다.
엔티티 정의
@Entity
@Table(name = "products")
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Product {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String name;
@Column(nullable = false)
private BigDecimal price;
@Column(nullable = false)
private String category;
@Column(nullable = false)
private Integer stockQuantity;
@Column(nullable = false)
private LocalDateTime createdAt;
@Builder
public Product(String name, BigDecimal price, String category,
Integer stockQuantity) {
this.name = name;
this.price = price;
this.category = category;
this.stockQuantity = stockQuantity;
this.createdAt = LocalDateTime.now();
}
}
CSV DTO
@Getter
@Setter
@NoArgsConstructor
public class ProductCsvDto {
private String name;
private BigDecimal price;
private String category;
private Integer stockQuantity;
}
Job 설정 - CsvImportJobConfig
@Configuration
@RequiredArgsConstructor
public class CsvImportJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final EntityManagerFactory entityManagerFactory;
private static final int CHUNK_SIZE = 1000;
@Bean
public Job csvImportJob() {
return new JobBuilder("csvImportJob", jobRepository)
.start(csvImportStep())
.listener(jobExecutionListener())
.build();
}
@Bean
public Step csvImportStep() {
return new StepBuilder("csvImportStep", jobRepository)
.<ProductCsvDto, Product>chunk(CHUNK_SIZE, transactionManager)
.reader(csvItemReader(null)) // @Value로 주입
.processor(csvItemProcessor())
.writer(jpaItemWriter())
.faultTolerant()
.skipLimit(100)
.skip(FlatFileParseException.class)
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.listener(stepExecutionListener())
.build();
}
@Bean
@StepScope
public FlatFileItemReader<ProductCsvDto> csvItemReader(
@Value("#{jobParameters['filePath']}") String filePath) {
return new FlatFileItemReaderBuilder<ProductCsvDto>()
.name("productCsvReader")
.resource(new FileSystemResource(filePath))
.encoding("UTF-8")
.delimited()
.delimiter(",")
.names("name", "price", "category", "stockQuantity")
.linesToSkip(1) // 헤더 스킵
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(ProductCsvDto.class);
}})
.build();
}
@Bean
public ItemProcessor<ProductCsvDto, Product> csvItemProcessor() {
return dto -> {
// 가격이 0 이하인 상품은 필터링 (null 반환 시 스킵)
if (dto.getPrice().compareTo(BigDecimal.ZERO) <= 0) {
return null;
}
return Product.builder()
.name(dto.getName().trim())
.price(dto.getPrice())
.category(dto.getCategory().trim())
.stockQuantity(dto.getStockQuantity())
.build();
};
}
@Bean
public JpaItemWriter<Product> jpaItemWriter() {
JpaItemWriter<Product> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory);
return writer;
}
@Bean
public JobExecutionListener jobExecutionListener() {
return new JobExecutionListener() {
private long startTime;
@Override
public void beforeJob(JobExecution jobExecution) {
startTime = System.currentTimeMillis();
log.info("=== CSV Import Job 시작 ===");
}
@Override
public void afterJob(JobExecution jobExecution) {
long elapsed = System.currentTimeMillis() - startTime;
log.info("=== CSV Import Job 완료 ===");
log.info("Status: {}", jobExecution.getStatus());
log.info("소요 시간: {}ms", elapsed);
}
};
}
}
4. JobParameter와 Job 실행
Spring Batch에서 Job을 실행할 때 JobParameter를 통해 외부에서 값을 전달할 수 있습니다. 같은 JobParameter로는 동일 Job을 재실행할 수 없으므로, 고유값(타임스탬프 등)을 포함하는 것이 일반적입니다.
@RestController
@RequestMapping("/api/batch")
@RequiredArgsConstructor
public class BatchController {
private final JobLauncher jobLauncher;
private final Job csvImportJob;
@PostMapping("/csv-import")
public ResponseEntity<String> runCsvImport(
@RequestParam String filePath) throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("filePath", filePath)
.addLong("timestamp", System.currentTimeMillis()) // 고유값
.toJobParameters();
JobExecution execution = jobLauncher.run(csvImportJob, params);
return ResponseEntity.ok(
"Job started - ID: " + execution.getId() +
", Status: " + execution.getStatus());
}
}
// 비동기 실행이 필요한 경우
@Bean
public JobLauncher asyncJobLauncher(JobRepository jobRepository) {
TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher();
launcher.setJobRepository(jobRepository);
launcher.setTaskExecutor(new SimpleAsyncTaskExecutor());
return launcher;
}
5. 파티셔닝으로 병렬 처리
100만 건을 단일 스레드로 처리하면 시간이 오래 걸립니다. 파티셔닝(Partitioning)을 활용하면 데이터를 분할하여 병렬로 처리할 수 있습니다.
Partitioner 구현
public class RangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public RangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Long min = jdbcTemplate.queryForObject(
"SELECT MIN(id) FROM products", Long.class);
Long max = jdbcTemplate.queryForObject(
"SELECT MAX(id) FROM products", Long.class);
if (min == null || max == null) {
return Collections.emptyMap();
}
long range = (max - min) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
long start = min + (i * range);
long end = Math.min(start + range - 1, max);
context.putLong("minId", start);
context.putLong("maxId", end);
partitions.put("partition" + i, context);
}
return partitions;
}
}
파티셔닝 Job 설정
@Bean
public Job partitionedJob() {
return new JobBuilder("partitionedJob", jobRepository)
.start(partitionedStep())
.build();
}
@Bean
public Step partitionedStep() {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", rangePartitioner())
.step(workerStep())
.gridSize(8) // 8개 파티션으로 분할
.taskExecutor(batchTaskExecutor())
.build();
}
@Bean
public Step workerStep() {
return new StepBuilder("workerStep", jobRepository)
.<Product, Product>chunk(500, transactionManager)
.reader(partitionedReader(null, null))
.processor(productProcessor())
.writer(productWriter())
.build();
}
@Bean
@StepScope
public JpaPagingItemReader<Product> partitionedReader(
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
Map<String, Object> params = new HashMap<>();
params.put("minId", minId);
params.put("maxId", maxId);
return new JpaPagingItemReaderBuilder<Product>()
.name("partitionedProductReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT p FROM Product p WHERE p.id BETWEEN :minId AND :maxId ORDER BY p.id")
.parameterValues(params)
.pageSize(500)
.build();
}
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("batch-worker-");
executor.initialize();
return executor;
}
파티셔닝 성능 비교
| 처리 방식 | 100만 건 소요 시간 | 메모리 사용량 |
|---|---|---|
| 단일 스레드 (chunkSize=1000) | 약 12분 | 약 512MB |
| 파티셔닝 4스레드 | 약 4분 | 약 768MB |
| 파티셔닝 8스레드 | 약 2분 30초 | 약 1GB |
파티셔닝 스레드 수는 DB 커넥션 풀 크기와 맞춰야 합니다. 스레드가 커넥션 풀보다 많으면 대기가 발생하여 오히려 성능이 저하됩니다.
6. 재시도(Retry)와 스킵(Skip) 전략
실무에서는 네트워크 오류, 일시적 DB 락, 잘못된 데이터 등 다양한 예외가 발생합니다. Spring Batch는 이를 우아하게 처리하는 메커니즘을 제공합니다.
@Bean
public Step faultTolerantStep() {
return new StepBuilder("faultTolerantStep", jobRepository)
.<ProductCsvDto, Product>chunk(CHUNK_SIZE, transactionManager)
.reader(csvItemReader(null))
.processor(csvItemProcessor())
.writer(jpaItemWriter())
.faultTolerant()
// Skip 설정: 파싱 오류는 최대 100건까지 스킵
.skipLimit(100)
.skip(FlatFileParseException.class)
.skip(NumberFormatException.class)
.noSkip(DataIntegrityViolationException.class) // 무결성 오류는 스킵 불가
// Retry 설정: 데드락은 최대 3회 재시도
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.retry(OptimisticLockingFailureException.class)
.noRetry(NullPointerException.class) // NPE는 재시도 불가
// Skip된 항목 로깅
.listener(new SkipListener<ProductCsvDto, Product>() {
@Override
public void onSkipInRead(Throwable t) {
log.warn("읽기 중 스킵: {}", t.getMessage());
}
@Override
public void onSkipInProcess(ProductCsvDto item, Throwable t) {
log.warn("처리 중 스킵 - item: {}, error: {}",
item.getName(), t.getMessage());
}
@Override
public void onSkipInWrite(Product item, Throwable t) {
log.warn("쓰기 중 스킵 - item: {}, error: {}",
item.getName(), t.getMessage());
}
})
.build();
}
7. Tasklet 활용 - 단순 작업
파일 정리, 임시 테이블 삭제 등 단순 작업은 Tasklet이 적합합니다.
@Bean
public Step cleanupStep() {
return new StepBuilder("cleanupStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
// 30일 이상 된 임시 파일 삭제
Path tempDir = Path.of("/data/batch/temp");
LocalDateTime threshold = LocalDateTime.now().minusDays(30);
try (var files = Files.list(tempDir)) {
files.filter(path -> {
try {
FileTime lastModified = Files.getLastModifiedTime(path);
return lastModified.toInstant()
.isBefore(threshold.toInstant(ZoneOffset.UTC));
} catch (IOException e) {
return false;
}
})
.forEach(path -> {
try {
Files.deleteIfExists(path);
log.info("삭제: {}", path);
} catch (IOException e) {
log.error("삭제 실패: {}", path, e);
}
});
}
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
// 여러 Step을 조합한 Job
@Bean
public Job fullImportJob() {
return new JobBuilder("fullImportJob", jobRepository)
.start(csvImportStep())
.next(indexRebuildStep())
.next(cleanupStep())
.build();
}
8. 스케줄링 - @Scheduled + Quartz
배치 Job을 자동으로 실행하려면 스케줄러가 필요합니다.
@Scheduled를 이용한 간단한 스케줄링
@Component
@RequiredArgsConstructor
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final Job csvImportJob;
// 매일 새벽 3시 실행
@Scheduled(cron = "0 0 3 * * *")
public void runCsvImportJob() {
try {
JobParameters params = new JobParametersBuilder()
.addString("filePath", "/data/daily/products.csv")
.addLocalDateTime("runDate", LocalDateTime.now())
.toJobParameters();
JobExecution execution = jobLauncher.run(csvImportJob, params);
log.info("Scheduled job completed - Status: {}",
execution.getStatus());
} catch (Exception e) {
log.error("Scheduled job failed", e);
// 알림 발송 (Slack, Email 등)
}
}
}
Quartz를 이용한 고급 스케줄링
// build.gradle
implementation 'org.springframework.boot:spring-boot-starter-quartz'
// QuartzJobBean 구현
public class BatchQuartzJob extends QuartzJobBean {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job csvImportJob;
@Override
protected void executeInternal(JobExecutionContext context) {
try {
String filePath = context.getMergedJobDataMap()
.getString("filePath");
JobParameters params = new JobParametersBuilder()
.addString("filePath", filePath)
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(csvImportJob, params);
} catch (Exception e) {
log.error("Quartz batch job failed", e);
}
}
}
// Quartz 설정
@Configuration
public class QuartzConfig {
@Bean
public JobDetail csvImportJobDetail() {
return JobBuilder.newJob(BatchQuartzJob.class)
.withIdentity("csvImportJob")
.usingJobData("filePath", "/data/daily/products.csv")
.storeDurably()
.build();
}
@Bean
public Trigger csvImportTrigger() {
return TriggerBuilder.newTrigger()
.forJob(csvImportJobDetail())
.withIdentity("csvImportTrigger")
.withSchedule(CronScheduleBuilder
.cronSchedule("0 0 3 * * ?") // 매일 새벽 3시
.withMisfireHandlingInstructionFireAndProceed())
.build();
}
}
9. 실무 주의사항과 Best Practices
9-1. JpaPagingItemReader vs JpaCursorItemReader
| 항목 | JpaPagingItemReader | JpaCursorItemReader |
|---|---|---|
| 동작 방식 | 페이지 단위 SELECT | 커서로 한 건씩 읽기 |
| 메모리 | 페이지 크기만큼만 사용 | 커서 유지 비용 |
| 멀티 스레드 | 안전 (각 페이지 독립) | 단일 스레드에서만 안전 |
| 데이터 변경 중 읽기 | 페이지 누락 가능성 있음 | 스냅샷 일관성 유지 |
| 권장 상황 | 병렬 처리, 대용량 | 순서 보장, 일관성 중요 |
9-2. 페이징 리더의 데이터 누락 문제
// 문제: 처리 중 데이터 상태가 바뀌면 페이지가 밀림
// 예) status = 'PENDING' 인 데이터를 읽어서 'DONE'으로 변경
// → 2페이지를 읽을 때 1페이지에 있던 DONE 데이터가 빠지면서 밀림
// 해결 1: ID 기반 정렬 + WHERE id > :lastId
@Bean
@StepScope
public JdbcPagingItemReader<Product> safeReader() {
return new JdbcPagingItemReaderBuilder<Product>()
.name("safeReader")
.dataSource(dataSource)
.selectClause("SELECT *")
.fromClause("FROM products")
.whereClause("WHERE status = 'PENDING'")
.sortKeys(Map.of("id", Order.ASCENDING)) // ID 기준 정렬 필수
.pageSize(1000)
.rowMapper(productRowMapper())
.build();
}
// 해결 2: JpaCursorItemReader 사용 (단일 스레드)
@Bean
@StepScope
public JpaCursorItemReader<Product> cursorReader() {
return new JpaCursorItemReaderBuilder<Product>()
.name("cursorReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT p FROM Product p WHERE p.status = 'PENDING' ORDER BY p.id")
.build();
}
9-3. 운영 체크리스트
- 메타 테이블 관리:
BATCH_JOB_EXECUTION,BATCH_STEP_EXECUTION등 메타 테이블이 무한히 증가합니다. 주기적으로 오래된 이력을 정리하세요. - 멱등성 보장: 같은 데이터로 Job을 재실행해도 결과가 동일해야 합니다. UPSERT나 임시 테이블 → 스왑 전략을 활용하세요.
- 모니터링: Job 실행 상태, 소요 시간, 읽기/쓰기/스킵 건수를 Prometheus + Grafana로 모니터링하세요.
- 트랜잭션 크기: chunk 크기가 너무 크면 롤백 시 부담이 커지고, 너무 작으면 커밋 오버헤드가 증가합니다. 500~2000 사이가 적당합니다.
- 메모리 관리: ItemReader가 전체 데이터를 메모리에 올리지 않도록 반드시 페이징이나 커서 방식을 사용하세요.
- 실패 시 재시작: Spring Batch는 Job이 FAILED 상태면 마지막 실패 지점부터 재시작합니다. COMPLETED 상태의 Job은 재실행되지 않으므로, 강제 재실행이 필요하면 새로운 JobParameter를 전달하세요.
10. 테스트 코드
@SpringBatchTest
@SpringBootTest
class CsvImportJobTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private ProductRepository productRepository;
@BeforeEach
void setUp() {
jobRepositoryTestUtils.removeJobExecutions();
productRepository.deleteAll();
}
@Test
void CSV_가져오기_Job이_정상_완료된다() throws Exception {
// given
JobParameters params = new JobParametersBuilder()
.addString("filePath", "src/test/resources/test-products.csv")
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
// when
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(productRepository.count()).isEqualTo(5); // 테스트 CSV 5건
}
@Test
void CSV_파싱_오류는_스킵하고_나머지를_처리한다() throws Exception {
// given - CSV에 잘못된 행 포함
JobParameters params = new JobParametersBuilder()
.addString("filePath", "src/test/resources/test-products-with-errors.csv")
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
// when
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
StepExecution stepExecution = execution.getStepExecutions()
.iterator().next();
assertThat(stepExecution.getSkipCount()).isGreaterThan(0);
assertThat(stepExecution.getWriteCount()).isEqualTo(3); // 5건 중 2건 스킵
}
@Test
void 개별_Step만_테스트할_수_있다() {
// given
JobParameters params = new JobParametersBuilder()
.addString("filePath", "src/test/resources/test-products.csv")
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
// when
JobExecution execution = jobLauncherTestUtils
.launchStep("csvImportStep", params);
// then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
}
}
마치며
Spring Batch는 단순한 배치 도구가 아니라, 대용량 데이터 처리를 위한 완전한 프레임워크입니다. 이 글에서 다룬 핵심 내용을 정리하겠습니다.
- Chunk 기반 처리는 ItemReader → ItemProcessor → ItemWriter의 명확한 파이프라인으로 대용량 데이터를 안정적으로 처리합니다.
- 파티셔닝을 활용하면 데이터를 분할하여 병렬 처리할 수 있으며, 100만 건 기준으로 약 4~5배 성능 향상을 기대할 수 있습니다.
- 재시도/스킵 전략으로 일시적 오류나 잘못된 데이터를 우아하게 처리하여 배치 Job의 안정성을 높입니다.
- 스케줄링은 단순한 경우 @Scheduled, 복잡한 경우 Quartz를 사용하세요.
- Spring Batch 5에서는 @EnableBatchProcessing 제거, JobBuilder/StepBuilder 생성자 변경 등 주의사항이 있습니다.
배치 처리는 사용자에게 직접 보이지 않지만, 서비스의 안정성을 좌우하는 핵심 인프라입니다. 이 글이 여러분의 배치 시스템 구축에 도움이 되길 바랍니다.
'Spring Boot' 카테고리의 다른 글
| Spring Boot Actuator 완벽 활용 - 헬스체크부터 커스텀 메트릭까지 (0) | 2026.04.10 |
|---|---|
| Spring Boot 4 마이그레이션 가이드 - Virtual Threads와 Spring AI 통합 (1) | 2026.04.02 |
| Spring WebFlux 입문 - 리액티브 프로그래밍의 모든 것 (0) | 2026.04.02 |
| Spring Security 6 실전 가이드 - JWT + OAuth2 인증/인가 완벽 정리 (0) | 2026.04.02 |
| Spring AI 2.0과 Spring Boot 4 - AI 퍼스트 Java 개발의 시작 (0) | 2026.04.01 |