본문 바로가기
개발 이야기

실시간 주문 알림 도입기(서버 to 클라이언트)

by 신재권 2024. 7. 27.

대부분의 사람이 음식점에서 밥을 먹고있을 때, 매장 스피커로 '배달의 민족 주문~' 이라는 소리를 들어봤을 것이다.

위와 비슷하게 주문이 발생할경우 소리가 발생하게 해달라는 실시간 주문 알림이라는 요구사항이 들어와 구현을 담당하게 됐다.

 

웹 어드민성 페이지에서 마트 관계자들이 간단하게 주문, 상품들을 관리할 수 있는 웹 플랫폼이 존재한다.

마트 관계자는 항상 해당 페이지를 보고 있기 때문에, 여기다 해당 기능을 구현하기로 결정하였다.

 

서버 to 클라이언트 통신 방식이 여러가지 있는 것을 알게되었고, 각 방식의 장단점을 먼저 분석해봤다.

 

Polling

클라이언트가 서버로 http request를 계속 보내 이벤트 내용을 전달받는 방식

대충 이런 흐름이다.

1. 클라이언트 요청

2. 서버에 이벤트가 발생했나 확인

3. 없네.. 다시 돌아감

4. 있다면, 이벤트 들고감

 

보이듯이 요청이 계속 가기 때문에 서버 부담이 급증한다.

 

Long Polling

Polling 방식과 같은 방식이지만, 서버에서 조금 더 기다리는 방식이다.

즉 요청을 하고 서버에서 조금 대기하는 방식이다.

Polling 방식보다는 서버 요청이 덜하겠지만, 서버의 부담은 여전하다.

 

WebSocket

양방향 통신이 가능하다.

ws 프로토콜을 통해 웹소켓 포트에 연결되어 있는 모든 클라이언트에게 이벤트 방식으로 응답

 

Server Sent Events(SSE)

서버의 데이터를 실시간, 지속적으로 스트리밍 하는 기술

html5 표준

양방향이 아닌 server to clinet 단방향

재접속의 저수준 처리 자동 지원

 

이렇게 크게 4가지 방식이 존재하였는데, 현재 요구사항에서는 연결 수립 후 양방향 통신이 필요없기 때문에 단방향 통신 방식인 SSE 기술을 선택하게 되었다.

우리는 스케일 아웃 환경이라 SSE + Redis Pub/Sub을 활용하였다.

 

일단 100% 전달을 보장하지 않아도 되기 때문에 이벤트 유실 상황에 대해서는 고려하지 않았다. 그래서 쉽게 구현했던 것 같다.

 

Sse 쪽 코드와 Redis Pub/Sub의 대략적인 코드이다.

@Service
class SseService(
    @Value("\${order.notification.sse.timeout}") private val timeOut: Long,
    @Value("\${order.notification.sse.reconnect-time}") private val reconnectTime: Long,
    private val sseLocalRepository: SseLocalRepository,
) {
    // 알림 구독 함수
    fun subscribe(key: String): SseEmitter {
        val emitter =
            SseEmitter(timeOut).also {
                it.onCompletion {
                    logger<SseService>()
                        .info("SseEmitter Close, key={}", key)
                    sseLocalRepository.deleteByKey(key)
                }
                it.onTimeout {
                    logger<SseService>()
                        .info("SseEmitter Time Out, Retry Connection key={}", key)
                    it.complete()
                }
                it.onError { throwable ->
                    logger<SseService>()
                        .error(
                            "SseEmitter Connection Error, key={}, message={}",
                            key,
                            throwable.message,
                        )
                    it.complete()
                }
            }

        sseLocalRepository.save(key, emitter)

        // 재연결 요청 시 503 방지, 첫 연결 시 dummy data 발송
        notify(
            key,
            event()
                .name("connect")
                .data("connected")
                .reconnectTime(reconnectTime),
        )

        return emitter
    }

    // 알림 발송 함수
    fun notify(
        prefix: String,
        eventData: SseEmitter.SseEventBuilder,
    ) {
        val keySet = sseLocalRepository.getAllKeyByPrefix(prefix)

        keySet.forEach { key ->
            sseLocalRepository.get(key)?.let { emitter ->
                runCatching {
                    emitter.send(eventData)
                }.onFailure { e ->
                    // client의 브라우저 종료/새로고침 행위로 Event Stream 이 닫힌다
                    emitter.completeWithError(e).also { _ ->
                        logger<SseService>()
                            .error("SseEmitter Error Send Message key={}, message={}", key, e.message)
                        sseLocalRepository.deleteByKey(key)
                    }
                }
            }
        }
    }

    @PreDestroy
    fun atShutDownServer() {
        sseLocalRepository.getAll().forEach { it.complete() }
    }
}

@Service
class RedisOrderNotifySubscribeService(
    @Value("\${order.notification.redis.topic}") private val orderTopicName: String,
    private val objectMapper: ObjectMapper,
    private val redisMessageListenerContainer: RedisMessageListenerContainer,
    private val sseService: SseService,
    private val redisTemplate: RedisTemplate<String, Any>,
) : MessageListener {
    @PostConstruct
    fun subscribeTopic() {
        redisMessageListenerContainer.addMessageListener(this, ChannelTopic(orderTopicName))
    }

    override fun onMessage(
        message: Message,
        pattern: ByteArray?,
    ) {
        val dto =
            objectMapper.readValue(
                redisTemplate.stringSerializer.deserialize(message.body),
                OrderNotifyDto::class.java,
            )
        val prefix = SseKeyManager.getSseKeyOfOrderNotification(dto.martId)

        sseService.notify(
            prefix,
            event()
                .name(orderTopicName)
                .data("${dto.orderId} 주문 발생"),
        )
    }
}

@Component
@Transactional(propagation = Propagation.REQUIRES_NEW)
class OrderEventListener(
    @Value("\${order.notification.redis.topic}") private val orderTopicName: String,
    private val redisRepository: CommonRedisRepository<String, OrderNotifyDto>,
) {
    @Async
    @TransactionalEventListener(
        phase = TransactionPhase.AFTER_COMMIT,
        classes = [OrderEvent::class],
    )
    fun onOrderEvent(orderEvent: OrderEvent) {
        redisRepository.publishAtChannel(
            orderTopicName,
            OrderNotifyDto(
                orderEvent.martId,
                orderEvent.orderId,
            ),
        )
    }
}

override fun publishAtChannel(
        channel: String,
        message: T,
    ) {
        redisTemplate.convertAndSend(
            channel,
            objectMapper.writeValueAsString(message),
        )
    }

 

 

정리하면 플로우는 다음과 같다.

1. 웹 클라이언트와 서버가 우선 SSE 연결 수립이 되어 있어야 한다.

2. 주문이 발생하면(서버 입장에서는 결제가 완료되면) 스프링 이벤트를 발행하여 Redis pub/sub 발행

3. 브로드 캐스트로 모든 파드에 이벤트가 발행되기 때문에 파드 중 sse 연결된 객체가 존재한다면, 클라이언트로 응답

 

 

지금은 이벤트 유실 상황에 대해 고려가 돼있지 않다. 나중에 SSE를 필요로하는 요구사항이 들어온다면 더 디벨롭 해볼 것 같다.