GraphQL Subscription With Federation Data Graph and Message Channel (feat. Redis, Kafka)

콜트·2023년 10월 26일
0

개요

  • 본 문서에서는 Apollo GraphQL 에서 지원하는 Federation 환경 에서 GraphQL Subscription 을 통해 Reactive(Server Push) 를 구현하는 방법을 기술한다.
  • 모든 예제 코드는 여기에 기재되어 있으니 참조할 것.
  • 참고로 본 문서에서 Kafka에 대한 구현은 따로 다루지 않으며, Redis에 대한 구현만 기재하였다. Kafka에 대한 샘플 구현이 궁금하다면 앞서 언급한 여기를 참조하도록 한다.

  • 본 문서에서 구현하는 Federation 환경에서의 Reactive 는 위 아키텍처를 기본 골자로 한다.
  • 이에 따라 필요한 구성요소는 아래와 같다.
    - GraphQL Federation Gateway
    - GraphQL API Server
    - GraphQL Subscription Server
    - Pub-Sub Channel (Redis, etc.)
    - Client for Receive Subscription Message (React App)

동작 방식

  1. 클라이언트(React App Browser) 실행시 아래 사진과 같이 websocket 이 연결됨.

해당 websocketGraphQL Subscription Server연결되어 추후에 Subscription Message 를 수신함.

  1. Postman 과 같은 API 클라이언트 프로그램을 이용하여 Gateway에 의해 Federated 된 GraphQL API 를 호출하면 GraphQL API Server 에서는 데이터를 적당히 처리한 다음 Pub-Sub Channel 에 Message를 발행한다.

  2. Subscription Server 에서는 Pub-Sub Channel 을 구독하고 있으며, 해당 메시지가 수신되면 적절히 소비하고 클라이언트(React App Browser)와 연결된 websocket을 통해 데이터를 내보낸다.

  3. 클라이언트(React App Browser)websocket으로부터 수신된 데이터를 이용해 화면을 갱신한다.

구현

GraphQL Federation 까지는 이전 단계에서 구현했으나, GraphQL API ServerGraphQL Subscription ServerPub-Sub Channel 로 연결해주어야 하므로 각각의 구현을 모두 기술하도록 한다.

A. GraphQL Subscription 을 지원하기 위해 GraphQL Subscription Server 를 구현한다.

A-1. 라이브러리 및 서버 구현 스펙 선택

편의상 이전 단계에서 진행했던 GraphQL API Server 와 동일한 것으로 선택한다.

A-2. Subscription 을 제공할 GraphQL Subscription ServerFederation 설정 및 Redis 설정에 대한 구현을 진행한다.

Federation 설정은 굳이 하지 않아도 상관없으나, 클라이언트에게 제공되는 GraphQL Schema에 포함되지 않는 불편함이 있으므로 함께 작성하도록 한다.

package com.example.subscriptionservice.config.graphql

import graphql.GraphQLError
import graphql.execution.DataFetcherExceptionHandler
import graphql.execution.DataFetcherExceptionHandlerParameters
import graphql.execution.DataFetcherExceptionHandlerResult
import graphql.language.SourceLocation

class CustomDataFetcherExceptionHandler : DataFetcherExceptionHandler {

    override fun onException(handlerParameters: DataFetcherExceptionHandlerParameters): DataFetcherExceptionHandlerResult {
        handlerParameters.exception.printStackTrace()
        val errors = when (val exception = handlerParameters.exception) {
            else -> {
                exception.printStackTrace()
                listOf(customGraphQLError(message = exception.localizedMessage ?: "your error message"))
            }
        }

        return DataFetcherExceptionHandlerResult.newResult()
            .errors(errors)
            .build()
    }

    private fun customGraphQLError(message: String): GraphQLError {
        return object : GraphQLError {
            override fun getMessage() = message

            override fun getLocations() = mutableListOf<SourceLocation>()

            override fun getErrorType() = null

            override fun getExtensions() = mapOf<String, Any>()
        }
    }
}
package com.example.subscriptionservice.config.graphql

import com.expediagroup.graphql.generator.annotations.GraphQLDescription
import com.expediagroup.graphql.generator.execution.FlowSubscriptionExecutionStrategy
import com.expediagroup.graphql.generator.federation.execution.FederatedTypeResolver
import com.expediagroup.graphql.generator.hooks.SchemaGeneratorHooks
import com.expediagroup.graphql.server.Schema
import graphql.GraphQL
import graphql.execution.DataFetcherExceptionHandler
import graphql.schema.GraphQLSchema
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.*

