[Redis] Stream

Vorhandenheit ·2022년 10월 15일
0

Redis

목록 보기
4/5

[Redis] Stream

로그 데이터를 처리하기 위해서 새로 도입된 데이터 타입입니다.

레디스 스트림에서는 소비자(Consumer)를 지정해서 데이터를 읽을 수 있고,
그 소비자가 데이터를 제대로 처리했는지 확인하는 방법을 제공하며,
만약 제대로 처리하지 못했다면 다른 소비자에게 할당해서 처리하도록 하는 방법을 제공합니다.

다른 특징이 있다면, 레코드를 '소비자'가 가져옵니다.

1. 명령어

먼저 명령어를 보겠습니다.

XADD : 데이터 추가

XADD key ID field value [field2 value2...]

  • key : stream의 명칭을 가리킵니다
  • id 생성
    - (millisecondsTime) (sequenceNumber)로 구성됩니다.
  • 예시
xadd('task_stream', '*', 'task', 123)

결과는 밑에서 보겠습니다.

XLEN : 데이터 길이 조회

key 안에 담겨져 있는 ID 개수를 나타냅니다.

XLEN key

  • key : stream 명칭
  • 예시
xlen task_stream
=> (integer) 1

XRANGE : 데이터 조회

XRANGE key start end [Count] count

  • key : stream 명칭
  • 예시
xrange task_stream - +

XREAD : 데이터 읽기

XREAD [COUNT count][BLOCK milliseconds] STREAMS key

  • count : 읽어올 데이터 개수를 정합니다.
  • id : id를 가리킵니다.
  • Block : 새 데이터가 들어오기를 기다렸다 들어오면 읽습니다.
  • 예시
xread count 0 Block 0 STREAM task_stream 0

XDEL, XTRIM : 데이터 삭제

XDEL key ID

  • 예시
xdel task_stream 1526569495631-0

XTRIM key MAXLEN [~] count
이러헥 사용하면 메시지 개수를 10개로 유지합니다.

XGROUP : 소비자 그룹 만들기

레디스 키와 함께 소비자 그룹을 생성합니다.

XGROUP CREATE key group

  • 예시
xgroup create task_stream workers_group $

XREADGROUP : 소비자 그룹 데이터 엙기

현재 소비자가 처리해야할 모든 보류중 레코드를 읽습니다.

XREADGROUP GROUP group consumerName STREAMS key id

  • 예시
xreadgroup GROUP workers_group consumerName STREAMS task_stream 0
xreadgroup GROUP workers_group consumerName BLOCK 0 COUNT 1 STREAMS task_stream >

'BLOCK'을 추가하면, 빈 목록을 반환하는 대신 현재 사용 가능한 새 레코드가 없으면 호출이 차단되어야함을 나타냅니다. 0 은 무기한 대기를 나타냅니다

'>'은 이 소비자 그룹에서 아직 조회되지 않은 레코드를 가져오려 한다는 걸 알립니다.

XPENDING : 처리 여부 확인

XPENDING stream group

  • 예시
xpending task_stream workers_group

XACK : 처리 확정

XACK stream group id

  • 예시
xack taks_stream workers_group 1526569495631-0

XINFO CONSUMERS : 펜딩시간

스트림의 전체적인 정보를 확인합니다.

XINFO CONSUMERS stream group

xinfo consumers task_stream workers_group

3. 테스트

  • send
const Redis = require('ioredis')

const redisClient = new Redis()

const taskTest = [
    '데이터',
    '계속해서',
    '넣기',
]
async function main() {
    for (task of taskTest)
    await redisClient.xadd('test_stream', '*',
        'data', JSON.stringify(task))
    console.log('추가되었습니다')
}

setInterval(
    () => {
        main().catch(err => console.log(err))
    }, 1000 * 60
)
  • receive
const Redis = require('ioredis')


const redisClient = new Redis()

async function main () {
  await redisClient.xgroup('CREATE', 'test_stream', 'test_group', '$', "MKSTREAM") 
  // group 을 생성합니다.
  .catch(() => { console.log('group already exists')})

  const [[, records]] = await redisClient.xreadgroup( //구조 분해 할당
    'GROUP', 'test_group', 'data', 'STREAMS', 'test_stream', '0'
  )
    // 현재 소비자가 처리해야할 모든 보류중인 레코드를 읽음
    // 그런다음 읽고 싶은 읽고 싶은 스트림을 지정
  console.log(records) // 
 // group  key : value   // consumer
  while (true) {
    const [[, records]] = await redisClient.xreadgroup(
      'GROUP', 'test_group', 'data', 'BLOCK', '0', 'COUNT', '1', 'STREAMS', 'test_stream', '>'
    )
      // 실제 스트림에서 새로운 레코드에 대한 읽기를 시작
      // GROUP workers_group consumerName 읽기 작업에 사용할 소비자 그룹을 지정
      // 0 은 무기한 대기
    for (const [recordId, [, rawTask]] of records) {
      console.log(recordId, rawTask)
      await processAndAck(recordId, rawTask)
    }
  }
}

async function processAndAck( recordId) {
    await redisClient.xack('test_stream', 'wtest_group', recordId) // 데이터가 확실히 처리됐는지 확인
    .then(data => console.log(data))
}

main().catch(err => console.log(err))

4. 활용

const { createServer } = require('http');
const Redis = require('ioredis');
const staticHandler = require('serve-handler')
const ws = require('ws')


const redisClient = new Redis()
const redisClientXRead = new Redis()

const server = createServer((req, res) => {
  return staticHandler(req, res, {public : 'www'})
})

const wss = new ws.Server({ server })

wss.on('connection', async client => {
  
  client.on('message', msg => {
    redisClient.xadd('chat_stream', `${msg}`)
  }) // 메시지가 들어오면 해당 msg를 redis stream 으로 등록합니다.

  const logs = await redisClient.xrange('chat_stream') // streamd을 조회한 후

  for (const [, [, message]] of logs) {
    client.send(message)
  }
})

function broadcast (msg) {
  for (const client of wss.clients) {
    if (client.readyState === ws.OPEN) {
      client.send(msg)
    }
  }
}

let lastRecordId = '$'

async function processStreamMessages() {
  while (true) {
    const [[, records]] = await redisClientXRead.xread('BLOCK', '0', 'STREAMS', 'chat_stream', lastRecordId)
    // 새 레코드가 들어오기를 기다리고 있다 들어온다면 스트림을 읽고 id를 제공 후 메세지를 읽습니다.
    for (const [record, [, message]] of records) { 
      BroadcastChannel(message)
      lastRecordId = recordId
    }
  }
}

processStreamMessages().catch(err => console.log(err))

server.listen(8000)
profile
읽고 기록하고 고민하고 사용하고 개발하자!

0개의 댓글