C++ Socket Scaling with Threads

Chung Hwan·2022년 6월 13일
1
post-thumbnail

Multiplexing에 대한 얘기가 아니다.

Socket programming 을 하는데 threading 을 한다고 하면 일단 multiplexing이 먼저 떠오른다. read(sockfd, buf, size(buf)) 콜을 하는 순간 해당 sockfdsize(buf) 만큼 입력이 들어오지 않으면 프로세스는 그대로 멈춰버린다. 기본적으로 read 는 blocking-mode 이기 때문이다. 그렇기 때문에 단순히 accept 후 read를 반복하는 반복문으로 서버를 만들면, 클라이언트가 write 를 하지 않을 경우 영원히 멈춰버린다. 클라이언트가 하나이면 별 문제 없겠지만, 여럿이라면 문제가 된다. 이를 해결하기 위한 대표적인 두가지 방법이 select, poll 등 non-blocking으로 read를 하는 것과 threading을 하는 것이다.

select 는 간단히 말해 descriptor의 set (FD_SET) 에서 어떤 descriptor에 변화(read/write)가 있었는 지를 확인한다. 아무것도 변화가 없었으면, 서버는 아무것도 하지 않고 넘어가 반복문을 다시 반복하면 된다. 서버의 socket descriptor에 변화가 있었다면, 새로운 클라이언트가 연결을 요청한 것이므로 accept를 하면 되고 이때 받은 클라이언트의 socket descriptor에서 변화가 있었다면 해당 descriptor를 read하면 된다. 간단하다. 클라이언트가 뭔가를 썼는지 안 썼는지 먼저 확인하고, 안 썼다면 read를 하지 않고 썼을 때만 read를 하는 것이다. read를 할 때는 클라이언트가 반드시 무언가를 썼다는 것이 보장되므로 read에서 서버가 멈춰버리는 일을 방지할 수 있다.

Threading은 이것보다도 더 단순하다. 그냥 반복문을 여러개의 thread가 돌게 하는 거다. 하나의 thread가 read에서 멈추어도, 다른 thread가 클라이언트를 받으면 된다. 당연하게도 서버가 한 번에 처리할 수 있는 클라이언트의 수는 thread 개수에 의존적이 된다. Thread는 하드웨어 자원의 제한을 받으므로, 한정적인 수의 클라이언트만 처리할 수 있다.

그럼 무슨 얘기를 할거냐

Multiplexing 얘기가 아니라고 해놓고 multiplexing 얘기만 한참했다. 본론으로 돌아와서, 오늘은 non-blocking I/O로 multiplexing이 이미 구현이 되어있는 상황에서 threading을 통한 scaling을 해보려고 한다. 마치 ECS 에서 여러 개의 Fargate 인스턴스를 띄워놓고, 요청을 돌아가면서 처리하도록해 load balancing하는 것과 비슷하다. 요청을 처리하는 thread를 여러개 띄워놓고, 돌아가면서 요청을 처리하도록 할 것이다. 동시에 요청이 여러 개 오더라도 thread 개수만큼 병렬적으로 처리할 수 있으므로 퍼포먼스의 향상이 분명히 있을 것이다. 이것은 분명히 multiplexing과 다른 얘기다. 아무리 multiplex을 했다고 해도 반복문 하나로 서버를 돌리면 한번에 “처리"할 수 있는 요청은 하나다.

기본적인 아이디어는 이렇다.

핵심은 서버가 thread에 요청을 전달하는 것이다. 방법은 오래 고민하지 않았다. Queue를 쓰는 것이 적당해 보였다. Producer-consumer 구조로 만들면 될 것이다. thread마다 queue를 가지고 있고, 이를 계속 검사하며 무언가 들어오는 즉시 consume하도록 반복문을 돌린다. 서버는 요청을 받으면 돌아가면서 각 thread의 queue에 요청을 push 해준다. 만약 thread가 4개이고 요청이 8개가 왔다면 각 thread는 요청을 2개씩 처리하는 식이다.

ServerThread

Consumer이다. consume 함수를 보면 queue에 무언가 들어올 때까지 기다린 뒤 consume하는 것을 반복한다. 서버가 produce를 위해 invoke해야하는 push_request 함수도 있다. 서버는 이 함수를 통해 queue에 요청을 push한다. consume 함수 내부에서 pop된 요청은 handle_request 로 들어가 처리된다.

또한 server와 thread가 queue를 동시에 사용하면서 발생할 수 있는 동시성 이슈를 해결하기 위해 mutex를 활용했다.

초기화 시 thread를 생성하고 detach 시켜, 서버가 ServerThread 클래스를 생성했을 때 thread가 생성될 수 있도록 한다.

코드에서 관련 없는 부분은 모두 생략했다.

typedef std::pair<int, sockets::client_msg> request_data;

