StandaloneConfiguration XREADGROUP BLOCK 검증 테스트
배경
멀티플렉싱 단일 커넥션 테스트에서
기존 RedisStaticMasterReplicaConfiguration 경유 XREADGROUP BLOCK이
ElastiCache Valkey 엔진에서 ERR [ENGINE] Invalid command로 거부되는 것을 확인하였다.
StaticMasterReplicaConnection의 라우팅/핸드셰이크 과정이 원인으로 추정되므로,
라우팅 레이어가 없는 RedisStandaloneConfiguration으로 Master에 직접 연결하면
XREADGROUP BLOCK이 정상 동작하는지 검증한다.
테스트 환경
| 항목 | 값 |
|---|---|
| Redis | Valkey 7.2.6 on Amazon ElastiCache (ap-northeast-2) |
| 접속 경로 | 로컬 VPN 경유 (127.0.0.1:40198 → 개발 ElastiCache) |
| 클라이언트 | Lettuce 6.4.2 (Spring Boot 3.4.3 / Spring Data Redis) |
| 기존 ConnectionFactory | RedisStaticMasterReplicaConfiguration + ReadFrom.REPLICA_PREFERRED + RESP2 |
| 테스트 ConnectionFactory | RedisStandaloneConfiguration + LettucePoolingClientConfiguration + RESP2 |
핵심 검증 포인트
RedisStandaloneConfiguration으로 연결하면 StatefulRedisMasterReplicaConnection 라우팅 레이어를
거치지 않고 StatefulRedisConnection으로 직접 연결된다.
이 경로에서 XREADGROUP BLOCK이 ElastiCache Valkey에서 정상 동작하는지 확인한다.
기존 (실패):
LettuceConnectionFactory
└── StatefulRedisMasterReplicaConnection (라우팅 레이어)
└── XREADGROUP BLOCK → ERR [ENGINE] Invalid command
검증 대상 (성공 기대):
LettuceConnectionFactory
└── StatefulRedisConnection (직접 연결)
└── XREADGROUP BLOCK → 정상 동작?
테스트 구성
TestConfiguration
package com.musinsapayments.prepay.application.prepay.admin.campaign.infrastructure.redis.stream
import io.lettuce.core.ClientOptions
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisURI
import io.lettuce.core.protocol.ProtocolVersion
import io.lettuce.core.resource.DefaultClientResources
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.context.annotation.Bean
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration
import org.springframework.data.redis.core.StringRedisTemplate
import java.time.Duration
/**
* StandaloneConfiguration + 커넥션 풀로 Master에 직접 연결하는 테스트 설정.
*
* CLIENT SETINFO를 비활성화하여 ElastiCache Valkey와의 핸드셰이크 호환성 문제를 회피한다.
* RedisURI에 libraryName/libraryVersion을 빈 문자열로 설정하면 CLIENT SETINFO가 전송되지 않는다.
*
* RedisConnectionFactory를 직접 빈으로 등록하면 기존 redisConnectFactory와 충돌하므로,
* StandaloneStreamContext로 래핑하여 단일 빈으로 노출한다.
*/
@TestConfiguration
class StandaloneConnectionTestConfig {
@Bean
fun standaloneStreamContext(
@Value("\${redis.master.host}") host: String,
@Value("\${redis.master.port}") port: Int,
): StandaloneStreamContext {
val standaloneConfig = RedisStandaloneConfiguration(host, port)
val poolConfig = GenericObjectPoolConfig<Any>().apply {
maxTotal = 40
maxIdle = 20
minIdle = 0
}
val clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.clientOptions(
ClientOptions.builder()
.autoReconnect(true)
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
.protocolVersion(ProtocolVersion.RESP2)
.build(),
)
.commandTimeout(Duration.ofSeconds(10))
.build()
val connectionFactory = LettuceConnectionFactory(standaloneConfig, clientConfig).apply {
afterPropertiesSet()
}
// CLIENT SETINFO를 비활성화한 별도 RedisClient 생성 (비교용)
val redisUri = RedisURI.builder()
.withHost(host)
.withPort(port)
.withLibraryName("")
.withLibraryVersion("")
.withTimeout(Duration.ofSeconds(10))
.build()
val clientOptions = ClientOptions.builder()
.autoReconnect(true)
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.REJECT_COMMANDS)
.protocolVersion(ProtocolVersion.RESP2)
.build()
val noSetInfoClient = RedisClient.create(DefaultClientResources.create(), redisUri).apply {
options = clientOptions
}
val redisTemplate = StringRedisTemplate(connectionFactory)
return StandaloneStreamContext(connectionFactory, redisTemplate, noSetInfoClient)
}
data class StandaloneStreamContext(
val connectionFactory: LettuceConnectionFactory,
val redisTemplate: StringRedisTemplate,
val noSetInfoClient: RedisClient,
)
}
기존 멀티플렉싱 테스트(StreamMultiplexingTestConfig)와의 차이:
| 항목 | 멀티플렉싱 테스트 | Standalone 테스트 |
|---|---|---|
| Configuration | RedisStaticMasterReplicaConfiguration (기존 redisConnectFactory 재사용) |
RedisStandaloneConfiguration (신규 생성) |
| Connection Pool | 없음 (멀티플렉싱) | LettucePoolingClientConfiguration + commons-pool2 |
| ReadFrom | REPLICA_PREFERRED |
없음 (Standalone은 라우팅 불필요) |
| 커넥션 타입 | StatefulRedisMasterReplicaConnection |
StatefulRedisConnection |
시나리오 1: Stream 명령 호환성 진단
멀티플렉싱 테스트와 동일한 진단을 수행하여 비교한다.
@Test
@Order(1)
fun `진단 - StandaloneConfiguration에서 Stream 명령 호환성`() {
val streamOps = redisTemplate.opsForStream<String, String>()
val results = mutableListOf<String>()
// Step 1: ConnectionFactory 타입 확인
results.add("ConnectionFactory 타입: ${connectionFactory.javaClass.name}")
results.add("StandaloneConfiguration 사용: ${connectionFactory.standaloneConfiguration != null}")
// Step 2: XADD
try {
val recordId = streamOps.add(
StreamRecords.string(mapOf("diag" to "1")).withStreamKey(streamKey),
)
results.add("XADD 성공: $recordId")
} catch (e: Exception) {
results.add("XADD 실패: ${e.message}")
}
// Step 3: XLEN
try {
val len = streamOps.size(streamKey)
results.add("XLEN 결과: $len")
} catch (e: Exception) {
results.add("XLEN 실패: ${e.message}")
}
// Step 4: XREADGROUP (non-block)
try {
val messages = streamOps.read(
Consumer.from(consumerGroup, "diag-consumer"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
)
results.add("XREADGROUP (non-block) 수신: ${messages?.size}건")
} catch (e: Exception) {
results.add("XREADGROUP (non-block) 실패: ${e.cause?.message ?: e.message}")
}
// Step 5: XREADGROUP BLOCK 100ms (핵심 검증)
try {
streamOps.read(
Consumer.from(consumerGroup, "diag-consumer-block"),
StreamReadOptions.empty().count(10).block(Duration.ofMillis(100)),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
)
results.add("XREADGROUP BLOCK (Lettuce) 성공")
} catch (e: Exception) {
results.add("XREADGROUP BLOCK (Lettuce) 실패: ${e.cause?.message ?: e.message}")
}
// ... (raw execute, noSetInfoClient 등 추가 진단 단계는 실제 테스트 코드 참고)
// Step 7: SET / GET / DEL
try {
redisTemplate.opsForValue().set("test:standalone:diag", "ok")
val value = redisTemplate.opsForValue().get("test:standalone:diag")
redisTemplate.delete("test:standalone:diag")
results.add("SET/GET/DEL 성공 (value=$value)")
} catch (e: Exception) {
results.add("SET/GET/DEL 실패: ${e.message}")
}
println("\n═══ StandaloneConfiguration 진단 결과 ═══")
results.forEach { println(it) }
println("══════════════════════════════════════════\n")
assertThat(results.any { it.contains("XADD 성공") })
.withFailMessage("XADD가 실패하면 Stream 자체를 사용할 수 없음")
.isTrue()
}
기대 결과:
| 명령 | StaticMasterReplica (기존) | Standalone (검증 대상) |
|---|---|---|
XADD |
OK | OK |
XLEN |
OK | OK |
XREADGROUP (non-block) |
OK | OK |
XREADGROUP BLOCK |
ERR | OK (기대) |
SET / GET / DEL |
OK | OK |
시나리오 2: Blocking polling 메시지 수신
XREADGROUP BLOCK이 동작한다면, StreamMessageListenerContainer에서 pollTimeout > 0으로
blocking polling 메시지 수신이 가능한지 확인한다.
@Test
@Order(2)
fun `Blocking polling으로 메시지 수신`() {
val options = StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofMillis(2000)) // XREADGROUP BLOCK 2000
.batchSize(10)
.build()
val container = StreamMessageListenerContainer
.create(connectionFactory, options)
val received = CopyOnWriteArrayList<MapRecord<String, String, String>>()
container.receive(
Consumer.from(consumerGroup, "consumer-0"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
StreamListener { message -> received.add(message) },
)
container.start()
// 메시지 10건 발행
val streamOps = redisTemplate.opsForStream<String, String>()
repeat(10) { i ->
streamOps.add(
StreamRecords.string(mapOf("index" to "$i")).withStreamKey(streamKey),
)
}
// blocking polling이므로 pollTimeout 내 수신
Thread.sleep(5000)
println("═══ Blocking polling 결과 ═══")
println("수신된 메시지: ${received.size}건 / 10건")
println("═════════════════════════════")
assertThat(received).hasSize(10)
container.stop()
}
시나리오 3: Blocking polling 중 다른 Redis 명령 응답시간
BLOCK 대기 중 동일 ConnectionFactory(Pool)에서 다른 Redis 명령이 정상 동작하는지 확인한다. 커넥션 풀을 사용하므로 BLOCK이 다른 명령을 차단하지 않아야 한다.
@Test
@Order(3)
fun `Blocking polling 중 다른 Redis 명령 응답시간`() {
val options = StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofMillis(2000))
.batchSize(10)
.build()
val container = StreamMessageListenerContainer
.create(connectionFactory, options)
container.receive(
Consumer.from(consumerGroup, "consumer-0"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
StreamListener { /* no-op */ },
)
container.start()
Thread.sleep(500)
// BLOCK polling 중 다른 Redis 명령 실행 (10회 측정)
val latencies = (1..10).map {
val start = System.currentTimeMillis()
redisTemplate.opsForValue().set("test:standalone:ping", "pong")
val value = redisTemplate.opsForValue().get("test:standalone:ping")
val elapsed = System.currentTimeMillis() - start
assertThat(value).isEqualTo("pong")
elapsed
}
println("═══ Blocking polling 중 Redis 명령 응답시간 ═══")
println("평균: ${latencies.average().toLong()}ms")
println("최대: ${latencies.max()}ms")
println("최소: ${latencies.min()}ms")
println("전체: $latencies")
if (latencies.average() > 100) {
println("⚠️ BLOCK polling이 다른 명령에 영향을 줌")
} else {
println("✅ BLOCK polling이 다른 명령에 영향 없음 (Pool 분리 정상)")
}
println("═══════════════════════════════════════════════")
container.stop()
redisTemplate.delete("test:standalone:ping")
}
시나리오 4: 다중 Subscription blocking polling
Container 1개에 N개 Subscription을 등록하고 blocking polling으로 메시지 수신 및 성능을 측정한다. 멀티플렉싱 테스트의 non-blocking 결과와 비교한다.
@ParameterizedTest
@ValueSource(ints = [1, 2, 4, 8, 16, 32])
@Order(4)
fun `Subscription N개에서 blocking polling 메시지 수신 + 다른 명령 응답시간`(subscriptionCount: Int) {
val options = StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofMillis(2000)) // XREADGROUP BLOCK 2000
.batchSize(10)
.build()
val container = StreamMessageListenerContainer
.create(connectionFactory, options)
val received = AtomicInteger(0)
repeat(subscriptionCount) { i ->
container.receive(
Consumer.from(consumerGroup, "consumer-$i"),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
StreamListener { received.incrementAndGet() },
)
}
container.start()
Thread.sleep(1000)
// 메시지 100건 발행
val streamOps = redisTemplate.opsForStream<String, String>()
repeat(100) { i ->
streamOps.add(
StreamRecords.string(mapOf("index" to "$i")).withStreamKey(streamKey),
)
}
// 다른 Redis 명령 응답시간 측정 (10회)
val latencies = (1..10).map {
val start = System.currentTimeMillis()
redisTemplate.opsForValue().set("test:standalone:latency", "check")
redisTemplate.opsForValue().get("test:standalone:latency")
System.currentTimeMillis() - start
}
Thread.sleep(5000)
println(
"""
|═══ Subscription ${subscriptionCount}개 결과 (blocking polling) ═══
|수신 메시지: ${received.get()}건 / 100건
|Redis 명령 응답시간:
| 평균: ${latencies.average().toLong()}ms
| 최대: ${latencies.max()}ms
| 최소: ${latencies.min()}ms
| 전체: $latencies
|═══════════════════════════════════════════════════
""".trimMargin(),
)
container.stop()
redisTemplate.delete("test:standalone:latency")
}
비교 기준 (멀티플렉싱 non-blocking 결과):
| Subscription 수 | non-blocking SET+GET 평균 | blocking SET+GET 평균 (기대) |
|---|---|---|
| 1 | 107ms | < 107ms |
| 32 | 704ms | < 704ms |
커넥션 풀 사용 시 BLOCK이 별도 커넥션에서 실행되므로, Subscription 수 증가에 따른 다른 명령 응답시간 저하가 크게 줄어들 것으로 기대한다.
판정 기준
| 케이스 | 조건 | 판정 |
|---|---|---|
| Case A: BLOCK 정상 동작 + 성능 양호 | XREADGROUP BLOCK 성공 + 32 sub에서 SET+GET < 300ms |
StandaloneConfiguration 채택 |
| Case B: BLOCK 정상 동작 + 성능 저하 | XREADGROUP BLOCK 성공 + 32 sub에서 SET+GET >= 300ms |
Pool 사이징 조정 후 재검증 |
| Case C: BLOCK 여전히 실패 | ERR [ENGINE] Invalid command 동일 |
non-blocking polling 또는 다른 대안 검토 |
테스트 코드 위치
src/test/kotlin/.../campaign/infrastructure/redis/stream/
├── StreamMultiplexingTestConfig.kt ← 기존 (멀티플렉싱)
├── SingleSubscriptionTest.kt ← 기존
├── MultipleSubscriptionTest.kt ← 기존
├── CoexistenceTest.kt ← 기존
├── StandaloneConnectionTestConfig.kt ← 신규 (Standalone + Pool)
└── StandaloneBlockTest.kt ← 신규 (BLOCK 검증)
재현 방법
# 로컬 VPN으로 개발 Redis 연결 후
./gradlew test --tests "*StandaloneBlockTest" --info