@Configuration
class GraphQLConfiguration {

    @GraphQLDescription("Sample GraphQL Schema")
    @Bean
    fun graphQLSchema(): Schema {
        return object : Schema {}
    }

    /**
     *
     * ```graphql
     * query {
     *   _service {
     *     sdl
     *   }
     * }
     * ```
     *
     * - 참고 : Federation 설정이 진행되면 위 쿼리를 이용해서 SuperGraph 에 SubGraph를 제공할 수 있게 됨.
     * - 참고 : 작성시점을 기준으로 제공되는 스키마 정보는 federation 버전이 2.5임. 그에 맞게 게이트웨이에서 수집하는 federation 버전도 수정해주어야 함.
     * @see com.expediagroup.graphql.generator.SchemaGenerator
     * @see com.expediagroup.graphql.generator.SchemaGeneratorConfig
     * @see com.expediagroup.graphql.generator.hooks.SchemaGeneratorHooks
     * @see com.expediagroup.graphql.generator.hooks.NoopSchemaGeneratorHooks
     * @see com.expediagroup.graphql.generator.hooks.FlowSubscriptionSchemaGeneratorHooks
     * @see com.expediagroup.graphql.server.spring.NonFederatedSchemaAutoConfiguration
     *
     * @see com.expediagroup.graphql.generator.federation.FederatedSchemaGenerator
     * @see com.expediagroup.graphql.generator.federation.FederatedSchemaGeneratorConfig
     * @see com.expediagroup.graphql.generator.federation.FederatedSchemaGeneratorHooks
     * @see com.expediagroup.graphql.generator.federation.FederatedSchemaGeneratorHooks.willBuildSchema // 이 함수에서 Federation 관련 설정이 진행됨.
     * @see com.expediagroup.graphql.generator.federation.types.SERVICE_OBJECT_TYPE // 상단에 명시된 쿼리 참조.
     */
    @Bean
    fun federatedSchemaGeneratorHooks(resolvers: Optional<List<FederatedTypeResolver>>): SchemaGeneratorHooks {
        return CustomFederationSchemaGeneratorHooks(resolvers.orElse(emptyList()))
    }

    /**
     * @see com.expediagroup.graphql.server.spring.GraphQLSchemaConfiguration
     * @see com.expediagroup.graphql.server.spring.SubscriptionAutoConfiguration
     * @see com.expediagroup.graphql.generator.execution.FlowSubscriptionExecutionStrategy
     * @see com.expediagroup.graphql.server.execution.subscription.GraphQLWebSocketServer
     * @see com.expediagroup.graphql.server.spring.SubscriptionGraphQLWsAutoConfiguration
     */
    @Bean
    fun graphQL(schema: GraphQLSchema?): GraphQL {
        val dataFetcherExceptionHandler = object : DataFetcherExceptionHandler {} // 기본 구현체 사용
//        val customDataFetcherExceptionHandler = CustomDataFetcherExceptionHandler() // 사용자 정의 구현체 사용
        return GraphQL.newGraphQL(schema)
            .subscriptionExecutionStrategy(FlowSubscriptionExecutionStrategy(dataFetcherExceptionHandler))
            .build()
    }
}
  • 이전에 작성했던 GraphQL API Server 와 차이점은 아래와 같다(66번 라인).
    - .subscriptionExecutionStrategy(FlowSubscriptionExecutionStrategy(dataFetcherExceptionHandler))
package com.example.subscriptionservice.config.graphql

import com.expediagroup.graphql.generator.federation.FederatedSchemaGeneratorHooks
import com.expediagroup.graphql.generator.federation.execution.FederatedTypeResolver

class CustomFederationSchemaGeneratorHooks(resolvers: List<FederatedTypeResolver>) :
    FederatedSchemaGeneratorHooks(resolvers) {
}
# application.yaml

logging:
  level:
    root: debug

graphql: # https://opensource.expediagroup.com/graphql-kotlin/docs/server/spring-server/spring-properties 참조.
  packages: # packages 항목을 제외한 나머지는 모두 기본값이 존재함. GraphQLConfigurationProperties 참조.
    - "com.example.subscriptionservice"
