Network Programming in Rust #4 - Adding multiple services

Migo·2024년 5월 30일

pingora

목록 보기
1/4
post-thumbnail

Pingora provides extensive customizability to your network programming. For more information, read: the this article.

Design

In this post, I'll be routing the traffic depending on the prefixes to a certain service. Let's say I have the following service:

  • task
  • auth

So, everytime user accesses http://your-host/task, you want all the traffic to be routed to task service. This could be expressed as the following:

"/task" => "task-service",
"/auth" => "auth-service"

The question is, how?



Defining single upstream service

Making a single upstream service is quite simple. Simply put a list of addresses to LoadBalancer::try_From_iter

let mut service =  LoadBalancer::try_from_iter(["0.0.0.0:44410","0.0.0.0:44411"]).unwrap();

But then, there is a chance that one of the upstreams become unreachable for various reasons. You want the system to find and workaround that situation. That could be done by adding health check system.

...
let hc = TcpHealthCheck::new();
service.set_health_check(hc);
service.health_check_frequency = Some(std::time::Duration::from_secs(1));

But then, health checking itself is one of the service that gets invoked at the interval you set(here it's every second). So you need to make it as background_service:

let background = background_service("health check", service);

From there, you need to get the main task:

let upstreams = background.task();

Finally, the services(backround service and main one) should be added to server:

let mut server = Server::new(None).unwrap();
server.bootstrap();
server.add_service(background);
server.add_service(upstreams);

And run it.

server.run_forever();

Congratulation! You got the basics down.



Multiple upstream services

Now, we want to achieve the design that routes traffic to different service depending on the path prefix. Although the implementation may vary, I would do something like this:


// ! note that the following assumes that services being registered require loadbalancer
add_loadbalanced_service!(server,service_mapper,
        "/auth" => ["0.0.0.0:44410","0.0.0.0:44411"],
        "/task" => ["0.0.0.0:44447"]
);

What the macro does is actually the same as adding single service with its own dedicated health checking service :

macro_rules! add_loadbalanced_service {
    (   $server:ident,
        $mapper:ident,
        $(
            $prefix: literal=> $addr:expr
        ),*
    ) => {

        $(
        let mut service =  LoadBalancer::try_from_iter($addr).unwrap();
        // adding a basic health check service. So all requests succeed and the broken peer is never used.
        // if that peer were to become healthy again, it would be re-included in the round robin again in within 1 second.
        let hc = TcpHealthCheck::new();
        service.set_health_check(hc);
        service.health_check_frequency = Some(std::time::Duration::from_secs(1));
        let background = background_service("health check", service);
        let upstreams = background.task();
        $server.add_service(background);
        $mapper.insert($prefix, upstreams);
        )*
    };
}

LB struct

As keen readers may well have already noticed, in the macro I used identifier mapper which is to store the main service with its prefix("/task", for example). This is out of concern that each service will have their own prefix and if no service with given path is found, the proxy should return 404.

Let's see how it looks:

pub struct LB {
    service_mapper: HashMap<&'static str, Arc<LoadBalancer<RoundRobin>>>,
    path_mapper: Trie,
}

Okay, make sense for the service mapper, but what is path mapper?

Path mapper

Essentially, you need a mechanism to find the match on the given path as efficiently as possible. Specificially as it is about networking which is performance critical, I thought it would be great to put some thought on how I could make matching operation more performant and it is Trie data structure.

It assumes a tree structure and the root node means emtpy string. Each child node from the parent means a specific character. So when I insert and and ant, the structure would like like:

When searching, given the string, it just takes one iteration alternative the child node and find the next and next. Take a look at the following code:

pub fn search(&self, s: &str) -> Option<String> {
	// starting from root but it the value(pointer) will be changed while traversing
    let mut node = &self.root;

    let mut matched_prefix = String::new();
    for ch in s.chars() {
        if let Some(next_node) = node.children.get(&ch) {
            matched_prefix.push(ch);
            node = next_node;
            if node.is_end_of_key {
                return Some(matched_prefix);
            }
        } else {
            break;
        }
    }
    None
}

Locating peer

I guess it's enough for the DS. With the LB with Trie in its field, what we can do is:

    
async fn upstream_peer(
    &self,
    session: &mut Session,
    ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
    // ! Room for optimization: use a trie to store the prefixes
    let path = session.req_header().uri.path();
    let matched_path = self.path_mapper.search(path);
    let Some(path) = matched_path else {
        return Err(Error::explain(HTTPStatus(404), "No matching path found"));
    };
    let addr = self
        .service_mapper
        .get(path.as_str())
        .unwrap()
        .select(b"", 256)
        .unwrap(); // When all services are down


    let mut peer = Box::new(HttpPeer::new(addr, false, "".to_string()));
    peer.options.connection_timeout = Some(Duration::from_millis(100));
    Ok(peer)
}

Done!

profile
Dude with existential crisis

1개의 댓글

comment-user-thumbnail
2025년 2월 25일

Great post! You clearly spent time designing a solid service architecture with health checks and load balancing. Optimizing routing efficiency with a Trie for path mapping is intriguing. Thank you for your insights! Looking forward to the development! Monkey Mart

답글 달기