
본 글에서는 AWS VPC Traffic Mirroring을 위해 필요한 replay handler를 구현해둔 AWS의 예제 코드를 분석합니다.
예제 코드를 분석하기 전에 AWS VPC Traffic Mirroring에 대해 간단히 살펴보겠습니다. 자세한 내용은 Mirror production traffic to test environment with VPC Traffic Mirroring을 읽어보시길 권장드립니다.
트래픽을 테스트 방법으로 분산과 미러링이 있습니다.

미러링을 이용해 트래픽을 테스트 하려면 어플리케이션 API가 멱등해 데이터 중복을 유발하지 않아야 합니다.
트래픽 미러링을 위한 오픈소스 솔루션(nginx mirroring capability, k8s gateway api)도 존재합니다. 하지만 이러한 방법들은 앞단에 프록시를 둬야하는 추가 작업이 필요합니다.
반면 AWS의 VPC Traffic Mirroring은 EC2 환경의 ENI(Elastic Network Interface)를 활용하기 때문에 별도의 컴포넌트 추가 없이 대역외 트래픽을 복제해 줍니다.

이 코드는 Replay handler에 대한 구현으로, Production environment에서 mirroring한 트래픽을 처리해 다시 Test environment에 전달합니다.
패킷 캡쳐와 관련된 코드의 일부분 입니다.
func main() {
var handle *pcap.Handle
// Set up pcap packet capture
handle, err = pcap.OpenLive("vxlan0", 8951, true, pcap.BlockForever)
if err != nil {
log.Fatal(err)
}
// Set up BPF filter
BPFFilter := fmt.Sprintf("%s%d", "tcp and dst port ", *reqPort)
if err := handle.SetBPFFilter(BPFFilter); err != nil {
log.Fatal(err)
}
// Read in packets, pass to assembler.
packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
packets := packetSource.Packets()
for {
select {
case packet := <-packets:
tcp := packet.TransportLayer().(*layers.TCP)
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
}
}
AWS VPC request mirroring은 Production environment에 들어온 요청의 L2 프레임데이터를 미러링하는 기술입니다. handle이 읽을 수 있는 데이터가 vxlan0으로 전달되기 까지의 과정은 다음과 같습니다.

|Ethernet 헤더 | IP 헤더 | UDP 헤더 | vxlan 헤더 | Frame |형태의 프레임입니다.
|Ethernet 헤더 | IP 헤더 | UDP 헤더 | vxlan 헤더 | Frame | 형태의 프레임을 전달받습니다.| vxlan 헤더 | Frame |형태로 데이터를 전달합니다.이러한 과정을 거쳐 vxlan0에 담기게 된 프레임을 pcap.OpenLive로 생성한 handle이 읽어들이게 됩니다.
// httpStreamFactory implements tcpassembly.StreamFactory
type httpStreamFactory struct{}
// httpStream will handle the actual decoding of http requests.
type httpStream struct {
net, transport gopacket.Flow
r tcpreader.ReaderStream
}
func (h *httpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
hstream := &httpStream{
net: net,
transport: transport,
r: tcpreader.NewReaderStream(),
}
go hstream.run() // Important... we must guarantee that data from the reader stream is read.
// ReaderStream implements tcpassembly.Stream, so we can return a pointer to it.
return &hstream.r
}
func main() {
// Set up assembly
streamFactory := &httpStreamFactory{}
streamPool := tcpassembly.NewStreamPool(streamFactory)
assembler := tcpassembly.NewAssembler(streamPool)
for {
select {
case packet := <-packets:
assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)
case <-ticker:
// Every minute, flush connections that haven't seen activity in the past 1 minute.
assembler.FlushOlderThan(time.Now().Add(time.Minute * -1))
}
}
}
TCP를 재조립하는 구체적인 과정은
github.com/google/gopacket/tcpassembly패키지의 내부 로직에 의해 처리됩니다. 본 글에서 AssembleWithTimestamp가 TCP 스트림을 재조립하는 구체적인 내용은 다루지 않습니다.
func (h *httpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
hstream := &httpStream{
net: net,
transport: transport,
r: tcpreader.NewReaderStream(),
}
go hstream.run() // Important... we must guarantee that data from the reader stream is read.
// ReaderStream implements tcpassembly.Stream, so we can return a pointer to it.
return &hstream.r
}
func (h *httpStream) run() {
buf := bufio.NewReader(&h.r)
for {
req, err := http.ReadRequest(buf)
if err == io.EOF {
// We must read until we see an EOF... very important!
return
} else if err != nil {
log.Println("Error reading stream", h.net, h.transport, ":", err)
} else {
reqSourceIP := h.net.Src().String()
reqDestionationPort := h.transport.Dst().String()
body, bErr := ioutil.ReadAll(req.Body)
if bErr != nil {
return
}
req.Body.Close()
go forwardRequest(req, reqSourceIP, reqDestionationPort, body)
}
}
}
for{} : 무한 루프를 통해 계속해서 들어오는 요청을 처리합니다.req, err := http.ReadRequest(buf) : 스트림을 http요청 단위로 파싱합니다.if err == io.EOF {return} : 상대가 연결을 닫았기 때문에 종료합니다.else if err != nil {} : 파싱 오류 발생 시 로그만 찍고 다음 요청을 처리합니다.body, bErr := ioutil.ReadAll(req.Body) : body를 모두 읽습니다.go forwardRequest(req, reqSourceIP, reqDestionationPort, body) : 새로운 고루틴으로 목적지로 요청을 전송합니다.func forwardRequest(req *http.Request, reqSourceIP string, reqDestionationPort string, body []byte) {
// if percentage flag is not 100, then a percentage of requests is skipped
if *fwdPerc != 100 {
var uintForSeed uint64
if *fwdBy == "" {
// if percentage-by is empty, then forward only a certain percentage of requests
var b [8]byte
_, err := crypto_rand.Read(b[:])
if err != nil {
log.Println("Error generating crypto random unit for seed", ":", err)
return
}
// uintForSeed is random
uintForSeed = binary.LittleEndian.Uint64(b[:])
} else {
// if percentage-by is not empty, then forward only requests from a certain percentage of headers/remoteaddresses
strForSeed := ""
if *fwdBy == "header" {
strForSeed = req.Header.Get(*fwdHeader)
} else {
strForSeed = reqSourceIP
}
crc64Table := crc64.MakeTable(0xC96C5795D7870F42)
// uintForSeed is derived from strForSeed
uintForSeed = crc64.Checksum([]byte(strForSeed), crc64Table)
}
// generate a consistent random number from the variable uintForSeed
math_rand.Seed(int64(uintForSeed))
randomPercent := math_rand.Float64() * 100
// skip a percentage of requests
if randomPercent > *fwdPerc {
return
}
}
// create a new url from the raw RequestURI sent by the client
url := fmt.Sprintf("%s%s", string(*fwdDestination), req.RequestURI)
// create a new HTTP request
forwardReq, err := http.NewRequest(req.Method, url, bytes.NewReader(body))
if err != nil {
return
}
// add headers to the new HTTP request
for header, values := range req.Header {
for _, value := range values {
forwardReq.Header.Add(header, value)
}
}
// Append to X-Forwarded-For the IP of the client or the IP of the latest proxy (if any proxies are in between)
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
forwardReq.Header.Add("X-Forwarded-For", reqSourceIP)
// The three following headers should contain 1 value only, i.e. the outermost port, protocol, and host
// https://tools.ietf.org/html/rfc7239#section-5.4
if forwardReq.Header.Get("X-Forwarded-Port") == "" {
forwardReq.Header.Set("X-Forwarded-Port", reqDestionationPort)
}
if forwardReq.Header.Get("X-Forwarded-Proto") == "" {
forwardReq.Header.Set("X-Forwarded-Proto", "http")
}
if forwardReq.Header.Get("X-Forwarded-Host") == "" {
forwardReq.Header.Set("X-Forwarded-Host", req.Host)
}
if *keepHostHeader {
forwardReq.Host = req.Host
}
// Execute the new HTTP request
httpClient := &http.Client{}
resp, rErr := httpClient.Do(forwardReq)
if rErr != nil {
// log.Println("Forward request error", ":", err)
return
}
defer resp.Body.Close()
}
url := fmt.Sprintf("%s%s", string(*fwdDestination), req.RequestURI) : 목적지 URL을 만듭니다.forwardReq, err := http.NewRequest(req.Method, url, bytes.NewReader(body)) : 새 요청을 생성합니다.