๐ŸŒˆ [Section4] 8. [Spring WebFlux] Project Reactor

ํ˜„์ฃผยท2022๋…„ 11์›” 30์ผ
0

bootcamp

๋ชฉ๋ก ๋ณด๊ธฐ
62/71

๐Ÿ“• ์˜ค๋Š˜ ๋ฐฐ์šด ๋‚ด์šฉ!

  • Reactor (Project Reactor)
  • ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ (Marble Diagram)
  • ์Šค์ผ€์ค„๋Ÿฌ (Scheduler)
  • Operators

โœ๏ธ Reactor (Project Reactor)

  • ๋ฆฌ์•กํ‹ฐ๋ธŒ ์ŠคํŠธ๋ฆผ์ฆˆ ํ‘œ์ค€ ์‚ฌ์–‘์„ ๊ตฌํ˜„ํ•œ ๊ตฌํ˜„์ฒด ์ค‘ ํ•˜๋‚˜

  • ๋ฆฌ์•กํ‹ฐ๋ธŒํ•œ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์œผ๋กœ ๋™์ž‘ํ•˜๋Š”๋ฐ ์žˆ์–ด ํ•ต์‹ฌ์ ์ธ ์—ญํ• ์„ ๋‹ด๋‹นํ•˜๋Š” ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์„ ์œ„ํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ

[์ฐธ๊ณ ]
https://projectreactor.io/docs/core/release/api/
https://sjh836.tistory.com/185

โœ” Reactor ํŠน์ง•

๊ณต์‹ ํ™ˆํŽ˜์ด์ง€

  • Non-Blocking ํ†ต์‹ 
    โžœ ์“ฐ๋ ˆ๋“œ๊ฐ€ ์ฐจ๋‹จ๋˜์ง€ ์•Š์Œ

  • MSA(Microservice Architecture) ๊ตฌ์กฐ์— ์ ํ•ฉํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ
    ( MSA ๊ธฐ๋ฐ˜ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜๋“ค์€ ์š”์ฒญ ์“ฐ๋ ˆ๋“œ๊ฐ€ ์ฐจ๋‹จ๋˜๋Š” Blocking ํ†ต์‹ ์„ ์‚ฌ์šฉํ•˜๊ธฐ์—๋Š” ๋ฌด๋ฆฌ๊ฐ€ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— Non-Blocking ํ†ต์‹ ์„ ์™„๋ฒฝํžˆ ์ง€์›ํ•˜๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์—ฌ์•ผ ํ•˜๊ธฐ ๋•Œ๋ฌธ )

  • Operator๋กœ ์‹œ์ž‘ํ•ด์„œ Operator๋กœ ๋๋‚œ๋‹ค !

  • Mono[0|1]์™€ Flux[N]์ด๋ผ๋Š” ๋‘ ๊ฐ€์ง€ Publisher ํƒ€์ž… ์ œ๊ณต

    • Mono[0|1]์—์„œ 0๊ณผ 1
      โžœ 0๊ฑด or 1๊ฑด์˜ ๋ฐ์ดํ„ฐ๋ฅผ emit ํ•  ์ˆ˜ ์žˆ์Œ์„ ์˜๋ฏธ
    • Flux[N]์—์„œ N
      โžœ ์—ฌ๋Ÿฌ ๊ฑด์˜ ๋ฐ์ดํ„ฐ๋ฅผ emitํ•  ์ˆ˜ ์žˆ์Œ์„ ์˜๋ฏธ
  • Backpressure ์ „๋žต

    • Subscriber์˜ ์ฒ˜๋ฆฌ ์†๋„๊ฐ€ Publihser์˜ emit ์†๋„๋ฅผ ๋”ฐ๋ผ๊ฐ€์ง€ ๋ชปํ•  ๋•Œ ์ ์ ˆํ•˜๊ฒŒ ์ œ์–ดํ•˜๋Š” ์ „๋žต

    • DROP ์ „๋žต
      โžœ ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐจ๋ฉด, ์ดํ›„์— ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋Š” DROPํ•˜๋Š” ์ „๋žต

    • LATEST ์ „๋žต
      โžœ ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐจ๋ฉด, ๊ฐ€์žฅ ์ตœ๊ทผ์— emit๋œ ๋ฐ์ดํ„ฐ๋งŒ ๋‚จ๊ธฐ๊ณ  ํ๊ธฐํ•˜๋Š” ์ „๋žต

      ๐Ÿ’ก LATEST ์ „๋žต์ด DROP ์ „๋žต์ด๋ž‘ ๋น„์Šทํ•œ๋ฐ
      DROP์€ Downstream์—์„œ ๋ฒ„ํผ๋ฅผ ๋ชจ๋‘ ์ฒ˜๋ฆฌํ•  ๋•Œ ๊นŒ์ง€ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋“ค์„ ๋“ค์–ด์˜ฌ ๋•Œ๋งˆ๋‹ค ํ•˜๋‚˜ํ•˜๋‚˜ DROPํ•˜๋‹ค๊ฐ€, ๋ฒ„ํผ๊ฐ€ ๋น„๋ฉด ๊ทธ ๋•Œ emit๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฐจ๋ก€๋กœ ๋‹ค์‹œ ๋ฒ„ํผ์— ๋„ฃ๋Š” ์ „๋žต์ด๊ณ 
      โ €
      LATEST๋Š” Downstream์—์„œ ๋ฒ„ํผ๋ฅผ ๋ชจ๋‘ ์ฒ˜๋ฆฌํ•  ๋•Œ ๊นŒ์ง€ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋“ค์„ ๊ทธ๋Œ€๋กœ ๋Œ€๊ธฐ์—ด๋กœ ๋ชจ์•„๋†จ๋‹ค๊ฐ€, ๋ฒ„ํผ๊ฐ€ ๋น„๋ฉด ๊ทธ ๋•Œ ๋งˆ์ง€๋ง‰์œผ๋กœ(์ตœ๊ทผ์—) emit๋œ ๋ฐ์ดํ„ฐ๋งŒ ๋‚จ๊ธฐ๊ณ  ๋‚˜๋จธ์ง€ ๋Œ€๊ธฐ์—ด์˜ ๋ฐ์ดํ„ฐ๋“ค์„ ํ•œ๋ฒˆ์— ํ๊ธฐํ•˜๋Š” ์ „๋žต์ž„

    • BUFFER ์ „๋žต

      • BUFFER DROP LATEST
        โžœ ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐจ๋ฉด, ๋ฒ„ํผ ์•ˆ์— ์žˆ๋Š” ๋ฐ์ดํ„ฐ ์ค‘ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ฑ„์›Œ์ง„ ๋ฐ์ดํ„ฐ๋ฅผ DROPํ•˜๋Š” ์ „๋žต

      • BUFFER DROP OLDEST
        โžœ ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“ ์ฐจ๋ฉด, ๋ฒ„ํผ ์•ˆ์— ์žˆ๋Š” ๋ฐ์ดํ„ฐ ์ค‘ ๊ฐ€์žฅ ์˜ค๋ž˜๋œ ๋ฐ์ดํ„ฐ๋ฅผ DROPํ•˜๋Š” ์ „๋žต

