본문 바로가기

Spring

Spring Boot Batch 적용하기 - 1

1. 스프링 배치 (Spring Batch)

 

스프링 배치(Spring Batch)는 대용량 데이터를 처리하기 위한 프레임워크로 스프링 기반에서 동작합니다.

대량의 데이터를 효율적으로 처리하기위 해다양한 기능을 제공해줍니다. 

 

  • 로깅 및 추적
  • 작업 처리 통계
  • 작업 재시작
  • 건너뛰기/리소스 관리
  • 트랜잭션 관리

위 와같이 대용량 데이터 처리를 안정적으로 처리를 제공해줍니다.

spring batch

스프링 배치(Spring Batch) 위와 같은 아키텍쳐 구성을 가지고 있습니다.

 

※ Batch 와 Scheduler 차이

배치(Batch)는 논리적 또는 물리적으로 구성된 데이터를 그룹화하여 일괄 처리하는 방법을 의미합니다. 스케줄러(Scheduler)는 주어진 작업을 정의된 시간에 수행할 수 있게 도와주는 도구를 의미합니다.

 

배치는 대량의 데이터를 일괄적으로 처리를 할 뿐, 특정 주기마다 자동으로 돌아가는 스케줄링과는 관련이 없습니다.

스프링 배치는 스케줄러와 함께 사용 할 수 있도록 설계 되었으며 Quartz, Spring Scheduler, Jenkins등 전용 Scheduler를 사용하여야 합니다.

 

2. 스프링 배치 (Spring Batch) 용어

 

Job

 Job은 배치 처리 과정을 추상화한 인터페이스로 하나 또는 그 이상의 Step을 포함하여 스프링 배치 계증에서  가장 상위에 위치합니다. 개별적인 고유 이름을 가지고 있으며 실행시 필요한 파라미터와 함께 JobInstance를 구별하는데 사용됩니다.

 

JobInstance

JobInstance는 Job의 실행의 단위를 말합니다. Job을 실행시키게 되면 하나의 JobInstance가 생성되며 JobInstance가 실패하여 다시 실행을 시키더라도 실패에 대한 JobInstance 데이터만 처리하게 됩니다.

 

JobParameters

JobInstance는 JobParameters로 객체를 구별하게 됩니다. JobParameters 구별자 이외에도 매개변수 전달 역할로도 사용됩니다.

데이터형으로는 String, Double, Long, Date 4가지 형식만을 지원합니다.

 

JobExecution

JobExecution은  JobInstance에 대한 실행의 시도를 나타내는 객체입니다. 동일한 JobInstance에 대한  시도마다 개별적인 JobExecution이 생성됩니다. JobExecution은 실행에 대한 상태, 시작시작, 종료시간, 생성시간 등의 정보를 담고 있습니다.

 

 

Step

step은 Job의 하위 단계로 실제 배치 처리가 되는 작업의 단위를 말합니다. 한개 이상의 Step으로 Job은 구성되어 있으며 순차적으로 처리됩니다. Step은 두가지의 방식을 제공합니다.

  • Chunk : ItemReader, ItemProcessor, ItemWriter을 이용하여 처리
  • Tasklet : 하나의 작업을 처리

 

StepExecution

StepExecution은 Step의 한번의 실행의 시도를 나타내는 객체입니다. Step의 실행에 대한 상태, 시간 등의 정보를 포함하고 있으며 읽은 데이터수, 쓴 아이템의수, 커밋 수, Skip 데이터 수 등의 상세 정보도 포함하고 있습니다.

 

ExecutionContext

ExecutionContext는 Job/Step 실행도중 데이터를 공유하는데 사용되는 저장소입니다. 스프링 배치는 JobExecutionContext, StepExecutionContext 두가지 종류의 컨텍스를 제공하며 다른 범위를 가지고 있습니다. JobExecutionContext는 commit 시점에 저장되는 반면 StepExecutionContext는 실행 사이에 저장이 됩니다. ExecutionContext를 통하여 스탭 간 데이터 공유가 가능하며 Job 실패시 ExecutionContext를 통한 마지막 실행 값을 재구성 할 수 있습니다.

 

JobRepository

JobRepository는 모든 배치 처리에 대한 정보를 담고있는 매커니즘입니다. Job의 실행정보(JobExecution), Step의 실행정보(StepExecution), Job 파라미터 등을 저장하고 관리합니다. Job이 실행이 되면 JobRepository는 JobExecution과 StepExecution을 생성하게 되며 Execution 정보들을 저장하고 조회합니다.

 

JobLauncher

JobLauncher는 Job과 JobParameters를 받아 Job을 실행하는 역할을 합니다.

 

ItemReader

