[ZeroMQ] Using ZMQ

Hα ყҽσɳɠ·2020년 3월 14일
0

project2

목록 보기
4/10
post-thumbnail

지난 포스팅에서는 ZeroMQ의 설치 방법과 간단한 코드를 실행시키는 방법에 대해 알아보았다. 이번 포스팅에서는 ZeroMQ를 이용한 오픈소스 프로그램을 다운받아 실행시켜보도록 하겠다.

🔈해당 프로그램의 코드와 코드리뷰는 ZMQ 가이드북을 참고하였으며, 앞서 소개를 다룬 포스팅 하단, Git 저장소에서 다운 받을 수 있습니다.


슈퍼컴퓨터 시뮬레이터 프로그램

이 슈퍼 컴퓨터 프로그램은 일반적인 병렬 컴퓨터와 같은 일을 수행할 것이다.

이 프로그램은 입력된 작업을 병렬로 처리할 수 있는 ventilator 와 프로세스 실제 작업을 수행하는 worker, worker 프로세스로부터 결과를 수집하고 종합하는 sink, 이렇게 3개의 프로그램으로 이루어져 있다.
여기서 worker들은 어려운 수학을 연산하는 GPU(그래픽 처리 장치)를 사용하는 superfast 상자 안에서 빠른 속도로 연산 작업을 실행한다. ventilator는 100 task를 생성하며 각각의 메시지마다 worker들에게 일정 시간(milliseconds)동안 sleep을 하게 한다.

ventilator를 실행시키면, 다음과 같이 "Press Enter when the workers are ready:"라는 문장이 출력되고, 입력을 기다리는 것을 확인할 수 있다.

worker들을 컴파일하여 준비시킨 후, Enter를 입력하여 ventilator를 실행 시킨다. 실행되고 있을 때는 "Sending tasks to workers ..."라는 문구가 출력되는 코드이며, 아래 사진에 보이는 마지막 줄은 worker들이 task를 끝마친 후, 실행 화면에 표시된 모습이다. ventilator는 total cost를 출력한 후, 종료된다.

sink를 실행시키면 다음과 같이 ..........이 총 10개의 task로 나누어 수행되며 worker의 동작이 끝난 후, "Total elapsed time:"에 경과시간을 출력한다.

worker를 1개로 두고, 프로그램을 실행시켜본 결과이다.
worker

Ventilator

sink

worker를 2개로 두고, 프로그램을 실행시켜본 결과이다.
worker1

worker2

첫번째 실행 커맨드에 따른 출력 결과는 worker가 1개일 때고, 두번째 실행 결과가 worker가 2개일 때이다. 직접 처리하는 과정을 출력해보면, task들이 절반만 수행되고 있음을 볼 수 있다.
두번째 사진에서 2번째 worker가 일하고 있는 것을 볼 수 있다.

ventilator

worker가 2개일때의 ventilator이다. 1개일때보다, total cost가 줄었음을 확인 할 수 있다.

sink

sink는 100개의 작업을 수집한 후 전체 처리시간이 얼마인지 계산을 한다. 따라서 worker가 여러 개라면 실제 병렬로 처리되는 것을 확인할 수 있다.
worker가 1개일때보다 절반 이상의 감소된 시간을 출력한다.

다음은worker를 1개, 2개, 4개로 두고, 프로그램을 실행시켜본 결과이다.

sink 출력 결과 worker 출력 결과

비동기/분산 처리를 통해 위와 같이 Total elpased time과 하나의 worker가 수행해야 하는 task의 수가 확연히 감소하고 있는 것을 볼 수 있다.

worker 1개일 떄와 worker 4개일 때의 비교


Code

  1. Ventilator
//  Binds PUSH socket to tcp://localhost:5557
//  Sends batch of tasks to workers via that socket

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <iostream>
#include <zmq.hpp>

#define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0))

int main (int argc, char *argv[])
{
    zmq::context_t context (1);

    //  Socket to send messages on
    zmq::socket_t  sender(context, ZMQ_PUSH);
    sender.bind("tcp://*:5557");

    std::cout << "Press Enter when the workers are ready: " << std::endl;
    getchar ();
    std::cout << "Sending tasks to workers...\n" << std::endl;

    //  The first message is "0" and signals start of batch
    zmq::socket_t sink(context, ZMQ_PUSH);
    sink.connect("tcp://localhost:5558");
    zmq::message_t message(2);
    memcpy(message.data(), "0", 1);
    sink.send(message);

    /*  Initialize random number generator */
    srandom ((unsigned) time (NULL));

    /*  Send 100 tasks */
    int task_nbr;
    int total_msec = 0;     
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {
        int workload;
        workload = within (100) + 1;
        total_msec += workload;

        message.rebuild(10);
        memset(message.data(), '\0', 10);
        sprintf ((char *) message.data(), "%d", workload);
        sender.send(message);
    }
    std::cout << "Total expected cost: " << total_msec << " msec" << std::endl;
    sleep (1);              

    return 0;
}
  1. worker
//  Connects PULL socket to tcp://localhost:5557
//  Collects workloads from ventilator via that socket
//  Connects PUSH socket to tcp://localhost:5558
//  Sends results to sink via that socket

#include "zhelpers.hpp"
#include <string>