๐Ÿ’ก ๋งŒ์•ฝ, Publisher์—์„œ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋Š์ž„์—†์ด emitํ•˜๋Š”๋ฐ Subscriber์˜ ์ฒ˜๋ฆฌ ์†๋„๊ฐ€ ๋Š๋ฆฌ๋‹ค๋ฉด,
๋Œ€๊ธฐํ•˜๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ์ง€์†์ ์œผ๋กœ ์Œ“์ด๋Š” ๊ฒƒ์„ ๋ฐฉ์น˜ํ•˜๊ฒŒ๋˜๋ฉด ์˜ค๋ฒ„ํ”Œ๋กœ์šฐ๊ฐ€ ๋ฐœ์ƒํ•˜๊ฒŒ ๋˜๊ณ  ๊ฒฐ๊ตญ์—” ์‹œ์Šคํ…œ์ด ๋‹ค์šด๋  ๊ฒƒ
โžœ Backpressure ์ „๋žต์œผ๋กœ ์ด๋ฅผ ๋ฐฉ์ง€
โ €
[์ฐธ๊ณ ] https://projectreactor.io/docs/core/release/reference/#reactive.backpressure


โœ๏ธ ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ (Marble Diagram)

  • ํ•˜๋‚˜์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ๋‹ค์ด์–ด๊ทธ๋žจ ์ƒ์—์„œ ์‹œ๊ฐ„์˜ ํ๋ฆ„์— ๋”ฐ๋ผ ์–ด๋–ป๊ฒŒ ๋ณ€ํ™”ํ•˜๋Š”์ง€ ๋ฐ์ดํ„ฐ์˜ ํ๋ฆ„์„ ํ‘œํ˜„ํ•œ ๊ทธ๋ฆผ

โžœ ๋ฆฌ์•กํ‹ฐ๋ธŒ ํ”„๋กœ๊ทธ๋ž˜๋ฐ์—์„œ๋Š” Operator์— ๋”ฐ๋ผ ๋ฐ์ดํ„ฐ ํ๋ฆ„์ด ๋‹ค์–‘ํ•˜๊ฒŒ ๋ณ€ํ™”ํ•˜๋Š”๋ฐ,
์ด ๋•Œ ๋ณต์žกํ•œ ๋ฐ์ดํ„ฐ์˜ ํ๋ฆ„์„ ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ์„ ํ†ตํ•ด ์ข€ ๋” ์‰ฝ๊ฒŒ ์ดํ•ด ๊ฐ€๋Šฅ

  • Reactor์—์„œ ์ œ๊ณตํ•˜๋Š” ์ˆ˜๋งŽ์€ Operaotr์˜ ๋‚ด๋ถ€ ๋™์ž‘์„ ์กฐ๊ธˆ ๋” ์‰ฝ๊ฒŒ ์ดํ•ดํ•˜๊ณ ,
    ํ•ด๋‹น Operator๋ฅผ ์ ์ ˆํ•˜๊ฒŒ ์‚ฌ์šฉํ•˜๋Š”๋ฐ ๋„์›€์„ ์คŒ