itemReader는 배치 작업에 처리할 데이터를 읽어오는 역할을 하며, 여러 형식의 데이터 소스(데이터베이스, 파일, 메세지 큐 등) 부터 데이터를 읽어 오는 다양한 구현체가 제공됩니다.

 

ItemProcessor

itemProcessor는 itemReader로부터 읽어온 아이템을 처리하는 역할을 합니다. 필요에 따라 사용할 수 있으며 데이터 필터링 변환 등의 작업을 수행할 수 있습니다.

 

ItemWriter

itemWriter는 처리된 데이터를 최종적으로 기록하는 역할을 합니다. 다양한 구현체를 통하여 데이터베이스 기록, 파일 생성, 메세지 발행 등 다양한 방식의 데이터를 사용할 수 있습니다.

 

Tasklet

tasklet은 간단한 단일 작업을 수행 할 때 사용 됩니다. (리소스 정리, 시스템 상태 체크 등)

 

JobOperator

JobOperator는 외부 인터페이스로 Job의 실행과 중지, 재시작 등의  배치 작업 흐름 제어를 담당합니다. JobOperator를 통해 JobLauncher와 JobRepository에 대한 직접적인 접근 없이도 배치 작업을 수행하고 상태를 조회 할 수 있습니다.

 

JobExplorer

JobExplorer는 Job의 실행 이력을 조회하는데 사용됩니다. JobRepository에서 제공하는 정보와 유사하지만, JobRepository는 주로 Job의 실행 도중인 상태에 대해 업데이트하고 관리하는 반면, JobExplorer는 주로 읽기 전용 접근에 초점을 맞추고 있습니다.

 

3. 스프링 배치 (Spring Batch) 메타 테이블

스프링 배치는 배치 작업의 상태를 관리하기 위한 메타 데이터를 저장하는 아래 6개의 테이블들을 자동으로 생성합니다. 또한 배치 작업을 수행하면 자동으로 생성된 테이블들의 컬럼 값들이 채워집니다.

spring batch table

 

1. BATCH_JOB_INSTANCE

BATCH_JOB_INSTANCE 테이블은 JobInstance와 관련된 모든 정보를 가지며, 전체 계층 구조의 최상위 역할을 합니다.

 

2. BATCH_JOB_EXECUTION

BATCH_JOB_EXECUTION 테이블은 JobExecution와 관련된 모든 정보가 들어있습니다. JobExcution은 JobInstance가 실행될 때마다 시작시간, 종료시간, 종료코드 등 다양한 정보를 가지고 있습니다.

 

3. BATCH_JOB_EXECUTION_PARAMS

BATCH_JOB_EXECUTION_PARAMS 테이블은 JobParameters와 관련된 모든 정보가 들어있습니다.

 

4. BATCH_JOB_EXECUTION_CONTEXT

BATCH_JOB_EXECUTION_CONTEXT 테이블은 작업의 실행 컨텍스트와 관련된 모든 정보가 들어있습니다. 각 JobExecution마다 정확히 하나의 JobExecutionContext가 있습니다. 이 ExecutionContext 데이터는 일반적으로 JobInstance가 실패 시 중단된 위치에서 다시 시작할 수 있는 정보를 저장하고 있습니다.

 

5. BATCH_STEP_EXECUTION

BATCH_STEP_EXECUTION 테이블은 StepExecution 객체와 관련된 모든 정보가 저장됩니다. BATCH_JOB_EXECUTION 테이블과 유사하며, 생성된 각 JobExecution에 대해 항상 단계당 하나 이상의 항목이 존재합니다. STEP의 EXECUTION 정보인 읽은 수, 커밋 수, 스킵 수 등 다양한 정보를 추가로 담고 있습니다.

 

6. BATCH_STEP_EXECUTION_CONTEXT

BATCH_STEP_EXECUTION_CONTEXT 테이블은 StepExecutionContext와 관련된 모든 정보가 저장되며, 스텝 실행당 정확히 하나의 ExecutionContext가 있으며, 특정 스텝 실행을 위해 유지되어야 하는 모든 데이터가 포함되어 있습니다. 이 ExecutionContext 데이터는 일반적으로 JobInstance가 실패 시 중단된 위치에서 다시 시작할 수 있는 정보를 저장하고 있습니다.

 

4. 스프링 배치 (Spring Batch) 실전 연습

Spring Batch 5버전으로 간단한 Job을 생성해보았습니다. 기존에 4버전과는 약간의 차이가 존재하니 버전에 맞게 설정해주시면 됩니다.

 

마리아 디비를 사용하였으며 sequence 전략이 다르므로 테이블로 처리가 됩니다.

spring batch table

 

데이터베이스를 다중 구성을 할 예정이어서 실제 서비스를 사용하는 도메인과 배치 두가지 형태의 데이터 소스로 구성을 하였습니다.