#  playground:
#    enabled: true
#    endpoint: playground
#  subscriptions:
#    endpoint: graphql
#  sdl:
#    enabled: true
#    endpoint: sdl

server:
  port: 8081

spring:
  data:
    redis:
      client-type: lettuce
      host: ${REDIS_HOST:localhost}
      port: ${REDIS_PORT:6379}
package com.example.subscriptionservice.config.redis

import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Primary
import org.springframework.data.redis.connection.*
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory

@Configuration
class RedisConfiguration {

    @Value("\${spring.data.redis.host}")
    private lateinit var redisHost: String

    @Value("\${spring.data.redis.port}")
    private lateinit var redisPort: String

    /**
     * ```
     * @Primary 처리 이유.
     * -> Parameter 0 of method reactiveStringRedisTemplate in org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration required a single bean, but 2 were found:
     * 	- connectionFactory: defined by method 'connectionFactory' in class path resource [com/example/subgraphservice/config/redis/RedisConfiguration.class]
     * 	- redisConnectionFactory: defined by method 'redisConnectionFactory' in class path resource [org/springframework/boot/autoconfigure/data/redis/LettuceConnectionConfiguration.class]
     * 	```
     *
     * @see org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration
     * @see org.springframework.data.redis.core.ReactiveRedisTemplate // ReactiveRedisConnectionFactory Bean 존재시 자동으로 Bean 등록.
     * @see org.springframework.data.redis.core.ReactiveStringRedisTemplate  // ReactiveRedisConnectionFactory Bean 존재시 자동으로 Bean 등록.
     * @see org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
     * @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration // RedisConnectionFactory 타입 Bean이 존재하지 않을 경우 자동으로 redisConnectionFactory Bean을 등록함.
     */
    @Primary // 상기 doc에 기재된 내용 참조.
    @Bean
    fun connectionFactory(): ReactiveRedisConnectionFactory {
        return LettuceConnectionFactory(RedisStandaloneConfiguration(redisHost, redisPort.toInt()))
    }

    /**
     * ```java
     *  @Bean
     * 	@ConditionalOnMissingBean(RedisConnectionFactory.class)
     * 	LettuceConnectionFactory redisConnectionFactory(
     * 			ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
     * 			ClientResources clientResources) {
     * 		LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(builderCustomizers, clientResources,
     * 				getProperties().getLettuce().getPool());
     * 		return createLettuceConnectionFactory(clientConfig);
     * 	}
     * 	```
     *
     * 	@see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration // RedisConnectionFactory 타입 Bean이 존재하지 않을 경우 자동으로 redisConnectionFactory Bean을 등록함.
     */
    //    @Bean
    fun redisConnectionFactory(): RedisConnectionFactory {
        return LettuceConnectionFactory(RedisStandaloneConfiguration(redisHost, redisPort.toInt()))
    }
}
package com.example.subscriptionservice.config.redis

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer

@Configuration
class RedisMessagingConfiguration(
    private val connectionFactory: ReactiveRedisConnectionFactory,
) {

    @Bean
    fun redisMessageListenerContainer(): ReactiveRedisMessageListenerContainer {
        return ReactiveRedisMessageListenerContainer(connectionFactory)
    }
}

A-3. Redis 설정에 대한 구현을 바탕으로 GraphQL Subscription 에 대한 구현을 진행한다.

package com.example.subscriptionservice.domain.book.api

import com.expediagroup.graphql.generator.annotations.GraphQLDescription
import com.expediagroup.graphql.server.operations.Subscription
import org.springframework.data.redis.listener.ChannelTopic
import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux

@Component
class BookGraphQLSubscription(
    private val redisMessageListenerContainer: ReactiveRedisMessageListenerContainer,
) : Subscription {


    companion object {
        private val channelTopic = ChannelTopic.of("pickupBook")
    }

    @GraphQLDescription("사용자가 책을 집을 경우 이에 대한 정보를 알린다")
    fun notifyPickupBook(): Flux<String> {
        return redisMessageListenerContainer.receive(channelTopic)
            .map { it.message }
            .doOnEach { message -> println("Message has received. message: $message") }
    }
}
  • GraphQL API 를 제공하기 위해서는 필수적으로 SchemaQuery 가 제공되어야 함에 유의할 것.

이후, 앞서 구현했던 GraphQL Federation GatewayGraphQL Subscription ServerFederation 연결도 GraphQL Subgraph Server 와 동일하게 진행한다.