โœ”๏ธ ๋งˆ๋ธ” (Marble)

  • ์›๋ž˜ ๋‹จ์–ด์˜ ์˜๋ฏธ๋Š” '๊ตฌ์Šฌ'
  • ํ•˜๋‚˜์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋‚˜ํƒ€๋ƒ„
    โžœ ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ์ด ๊ตฌ์Šฌ(ํ•˜๋‚˜์˜ ๋ฐ์ดํ„ฐ)์ด ์–ด๋–ค ํ๋ฆ„์œผ๋กœ ๊ตด๋Ÿฌ๊ฐ€๋Š”์ง€ ํ‘œํ˜„ํ•œ ๊ฒƒ

[์ฐธ๊ณ ]
https://projectreactor.io/docs/core/release/reference/#howtoReadMarbles

โœ” Mono์˜ ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ

( Reactor์˜ ๋ฐ์ดํ„ฐ ํƒ€์ž… ์ค‘ ํ•˜๋‚˜์ธ Mono๋ฅผ ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ์œผ๋กœ ํ‘œํ˜„ํ•œ ๊ฒƒ )

  • ํ•œ๊ฐœ์˜ ๊ตฌ์Šฌ
    โžœ 0๊ฑด ๋˜๋Š” 1๊ฑด์˜ ๋ฐ์ดํ„ฐ๋งŒ emitํ•˜๋Š” Reactor ํƒ€์ž…์ž„์„ ์˜๋ฏธ

  • ์œ„ ์•„๋ž˜๋กœ ๋‘๊ฐœ์˜ ํƒ€์ž„ ๋ผ์ธ์ด ์žˆ๊ณ , ๋ชจ๋‘ ๋ฐ์ดํ„ฐ๊ฐ€ ํ˜๋Ÿฌ๊ฐ€๋Š” ์‹œ๊ฐ„์˜ ํ๋ฆ„์„ ํ‘œํ˜„
    ( ์™ผ โžœ ์˜ค ๋กœ ์‹œ๊ฐ„์ด ํ๋ฆ„ )

โ‘  ์›๋ณธ Mono(Original Mono)์—์„œ Sequence๊ฐ€ ์‹œ์ž‘๋˜๋Š” ๊ฒƒ์„ ํƒ€์ž„๋ผ์ธ์œผ๋กœ ํ‘œํ˜„
โ €
โ‘ก Mono์˜ Sequence์—์„œ ๋ฐ์ดํ„ฐ๊ฐ€ emit ๋˜๋Š” ๊ฒƒ์„ ํ‘œํ˜„
โ €
โ‘ข ์ˆ˜์ง ๋ง‰๋Œ€ ๋ฐ”๋Š” Mono์˜ Sequence๊ฐ€ ์ •์ƒ ์ข…๋ฃŒ๋˜์—ˆ๋‹ค๋Š” ๊ฒƒ์„ ์˜๋ฏธ
โ €
โ‘ฃ Mono์—์„œ ์ง€์›ํ•˜๋Š” ์–ด๋–ค Operator์—์„œ ์ž…๋ ฅ์œผ๋กœ ๋“ค์–ด์˜ค๋Š” ๊ตฌ์Šฌ ๋ชจ์–‘์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ๊ฐ€๊ณต ์ฒ˜๋ฆฌ๋˜๋Š” ๊ฒƒ์„ ํ‘œํ˜„
โ €
โ‘ค Operator์—์„œ ๊ฐ€๊ณต ์ฒ˜๋ฆฌ๋œ ๋ฐ์ดํ„ฐ๊ฐ€ Downstream์œผ๋กœ ์ „๋‹ฌ๋  ๋•Œ์˜ ํƒ€์ž„๋ผ์ธ
โ €
โ‘ฅ Mono์—์„œ emit๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ์ฒ˜๋ฆฌ๋˜๋Š” ๊ณผ์ •์— ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค๋ฉด X๋กœ ํ‘œ์‹œ
โ— |๋Š” ์ •์ƒ ์ข…๋ฃŒ / X๋Š” ์—๋Ÿฌ๋กœ ์ธํ•œ ๋น„์ •์ƒ ์ข…๋ฃŒ

โœ” Flux์˜ ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ

( Reactor์˜ ๋ฐ์ดํ„ฐ ํƒ€์ž… ์ค‘ ํ•˜๋‚˜์ธ Flux๋ฅผ ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ์œผ๋กœ ํ‘œํ˜„ํ•œ ๊ฒƒ )

  • Mono์™€๋Š” ๋‹ฌ๋ฆฌ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ๊ตฌ์Šฌ
    โžœ ์—ฌ๋Ÿฌ ๊ฐœ(0 โ€ฆ N)์˜ ๋ฐ์ดํ„ฐ๋ฅผ emitํ•˜๋Š” Reactor ํƒ€์ž…์ž„์„ ์˜๋ฏธ

