Redis Streams
목차
1. 개요
Redis Streams는 Redis 5.0에서 도입된 append-only 로그 데이터 구조이다. Apache Kafka의 로그 기반 메시징 모델에서 영감을 받았으며, Redis 내에서 경량 메시지 브로커 역할을 수행한다.
핵심 특성
| 특성 | 설명 |
|---|---|
| Append-only | 새로운 항목은 항상 끝에 추가되며, 기존 항목은 변경 불가 |
| 영속성 | 메시지가 Stream에 저장되어 소비 후에도 유지됨 |
| 고유 ID | 각 항목에 시간 기반의 고유 ID가 자동 부여됨 |
| Consumer Group | 여러 소비자가 메시지를 분산 처리할 수 있음 |
| 범위 조회 | ID 기반으로 특정 범위의 항목을 조회 가능 |
2. 기본 개념
2.1 Stream Entry 구조
각 Stream entry는 고유 ID와 하나 이상의 field-value 쌍으로 구성된다. Redis Hash와 유사한 구조이다.
Entry ID: 1526919030474-0
┌─────────────┬──────────┐
│ Field │ Value │
├─────────────┼──────────┤
│ name │ Sara │
│ surname │ OConnor │
└─────────────┴──────────┘
2.2 Entry ID
Entry ID는 <millisecondsTime>-<sequenceNumber> 형식이다.
- millisecondsTime: Redis 노드의 로컬 시간 (밀리초 단위 Unix timestamp)
- sequenceNumber: 동일 밀리초 내에서의 순번 (0부터 시작)
# 자동 생성 (권장)
> XADD mystream * name Sara
"1526919030474-0"
# 명시적 지정 (ID는 항상 증가해야 함)
> XADD mystream 1526919030474-55 message "Hello"
"1526919030474-55"
# 부분 ID (시퀀스 자동 생성, Redis 7.0+)
> XADD mystream 1526919030474-* message "World"
"1526919030474-56"
ID 규칙:
- ID는 항상 증가해야 한다 (이전 ID보다 큰 값만 허용)
- 최소 유효 ID:
0-1 - 특수 ID:
$(최신),-(최소),+(최대)
2.3 데이터 구조 내부 (Radix Tree)
Redis Streams는 내부적으로 Radix Tree(기수 트리)를 사용하여 항목을 저장한다. 이 구조 덕분에 ID 기반의 범위 조회가 효율적이며, 메모리 사용도 최적화된다.
3. 핵심 명령어
3.1 XADD - 항목 추가
Stream에 새로운 entry를 추가한다. Stream이 존재하지 않으면 자동 생성된다.
XADD key [NOMKSTREAM] [MAXLEN | MINID [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
# 기본 사용
> XADD mystream * name Sara surname OConnor
"1526919030474-0"
# 여러 필드 추가
> XADD mystream * field1 value1 field2 value2 field3 value3
"1526919030475-0"
# Stream 미존재 시 생성하지 않음 (Redis 6.2+)
> XADD nonexistent NOMKSTREAM * field value
(nil)
# 추가와 동시에 트리밍 (대략적)
> XADD mystream MAXLEN ~ 1000 * field value
| 옵션 | 설명 | 도입 버전 |
|---|---|---|
NOMKSTREAM |
Stream이 없으면 생성하지 않음 | 6.2 |
MAXLEN [= \| ~] N |
최대 N개 항목 유지 (초과 시 오래된 항목 제거) | 5.0 |
MINID [= \| ~] id |
지정 ID보다 작은 항목 제거 | 6.2 |
LIMIT count |
트리밍 시 검사할 최대 항목 수 | 6.2 |
- 시간 복잡도: O(1) (추가), O(N) (트리밍, N = 제거되는 항목 수)
3.2 XREAD - 항목 읽기
하나 이상의 Stream에서 항목을 읽는다. 여러 Stream을 동시에 읽을 수 있다.
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
# 비블로킹 - 처음부터 2개 읽기
> XREAD COUNT 2 STREAMS mystream 0-0
1) 1) "mystream"
2) 1) 1) 1526984818136-0
2) 1) "duration"
2) "1532"
2) 1) 1526999352406-0
2) 1) "duration"
2) "812"
# 블로킹 - 새 항목이 올 때까지 최대 5초 대기
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
# 여러 Stream 동시 읽기
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
| 파라미터 | 설명 |
|---|---|
COUNT count |
각 Stream당 반환할 최대 항목 수 |
BLOCK milliseconds |
블로킹 대기 시간 (0 = 무한 대기) |
$ |
현재 시점 이후의 새 항목만 읽기 (첫 호출에만 사용) |
+ |
마지막 항목만 읽기 (Redis 7.4+) |
- 시간 복잡도: O(M) (M = 반환 항목 수)
주의:
$는 첫 번째 호출에서만 사용하고, 이후에는 마지막으로 받은 ID를 사용해야 한다. 그렇지 않으면 폴링 사이에 도착한 메시지가 누락될 수 있다.
3.3 XRANGE / XREVRANGE - 범위 조회
ID 범위로 항목을 조회한다.
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]
# 전체 조회
> XRANGE mystream - +
# 특정 범위 조회
> XRANGE mystream 1526984818136-0 1526999352406-0
# 단일 항목 조회
> XRANGE mystream 1526984818136-0 1526984818136-0
# 페이지네이션 (exclusive range)
> XRANGE mystream (1526984818136-0 + COUNT 10
# 역순 조회
> XREVRANGE mystream + - COUNT 5
- 특수 ID:
-(최소 ID),+(최대 ID) - Exclusive Range: ID 앞에
(를 붙이면 해당 ID 제외 - 시간 복잡도: O(N) (N = 반환 항목 수)
3.4 XLEN - 항목 수 조회
> XLEN mystream
(integer) 3
- 시간 복잡도: O(1)
3.5 XDEL - 항목 삭제
> XDEL mystream 1526984818136-0
(integer) 1
4. Consumer Group
Consumer Group은 여러 소비자(Consumer)가 하나의 Stream을 분산 처리하기 위한 메커니즘이다. Kafka의 Consumer Group 개념과 유사하다.
4.1 핵심 개념
┌──────────────┐
│ Stream │
│ (mystream) │
└──────┬───────┘
│
┌──────────┴──────────┐
│ Consumer Group │
│ (mygroup) │
├─────────────────────┤
│ last_delivered_id │
│ PEL (Pending List) │
└──┬───────┬───────┬──┘
│ │ │
┌────┴──┐ ┌──┴───┐ ┌─┴─────┐
│ C1 │ │ C2 │ │ C3 │
│ (PEL) │ │(PEL) │ │ (PEL) │
└───────┘ └──────┘ └───────┘
동작 원리:
- 각 메시지는 그룹 내 단 하나의 Consumer에게만 전달됨
- 메시지는 처리 완료(ACK)될 때까지 PEL(Pending Entries List)에 유지됨
- Consumer가 실패하면 다른 Consumer가 미처리 메시지를 인수(Claim)할 수 있음
4.2 XGROUP CREATE - 그룹 생성
XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
# Stream 처음부터 읽기
> XGROUP CREATE mystream mygroup 0
# 새 항목부터 읽기
> XGROUP CREATE mystream mygroup $
# Stream이 없으면 자동 생성
> XGROUP CREATE mystream mygroup $ MKSTREAM
| 옵션 | 설명 |
|---|---|
0 |
Stream의 처음부터 읽기 시작 |
$ |
현재 시점 이후의 새 항목부터 읽기 |
MKSTREAM |
Stream이 없으면 길이 0으로 자동 생성 |
ENTRIESREAD N |
Lag 추적을 위한 읽은 항목 수 설정 (Redis 7.0+) |
4.3 XREADGROUP - 그룹으로 읽기
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
# 새 메시지 읽기
> XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS mystream >
# 블로킹으로 대기
> XREADGROUP GROUP mygroup consumer1 BLOCK 2000 COUNT 10 STREAMS mystream >
# Consumer의 pending 메시지 조회
> XREADGROUP GROUP mygroup consumer1 STREAMS mystream 0
| ID 값 | 동작 |
|---|---|
> |
아직 전달되지 않은 새 메시지만 수신 |
0 또는 특정 ID |
해당 Consumer의 pending 메시지만 조회 |
NOACK 옵션: PEL에 추가하지 않음. 메시지 손실을 허용할 수 있는 경우에 사용.
4.4 XACK - 처리 완료 확인
> XACK mystream mygroup 1526569495631-0
(integer) 1
# 여러 메시지 한 번에 확인
> XACK mystream mygroup id1 id2 id3
XACK는 PEL에서 메시지를 제거하여 메모리를 해제한다. 처리가 완료된 메시지는 반드시 ACK 해야 한다.
4.5 기본 처리 루프
시작 시:
# 1단계: 이전 크래시로 인한 미처리 메시지 복구
WHILE true
entries = XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS mystream 0
IF entries가 비어있음
BREAK # pending 메시지 모두 처리 완료
FOR EACH message IN entries
process(message)
XACK mystream mygroup message.id
END
# 2단계: 새 메시지 처리
WHILE true
entries = XREADGROUP GROUP mygroup consumer1 BLOCK 2000 COUNT 10 STREAMS mystream >
IF entries == nil
CONTINUE # 타임아웃
FOR EACH message IN entries
process(message)
XACK mystream mygroup message.id
END
5. 메시지 신뢰성 보장
5.1 PEL (Pending Entries List)
PEL은 Consumer에게 전달되었지만 아직 ACK되지 않은 메시지들의 목록이다.
┌─ PEL Entry ────────────────────────────┐
│ Message ID: 1526569495631-0 │
│ Consumer: consumer1 │
│ Delivery Time: 1526569495631 │
│ Delivery Count: 1 │
└─────────────────────────────────────────┘
5.2 XPENDING - Pending 메시지 조회
# 요약 조회
> XPENDING mystream mygroup
1) (integer) 2 # 총 pending 메시지 수
2) 1526984818136-0 # 최소 ID
3) 1526984818137-0 # 최대 ID
4) 1) 1) "consumer1" # Consumer별 pending 수
2) "2"
# 상세 조회
> XPENDING mystream mygroup - + 10
1) 1) 1526984818136-0 # 메시지 ID
2) "consumer1" # Consumer
3) (integer) 196415 # 마지막 전달 후 경과 시간 (ms)
4) (integer) 1 # 전달 횟수
# 특정 시간 이상 유휴 상태인 메시지만 조회 (Redis 6.2+)
> XPENDING mystream mygroup IDLE 60000 - + 10
5.3 XCLAIM - 메시지 소유권 이전
실패한 Consumer의 메시지를 다른 Consumer가 인수받는다.
# 1시간 이상 유휴 상태인 메시지를 Alice에게 할당
> XCLAIM mystream mygroup Alice 3600000 1526569498055-0
1) 1) 1526569498055-0
2) 1) "message"
2) "orange"
| 옵션 | 설명 |
|---|---|
IDLE ms |
유휴 시간을 지정 값으로 설정 |
RETRYCOUNT count |
재시도 횟수 설정 |
FORCE |
PEL에 없는 ID도 강제 claim |
JUSTID |
ID만 반환, 재시도 횟수 미증가 |
5.4 XAUTOCLAIM - 자동 소유권 이전 (Redis 6.2+)
XPENDING + XCLAIM을 결합한 편의 명령어이다.
> XAUTOCLAIM mystream mygroup Alice 3600000 0-0 COUNT 25
1) "0-0" # 다음 호출 시 사용할 커서
2) 1) 1) "1609338752495-0" # claim된 메시지
2) 1) "field"
2) "value"
3) (empty array) # 삭제된 메시지 ID
SCAN 방식으로 동작하므로 대규모 PEL에서도 효율적이다.
5.5 장애 복구 패턴
Consumer 장애 감지 시:
1. XPENDING으로 실패한 Consumer의 pending 메시지 확인
2. XAUTOCLAIM으로 유휴 메시지를 다른 Consumer에게 이전
3. 전달 횟수(delivery count)가 임계값을 초과하면 Dead Letter Queue로 이동
6. Stream 관리
6.1 XTRIM - Stream 크기 조절
XTRIM key <MAXLEN | MINID> [= | ~] threshold [LIMIT count]
# 정확히 1000개만 유지
> XTRIM mystream MAXLEN 1000
# 대략적 트리밍 (성능 우선)
> XTRIM mystream MAXLEN ~ 1000
# 특정 ID 이전 항목 제거
> XTRIM mystream MINID 649085820
| 전략 | 설명 |
|---|---|
MAXLEN = N |
정확히 N개까지 트리밍 |
MAXLEN ~ N |
대략 N개까지 트리밍 (더 효율적) |
MINID = id |
지정 ID보다 작은 항목 제거 |
MINID ~ id |
대략적으로 지정 ID 이전 항목 제거 |
~(approximate) 사용 권장: 정확한 트리밍은 매 호출마다 전체를 검사해야 하지만, 대략적 트리밍은 매크로 노드 단위로 처리하여 훨씬 효율적이다.
6.2 XINFO - Stream 정보 조회
# Stream 기본 정보
> XINFO STREAM mystream
# Consumer Group 정보
> XINFO GROUPS mystream
# Consumer 정보
> XINFO CONSUMERS mystream mygroup
6.3 메모리 관리 전략
| 전략 | 설명 | 적합한 경우 |
|---|---|---|
XADD ... MAXLEN ~ |
추가 시 동시에 트리밍 | 실시간 처리, 크기 제한 필요 시 |
XTRIM MAXLEN ~ |
별도 배치로 트리밍 | 주기적 정리 |
XTRIM MINID ~ |
시간 기반 트리밍 | 보존 기간 기반 관리 |
| TTL (EXPIRE) | Stream 키 자체에 TTL 설정 | 임시 Stream |
7. Pub/Sub 및 List와의 비교
| 기능 | Redis Streams | Pub/Sub | List (BRPOP) |
|---|---|---|---|
| 메시지 영속성 | O (저장됨) | X (Fire & Forget) | X (소비 시 삭제) |
| Consumer Group | O | X | X |
| 다중 소비자 (Fan-out) | O | O | X |
| 분산 처리 | O (Consumer Group) | X | 부분적 (경쟁 소비) |
| 메시지 재읽기 | O (ID 기반) | X | X |
| 메시지 ACK | O | X | X |
| 범위 조회 | O | X | X |
| 블로킹 읽기 | O | O | O |
| 메시지 순서 보장 | O | O | O |
| Dead Letter 처리 | O (PEL + XCLAIM) | X | X |
사용 기준:
- Pub/Sub: 실시간 알림, 메시지 손실 허용, 구독자 모두 수신 필요
- List: 단순 작업 큐, 1:1 소비
- Streams: 메시지 영속성, 분산 처리, 신뢰성 보장이 필요한 메시지 큐
8. 활용 사례
8.1 이벤트 소싱 (Event Sourcing)
# 주문 이벤트 기록
> XADD order-events * type ORDER_CREATED orderId 12345 userId 100
> XADD order-events * type PAYMENT_COMPLETED orderId 12345 amount 50000
> XADD order-events * type ORDER_SHIPPED orderId 12345 trackingNo TRK001
# 특정 시점 이후 이벤트 조회
> XRANGE order-events 1526984818136-0 +
8.2 실시간 데이터 파이프라인
# Producer: 센서 데이터 수집
> XADD sensor:temp * deviceId D001 temperature 23.5 timestamp 1609459200
# Consumer Group으로 분산 처리
> XGROUP CREATE sensor:temp analytics-group 0 MKSTREAM
> XREADGROUP GROUP analytics-group worker1 COUNT 100 BLOCK 5000 STREAMS sensor:temp >
8.3 작업 큐 (Task Queue)
# 작업 등록
> XADD task-queue * type email to user@example.com subject "Welcome"
# 워커가 분산으로 처리
> XREADGROUP GROUP workers worker-1 COUNT 1 BLOCK 0 STREAMS task-queue >
# ... 처리 후 ACK
> XACK task-queue workers 1526569495631-0
8.4 활동 로그 / 감사 추적
# 사용자 활동 기록
> XADD audit-log * userId 100 action LOGIN ip 192.168.1.1
> XADD audit-log * userId 100 action UPDATE_PROFILE field email
# 시간 기반으로 보존 (7일)
> XTRIM audit-log MINID ~ <7일_전_timestamp>
9. Spring Data Redis 연동
Spring Data Redis 2.2부터 Redis Streams를 지원한다. 명령형(Imperative)과 반응형(Reactive) 두 가지 프로그래밍 모델을 모두 지원한다.
9.1 의존성
// build.gradle
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
9.2 메시지 Produce
RedisTemplate 사용
@RequiredArgsConstructor
@Service
public class StreamProducer {
private final RedisTemplate<String, Object> redisTemplate;
// Map 기반 레코드 추가
public RecordId send(String streamKey, Map<String, String> data) {
StringRecord record = StreamRecords.string(data)
.withStreamKey(streamKey);
return redisTemplate.opsForStream().add(record);
}
// Object 기반 레코드 추가
public RecordId send(String streamKey, Object payload) {
ObjectRecord<String, Object> record = StreamRecords.newRecord()
.in(streamKey)
.ofObject(payload);
return redisTemplate.opsForStream().add(record);
}
}
MAXLEN을 적용한 추가
StringRecord record = StreamRecords.string(data).withStreamKey("my-stream");
redisTemplate.opsForStream().add(record, StreamAddOptions.maxLen(1000).approximate());
9.3 메시지 Consume (동기)
// 최신 메시지 읽기
List<MapRecord<String, String, String>> messages = redisTemplate.opsForStream()
.read(StreamReadOptions.empty().count(10),
StreamOffset.latest("my-stream"));
// Consumer Group에서 읽기
List<MapRecord<String, String, String>> messages = redisTemplate.opsForStream()
.read(Consumer.from("my-group", "my-consumer"),
StreamReadOptions.empty().count(10),
StreamOffset.create("my-stream", ReadOffset.lastConsumed()));
9.4 메시지 Consume (비동기) - StreamMessageListenerContainer
비동기 메시지 수신을 위한 핵심 컴포넌트이다. 스레드 관리와 런타임 구독 변경을 지원한다.
StreamListener 구현
@Slf4j
@Component
public class OrderEventListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
log.info("Message ID: {}", message.getId());
log.info("Stream: {}", message.getStream());
log.info("Body: {}", message.getValue());
}
}
Container 설정
@Configuration
public class RedisStreamConfig {
@Bean
public StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer(
RedisConnectionFactory connectionFactory,
OrderEventListener listener) {
var options = StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.batchSize(10)
.build();
var container = StreamMessageListenerContainer.create(connectionFactory, options);
// 단순 구독 (Consumer Group 없이)
container.receive(
StreamOffset.fromStart("my-stream"),
listener
);
// Consumer Group으로 구독
container.receive(
Consumer.from("my-group", "my-consumer"),
StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
listener
);
// 자동 ACK로 구독
container.receiveAutoAck(
Consumer.from("my-group", "my-consumer"),
StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
listener
);
container.start();
return container;
}
}
9.5 Consumer Group 관리
@RequiredArgsConstructor
@Service
public class StreamGroupManager {
private final RedisTemplate<String, Object> redisTemplate;
// Consumer Group 생성
public void createGroup(String streamKey, String groupName) {
try {
redisTemplate.opsForStream()
.createGroup(streamKey, ReadOffset.from("0"), groupName);
} catch (RedisSystemException e) {
// BUSYGROUP: 이미 존재하는 그룹
log.warn("Consumer group already exists: {}", groupName);
}
}
// 메시지 ACK
public void acknowledge(String streamKey, String groupName, RecordId... recordIds) {
redisTemplate.opsForStream().acknowledge(streamKey, groupName, recordIds);
}
// Pending 메시지 조회
public PendingMessagesSummary pending(String streamKey, String groupName) {
return redisTemplate.opsForStream().pending(streamKey, groupName);
}
}
9.6 Object Mapping
Stream entry를 도메인 객체로 자동 매핑할 수 있다.
// 도메인 객체
@Data
public class OrderEvent {
private String orderId;
private String type;
private String userId;
}
// ObjectRecord로 추가
ObjectRecord<String, OrderEvent> record = StreamRecords.newRecord()
.in("order-events")
.ofObject(new OrderEvent("12345", "CREATED", "user1"));
redisTemplate.opsForStream().add(record);
// ObjectRecord로 읽기
List<ObjectRecord<String, OrderEvent>> records = redisTemplate.opsForStream()
.read(OrderEvent.class, StreamOffset.fromStart("order-events"));
ObjectRecord 기반 Listener
@Component
public class OrderEventObjectListener
implements StreamListener<String, ObjectRecord<String, OrderEvent>> {
@Override
public void onMessage(ObjectRecord<String, OrderEvent> message) {
OrderEvent event = message.getValue();
// event.getOrderId(), event.getType() ...
}
}
// Container에 ObjectMapper 설정
var options = StreamMessageListenerContainerOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.targetType(OrderEvent.class) // 자동 매핑
.build();
9.7 ReadOffset 전략
| ReadOffset | 단독 사용 | Consumer Group |
|---|---|---|
ReadOffset.latest() |
최신 메시지 읽기 | 최신 메시지 읽기 |
ReadOffset.from(id) |
특정 ID 이후 읽기 | 특정 ID 이후 읽기 |
ReadOffset.lastConsumed() |
지원 안 함 | 마지막 소비된 메시지 이후 (>) |
주의:
ReadOffset.latest()는 폴링 간격 사이에 도착한 메시지가 누락될 수 있다. 메시지 손실이 허용되지 않는 경우lastConsumed()또는 특정 ID를 사용해야 한다.
9.8 반응형 (Reactive) - StreamReceiver
ReactiveRedisConnectionFactory connectionFactory = ...;
var options = StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build();
StreamReceiver<String, MapRecord<String, String, String>> receiver =
StreamReceiver.create(connectionFactory, options);
Flux<MapRecord<String, String, String>> messages =
receiver.receive(StreamOffset.fromStart("my-stream"));
messages
.doOnNext(message -> {
log.info("ID: {}, Body: {}", message.getId(), message.getValue());
})
.subscribe();
반응형 모델은 Backpressure 기반의 수요 주도형 소비를 지원하여, 소비자의 처리 속도에 따라 메시지를 가져온다.