| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | ||||
| 4 | 5 | 6 | 7 | 8 | 9 | 10 |
| 11 | 12 | 13 | 14 | 15 | 16 | 17 |
| 18 | 19 | 20 | 21 | 22 | 23 | 24 |
| 25 | 26 | 27 | 28 | 29 | 30 | 31 |
- 1차원 DP
- 2차원 dp
- 99클럽
- @BeforeAll
- @BeforeEach
- @Builder
- @Entity
- @GeneratedValue
- @GenericGenerator
- @NoargsConstructor
- @Query
- @Table
- @Transactional
- Actions
- Amazon EFS
- amazon fsx
- Android Studio
- ANSI SQL
- api gateway 설계
- api gateway 필터
- ApplicationEvent
- argocd
- assertThat
- async/await
- AVG
- AWS
- aws autoscaling
- aws eks
- aws iam role
- AWS KMS
- Today
- Total
기록
[Kafka × Elasticsearch 기반 상품 검색 시스템] 2. 상품 등록 이벤트 발행 본문
이전 편에서 전체 구조와 CQRS 설계를 살펴보았다. 이번 글에서는 그중 API 서버(Spring MVC) 가 어떻게 상품 등록 요청을 처리하고, 이를 Kafka 이벤트로 발행하는지를 구체적으로 다룬다. 이 서버는 시스템의 Command 사이드(Command Side) 로, 사용자의 “쓰기” 요청을 받아 시스템 내 “변화”를 만들어내는 역할을 담당한다. 실제 상품 데이터는 즉시 저장되지 않고, 이벤트 형태로 브로커에 발행되어 비동기적으로 처리된다. 이로써 API 서버는 빠르고 안정적으로 응답을 반환하며, 검색 서버(Query Side)는 색인 작업에 집중할 수 있다.
1. 구조 개요
상품 등록 흐름은 단순하면서도 명확한 3단계로 구성된다.
[1] ProductController → [2] ProductService → [3] KafkaEventProducer
| 계층 | 역할 |
| Controller | 클라이언트의 HTTP 요청을 수신하여 DTO로 변환하고, 비즈니스 로직을 호출한다. |
| Service | 요청을 처리하고 KafkaEventProducer를 통해 이벤트를 전송한다. |
| Producer | KafkaTemplate을 사용해 실제 메시지를 브로커로 전송한다. |
토픽 이름은 product-register이며, 시스템 전역에서 Topic 클래스로 상수 관리한다. 이 구조는 단순하지만, 유지보수성과 일관성 측면에서 매우 중요하다. 하나의 토픽 이름이 하드코딩된 여러 위치에 흩어지면, 환경 전환이나 버전 관리 시 쉽게 충돌을 일으킬 수 있다.
2. 상품 등록 API
API 서버는 /products/register 엔드포인트를 제공한다. 상품 등록 요청이 들어오면, 컨트롤러는 이를 DTO로 받아 서비스 계층으로 전달한다. 이 시점에서 실제 DB 저장은 수행되지 않는다. 대신 상품 데이터는 Kafka 이벤트로 변환되어 Query 서버가 색인하도록 위임된다.
@RestController
@RequiredArgsConstructor
@RequestMapping("/products")
public class ProductController {
private final ProductService productService;
@PostMapping("/register")
public ResponseEntity<String> register(@RequestBody ProductRegisterRequest request) {
productService.registerProduct(request);
return ResponseEntity.ok("Product registered event published.");
}
}
이 방식의 장점은 API 서버의 응답 속도와 독립성이다. 만약 Elasticsearch가 일시적으로 응답하지 않더라도, API 서버는 Kafka에 이벤트만 발행하면 즉시 요청을 완료할 수 있다. 장애가 색인 서버에 국한되므로 시스템 전체의 가용성을 유지할 수 있다.
3. Kafka 메시지 DTO
ProductRegisterRequest는 상품 등록 이벤트를 표현하는 데이터 객체다. 이 객체는 Kafka의 메시지 페이로드로 직렬화되어 전송된다.
@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ProductRegisterRequest {
private Long productId;
private String name;
private String description;
private String category;
private Double price;
private Long eventTimestamp;
}
이 모델은 “상품 등록 요청”과 “색인 이벤트”의 경계선 역할을 한다. eventTimestamp는 이벤트의 최신성을 판별하는 기준이 된다. Query 서버는 Kafka로부터 동일 상품 ID의 여러 이벤트를 받을 수 있는데, 이 필드를 비교하여 더 최신의 이벤트만 반영한다. 즉, 단순한 DTO이지만 데이터 일관성을 보장하는 핵심 단서가 된다.
4. KafkaEventProducer
Kafka로의 실제 전송은 KafkaEventProducer가 담당한다. Spring Kafka의 KafkaTemplate<String, Object>를 이용해 비동기 전송을 수행하며, 성공·실패 시 로그를 남긴다.
@Component
@RequiredArgsConstructor
@Slf4j
public class KafkaEventProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void send(String topic, Object message) {
kafkaTemplate.send(topic, message)
.thenAccept(result -> log.info("Sent to topic: {}", topic))
.exceptionally(ex -> {
log.error("Failed to send to topic: {}", topic, ex);
return null;
});
}
}
.send() 메서드는 CompletableFuture를 반환하므로, 프로듀서는 응답을 기다리지 않고 즉시 반환한다. .thenAccept()를 통해 전송 성공을 비동기적으로 확인하고, .exceptionally() 블록에서 실패를 로깅한다. 실패 시 재시도 정책이나 DLQ(Dead Letter Queue)를 도입하면, 운영 안정성을 크게 향상시킬 수 있다. 이 클래스는 실제 비즈니스 로직과 독립적이므로, 다른 이벤트 타입(예: product-update, product-delete)으로도 쉽게 재사용 가능하다.
5. ProductService
서비스 계층은 비즈니스 의도를 담는 부분이다. ProductService는 상품 등록 요청을 받아 KafkaEventProducer를 호출하고, product-register 토픽으로 이벤트를 전송한다.
@Service
@RequiredArgsConstructor
@Slf4j
public class ProductService {
private final KafkaEventProducer kafkaEventProducer;
public void registerProduct(ProductRegisterRequest request) {
kafkaEventProducer.send(Topic.PRODUCT_REGISTER, request);
log.info("Published ProductRegisterEvent for id={}", request.getProductId());
}
}
단순히 이벤트를 발행하는 것뿐만 아니라, 향후 이 로직 안에서 유효성 검사, 중복 확인, 상품 상태 관리 등을 확장할 수 있다. 즉, 지금은 단순하지만 “도메인 로직의 진입점” 역할을 한다.
6. Topic 상수 관리
토픽 이름은 시스템 전역에서 동일해야 한다. Producer와 Consumer가 서로 다른 문자열을 참조하면 즉시 장애로 이어지기 때문에, 상수 클래스로 통일 관리한다.
public class Topic {
public static final String PRODUCT_REGISTER = "product-register";
}
이 단순한 한 줄이 유지보수성을 크게 높인다. 환경별로 토픽을 다르게 설정하고 싶다면, 나중에 @ConfigurationProperties 기반 설정 객체로 확장할 수 있다.
7. 실행 흐름 요약
상품 등록 요청이 들어오면 아래와 같은 이벤트 흐름이 실행된다.
[사용자]
POST /products/register
{
"productId": 1001,
"name": "무선 마우스",
"description": "2.4GHz 블루투스 마우스",
"category": "전자기기",
"price": 24900,
"eventTimestamp": 1730900000000
}
[API 서버 내부]
ProductController → ProductService.registerProduct()
→ KafkaEventProducer.send("product-register", request)
Kafka 토픽에는 다음과 같은 JSON 메시지가 쌓인다.
{
"productId": 1001,
"name": "무선 마우스",
"description": "2.4GHz 블루투스 마우스",
"category": "전자기기",
"price": 24900.0,
"eventTimestamp": 1730900000000
}
이 메시지는 Query 서버의 ProductRegisterListener가 수신해 Elasticsearch 색인으로 이어진다. 따라서 API 서버는 등록을 완료했을 뿐, 검색 데이터는 아직 반영되지 않은 상태다. 그러나 사용자는 즉시 응답을 받고, 색인은 몇 초 내로 비동기 반영된다.
8. Kafka 설정 요약
application-local.yml에 정의된 Kafka 프로듀서 설정은 다음과 같다.
spring:
kafka:
bootstrap-servers: localhost:29092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
J
sonSerializer를 사용하여 DTO를 JSON으로 자동 직렬화한다. 이 설정은 Spring Kafka가 내부적으로 Jackson ObjectMapper를 활용하여 객체를 문자열로 변환하기 때문에, 별도 변환 코드가 필요 없다. 즉, 단순한 설정으로도 안정적인 이벤트 직렬화가 가능하다.
9. 검증 및 관측
이벤트가 제대로 발행되는지 확인하려면 Kafka UI(localhost:8085)나 CLI를 사용한다.
docker exec -it prologue-kafka kafka-console-consumer \
--bootstrap-server localhost:29092 \
--topic product-register \
--from-beginning
콘솔에 출력된 JSON이 예상한 구조와 일치하면 성공이다. 운영 환경에서는 Kafka에 전송된 이벤트를 모니터링하기 위해 Kafka UI, Prometheus Exporter, Dead Letter Topic 등을 조합해 관측 체계를 구축할 수 있다.
'Web > Spring' 카테고리의 다른 글
| [Kafka × Elasticsearch 기반 상품 검색 시스템] 4. 상품 검색 API 설계 (0) | 2025.12.15 |
|---|---|
| [Kafka × Elasticsearch 기반 상품 검색 시스템] 3. 상품 색인 (0) | 2025.12.08 |
| [Kafka × Elasticsearch 기반 상품 검색 시스템] 1. 상품 검색 엔진 설계 (0) | 2025.11.24 |
| [Swagger] Swagger에서 JWT 인증 처리하기 (0) | 2025.10.26 |
| [Swagger] annotation 정리 (0) | 2025.10.26 |