( โฌ‡๏ธ ์•„๋ž˜ โ‘  ~ โ‘ฅ์€ ๊ทธ๋ƒฅ ํ‘œ๋ฅผ ์ฝ๋Š” ๋ฐฉ๋ฒ•์ด๋ฏ€๋กœ Mono์™€ ๊ฐ™์Œ )
โ €
โ‘  ์›๋ณธ Flux์—์„œ Sequence๊ฐ€ ์‹œ์ž‘๋˜๋Š” ๊ฒƒ์„ ํƒ€์ž„๋ผ์ธ์œผ๋กœ ํ‘œํ˜„
โ €
โ‘ก Flux์˜ Sequence์—์„œ ๋ฐ์ดํ„ฐ๊ฐ€ emit ๋˜๋Š” ๊ฒƒ์„ ํ‘œํ˜„
โ €
โ‘ข ์ˆ˜์ง ๋ง‰๋Œ€ ๋ฐ”๋Š” Flux์˜ Sequence๊ฐ€ ์ •์ƒ ์ข…๋ฃŒ๋˜์—ˆ๋‹ค๋Š” ๊ฒƒ์„ ์˜๋ฏธ
โ €
โ‘ฃ Flux์—์„œ ์ง€์›ํ•˜๋Š” ์–ด๋–ค Operator์—์„œ ์ž…๋ ฅ์œผ๋กœ ๋“ค์–ด์˜ค๋Š” ๊ตฌ์Šฌ ๋ชจ์–‘์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ๊ฐ€๊ณต ์ฒ˜๋ฆฌ๋˜๋Š” ๊ฒƒ์„ ํ‘œํ˜„
โ €
โ‘ค Operator์—์„œ ๊ฐ€๊ณต ์ฒ˜๋ฆฌ๋œ ๋ฐ์ดํ„ฐ๊ฐ€ Downstream์œผ๋กœ ์ „๋‹ฌ๋  ๋•Œ์˜ ํƒ€์ž„๋ผ์ธ
โ €
โ‘ฅ Flux์—์„œ emit๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ์ฒ˜๋ฆฌ๋˜๋Š” ๊ณผ์ •์— ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค๋ฉด X๋กœ ํ‘œ์‹œ
โ— |๋Š” ์ •์ƒ ์ข…๋ฃŒ / X๋Š” ์—๋Ÿฌ๋กœ ์ธํ•œ ๋น„์ •์ƒ ์ข…๋ฃŒ

๐Ÿ’ก main ์“ฐ๋ ˆ๋“œ / ๋ฐ๋ชฌ ์“ฐ๋ ˆ๋“œ
โ €โ €
โœ”๏ธ main ์“ฐ๋ ˆ๋“œ
โžœ ์ฃผ ์“ฐ๋ ˆ๋“œ
โ €
โœ”๏ธ ๋ฐ๋ชฌ ์“ฐ๋ ˆ๋“œ
โžœ ๋ณด์กฐ ์“ฐ๋ ˆ๋“œ ( ์ฃผ ์“ฐ๋ ˆ๋“œ์— ์˜ํ–ฅ์„ ๋ฐ›๋Š” ์“ฐ๋ ˆ๋“œ )
โžœ ์ฃผ ์“ฐ๋ ˆ๋“œ๊ฐ€ ์ข…๋ฃŒ๋˜๋ฉด ๋”ฐ๋ผ์„œ ์ข…๋ฃŒ๋จ

โœ” map() Operator์˜ ๋งˆ๋ธ” ๋‹ค์ด์–ด๊ทธ๋žจ ์˜ˆ์‹œ

  • map() Operator
    โžœ ์ž…๋ ฅ์œผ๋กœ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐœ๋ฐœ์ž๊ฐ€ ๊ตฌํ˜„ํ•˜๋Š” ๋™์ž‘๋Œ€๋กœ ๋ณ€ํ™˜ํ•ด์„œ Downstream์œผ๋กœ ์ „๋‹ฌํ•˜๋Š” ์—ญํ• 

  • ์œ„ ๊ทธ๋ฆผ์„ ์‚ดํŽด๋ณด๋ฉด,
    ์œ„์˜ sequence์—์„œ x๋กœ ๋“ค์–ด๊ฐ„ ๋ฐ์ดํ„ฐ๊ฐ€ ๋™์ผํ•œ ์ƒ‰์˜ ๊ฒฐ๊ณผ๊ฐ’์œผ๋กœ ๋ณ€ํ™˜๋˜์–ด Downstream์œผ๋กœ ์ „๋‹ฌ๋˜๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ์Œ


