티스토리 뷰
FCM 푸시 알림 발송 개발 (feat. Bucket4j API Throttling, Redisson 분산락)
daeuun 2024. 11. 27. 00:39
이번 포스팅은 FCM Push Notification 개발과 개발하면서 겪은 문제 해결에 관한 글입니다.
1. 커스텀 푸시 알림 개발 요청사항이 들어오다.
하루냥에서 알림 발송은 기존에 Flutter에서 매일 지정된 시간에 메세지를 발송하는 FCM 푸시 알림을 사용하고 있었다.
하지만 여러 BM을 만들면서 매일 보내는 알림과는 별개로 관리자의 입맛에 맞게 보내는 커스텀 알림 메세지가 필요한 시점이 와버렸다!
앱으로, 무료로 푸시 알림을 보내기에 FCM을 사용하지 않을 수 없었고 클라이언트에서 보내는 알림과 별개로 서버에서 개발하게 되었다.
이번 푸시 알림의 개발 요구사항은 다음과 같다.
- 발송 대상을 설정하여 원하는 사용자를 지정할 수 있다. (전체 사용자/그룹 사용자)
- 발송 타입에 따라 원하는 시간에 알림을 발송할 수 있다. (즉시 발송/예약 발송/주기 발송)
- 메세지 제목, 내용, 이미지 첨부, Landing URL을 설정할 수 있다.
요구사항을 고려해서 어떻게 서버에서 개발해야할지 구현에 필요한 기능을 정리해보았다.
2. 구현 필요한 기능
- 발송 필수 정보 수집
- 발송 필수 정보로 유저 정보에 기기 식별자인 device token 데이터를 등록해야 한다.
- 로그인시 클라이언트에서 수집한 FCM token을 유저 정보에 저장하고, device token이 있는 유저를 알림 수신자로 등록한다.
- 알림 발송 시기
- 즉시 발송: 시간을 지정하지 않고 현재 시각에 대한 알림을 발송한다.
- 예약 발송: 특정 시각(예:
2025-01-01 00:00:00
)에 알림을 발송한다. - 주기 발송: 일정 시각(예:
매일 17:00
/매주 수요일 20:00
)을 특정해 알림을 발송한다.
- 대량 발송 여부
- 전체 사용자에게 알림 발송 → 현재 앱 규모로 보면 수신자가
16,000
건이 넘기 때문에 대량 발송에 해당한다.
- 전체 사용자에게 알림 발송 → 현재 앱 규모로 보면 수신자가
- 전송 실패 가능성 대응
- 예상치 않은 서버 상황에 의한 전송 실패를 대비하여 실패 건에 대하여 재시도 설계가 필요하다.
3. 알림 발송하는 시기
발송 시기에 대한 요구사항은 두가지였다.
- 현재 시각 기준 발송
- 미리 발송 시간을 지정해놓고 해당 시간에 알림을 발송하는 주기 알림 발송 기능
현재 시각에 대한 발송은 FCM api를 호출하도록 만들면 되지만, 발송 시간을 지정하는 기능을 위해 DB 구조 설계가 필요했다.
✅ 알림 테이블 구성
하나의 테이블에 메시지 정보, 발송 스케줄, 수신자 토큰, 실패 여부 등의 데이터를 모두 포함하면 데이터가 중복되고 관리하기 복잡해지기 때문에, 정규화를 통해 분리하는 것이 적합하다고 판단했다.
따라서, 알림 메시지의 정보와 스케줄 정보는 notification_schedule
테이블에 저장하고,
알림 이력은 notification_history
테이블로 분리하여 관리하기로 했다.
notification_schedule 테이블
알림 메시지와 스케줄 정보를 저장한다.
기본적인 발송 정보인 title
(제목), body
(내용), image
(이미지 url), land_url
(랜딩 url) 뿐만 아니라
주기 발송에 대한 schedule_at
(발송시간), repeat_cycle
(스케줄 cron정보) 를 포함하게 되었다.
notification_history 테이블
알람 발송 이력을 저장한다.
schedule_id
(어떤 스케줄에 대한 이력인지 구분하기 위한 스케줄 식별자), device_id
(수신자의 Token, 디바이스 식별값), success
(성공여부), fail_count
(재발송을 위한 실패 횟수) 와 같이 실패 시 재발송을 위한 데이터도 함께 저장되어 있다.
✅ 특정 시간에 작업 수행을 위한 스프링 스케줄러
특정 시간을 세팅하고 발송 API 호출하는 기능은 스프링 스케줄러를 사용해서 구현하였다.
스프링에서는 간단히 @Scheduled
어노테이션을 붙여서 스케줄러를 구현할 수 있다. 하지만 클라이언트에서 설정한 값으로 스케줄링 시간을 동적으로 변경하기 위해 ThreadPoolTaskScheduler
클래스를 사용하였다.
schedule
에 등록할 task
(여기서는 sendNotifications
로 푸시알림 시작하는 메서드)와 cron
표현식을 넘겨주면 동적으로 지정된 시간에 스케줄링 하게 된다.
다음 스케줄링 작업 시간은 ScheduledFuture<?>
의 인스턴스로 등록하고 관리한다.
@Component
class DynamicScheduler(
private var scheduler: ThreadPoolTaskScheduler,
) {
private val logger = LoggerFactory.getLogger(DynamicScheduler::class.java)
private val scheduledFutureMap: ConcurrentMap<Long, ScheduledFuture<*>> = ConcurrentHashMap()
init {
scheduler = ThreadPoolTaskScheduler()
scheduler.initialize()
}
fun registerScheduler(taskId:Long, task: Runnable, cron: String) {
stopScheduler(taskId)
val schedule = scheduler.schedule(task, CronTrigger(cron))
scheduledFutureMap[taskId] = schedule
logger.info("[SCHEDULE] start schedule taskId: $taskId, time: $cron")
}
fun stopScheduler(taskId: Long) {
scheduledFutureMap[taskId]?.let {
it.cancel(false)
scheduledFutureMap.remove(taskId)
logger.info("[SCHEDULE] stop schedule taskId: $taskId")
}
}
}
4. API Throttling, Redisson 분산락을 활용한 FCM 푸시 알림 발송
4.1. sendNotifications
- 알림 발송 로직 시작 지점
- 전체 발송 실패 여부를 판단해서 실패시 재발송 처리한다.
/**
* FCM 알림 메세지를 발송한다. 발송 실패여부를 판단하여 스케줄을 재등록한다.
*/
fun sendNotifications(apiKey: String?, schedule: NotificationSchedule) {
val scheduleId = schedule.id ?: error("schedule id is null")
val maxRetries = 3
val scheduledTime = LocalDateTime.now().plusSeconds(60) // 재발송을 위한 시간 설정
val targetHistories = notificationHistoryRepository.findAllByScheduleIdAndSuccessAndFailCountLessThan(
scheduleId,
false,
maxRetries
)
if (targetHistories.isEmpty()) return
val tokens = targetHistories.map { it.deviceId }
val allSuccess = sendMessageForTarget(tokens, apiKey, schedule)
if (allSuccess) {
return
} else {
val cron = DateTimeUtil.convertToCron(scheduledTime)
dynamicScheduler.registerScheduler(scheduleId, { sendNotifications(apiKey, schedule) }, cron)
}
}
sendMessageForTarget
메서드로 수신자(타겟)에게 메세지를 발송한다.
만약 모든 발송이 성공하지 않으면 예외처리 되어 설정한 시간(코드에서는 60s
)에 대해 스케줄이 등록되어 재발송 처리한다.
4.2. sendMessageForTarget
- 분산 요청과 알림 중복 발송을 방지하기 위해 Redisson 분산락을 적용한다.
- 대량 발송시 서버 부하를 줄이기 위해 API Throttling으로 호출량을 조절한다.
private fun sendMessageForTarget(tokens: List<String>, apiKey: String?, schedule: NotificationSchedule): Boolean {
val limit = 100
val targetTokens = tokens.toMutableList()
val locks = mutableListOf<RLock>()
try {
for (tokens in targetTokens.chunked(limit)) {
tokens.forEach { token ->
val lock = redissonClient.getLock(token) // 1.token에 대한 락 걸기 시도
val hasLock = lock.tryLock(0, 60, TimeUnit.SECONDS) // 2.락 획득시 60초 유지
if (hasLock) {
locks.add(lock)
} else {
logger.info("[NOTI] Lock acquisition failed for token: $token")
targetTokens.remove(token) // 3. 락 실패시 해당 토큰에 대한 요청 제외
return false
}
}
if (!tryConsume(apiKey)) { // 4. 쓰로틀링 시도
logger.info("[NOTI] API rate limit exceeded for API key: $apiKey")
return false
}
if (!sendMessages(tokens, schedule)) {
return false
}
}
} catch (e: Exception) {
return false
}
return true
}
private fun tryConsume(apiKey: String?): Boolean {
if (apiRateLimiter.tryConsume(apiKey)){
return true
}
try { // 버킷의 토큰을 획득하지 못한 경우 리필 후 재시도
logger.info("[NOTI] Waiting for token refill.")
Thread.sleep(TOKEN_REFILL_SECONDS * 1000) // 설정된 리필 시간 1s 대기
return apiRateLimiter.tryConsume(apiKey)
} catch (e: InterruptedException) {
logger.info("[NOTI] Thread interrupted during processing. Message: ${e.message}")
Thread.currentThread().interrupt()
return false
}
}
💡 분산 요청을 위한 Redisson 분산락 활용 (Distributed Lock)
Redisson은 RLock
API를 통해 간단히 락을 구현할 수 있고, lock의 TTL을 자동으로 관리해주어 데드락을 방지한다.
✅ Redisson?
Redis에 접근할 수 있는 다양한 클라이언트들이 존재한다. 대표적으로 Jedis, Lettuce, Redisson등이 있다.
Redisson은 비교적 합리적인 방식으로 Lock 획득 재시도 기능이 구현되어 있다.
Lettuce는 스핀락이라고 불리는 일종의 폴링 기법을 활용해서 Lock 획득을 재시도하고,
Redisson은 Redis의 Pub/sub 기능을 사용해서 Lock 획득을 재시도한다.
- Lock 획득에 실패하면 Redisson은 특정 채널을 구독한다.
- Lock을 소유한 클라이언트가 Lock을 해제하면 Redis가 해당 이벤트를 구독 중인 클라이언트들에게 알린다.
- Lock이 해제되었다는 이벤트를 수신한 Redisson 클라이언트는 다시 Lock 획득을 시도한다.
이 방식은 Lock 획득에 실패하면 Redis에 지속적으로 Lock 획득 요청을 반복하는 Lettuce의 폴링 방식과 비교하여 Redis 서버 부하를 줄이고, 클라이언트의 네트워크 효율성을 높일 수 있다.
✅ 왜 분산락을 사용했을까?
동시성 제어
분산락을 사용해서 같은 사용자에게 중복으로 알림을 보내지 않도록 lock을 걸어서 중복 발송을 피할 수 있고, lock을 획득하지 못하면 대기하거나 실패 처리를 쉽게 구현할 수 있다.
- lock을 획득할 때 FCM Token을
key
로 설정한다. key
가 이미 존재한다면, FCM Token에 대해 lock을 획득하여 이미 발송을 진행 중이라는 의미가 된다. 따라서 설정된 60s TTL에 의해 key가 삭제되도록 기다린다.- lock을 획득하지 못했다면 해당 FCM Token에 대한 요청을 하지 않도록 요청 리스트에서 제거하고 중복 발송을 피하도록 했다.
✅ FCM 토큰 요청 최대 개수를 100으로 설정한 이유
limit
값은 알림 발송 API 호출에서 처리할 FCM 토큰의 최대 개수를 제한한 값이다.
FCM sendEachForMulticast
메서드는 최대 토큰량을 500개로 제한하고 있고, 한번에 너무 많은 토큰에 대해 요청이 들어오면 API 지연이 발생할 수 있다. 따라서 요청을 적당히 나누어 한 번에 처리 가능한 양으로 제한하고, Redis 분산락 충돌이 발생하지 않도록 조정한다.
- 응답 시간이
1.5s
라면,limit
값을 크게 설정해 병렬성을 높이는 것이 유리하다. - 현재 사용하는 운영 서버 AWS
t3.micro
인스턴스는 대역폭이 제한적이므로 작은limit
값으로 네트워크를 분산시키는 것이 좋다. - Redis가 처리 가능한 QPS(Queries Per Second)를 기준으로 적정값을 설정해야한다.
- 일반적으로 Redis의 t3.micro 환경에서는 초당 약 5,000~10,000 QPS를 처리할 수 있음
- 따라서 트래픽과 안정성을 고려하여
limit
100으로 설정하여 작은 락 요청과 적당한 병렬성을 유지한다.
💡 Bucket4j를 사용한 트래픽 조절(API Throttling)
대량 발송시 서버 부하를 줄이기 위해 트래픽 제한이 필요했다.
그래서 API Throttling으로 호출량을 조절하기로 했고, Bucket4j 라이브러리를 사용하기로 했다.
Bucket4j는 Token bucket 알고리즘을 기반으로 한 Rate Limiting(요청량 제한) 라이브러리로, 요청의 처리량을 효율적으로 제한하고 제어할 수 있다. Redis와 같은 분산 환경에서도 사용할 수 있고, 초/분당 요청 수 같이 다양한 단위로 요청량을 설정할 수 있어서 사용하게 되었다.
Bucket4j는 버킷과 토큰을 사용하는데 둘의 관계를 이해하려면, Token bucket 알고리즘의 작동 방식을 이해할 필요가 있다.
✅ 버킷과 토큰의 역할
- Bucket (버킷)
- 요청량을 관리하는 컨테이너 역할
- 특정한 용량(예:
BUCKET_CAPACITY
)과 리필 규칙(예: 초당TOKEN_REFILL_AMOUNT
)을 가지며, 이를 통해 요청이 제한된다. - 버킷은 설정된
BUCKET_CAPACITY
만큼 토큰으로 채워져 있다.
예를 들어,BUCKET_CAPACITY = 100
이면 초기에는 100개의 토큰이 버킷에 저장되고, 요청이 들어오면 토큰을 소비하는 방식으로 트래픽을 제한한다.
- Token (토큰)
- 요청을 처리할 수 있는 권한(리소스)의 단위
- 버킷에 저장된 토큰을 소비함으로써 요청을 허용하거나 거부한다.
- 버킷에 남은 토큰이 없으면 요청은 거부된다.
- Refill
- 일정 시간마다 몇 개의 Token을 충전할지 지정한다.
- 예를 들어,
초당 5개의 토큰
을 리필하도록 설정하면 매초 5개의 새로운 토큰이 버킷에 추가된다.
- Bandwidth (대역폭)
- 버킷의 최대 크기(
BUCKET_CAPACITY
)와 리필 규칙(Token 충전 주기와 개수)을 지정한다. - Bandwidth 설정을 통해 특정 시간 동안 허용할 수 있는 최대 요청량을 제한할 수 있다.
- 버킷의 최대 크기(
✅ 버킷과 토큰 설정 계산
FCM은 한 요청당 최대 500개의 토큰(디바이스 식별값)을 허용하므로, 한 번에 100개의 토큰에 대한 요청을 보낸다.
FCM 토큰에 대해 요청을 나누어 순차적으로 처리한다.
- 요청량 제한: 초당 100개 요청 허용, Bucket에 100개의 Token 충전 (최대 요청량 100)
- 재충전 속도: 초당 100 Token
- 토큰 소모량: 한 번의
sendMessage()
호출 시 1 Token 소모
4.3. APIRateLimiter
Bucket4j를 사용해서 버킷을 생성하고 토큰을 소모하는 코드는 아래와 같다.
요청자 IP에 대해 버킷을 생성하고, 해당 버킷에서 Token을 소비한다.
미리 설정된 BUCKET_CAPACITY
로 호출량을 조절할 수 있다.
@Component
class APIRateLimiter {
companion object {
const val BUCKET_CAPACITY = 100L
const val TOKEN_REFILL_AMOUNT = 100L
const val TOKEN_REFILL_SECONDS = 1L
}
private val buckets: ConcurrentMap<String, Bucket> = ConcurrentHashMap()
private val logger: Logger = LoggerFactory.getLogger(APIRateLimiter::class.java)
/**
* 요청자 IP 추출하여 API 키로 사용한다.
*
* @param request Servlet Request
* @return 요청자 IP 식별값
*/
fun getClientIP(request: HttpServletRequest): String? {
val ipHeaders = listOf(
"X-Forwarded-For",
"Proxy-Client-IP",
"WL-Proxy-Client-IP",
"HTTP_CLIENT_IP",
"HTTP_X_FORWARDED_FOR"
)
ipHeaders.forEach { header ->
val ip = request.getHeader(header)
if (ip?.isNotEmpty() == true) {
logger.info("$header : $ip")
return ip.split(",").first()
}
}
return request.remoteAddr.also { ip ->
logger.info("remoteAddr : $ip")
}
}
/**
* API 키에 해당하는 버킷을 가져오거나, 없을 경우 새로 생성한다.
*
* @param apiKey API 키
* @return 해당 API 키에 대응하는 버킷
*/
private fun getOrCreateBucket(apiKey: String?): Bucket {
return buckets.computeIfAbsent(apiKey) {
createBucket()
}
}
/**
* 버킷을 새로 생성한다.
* Bandwidth capacity: Bucket의 총 크기 5
* 리필 주기: Duration.ofSeconds, 1초마다 토큰을 충전
* refillIntervally: 요청량 제한, 1초에 5개 요청을 처리할 수 있는 limit
*
* @return 생성된 버킷
*/
private fun createBucket(): Bucket {
val refillDuration = Duration.ofSeconds(TOKEN_REFILL_SECONDS)
val limit = Bandwidth.builder().capacity(BUCKET_CAPACITY).refillIntervally(TOKEN_REFILL_AMOUNT, refillDuration).build()
return Bucket.builder().addLimit(limit).build()
}
/**
* API 키에 해당하는 버킷에서 토큰을 소비한다.
*
* @param apiKey API 키
* @return 토큰 소비 성공 여부
*/
fun tryConsume(apiKey: String?): Boolean {
val bucket = getOrCreateBucket(apiKey)
val consumed = bucket.tryConsume(1) // 한번에 소모할 토큰 지정
val now = LocalDateTime.now()
logger.info("API Key: $apiKey, Consumed: $consumed, Time: $now, Remain bucket Count : ${bucket.availableTokens}")
return consumed
}
}
4.4. sendMessages
- FCM api를 호출하여 실제로 메세지 발송하고
- 발송 결과에 따라 이력을 업데이트 한다.
/**
* FCM 알림 메세지를 다건 발송한다.
*/
private fun sendMessages(tokens: List<String>, schedule: NotificationSchedule): Boolean {
val message = makeMessage(tokens, schedule)
val now = LocalDateTime.now()
val scheduleId = schedule.id ?: error("schedule id is null")
val response = FirebaseMessaging.getInstance().sendEachForMulticast(message)
logger.info("[NOTI] FCM messages were sent successfully. " +
"Success count: ${response.successCount}, " +
"Fail count: ${response.failureCount}")
response.responses.forEachIndexed { i, res ->
updateNotificationHistory(NotificationHistoryUpdateRequest(scheduleId, tokens[i], now, res.isSuccessful))
}
return response.responses.all { it.isSuccessful }
}
/**
* 발송할 메세지를 생성한다.
* Android/APNs 구성 설정하고, 기기 등록 토큰 목록에 메시지를 멀티캐스트 한다.
*/
private fun makeMessage(tokens: List<String>, schedule: NotificationSchedule): MulticastMessage {
val notification = Notification.builder()
.setTitle(schedule.title)
.setBody(schedule.body)
.setImage(schedule.image)
.build()
return MulticastMessage.builder()
.addAllTokens(tokens)
.setNotification(notification)
.setAndroidConfig(setAndroidConfig(schedule))
.setApnsConfig(setApnsConfig(schedule))
.build()
}
sendEachForMulticast 메서드는 내부적으로 Messaging.sendeach()
API를 사용하여 모든 대상 수신자에게 지정된 메시지를 보낸다. 이 메서드는 매개변수로 MulticastMessage 타입의 메세지를 받기 때문에 makeMessage()
메소드에서 수신자 token
에 맞는 메세지를 생성한다.
MulticastMessage는 최대 500개의 토큰이 포함된 멀티캐스트 메시지여야 하기 때문에 호출시 요청량 조절이 필요하다.
발송 결과는 response로 반환 받을 수 있고, 1.성공하면 성공 처리 2.실패하면 실패 횟수 증가를 이력 테이블에 업데이트한다.
이렇게 해서 FCM을 활용한 푸시 알림 발송, 주기 설정을 위한 스케줄 발송에 성공하였다.
그런데 현재 구조에서는 문제점이 있는데, 바로 스케줄 정보를 서버 내부에 저장하고 있다는 것이다.
다시 한번 스케줄링 코드를 살펴보자.
@Component
class DynamicScheduler(
private var scheduler: ThreadPoolTaskScheduler,
) {
private val logger = LoggerFactory.getLogger(DynamicScheduler::class.java)
private val scheduledFutureMap: ConcurrentMap<Long, ScheduledFuture<*>> = ConcurrentHashMap()
init {
scheduler = ThreadPoolTaskScheduler()
scheduler.initialize()
}
fun registerScheduler(taskId:Long, task: Runnable, cron: String) {
stopScheduler(taskId)
val schedule = scheduler.schedule(task, CronTrigger(cron))
scheduledFutureMap[taskId] = schedule
logger.info("[SCHEDULE] start schedule taskId: $taskId, time: $cron")
}
fun stopScheduler(taskId: Long) {
scheduledFutureMap[taskId]?.let {
it.cancel(false)
scheduledFutureMap.remove(taskId)
logger.info("[SCHEDULE] stop schedule taskId: $taskId")
}
}
}
필드 ConcurrentMap<Long, ScheduledFuture<*>
>에 예약된 task의 스케줄 정보와 다음 스케줄링 작업 시간을 저장하고 있다.
그런데 만약 메세지 발송 도중에 배포로 인한 순단 혹은 서버에 문제가 발생한다면? 저장된 스케줄 정보가 메모리에서 날아가게 되고 중복 발송이 가능해진다는 문제가 있다.
따라서 해당 문제를 해결하기 위해 스케줄 관리를 변경할 필요가 있었다.
'backend > spring boot' 카테고리의 다른 글
IoC(제어의 역전), DI(의존성 주입) (0) | 2023.10.25 |
---|---|
[Spring Security] JwtTokenProvider 로 토큰 생성, 검증하기 (1) | 2023.04.19 |
[Spring Security] OncePerRequestFilter 로 Filter 구현 (0) | 2023.03.30 |
예외처리 Controller vs Service (0) | 2022.12.16 |
[SpringBoot] 개발 도구의 준비 : IntelliJ (2) | 2021.07.27 |
- Total
- Today
- Yesterday
- Git
- 스프링 스케줄링
- 티스토리챌린지
- 배열
- redisson 분산락
- addFilterBefore
- array
- Cannot construct instance of
- junit5
- spring boot 3
- checkout
- 스프링오류
- Java
- MongoDB
- JPA
- n+1
- Kotlin
- Linux
- port
- jvm warm-up 전략
- bucket4j
- Spring Security
- dto 클래스 생성자
- 오블완
- QueryDSL
- MultipleBagFetchException
- ChatGPT
- 자바 어플리케이션 실행 과정
- 추상클래스
- FetchJoin
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |