로그 데이터를 처리하기 위해서 새로 도입된 데이터 타입입니다.
레디스 스트림에서는 소비자(Consumer)를 지정해서 데이터를 읽을 수 있고,
그 소비자가 데이터를 제대로 처리했는지 확인하는 방법을 제공하며,
만약 제대로 처리하지 못했다면 다른 소비자에게 할당해서 처리하도록 하는 방법을 제공합니다.
다른 특징이 있다면, 레코드를 '소비자'가 가져옵니다.
먼저 명령어를 보겠습니다.
XADD key ID field value [field2 value2...]
- key : stream의 명칭을 가리킵니다
- id 생성
- (millisecondsTime) (sequenceNumber)로 구성됩니다.
xadd('task_stream', '*', 'task', 123)
결과는 밑에서 보겠습니다.
key 안에 담겨져 있는 ID 개수를 나타냅니다.
XLEN key
- key : stream 명칭
xlen task_stream
=> (integer) 1
XRANGE key start end [Count] count
xrange task_stream - +
XREAD [COUNT count][BLOCK milliseconds] STREAMS key
- count : 읽어올 데이터 개수를 정합니다.
- id : id를 가리킵니다.
- Block : 새 데이터가 들어오기를 기다렸다 들어오면 읽습니다.
xread count 0 Block 0 STREAM task_stream 0
XDEL key ID
xdel task_stream 1526569495631-0
XTRIM key MAXLEN [~] count
이러헥 사용하면 메시지 개수를 10개로 유지합니다.
레디스 키와 함께 소비자 그룹을 생성합니다.
XGROUP CREATE key group
xgroup create task_stream workers_group $
현재 소비자가 처리해야할 모든 보류중 레코드를 읽습니다.
XREADGROUP GROUP group consumerName STREAMS key id
xreadgroup GROUP workers_group consumerName STREAMS task_stream 0
xreadgroup GROUP workers_group consumerName BLOCK 0 COUNT 1 STREAMS task_stream >
'BLOCK'을 추가하면, 빈 목록을 반환하는 대신 현재 사용 가능한 새 레코드가 없으면 호출이 차단되어야함을 나타냅니다. 0 은 무기한 대기를 나타냅니다
'>'은 이 소비자 그룹에서 아직 조회되지 않은 레코드를 가져오려 한다는 걸 알립니다.
XPENDING stream group
xpending task_stream workers_group
XACK stream group id
xack taks_stream workers_group 1526569495631-0
스트림의 전체적인 정보를 확인합니다.
XINFO CONSUMERS stream group
xinfo consumers task_stream workers_group
const Redis = require('ioredis')
const redisClient = new Redis()
const taskTest = [
'데이터',
'계속해서',
'넣기',
]
async function main() {
for (task of taskTest)
await redisClient.xadd('test_stream', '*',
'data', JSON.stringify(task))
console.log('추가되었습니다')
}
setInterval(
() => {
main().catch(err => console.log(err))
}, 1000 * 60
)
const Redis = require('ioredis')
const redisClient = new Redis()
async function main () {
await redisClient.xgroup('CREATE', 'test_stream', 'test_group', '$', "MKSTREAM")
// group 을 생성합니다.
.catch(() => { console.log('group already exists')})
const [[, records]] = await redisClient.xreadgroup( //구조 분해 할당
'GROUP', 'test_group', 'data', 'STREAMS', 'test_stream', '0'
)
// 현재 소비자가 처리해야할 모든 보류중인 레코드를 읽음
// 그런다음 읽고 싶은 읽고 싶은 스트림을 지정
console.log(records) //
// group key : value // consumer
while (true) {
const [[, records]] = await redisClient.xreadgroup(
'GROUP', 'test_group', 'data', 'BLOCK', '0', 'COUNT', '1', 'STREAMS', 'test_stream', '>'
)
// 실제 스트림에서 새로운 레코드에 대한 읽기를 시작
// GROUP workers_group consumerName 읽기 작업에 사용할 소비자 그룹을 지정
// 0 은 무기한 대기
for (const [recordId, [, rawTask]] of records) {
console.log(recordId, rawTask)
await processAndAck(recordId, rawTask)
}
}
}
async function processAndAck( recordId) {
await redisClient.xack('test_stream', 'wtest_group', recordId) // 데이터가 확실히 처리됐는지 확인
.then(data => console.log(data))
}
main().catch(err => console.log(err))
const { createServer } = require('http');
const Redis = require('ioredis');
const staticHandler = require('serve-handler')
const ws = require('ws')
const redisClient = new Redis()
const redisClientXRead = new Redis()
const server = createServer((req, res) => {
return staticHandler(req, res, {public : 'www'})
})
const wss = new ws.Server({ server })
wss.on('connection', async client => {
client.on('message', msg => {
redisClient.xadd('chat_stream', `${msg}`)
}) // 메시지가 들어오면 해당 msg를 redis stream 으로 등록합니다.
const logs = await redisClient.xrange('chat_stream') // streamd을 조회한 후
for (const [, [, message]] of logs) {
client.send(message)
}
})
function broadcast (msg) {
for (const client of wss.clients) {
if (client.readyState === ws.OPEN) {
client.send(msg)
}
}
}
let lastRecordId = '$'
async function processStreamMessages() {
while (true) {
const [[, records]] = await redisClientXRead.xread('BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId)
// 새 레코드가 들어오기를 기다리고 있다 들어온다면 스트림을 읽고 id를 제공 후 메세지를 읽습니다.
for (const [record, [, message]] of records) {
BroadcastChannel(message)
lastRecordId = recordId
}
}
}
processStreamMessages().catch(err => console.log(err))
server.listen(8000)