B. Pub-Sub Channel 은 편의상 Redis Docker Container 를 사용한다.

docker build -t <your-image-tag> .
docker run --name <your-container-name> -d -p 6379:6379 <your-image-tag>

C. GraphQL Subscription ServerRedis 에 메시지를 발행할 수 있도록 설정 및 데이터 처리 구현을 진행한다.

C-1. Redis 설정을 구현한다.

package com.example.subgraphservice.config.redis

import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Primary
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
import org.springframework.data.redis.connection.RedisConnectionFactory
import org.springframework.data.redis.connection.RedisStandaloneConfiguration
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory

@Configuration
class RedisConfiguration {

    @Value("\${spring.data.redis.host}")
    private lateinit var redisHost: String

    @Value("\${spring.data.redis.port}")
    private lateinit var redisPort: String

    /**
     * ```
     * @Primary 처리 이유.
     * -> Parameter 0 of method reactiveStringRedisTemplate in org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration required a single bean, but 2 were found:
     * 	- connectionFactory: defined by method 'connectionFactory' in class path resource [com/example/subgraphservice/config/redis/RedisConfiguration.class]
     * 	- redisConnectionFactory: defined by method 'redisConnectionFactory' in class path resource [org/springframework/boot/autoconfigure/data/redis/LettuceConnectionConfiguration.class]
     * 	```
     *
     * @see org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration
     * @see org.springframework.data.redis.core.ReactiveRedisTemplate // ReactiveRedisConnectionFactory Bean 존재시 자동으로 Bean 등록.
     * @see org.springframework.data.redis.core.ReactiveStringRedisTemplate  // ReactiveRedisConnectionFactory Bean 존재시 자동으로 Bean 등록.
     * @see org.springframework.data.redis.connection.ReactiveRedisConnectionFactory
     * @see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration // RedisConnectionFactory 타입 Bean이 존재하지 않을 경우 자동으로 redisConnectionFactory Bean을 등록함.
     */
    @Primary // 상기 doc에 기재된 내용 참조.
    @Bean
    fun connectionFactory(): ReactiveRedisConnectionFactory {
        return LettuceConnectionFactory(RedisStandaloneConfiguration(redisHost, redisPort.toInt()))
    }

        /**
         * ```java
         *  @Bean
         * 	@ConditionalOnMissingBean(RedisConnectionFactory.class)
         * 	LettuceConnectionFactory redisConnectionFactory(
         * 			ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
         * 			ClientResources clientResources) {
         * 		LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(builderCustomizers, clientResources,
         * 				getProperties().getLettuce().getPool());
         * 		return createLettuceConnectionFactory(clientConfig);
         * 	}
         * 	```
         *
         * 	@see org.springframework.boot.autoconfigure.data.redis.LettuceConnectionConfiguration // RedisConnectionFactory 타입 Bean이 존재하지 않을 경우 자동으로 redisConnectionFactory Bean을 등록함.
         */
    //    @Bean
    fun redisConnectionFactory(): RedisConnectionFactory {
        return LettuceConnectionFactory(RedisStandaloneConfiguration(redisHost, redisPort.toInt()))
    }
}

C-2. API 호출 시 데이터 처리 이후 Redis 에 메시지 발행을 구현한다.

package com.example.subgraphservice.domain.book.api

import com.example.subgraphservice.domain.book.datasource.BookDataSource
import com.expediagroup.graphql.generator.annotations.GraphQLDescription
import com.expediagroup.graphql.server.operations.Query
import org.springframework.data.redis.core.ReactiveStringRedisTemplate
import org.springframework.stereotype.Component

@Component
class BookGraphQLQuery(
    private val redisTemplate: ReactiveStringRedisTemplate,
) : Query {

    @GraphQLDescription("번호로 책을 집는다")
    fun pickupBook(number: Int): BookResponse {
        val book = BookDataSource.getBook(number)
        redisTemplate.convertAndSend(
            "pickupBook",
            "book has picked up with number: $number. it's title is [${book.title}]"
        ).subscribe()
        return BookResponse(book)
    }
}

object BookDataSource {

    private val books = mapOf(0 to "어린왕자", 1 to "보물섬", 2 to "피터팬")

    fun getBook(number: Int): Book {
        val title = books[number] ?: throw RuntimeException("번호에 해당하는 책의 정보를 찾지 못했습니다. number: $number")
        return Book(number, title)
    }
}