class ServerThread
{
public:
  void handle_request(int clientfd, sockets::client_msg msg)
  {
    server::server_response response;
    for (const sockets::client_msg_OperationData op : msg.ops())
    {
      auto reply = response.add_reps();
      reply->set_op_id(op.op_id());
      switch (op.type())
      {
      case sockets::client_msg_OperationType_PUT:
        on_put(op, reply);
        break;
        // ...
      }
    }
    // ...
	
	  secure_send(clientfd, buf.get(), msg_size + length_size_field);
  }

  void push_request(int clientfd, sockets::client_msg msg)
  {
    std::unique_lock<std::mutex> lk(_mt);
    _queue.push(new request_data(clientfd, msg));
    _cv.notify_one();
  }

  void consume()
  {
    if (_debug)
    {
      fmt::print("[*] Thread {0} ready\n", id);
    }

    while (true)
    {
      std::unique_lock<std::mutex> lk(_mt);
      while (_queue.empty())
        _cv.wait(lk);

      if (_queue.empty())
        continue;

      auto ptr = _queue.front();
      auto [clientfd, msg] = *ptr;
      handle_request(clientfd, msg);
      _queue.pop();
    }
  }
  // ...
  ServerThread(int tid /*, ... */)
  {
    id = tid;
    // ...
    _thread = std::thread(&ServerThread::consume, this);
    _thread.detach();
  }
};

Server

Producer이다. run 함수를 보면 처음에 원하는 개수만큼 thread를 생성한 뒤, 요청을 받는 것을 반복한다. 위에 언급한 select 를 이용한 I/O multiplexing이 구현되어 있다. recv_request 를 보면 서버가 요청을 받은 뒤 쓰레드의 queue 요청을 push 한다. 모든 thread에 같은 확률로 돌아가며 요청을 주는, 가장 단순한 방법인 Round robin으로 구현했다.

class Server
{
public:
	// ...

	void recv_request(int clientfd)
	{
		auto [bytecount, buffer] = secure_recv(clientfd);
		// ...

		sockets::client_msg msg;
		if (!msg.ParseFromArray(buffer.get(), bytecount))
		// ...

		_t_index = (_t_index + 1) % _no_threads;
		_threads[_t_index]->push_request(clientfd, msg);
	}

	void run()
	{
		for (int i = 0; i < _no_threads; i++)
		{
			_threads[i] = new ServerThread(i, _db, _debug);
		}

		listen();

		_is_running = true;
		while (_is_running)
		{
			// ...

			fd_set fds;
			int state = _fdl.select_read(&fds, &timeout);
      // ...

			for (int i = 0; i < _fdl.size(); i++)
			{
				fd = _fdl.get(i);
				if (!_fdl.is_set(fd, &fds))
					continue;

				if (fd == _sockfd)
				{
					client_soc = accept();
				}
				else
				{
					recv_request(fd);
				}
			}
		}
	}
	// ...
};

Result

결과적으로 성능 개선은 아래와 같았다.

Achieved scaling 1->2 2.1270017715972447 and 2->4 1.8961979618466085.

thread를 1개에서 2개로 했을 때 성능은 2.12배가 되었고, 2개에서 4개로 했을 때는 1.89배가 되었다. 예상대로 thread 개수와 비례해서 처리속도가 빨라지는 것을 확인할 수 있다. 물론 레이턴시가 thread 내에서만 발생하는 것이 아니기 때문에 완벽히 비례하지는 않는다. thread 내부에서 mutex처럼 다른 thread를 기다려야 하는 요소가 있다면 더더욱 그렇다.

구현이 쉽진 않았지만 개념적으로는 굉장히 단순하고 기본적이다. 항상 완성된 프레임워크로 개발하지만 한 번쯤은 이 정도 수준의 (상대적으로) 로우레벨로 구현해보는 것도 좋은 경험인 것 같다.

Further

이번에는 thread마다 각자의 queue를 가지고 서버가 Round Robin 으로 넣어주는 방식을 썼는데, 서버는 단일 queue에 요청을 넣고 이를 subscribe하고 있는 ideal한 thread가 consume하는 방식으로 개선할 수 있을 것 같다. 요청마다 걸리는 시간에 차이가 클 수 있다면, 단순히 RR로 처리할 경우 ideal한 thread가 있음에도 busy한 thread에 요청이 push 되는 비효율적인 상황이 발생할 수 있다. 또한 지금 구현된 방식에서는 어떤 thread에 문제가 생긴 경우, 서버가 이를 인지하고 해당 thread에 push를 하지 않아야 한다. 하지만 thread가 능동적으로 요청을 단일 queue에서 꺼내가도록 하면, thread에 문제가 생길 경우 해당 thread는 consume을 하지 않을 것이므로 성능 상 저하는 생겨도 처리되지 못하는 요청이 발생하지는 않는다.

0개의 댓글