package com.example.batch.config;

import com.zaxxer.hikari.HikariDataSource;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.Objects;

@EnableBatchProcessing(
        dataSourceRef = "batchDataSource",
        transactionManagerRef = "batchTransactionManager"
)
@Configuration

public class BatchDataSourceConfig {

    @Primary
    @Bean("batchDataSource")
    @ConfigurationProperties(prefix = "spring.batch-datasource")
    public DataSource batchDataSource() {
        return DataSourceBuilder.create()
                .type(HikariDataSource.class)
                .build();
    }

    @Primary
    @Bean("batchEntityManagerFactory")
    public LocalContainerEntityManagerFactoryBean entityManagerFactory(EntityManagerFactoryBuilder builder, @Qualifier("batchDataSource") DataSource dataSource) {
        return builder
                .dataSource(dataSource)
                .packages("com.example.batch.entity.batch")
                .persistenceUnit("batchPersistenceUnit")
                .build();
    }

    @Primary
    @Bean(name = "batchTransactionManager")
    public PlatformTransactionManager batchTransactionManager(
            @Qualifier("batchEntityManagerFactory") LocalContainerEntityManagerFactoryBean entityManagerFactoryBean) {
        JpaTransactionManager transactionManager = new JpaTransactionManager(Objects.requireNonNull(entityManagerFactoryBean.getObject()));
        transactionManager.setNestedTransactionAllowed(true);
        return transactionManager;
    }

}

 

 

 

스프링 배치 5버전 경우 JobRepository, StepRespository를 사용하지 않고 실제 빌더 객체를 생성하며 처리해야합니다. 또한 트랜잭션 매니저도 함께 설중해주면됩니다.

 

JobExecutionListener와 StepExecutionListener을 설정해주었으며 tasklet, chunk 두가지 방식으로 간단하게 구성되었습니다. 아래 코드 동작하였을때 1회 동작후 두번째부터는 chunk가 제대로 동작하지 않았는데 이는 ListItemReader가 메모리상에서 null로 설정된다고 확인하여 @StepScope를 주어 주기를 설정해주었습니다.

package com.example.batch.jobs;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.Arrays;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MyJobConfig {

    private final JobExecutionListener jobExecutionListener;

    private final StepExecutionListener stepExecutionListener;

    @Bean
    public Job myJob(JobRepository jobRepository, PlatformTransactionManager batchTransactionManager) {
        return new JobBuilder("myJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .listener(jobExecutionListener)
                .start(myTaskletStep(jobRepository, batchTransactionManager))
                .next(myChunkStep(jobRepository, batchTransactionManager))
                .build();
    }

    @Bean
    @JobScope
    public Step myTaskletStep(JobRepository jobRepository, PlatformTransactionManager batchTransactionManager) {
        return new StepBuilder("myTaskletStep", jobRepository)
                .listener(stepExecutionListener)
                .tasklet(((contribution, chunkContext) -> {
                    log.info("myTaskletStep started");
                    return RepeatStatus.FINISHED;
                }), batchTransactionManager).build();
    }

    @Bean
    @JobScope
    public Step myChunkStep(JobRepository jobRepository, PlatformTransactionManager batchTransactionManager) {
        return new StepBuilder("myChunkStep", jobRepository)
                .listener(stepExecutionListener)
                .<String, String >chunk(2, batchTransactionManager)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public ItemReader<String> reader() {
        return new ListItemReader<>(Arrays.asList("item1", "item2", "item3"));
    }

    @Bean
    @StepScope
    public ItemProcessor<String, String> processor() {
        return item -> {
            log.info("processor started");
            item = item.toUpperCase();
            return item;
        };
    }

    @Bean
    @StepScope
    public ItemWriter<? super Object> writer() {
        return items -> {
            log.info("writer started");
            items.forEach(e -> log.info(e.toString()));
        };
    }

}

 

 

JobLauncher를 이용하여 간단하게 수행해보았습니다.

package com.example.batch.api;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j
@RestController
@RequiredArgsConstructor
public class JobController {

    private final JobLauncher jobLauncher;
    private final Job myJob;


    @GetMapping("/job/run")
    public void runJob() throws Exception {
        log.info("Job started");
        jobLauncher.run(myJob, new JobParametersBuilder()
                .addDate("startTime", new Date())  // 고유한 파라미터
                .toJobParameters());
    }
}

 

Job이 정상적으로 수행되고 chunk 단위로 쪼개어서 처리되는것을 확인 할 수 있었습니다. 기본적인 배치 구조를 작성해보았으며 다음 포스팅에서는 데이터베이스 데이터를 이용하여 처리 후 장애 상황까지 재연해보겠습니다.