โœ๏ธ ์Šค์ผ€์ค„๋Ÿฌ (Scheduler)

  • ์“ฐ๋ ˆ๋“œ ๊ด€๋ฆฌ์ž
    โžœ ์—ฌ๋Ÿฌ ์“ฐ๋ ˆ๋“œ๋“ค์„ ์†์‰ฝ๊ฒŒ ๊ด€๋ฆฌ
    โžœ ๋ณต์žกํ•œ ๋ฉ€ํ‹ฐ์“ฐ๋ ˆ๋”ฉ ํ”„๋กœ์„ธ์Šค๋ฅผ ๋‹จ์ˆœํ•˜๊ฒŒ ํ•ด์คŒ

  • Reactor Sequence ์ƒ์—์„œ ์ฒ˜๋ฆฌ๋˜๋Š” ๋™์ž‘๋“ค์„ ํ•˜๋‚˜ ์ด์ƒ์˜ ์“ฐ๋ ˆ๋“œ์—์„œ ๋™์ž‘ํ•˜๋„๋ก ๋ณ„๋„์˜ ์“ฐ๋ ˆ๋“œ๋ฅผ ์ œ๊ณตํ•ด ์ฃผ๋Š” ๊ฒƒ

Scheduler ์ฐธ๊ณ 

โœ” Scheduler ์ „์šฉ Operator

( ์ ์ ˆํ•œ ์ƒํ™ฉ์— ๋งž๋Š” ์“ฐ๋ ˆ๋“œ๋ฅผ ์ถ”๊ฐ€๋กœ ์ƒ์„ฑํ•˜๋Š” Operator )

โœ”๏ธ subscribeOn() Operator

  • ๋ฐ์ดํ„ฐ ์†Œ์Šค์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ emitํ•˜๋Š” ์›๋ณธ Publisher์˜ ์‹คํ–‰ ์“ฐ๋ ˆ๋“œ๋ฅผ ์ง€์ •ํ•˜๋Š” ์—ญํ• 

  • ๊ตฌ๋… ์ง ํ›„ ์‹คํ–‰๋˜๋Š” Operator ์ฒด์ธ์˜ ์‹คํ–‰ ์“ฐ๋ ˆ๋“œ๋ฅผ Scheduler๋กœ ์ง€์ •ํ•œ ์“ฐ๋ ˆ๋“œ๋กœ ๋ณ€๊ฒฝ

    โœ”๏ธ doOnSubscribe() Operator

    • ๊ตฌ๋… ๋ฐœ์ƒ ์ง ํ›„์— ํŠธ๋ฆฌ๊ฑฐ ๋˜๋Š” Operator
    • ๊ตฌ๋… ์ง ํ›„์— ์–ด๋–ค ๋™์ž‘์„ ์ˆ˜ํ–‰ํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด doOnSubscribe()์— ๋กœ์ง์„ ์ž‘์„ฑํ•˜๋ฉด ๋จ
    • ์ด ์•ž์— subscribeOn() Operator ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ doOnSubscribe()์˜ ๋กœ์ง์€ ์“ฐ๋ ˆ๋“œ๊ฐ€ ๋ฐ”๋€Œ์ง€ ์•Š๊ณ  main์—์„œ ์‹คํ–‰๋จ
    • ์ดํ›„ ๋ฐ์ดํ„ฐ๋ฅผ ์ƒ์„ฑํ•˜๊ณ  emitํ•˜๋Š” ๋‹จ๊ณ„๋ถ€ํ„ฐ๋Š” ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ์—์„œ ์‹คํ–‰
  • ์ฃผ๋กœ Schedulers.boundedElastic() ์‚ฌ์šฉ

โœ”๏ธ publishOn() Operator

  • ์ „๋‹ฌ ๋ฐ›์€ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€๊ณต ์ฒ˜๋ฆฌํ•˜๋Š” Operator ์•ž์— ์ถ”๊ฐ€ํ•ด์„œ ์‹คํ–‰ ์“ฐ๋ ˆ๋“œ๋ฅผ ๋ณ„๋„๋กœ ์ถ”๊ฐ€ํ•˜๋Š” ์—ญํ• 

  • publishOn() ๊ธฐ์ค€ Downstream ์“ฐ๋ ˆ๋“œ๊ฐ€ publishOn()์˜ Scheduler๋กœ ์ง€์ •ํ•œ ์“ฐ๋ ˆ๋“œ๋กœ ๋ณ€๊ฒฝ

    โœ”๏ธ doOnNext() Operator

    • doOnNext() ๋ฐ”๋กœ ์•ž์— ์œ„์น˜ํ•œ Operator๊ฐ€ ์‹คํ–‰๋  ๋•Œ, ํŠธ๋ฆฌ๊ฑฐ ๋˜๋Š” Operator
  • ์ฃผ๋กœ Schedulers.parallel() ์‚ฌ์šฉ

