| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | ||||
| 4 | 5 | 6 | 7 | 8 | 9 | 10 |
| 11 | 12 | 13 | 14 | 15 | 16 | 17 |
| 18 | 19 | 20 | 21 | 22 | 23 | 24 |
| 25 | 26 | 27 | 28 | 29 | 30 | 31 |
- 1차원 DP
- 2차원 dp
- 99클럽
- @BeforeAll
- @BeforeEach
- @Builder
- @Entity
- @GeneratedValue
- @GenericGenerator
- @NoargsConstructor
- @Query
- @Table
- @Transactional
- Actions
- Amazon EFS
- amazon fsx
- Android Studio
- ANSI SQL
- api gateway 설계
- api gateway 필터
- ApplicationEvent
- argocd
- assertThat
- async/await
- AVG
- AWS
- aws autoscaling
- aws eks
- aws iam role
- AWS KMS
- Today
- Total
기록
[Kafka × Elasticsearch 기반 상품 검색 시스템] 3. 상품 색인 본문
상품 검색 엔진의 핵심은 단순한 데이터 저장이 아니라, “검색 가능한 데이터로 변환하는 것”이다.
API 서버가 Kafka를 통해 발행한 상품 등록 이벤트는 Query 서버(Spring WebFlux)를 거쳐 Elasticsearch로 전달되며, 이 과정을 담당하는 것이 바로 상품 색인기(Product Indexer) 이다. 색인기는 데이터를 읽고, 가공하고, 분석기를 적용해 검색 효율이 높은 형태로 재구성한다.
1. Query 서버 구조
Query 서버는 Spring WebFlux 기반으로 구현되어 있으며, Kafka Listener와 Reactive Elasticsearch를 결합해 완전 비동기 방식으로 동작한다.
색인기는 등록 요청이 몰려와도 스레드를 점유하지 않으며, Reactor 기반 스트림으로 I/O를 처리한다. 즉, 단일 인스턴스로도 수천 건의 이벤트를 동시 처리할 수 있는 구조이다.