data class Book(
    val number: Int,
    val title: String,
)

D. React App

D-1. GraphQL Subscription Server 가 제공하는 Subscription 을 사용하기 위해 React App을 구현한다.

// apollo.js
import {ApolloClient, HttpLink, InMemoryCache, split} from "@apollo/client";
import {createClient} from "graphql-ws";
import {getMainDefinition} from "@apollo/client/utilities";
import {GraphQLWsLink} from "@apollo/client/link/subscriptions";

const wsLink = new GraphQLWsLink(
  createClient({
    url: process.env.REACT_APP_SUBSCRIPTIONS_API_URL,
    connectionParams: () => {
      // simulate an auth token sent from the client over the WS connection
      const token = "some-token";
      return { ...(token && { token }) };
    }
  })
);

const httpLink = new HttpLink({
  uri: process.env.REACT_APP_GATEWAY_API_URL
});

const link = split(
  ({ query }) => {
    const definition = getMainDefinition(query);
    return (
      definition.kind === "OperationDefinition" &&
      definition.operation === "subscription"
    );
  },
  wsLink,
  httpLink
);

const client = new ApolloClient({
  cache: new InMemoryCache(),
  link
});

export default client;
// subscriptions.js
import {gql} from "@apollo/client";

export const NOTIFY_PICKUP_BOOK = gql`
  subscription NotifyPickupBook {
      notifyPickupBook
  }
`
// NotiAboutBookOfPickedUpPage.js
import React from "react";
import {useSubscription} from "@apollo/client";
import {NOTIFY_PICKUP_BOOK} from "../graphql/subscriptions";

function NotiAboutBookOfPickedUpPage() {
  const {data, loading} = useSubscription(NOTIFY_PICKUP_BOOK);

  if (loading) {
    return <p>Loading...</p>;
  }

  return (
      <div>
        <h2>Notification About Book of Picked Up</h2>
        {data && data.notifyPickupBook ? (
            <p>{data.notifyPickupBook}</p>
        ) : (
            <p>There is No Book Picked Up!</p>
        )}
      </div>
  );
}

export default NotiAboutBookOfPickedUpPage;
// App.js

import {ApolloProvider} from "@apollo/client";
import {BrowserRouter as Router, Route, Switch} from "react-router-dom";
import React from "react";
import client from "./graphql/apollo";

import "./App.css";
import NotiAboutBookOfPickedUpPage from "./pages/NotiAboutBookOfPickedUpPage";

function App() {
  return (
    <ApolloProvider client={client}>
      <Router>
        <Switch>
          <Route path="*" component={NotiAboutBookOfPickedUpPage} />
        </Switch>
      </Router>
    </ApolloProvider>
  );
}

export default App;
// index.js

import React from "react";
import ReactDOM from "react-dom";

import App from "./App";

import "./index.css";

ReactDOM.render(
  <React.StrictMode>
    <App />
  </React.StrictMode>,
  document.getElementById("root")
);
// .env

REACT_APP_GATEWAY_API_URL=http://localhost:4000/graphq
REACT_APP_SUBSCRIPTIONS_API_URL=ws://localhost:8081/subscriptions

실행

1. 의존 관계에 맞게 서버 및 클라이언트를 아래의 순서대로 순차적으로 실행한다.

  1. Pub-Sub Channel (Redis, etc.)
  2. GraphQL Subscription Server
  3. GraphQL API Server
  4. GraphQL Federation Gateway
  5. Client for Receive Subscription Message (React App)

2. 브라우저에 접속한다(http://localhost:3000)

GraphQL Subscription Serverwebsocket 으로 정상적으로 연결되었다면 아래와 같은 화면이 보일 것임.

3. API Client Tool (e.g. Postman)을 이용, 게이트웨이 엔드포인트를 통해 API 호출.

4. 브라우저에서 실시간으로 데이터 갱신되는 것 확인.

이로써 GraphQL Federation + Subscription 구현이 완료되었다.

참고 자료

Node.js

Apollo

Redis

GraphQL Kotlin (ExpediaGroup)

Spring Redis

Spring Kafka

Reactor Core

etc.

profile
개발 블로그이지만 꼭 개발 이야기만 쓰라는 법은 없으니, 그냥 쓰고 싶은 내용이면 뭐든 쓰려고 합니다. 코드는 깃허브에다 작성할 수도 있으니까요.

0개의 댓글