๐Ÿ’ก Operator ์ฒด์ธ๋งˆ๋‹ค ์‹คํ–‰ ์“ฐ๋ ˆ๋“œ๋ฅผ ๊ตฌ๋ถ„ํ•ด์„œ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•˜๋Š” ์ด์œ 
Spring WebFlux ๊ธฐ๋ฐ˜์˜ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์€ ์ ์€ ์ˆ˜์˜ ์“ฐ๋ ˆ๋“œ๋กœ ๋Œ€๋Ÿ‰์˜ ์š”์ฒญ์„ Non-Blocking ๋ฐฉ์‹์œผ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” ๊ตฌ์กฐ์ž„
์ด ๊ตฌ์กฐ๋Š” ์“ฐ๋ ˆ๋“œ๊ฐ€ ๋ณต์žกํ•œ ๊ณ„์‚ฐ์ด ํ•„์š”ํ•œ ์ž‘์—…์˜ ๊ฒฝ์šฐ, ์‘๋‹ต ์ง€์—ฐ์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Œ !
โžœ Scheduler๋ฅผ ํ†ตํ•ด ๋ณ„๋„์˜ ์“ฐ๋ ˆ๋“œ๋ฅผ ์ƒ์„ฑํ•œ ํ›„, ๋ณต์žกํ•œ ๊ณ„์‚ฐ์„ ์ˆ˜ํ–‰ํ•˜๋„๋ก ํ•จ

Scheduler ์ง€์› ์“ฐ๋ ˆ๋“œ ์œ ํ˜• ์ฐธ๊ณ 


โœ๏ธ Operators

  • Reactor์—์„œ ๊ฐ€์žฅ ์ค‘์š”ํ•œ ๊ตฌ์„ฑ์š”์†Œ

โœ” ์ƒํ™ฉ๋ณ„๋กœ ๋ถ„๋ฅ˜๋œ Operator ๋ชฉ๋ก

Which operator do I need? ์ฐธ๊ณ 

โ‘  ์ƒˆ๋กœ์šด Sequence๋ฅผ ์ƒ์„ฑ(Creating)ํ•˜๊ณ ์ž ํ•  ๊ฒฝ์šฐ

  • just()

  • โญ fromStream()
    โžœ Java์˜ Stream์„ ์ž…๋ ฅ์œผ๋กœ ์ „๋‹ฌ ๋ฐ›์•„ emitํ•˜๋Š” Operator

  • โญ fromIterable()
    โžœ Java์˜Iterable์„ ์ž…๋ ฅ์œผ๋กœ ์ „๋‹ฌ ๋ฐ›์•„ emitํ•˜๋Š” Operator
    ( List, Map, Set ๋“ฑ์˜ ์ปฌ๋ ‰์…˜์„ fromIterable()์˜ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ „๋‹ฌ ๊ฐ€๋Šฅ )

  • fromArray()

  • range()

  • interval()

  • empty()

  • never()

  • defer()

  • using()

  • generate()

  • โญ create()
    โžœ ํ”„๋กœ๊ทธ๋ž˜๋ฐ ๋ฐฉ์‹์œผ๋กœ Signal ์ด๋ฒคํŠธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ค๋Š” Operator
    โžœ ํ•œ ๋ฒˆ์— ์—ฌ๋Ÿฌ ๊ฑด์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋น„๋™๊ธฐ์ ์œผ๋กœ emitํ•  ์ˆ˜ ์žˆ๋Š” Operator

    โœ”๏ธ Signal
    โžœ Publisher๊ฐ€ ๋ฐœ์ƒ์‹œํ‚ค๋Š” ์ด๋ฒคํŠธ