| 구성요소 | 역할 |
| ProductRegisterListener | Kafka에서 상품 등록 이벤트를 구독하고 처리 흐름을 시작한다. |
| ProductService(upsert) | 수신된 데이터를 Elasticsearch에 저장하거나 갱신한다. |
| ProductEntity | Elasticsearch 도큐먼트 매핑 객체이다. |
| ProductEsReactiveRepository | ReactiveElasticsearchRepository를 상속받은 저장소 인터페이스이다. |
| products-settings.json | Nori 분석기 기반 인덱스 설정 파일이다. |
이 서버는 API 서버와 완전히 분리되어 있으며, Elasticsearch 장애나 부하가 API 응답에 영향을 주지 않으며, 이것이 CQRS의 장점이다.
2. Kafka Listener: ProductRegisterListener
Kafka 이벤트를 구독하는 리스너는 시스템의 “입력 관문”이다. 이 리스너는 product-register 토픽의 메시지를 받아 Reactive 체인을 통해 색인 서비스로 전달한다.
@Component
@RequiredArgsConstructor
@Slf4j
public class ProductRegisterListener {
private final ProductService productService;
@KafkaListener(topics = Topic.PRODUCT_REGISTER, groupId = "product-indexer")
public void onMessage(ProductRegisterRequest request) {
log.info("Received product register event: {}", request);
productService.upsert(request).subscribe();
}
}
리스너는 소비자 그룹 "product-indexer"에 속하며, Kafka의 파티션 리밸런싱에 자동으로 대응한다. 즉, 동일 그룹 내 인스턴스가 여러 개 있을 경우 Kafka가 파티션을 분배해 병렬 처리하게 된다. subscribe()를 호출해야 실제 Reactor 체인이 실행되므로, 리액티브 환경에서 반드시 명시해야 한다. 이 Listener는 데이터 소비만 담당하며, 실패 시 API 서버로 응답을 보내거나 재시도하지 않는다. Kafka는 자체적으로 오프셋 커밋(Consumer Offset) 기반으로 메시지 처리를 추적하므로, 장애가 발생해도 중복 없이 다시 처리할 수 있다. 이 덕분에 색인기는 “한 번만 처리해야 하는” 멱등성 요구를 쉽게 충족한다.
3. 색인 처리 서비스: ProductService.upsert()
색인기의 실제 로직은 서비스 계층에 있다. 이 계층은 수신된 DTO를 ProductEntity로 변환하고, Elasticsearch에 저장한다.
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductService {
private final ProductEsReactiveRepository productRepository;
public Mono<Void> upsert(ProductRegisterRequest request) {
ProductEntity entity = ProductEntity.builder()
.id(request.getProductId())
.name(request.getName())
.description(request.getDescription())
.category(request.getCategory())
.price(request.getPrice())
.updatedAt(request.getEventTimestamp())
.build();
return productRepository.save(entity)
.doOnSuccess(saved -> log.info("Indexed product id={} into Elasticsearch", saved.getId()))
.doOnError(ex -> log.error("Failed to index product id={}", request.getProductId(), ex))
.then();
}
}
\
save() 메서드는 동일한 ID의 문서가 존재하면 업데이트, 없으면 삽입한다. 즉, 이 구조는 Elasticsearch의 upsert 패턴을 그대로 따른다.
비동기로 동작하므로 수천 개의 색인 요청이 들어와도 스레드가 점유되지 않으며, Reactor의 이벤트 루프 기반으로 병렬 처리된다. .then()을 붙인 이유는 반환값을 Mono<Void>로 바꾸어, 상위 계층에서 불필요한 반환 데이터를 신경 쓰지 않게 하기 위함이다. 이 메서드의 진짜 의미는 “색인 작업이 비즈니스 로직의 일부가 아니라 후속 비동기 파이프라인으로 처리된다”는 점이다. 따라서, API 서버는 Kafka로 이벤트를 던지는 순간 책임을 끝내고, 이후 데이터 일관성과 검색 최신성을 이 서비스가 담당한다.
4. Reactive Repository: ProductEsReactiveRepository
Spring Data Elasticsearch는 Reactive 버전을 제공한다. 이는 내부적으로 Netty 기반의 비동기 HTTP 클라이언트를 사용하며, 스레드 블로킹 없이 Elasticsearch REST API를 호출한다.
@Repository
public interface ProductEsReactiveRepository extends ReactiveElasticsearchRepository<ProductEntity, Long> {
}
이 저장소는 JPA처럼 보이지만, 완전히 비동기적이다. save(), findById(), search() 모두 Mono나 Flux로 반환된다. Elasticsearch 연결이 일시적으로 지연되더라도 WebFlux의 논블로킹 모델 덕분에 처리량이 유지된다.
5. Elasticsearch 도큐먼트 매핑: ProductEntity
상품 문서의 인덱스 매핑을 정의한 클래스이다. Elasticsearch는 스키마가 자유로운 NoSQL 계열이지만, 명시적인 매핑을 지정해야 검색 품질이 향상된다.
@Document(indexName = "products")
@Setting(settingPath = "/elasticsearch/products-settings.json")
@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ProductEntity {
@Id
private Long id;
@Field(type = FieldType.Text, analyzer = "nori_korean", searchAnalyzer = "nori_korean")
private String name;
@Field(type = FieldType.Text, analyzer = "nori_korean", searchAnalyzer = "nori_korean")
private String description;
@Field(type = FieldType.Keyword)
private String category;
@Field(type = FieldType.Double)
private Double price;
@Field(type = FieldType.Long)
private Long updatedAt;
}
@Setting은 인덱스 생성 시 Nori 분석기 설정을 자동 적용한다. 상품명(name)과 설명(description)은 nori_korean 분석기를 사용해 한국어 형태소 단위로 토큰화된다. category는 Keyword 타입으로 지정되어, 정확한 일치 검색에 사용된다. updatedAt은 Kafka 이벤트 순서를 보장하기 위한 필드이며, 동일 ID 문서의 중복 이벤트 중 가장 최신 것만 반영하도록 비교 기준으로 사용된다.
6. 한국어 분석기: products-settings.json
검색 품질을 결정하는 가장 중요한 요소 중 하나가 분석기(Analyzer) 이다. Elasticsearch는 단순히 문자열을 저장하는 것이 아니라, 분석기를 통해 문장을 “토큰(token)” 단위로 나누어 색인한다. 이 프로젝트에서는 한국어 형태소 분석기인 Nori를 적용하였다.
{
"analysis": {
"tokenizer": {
"nori_tokenizer_custom": {
"type": "nori_tokenizer",
"decompound_mode": "mixed"
}
},
"analyzer": {
"nori_korean": {
"type": "custom",
"tokenizer": "nori_tokenizer_custom",
"filter": ["lowercase"]
}
}
}
}
decompound_mode: mixed는 복합명사를 “전체 단어”와 “분해된 단어”를 모두 색인하는 모드이다. 예를 들어 “무선마우스”는 ["무선마우스", "무선", "마우스"]로 색인되어, “무선” 또는 “마우스”로 검색해도 결과가 나온다. lowercase 필터는 대소문자 구분을 제거해 “Logitech”과 “logitech”을 동일하게 처리한다. 이러한 세부 설정은 한국어 상품명 검색에서 오타·복합명사 문제를 줄이고, 자연어 검색 품질을 크게 높여준다.
7. 전체 색인 파이프라인
다음은 데이터 흐름을 단계별로 정리한 것이다.
[API 서버]
ProductService.registerProduct()
→ KafkaEventProducer.send("product-register", ProductRegisterRequest)
[Kafka 브로커]
topic: product-register
↓
[Query 서버]
ProductRegisterListener.onMessage()
→ ProductService.upsert()
→ ProductEsReactiveRepository.save(ProductEntity)
↓
[Elasticsearch]
index: products
이 파이프라인은 완전 비동기이며, Kafka가 시스템 간 경계에서 버퍼 역할을 수행한다. API 서버는 Elasticsearch 상태를 몰라도 이벤트만 발행하면 되고, Query 서버는 Kafka를 통해 색인을 재처리(replay)할 수도 있다. 즉, 데이터 일관성과 시스템 복원력을 동시에 확보하는 구조다.
8. 검증 및 테스트
색인이 완료되었는지 확인하려면 Elasticsearch에서 직접 조회한다.
curl -X GET "http://localhost:9200/products/_search?q=마우스&pretty"
출력 예시:
{
"hits": {
"total": 1,
"hits": [
{
"_source": {
"id": 1001,
"name": "무선 마우스",
"description": "2.4GHz 블루투스 마우스",
"category": "전자기기",
"price": 24900.0
}
}
]
}
}
이 결과가 확인되면 Kafka → Elasticsearch 색인 파이프라인이 정상 동작한 것이다. Kibana에서도 products 인덱스의 필드 구조와 토큰화를 시각적으로 확인할 수 있다.
'Web > Spring' 카테고리의 다른 글
| [Kafka × Elasticsearch 기반 상품 검색 시스템] 5. Nori 분석기와 검색 품질 (0) | 2025.12.22 |
|---|---|
| [Kafka × Elasticsearch 기반 상품 검색 시스템] 4. 상품 검색 API 설계 (0) | 2025.12.15 |
| [Kafka × Elasticsearch 기반 상품 검색 시스템] 2. 상품 등록 이벤트 발행 (0) | 2025.12.01 |
| [Kafka × Elasticsearch 기반 상품 검색 시스템] 1. 상품 검색 엔진 설계 (0) | 2025.11.24 |
| [Swagger] Swagger에서 JWT 인증 처리하기 (0) | 2025.10.26 |