int main (int argc, char *argv[])
{
    zmq::context_t context(1);

    //  Socket to receive messages on
    zmq::socket_t receiver(context, ZMQ_PULL);
    receiver.connect("tcp://localhost:5557");

    //  Socket to send messages to
    zmq::socket_t sender(context, ZMQ_PUSH);
    sender.connect("tcp://localhost:5558");

    //  Process tasks forever
    while (1) {

        zmq::message_t message;
        int workload;          

        receiver.recv(&message);
        std::string smessage(static_cast<char*>(message.data()), message.size());

        std::istringstream iss(smessage);
        iss >> workload;

        s_sleep(workload);

        //  Send results to sink
        message.rebuild();
        sender.send(message);

        std::cout << "." << std::flush;
    }
    return 0;
}
  1. sink
//  Task sink in C++
//  Binds PULL socket to tcp://localhost:5558
//  Collects results from workers via that socket

#include <zmq.hpp>
#include <time.h>
#include <sys/time.h>
#include <iostream>

int main (int argc, char *argv[])
{
    //  Prepare our context and socket
    zmq::context_t context(1);
    zmq::socket_t receiver(context,ZMQ_PULL);
    receiver.bind("tcp://*:5558");

    //  Wait for start of batch
    zmq::message_t message;
    receiver.recv(&message);

    /* Start our clock now */
    struct timeval tstart;
    gettimeofday (&tstart, NULL);

    /* Process 100 confirmations */
    int task_nbr;
    int total_msec = 0;    
    cost in msecs
    for (task_nbr = 0; task_nbr < 100; task_nbr++) {

        receiver.recv(&message);
        if (task_nbr % 10 == 0)
            std::cout << ":" << std::flush;
        else
            std::cout << "." << std::flush;
    }
    /*  Calculate and report duration of batch */
    struct timeval tend, tdiff;
    gettimeofday (&tend, NULL);

    if (tend.tv_usec < tstart.tv_usec) {
        tdiff.tv_sec = tend.tv_sec - tstart.tv_sec - 1;
        tdiff.tv_usec = 1000000 + tend.tv_usec - tstart.tv_usec;
    }
    else {
        tdiff.tv_sec = tend.tv_sec - tstart.tv_sec;
        tdiff.tv_usec = tend.tv_usec - tstart.tv_usec;
    }
    total_msec = tdiff.tv_sec * 1000 + tdiff.tv_usec / 1000;
    std::cout << "\nTotal elapsed time: " << total_msec << " msec\n" << std::endl;
    return 0;
}

Code review

✔️workerventilator의 아래에 연결되어 있고, sink와는 위에 연결되어 있다. 이 구조는 worker를 임의로 추가 할 수 있음을 의미한다. worker가 그것들의 endpoint에 바인딩되어 있다면 worker가 추가할 때마다 매번 ventilator와 sink가 변경하기 위해서 더 많은 endpoint이 필요하게 된다. 이 구조에서 ventilator와 sink는 stable한 부분이며 worker는 dynamic한 부분으로 가정한다.

✔️모든 worker는 시작을 동기화하여 실행된다. 이러한 동기화를 위한 batch signal 전송 기법은 ØMQ에서는 매우 일반적인 것이며, 대체할 다른 쉬운 솔루션이 없다. zmq_connect() 함수의 실행에는 특정한 시간이 소요된다. 따라서 worker들이 ventilator에 접속할 때, 처음 연결에 성공한 worker는 다른 worker가 연결되는 짧은 시간 동안 전체메시지를 받아버릴 수도 있다. 따라서, 시작을 동기화하지 않으면 프로그램이 병렬로 실행되지 않는다. ventilator에서 wait부분이 필요한 이유다.

✔️ventilator의 PUSH 소켓은 균등하게 worker (모든 worker가 연결되어 있는 상태라고 가정)에 작업을 분배한다. 이것은 load-balancing 이라고 한다.

✔️sink의 PULL소켓은 균등하게 worker로부터 결과를 수집하며, fair-queuing이라고 한다.


끝마치며 💻


봄학기 수업의 일환으로 오픈소스를 다루는 과제를 통해, ZMQ라는 고성능 비동기 message 라이브러리에 대해 공부해보는 시간을 갖게 되었다. 아직 네트워킹, 소켓 프로그래밍에 대한 지식이 많지 않아 ZMQ의 전체적인 구조와 다른 라이브러리와 비교하였을 때의 장점이 크게 와닿지 않았었는데, 예제를 직접 실행해보고 이를 이용한 프로그램 구조를 파악해봄으로써 ZMQ의 편리성과 강점을 이해할 수 있었다. 튜토리얼과 가이드북, ZMQ를 사용해본 개발자들의 리뷰등을 읽으며 ZMQ가 얼마나 효율적이고 강력한 지 알게 되었다.

설치와 환경설정을 하며 부딪힌 에러, 내부 구조와 코드를 이해하면서 겪은 궁금점들로 인해, 기존에 사용해봤던 flutter나 PyTorch, node등을 이용한 오픈소스 프로그램를 살펴보는 것보다 많은 시간을 소요하게 되었지만 아주 좋은 라이브러리를 알게 되어 유익하고, 알찬 시간이었다.

습득한 유용한 기술을 나의 개발에 실질적으로 이용할 수 있어야 온전히 습득한 것이라 생각한다. 이번 기회를 통해 ZMQ를 이용한 프로그램을 실행시켜보고 내부 코드를 살펴보며 이해한 내용을 실제 개발에 적용시켜보고 싶다. 추후 애플리케이션 개발시, 네트워크를 통한 메시징과 관련된 작업을 하게 된다면 꼭 ZMQ를 이용해보아야겠다! 👩🏻‍💻🍒


profile
𝑯𝒐𝒏𝒆𝒔𝒕𝒚 𝑰𝒏𝒕𝒆𝒈𝒓𝒊𝒕𝒚 𝑬𝒙𝒄𝒆𝒍𝒍𝒆𝒏𝒄𝒆

0개의 댓글