โ‘ก ๊ธฐ์กด Sequence์—์„œ ๋ณ€ํ™˜ ์ž‘์—…(Transforming)์ด ํ•„์š”ํ•œ ๊ฒฝ์šฐ

  • โญ map()
    โžœ ์•ˆ์—์„œ ๊ฐ€๊ณต์ฒ˜๋ฆฌ ํ•ด์คŒ

  • โญ flatMap()
    โžœ flatMap() ๋‚ด๋ถ€๋กœ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋งˆ๋‹ค ํ•˜๋‚˜์˜ ์ƒˆ๋กœ์šด Sequence๊ฐ€ ์ƒ์„ฑ๋จ
    ( ๋“ค์–ด์˜ค๋Š” Sequence๋Š” ํ•˜๋‚˜์ธ๋ฐ, ๋‚ด๋ถ€์—์„œ ์ž‘๋™๋˜๋Š” Sequence๋Š” ์—ฌ๋Ÿฌ๊ฐœ )
    โžœ flatMap() ๋‚ด๋ถ€์—์„œ ์ •์˜ํ•˜๋Š” Sequence = Inner Sequece
    โžœ But, ๊ฐ ์“ฐ๋ ˆ๋“œ์˜ ์ž‘์—…์˜ ์ฒ˜๋ฆฌ ์ˆœ์„œ๋ฅผ ๋ณด์žฅํ•˜์ง€ ์•Š์Œ ( ์ถœ๋ ฅ ๋žœ๋ค )

  • โญ concat()
    โžœ ์ž…๋ ฅ์œผ๋กœ ์ „๋‹ฌํ•˜๋Š” Publisher์˜ Sequence๋ฅผ ํ•˜๋‚˜๋กœ ์ด์–ด๋ถ™์—ฌ์„œ ์ฐจ๋ก€๋Œ€๋กœ ๋ฐ์ดํ„ฐ๋ฅผ emitํ•จ
    ( ์ˆœ์„œ๋Œ€๋กœ emit๋จ )

  • collectList()

  • collectMap()

  • merge()

  • โญ zip()
    โžœ ์ž…๋ ฅ์œผ๋กœ ์ „๋‹ฌ๋˜๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ Publisher Sequence์—์„œ emit๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฒฐํ•ฉํ•˜๋Š” Operator
    โžœ ๊ฐ๊ฐ์˜ Sequence์—์„œ emit๋˜๋Š” ๋ฐ์ดํ„ฐ ์ค‘ ๊ฐ™์€ ์ฐจ๋ก€(index)์˜ ๋ฐ์ดํ„ฐ๋“ค์ด ๊ฒฐํ•ฉ
    ( ๋‘ Sequence์˜ emit ์‹œ์ ์ด ๋งค๋ฒˆ ๋‹ค๋ฅด๋”๋ผ๋„ emit ์‹œ์ ์ด ๋Šฆ์€ ๋ฐ์ดํ„ฐ๊ฐ€ emit๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ ํ–ˆ๋‹ค๊ฐ€ ๋‘ ๊ฐœ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ „๋‹ฌ ๋ฐ›์•„ ๊ฒฐํ•ฉ )

    ๐Ÿ’ก ์—ฌ๊ธฐ์„œ์˜ '๊ฒฐํ•ฉ'
    โžœ ๊ฐ Publisher๊ฐ€ emitํ•˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ํ•˜๋‚˜์”ฉ ์ „๋‹ฌ ๋ฐ›์•„์„œ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ๋ฅผ ๋งŒ๋“  ํ›„์— Downstream์œผ๋กœ ์ „๋‹ฌํ•œ๋‹ค๋Š” ์˜๋ฏธ

  • then()

  • switchIfEmpty()
    โžœ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์œผ๋ฉด ๋‚ด๋ถ€์—์„œ ์ƒˆ Sequence๋ฅผ ๋งŒ๋“ค์–ด์„œ ๊ทธ Sequence์˜ ๊ฒฐ๊ณผ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฆฌํ„ดํ•  ์ˆ˜ ์žˆ๊ณ ,
    ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ์žˆ์œผ๋ฉด ๊ทธ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฆฌํ„ด

  • and()

  • when()

โ‘ข Sequence ๋‚ด๋ถ€์˜ ๋™์ž‘์„ ํ™•์ธ(Peeking)ํ•˜๊ณ ์ž ํ•  ๊ฒฝ์šฐ

  • doOnSubscribe

  • โญ doOnNext()
    โžœ ๋ฐ์ดํ„ฐ emit ์‹œ ํŠธ๋ฆฌ๊ฑฐ ๋˜์–ด ๋ถ€์ˆ˜ ํšจ๊ณผ(side-effect)๋ฅผ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ๋Š” Operator
    โžœ ๋ฆฌํ„ด๊ฐ’์ด ์—†์Œ
    โžœ ์ฃผ๋กœ ๋กœ๊น…(๋กœ๊ทธ๋ฅผ ๊ธฐ๋ก ๋˜๋Š” ์ถœ๋ ฅํ•˜๋Š” ์ž‘์—…)์— ์‚ฌ์šฉํ•˜์ง€๋งŒ,
    ๋ฐ์ดํ„ฐ๋ฅผ emitํ•˜๋ฉด์„œ ํ•„์š”ํ•œ ์ถ”๊ฐ€ ์ž‘์—…์ด ์žˆ๋‹ค๋ฉด doOnNext()์—์„œ ์ฒ˜๋ฆฌ

    โœ”๏ธ ๋ถ€์ˆ˜ ํšจ๊ณผ (side-effect)
    โžœ ์–ด๋–ค ๋™์ž‘์„ ์‹คํ–‰ํ•˜๋˜ ๋ฆฌํ„ด ๊ฐ’์ด ์—†๋Š” ๊ฒƒ

  • doOnError()

  • doOnCancel()

  • doFirst()

  • doOnRequest()

  • doOnTerminate()

  • doAfterTerminate()

  • doOnEach()

  • doFinally()

  • โญ log()
    โžœ Publisher์—์„œ ๋ฐœ์ƒํ•˜๋Š” Signal ์ด๋ฒคํŠธ ๋งˆ๋‹ค ๋กœ๊ทธ๋กœ ์ถœ๋ ฅํ•ด์ฃผ๋Š” ์—ญํ• 
    โžœ ์›ํ•˜๋Š” ๋งŒํผ ์ถ”๊ฐ€ ๊ฐ€๋Šฅ

โ‘ฃ Sequence์—์„œ ๋ฐ์ดํ„ฐ ํ•„ํ„ฐ๋ง(Filtering)์ด ํ•„์š”ํ•œ ๊ฒฝ์šฐ

  • โญ filter()
    โžœ ๋“ค์–ด์˜ค๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์–ด๋–ค ์กฐ๊ฑด์— ๋งž์ถฐ์„œ ํ•„ํ„ฐ๋ง

  • take()
    โžœ ํŒŒ๋ผ๋ฏธํ„ฐ์— ์ฒ˜๋ฆฌํ•˜๊ณ ์‹ถ์€ ๋ฐ์ดํ„ฐ์˜ ์ˆ˜๊ฐ€ ๋“ค์–ด๊ฐ

  • ignoreElements()

  • distinct()

  • โญ take()

  • next()

  • skip()

  • sample()

  • single()

โ‘ค ์—๋Ÿฌ๋ฅผ ์ฒ˜๋ฆฌ(Handling errors)ํ•˜๊ณ ์ž ํ•  ๊ฒฝ์šฐ

  • โญ error()
    โžœ Reactor Sequence ์ƒ์—์„œ ์˜๋„์ ์œผ๋กœ ์˜ˆ์™ธ๋ฅผ ๋˜์ ธ์„œ onError Signal ์ด๋ฒคํŠธ ๋ฐœ์ƒ์‹œํ‚ค๋Š”๋ฐ ์‚ฌ์šฉ
    ( throw ์™€ ๊ฐ™์Œ )

  • โญ timeout()
    โžœ ์ž…๋ ฅ์œผ๋กœ ์ฃผ์–ด์ง„ ์‹œ๊ฐ„๋™์•ˆ emit๋˜๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์œผ๋ฉด onError Signal ์ด๋ฒคํŠธ ๋ฐœ์ƒ ์‹œํ‚ด
    โžœ retry() Operator์™€ ํ•จ๊ป˜ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ๋งŽ์Œ
    ( ์ผ์ •ํ•œ ์‹œ๊ฐ„ ์•ˆ์— ์•ˆ์˜ค๋ฉด ๋‹ค์‹œ ์žฌ์‹œ๋„ ํ•ด๋ด๋ผ ! )

  • onErrorReturn()

  • onErrorResume()

  • onErrorMap()

  • doFinally()

  • โญ retry()
    โžœ Sequence ์ƒ์—์„œ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•  ๊ฒฝ์šฐ, ์ž…๋ ฅ์œผ๋กœ ์ฃผ์–ด์ง„ ์ˆซ์ž๋งŒํผ ์žฌ๊ตฌ๋…ํ•ด์„œ Sequence๋ฅผ ๋‹ค์‹œ ์‹œ์ž‘
    โžœ ํŒŒ๋ผ๋ฏธํ„ฐ์— ์žฌ์‹œ๋„ํ•  ์ˆ˜ ์žˆ๋Š” ํšŸ์ˆ˜๊ฐ€ ๋“ค์–ด๊ฐ
    โžœ timeout() Operator์™€ ํ•จ๊ป˜ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ๋งŽ์Œ


๐Ÿ˜œ ์‹ค์Šต

  • projects - be-template-reactor
    ( ๊ตฌํ˜„ ์‹ค์Šต๋ณด๋‹จ ์˜ˆ์‹œ์— ๊ฐ€๊นŒ์›€ )

๐ŸŒˆ ๋Š๋‚€์ 

์‚ฌ์‹ค ๋ญ”๊ฐ€ Stream์ด๋ž‘ ๋น„์Šทํ•ด์„œ ์ดํ•ด๋Š” ๊ทธ๋ž˜๋„ ์‰ฝ๊ธด ํ–ˆ๋Š”๋ฐ
Mono์™€ Flux๋ฅผ ๊ทธ๋ž˜์„œ ์–ธ์ œ ์–ด๋–ป๊ฒŒ ์จ์•ผ ํ•˜๋Š”์ง€๋Š” ์•„์ง ์ž˜ ๋ชจ๋ฅด๊ฒ ๋‹ค ! ใ…Žใ…Ž
๊ทธ๋ฆฌ๊ณ  Operator ๋„ˆ๋ฌด ๋งŽ์•„..
๋‚˜์ค‘์— ๋งˆ์ € ๊ณต๋ถ€ํ•ด์•ผ๊ฒ ๋‹ท

2๊ฐœ์˜ ๋Œ“๊ธ€

comment-user-thumbnail
2022๋…„ 12์›” 1์ผ

์ž˜ ๋ณด๊ณ ๊ฐ‘๋‹ˆ๋‹ค. ๋ฒค์น˜๋งˆํ‚นํ•ด์„œ ์ €๋„ ์ž˜ ์ •๋ฆฌํ•ด๋ด์•ผ๊ฒ ์–ด์š”. ๊ฐ™์ด ํ™”์ดํŒ…ํ•ด์—ฌ~!

1๊ฐœ์˜ ๋‹ต๊ธ€