RabbitMQ

Doseok Kim·2023년 11월 15일

WEBPOS

목록 보기
2/2

I had plan to use rabbitmq for printing and sending signals to registers. There were some problems because this pos is web based software and always needs some actions from server like forced cashier logout or overide, and printing receipts and labels. As far as know, there are some javascript way but these approaches need user interaction.
To avoid this a service should run background and communicates with backends. To to this, message queue is one of the best solutions. I chose rabbitmq and I made an app using this rabbitmq and it was good.
I need to add two requirements beside its basic features. One of these is TLS and user authentication.
Adding TLS is quite simple and easy. It can be done with tls-gen. But adding authentication with username and password was hard. I spent many hours to figure it out because I forgot totally how to do it.
I wrote rabbitmq producer and consumer apps with python and golang that can be found in sandbox directory.
Also I wrote docker compose yaml file along with rabbitmq configuration files shown below. I used topic exchange type. It should be topic or routing. There can be many registers and printers.
I haven't decide what inter server protocol couldn't be used in between backend and frontend apps. Possible candidates are RESTAPI, gRPC, or coap. I found coap was possible recently.

ersion: '3.7'

services:
  rabbit-service:
    image: rabbitmq:3-management-alpine
    hostname: rabbit-service
    ports:
      - "15672:15672"
      - "5671:5671"
    volumes:
      - ./rabbitmq/conf.d:/etc/rabbitmq/conf.d/
      - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json
      - ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins    
      - ./storage/rabbitmq/data/:/var/lib/rabbitmq/
      - ./storage/rabbitmq/log/:/var/log/rabbitmq
      - ./certs:/certs
rabbitmq config
management.load_definitions = /etc/rabbitmq/definitions.json

listeners.tcp = none
listeners.ssl.default = 5671
ssl_options.cacertfile           = /certs/ca_certificate.pem
ssl_options.certfile             = /certs/server_macmini.local_certificate.pem
ssl_options.keyfile              = /certs/server_macmini.local_key.pem
ssl_options.verify               = verify_peer
ssl_options.fail_if_no_peer_cert = true

ssl_cert_login_from = common_name
ssl_options.password = kim7795004

loopback_users.guest = none
default_vhost = /
default_user = doseok
default_pass = kim7795004

auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN

log.console = true
rabbitmq topic user definition
{
  "rabbit_version": "3.6.6",
  "users": [
    {
      "name": "doseok",
      "password_hash": "PR67qUrkPVsnaZhEXGwOqE1TtZka10ZkxAt/IHPJGgW8Pk3I",
      "hashing_algorithm": "rabbit_password_hashing_sha256",
      "tags": "admin"
    }
  ],
  "vhosts": [
    {
      "name": "/wpos"
    }
  ],
  "permissions": [
    {
      "user": "doseok",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "doseok",
      "vhost": "/wpos",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "topic_permissions": [
    {
      "user": "doseok",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    },
    {
      "user": "doseok",
      "vhost": "/wpos",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "parameters": [],
  "policies": [],
  "queues": [],
  "exchanges": [],
  "bindings": []
}
rabbitmq plugins
[rabbitmq_management,rabbitmq_prometheus,rabbitmq_auth_mechanism_ssl].
python producer
#!/usr/bin/env python
import pika
import sys
import logging
import ssl

def sslmain():
    logging.basicConfig(level=logging.ERROR) #logging.ERROR INFO
    ca = "../certs/ca_certificate.pem"
    cert = "../certs/client_macmini.local_certificate.pem"	
    key = "../certs/client_macmini.local_key.pem"

    context = ssl.create_default_context(cafile=ca)
    context.verify_mode = ssl.CERT_REQUIRED
    context.load_cert_chain(cert, key)
    ssl_options = pika.SSLOptions(context, "localhost")
    credentials = pika.credentials.PlainCredentials(username='doseok', password='kim7795004')
    conn_params = pika.ConnectionParameters(
        host='localhost',
        port=5671, 
        virtual_host='/',
        credentials=credentials,
        ssl_options=ssl_options
        )
    
    with pika.BlockingConnection(conn_params) as conn:
        channel = conn.channel()
        channel.exchange_declare(exchange='topic_wpos', exchange_type='topic')

        #routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
        routing_key = ' '.join(sys.argv[2:]) or 'wpos.message'
        message = ' '.join(sys.argv[2:]) or 'Hello World!'
        channel.basic_publish( exchange='topic_wpos', routing_key=routing_key, body=message)
        print(" [x] Sent %r:%r" % (routing_key, message))
        conn.close()

def main():
    connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='topic_wpos', exchange_type='topic')

    routing_key = sys.argv[1] if len(sys.argv) > 2 else 'wpos.message'
    routing_key = 'wpos.message'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish( exchange='topic_wpos', routing_key=routing_key, body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()


if __name__=="__main__":
    sslmain()
python consumer
#!/usr/bin/env python
import pika
import sys
import logging
import ssl

def sslmain():
    logging.basicConfig(level=logging.ERROR) #logging.INFO ERROR
    ca = "../certs/ca_certificate.pem"
    cert = "../certs/client_macmini.local_certificate.pem"	
    key = "../certs/client_macmini.local_key.pem"

    context = ssl.create_default_context(cafile=ca)
    context.verify_mode = ssl.CERT_REQUIRED
    context.load_cert_chain(cert, key)
    ssl_options = pika.SSLOptions(context, "localhost")
    credentials = pika.credentials.PlainCredentials(username='doseok', password='kim7795004')
    conn_params = pika.ConnectionParameters(
        host='localhost',
        port=5671, 
        virtual_host='/',
        credentials=credentials,
        ssl_options=ssl_options
        )


    with pika.BlockingConnection(conn_params) as conn:
        channel = conn.channel()

        channel.exchange_declare(exchange='topic_wpos', exchange_type='topic')

        result = channel.queue_declare('', exclusive=True)
        queue_name = result.method.queue

        binding_keys = sys.argv[1:]
        if not binding_keys:
            binding_keys = ["wpos.message",]

        for binding_key in binding_keys:
            channel.queue_bind( exchange='topic_wpos', queue=queue_name, routing_key=binding_key)

        print(' [*] Waiting for logs. To exit press CTRL+C')

        def callback(ch, method, properties, body):
            print(" [x] %r:%r" % (method.routing_key, body))


        channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)

        channel.start_consuming()
        conn.close()



def main():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='topic_wpos', exchange_type='topic')

    result = channel.queue_declare('', exclusive=True)
    queue_name = result.method.queue

    binding_keys = sys.argv[1:]
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
        sys.exit(1)

    for binding_key in binding_keys:
        channel.queue_bind(
            exchange='topic_logs', queue=queue_name, routing_key=binding_key)

    print(' [*] Waiting for logs. To exit press CTRL+C')


    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))


    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)

    channel.start_consuming()

if __name__=="__main__":
    sslmain()
golang producer
package main

import (
	"crypto/tls"
	"crypto/x509"
	"encoding/json"
	"fmt"
	"log"
	"math/rand"
	"os"
	"strings"
	"time"

	"github.com/joho/godotenv"
	"github.com/streadway/amqp"
)

type (
	Message struct {
		UserID string `json:"userid" `
		Key    string `json:"key" `
		Body   string `json:"body" `
	}
)

const (
	charset = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
)

var (
	key string
)

func getRandStr(charset string, charlen int) string {
	chars := []rune(charset)
	rand.Seed(time.Now().UnixNano())
	var b strings.Builder
	for i := 0; i < charlen; i++ {
		b.WriteRune(chars[rand.Intn(len(chars))])
	}
	return b.String()
}

func main() {
	err := godotenv.Load()
	if err != nil {
		log.Fatal("Error loading .env file")
	}

	caCert, err := os.ReadFile(os.Getenv("WPOS_RABBITMQ_CACERT"))
	if err != nil {
		fmt.Println("Error1:", err)
	}

	cert, err := tls.LoadX509KeyPair(os.Getenv("WPOS_RABBITMQ_CLICERT"), os.Getenv("WPOS_RABBITMQ_CLIKEY"))
	if err != nil {
		fmt.Println("Error2:", err)
	}

	rootCAs := x509.NewCertPool()
	rootCAs.AppendCertsFromPEM(caCert)

	tlsConf := &tls.Config{
		RootCAs:      rootCAs,
		Certificates: []tls.Certificate{cert},
		//ServerName:   os.Getenv("WPOS_RABBITMQ_HOST"), // Optional
	}

	conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:%s/", os.Getenv("WPOS_RABBITMQ_USERID"), os.Getenv("WPOS_RABBITMQ_PASSWORD"), os.Getenv("WPOS_RABBITMQ_HOST"), os.Getenv("WPOS_RABBITMQ_PORT")), tlsConf)
	if err != nil {
		fmt.Println("Dial Error:", err)
	}
	fmt.Println("Connection:", conn)
	// If you used credentials in docker, we did not!
	// return amqp.DialTLS("amqps://user:pass@localhost:5671/", tlsConf)

	defer conn.Close()

	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Error4:", err)
	}
	defer ch.Close()
	err = ch.ExchangeDeclare(
		os.Getenv("WPOS_RABBITMQ_EXCHANGE_NAME"), // name
		os.Getenv("WPOS_RABBITMQ_EXCHANGE_TYPE"), // type
		true,                                     // durable
		false,                                    // auto-deleted
		false,                                    // internal
		false,                                    // no-wait
		nil,                                      // arguments
	)

	if err != nil {
		fmt.Println("Error5:", err)
	}

	msg := Message{UserID: userid, Key: getRandStr(charset, 16), Body: getRandStr(charset, 256)}
	body, err := json.Marshal(msg)
	if err != nil {
		fmt.Println("Error5:", err)
	}

	err = ch.Publish(
		exchange_name,                      // exchange
		os.Getenv("WPOS_RABBITMQ_ROUTING"), // routing key
		false,                              // mandatory
		false,                              // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	if err != nil {
		fmt.Println("Error6:", err)
	}
	log.Printf(" [x] Sent %s", body)
}
golang consumer
package main

import (
	"crypto/tls"
	"crypto/x509"
	"encoding/json"
	"fmt"
	"log"
	"os"

	"github.com/joho/godotenv"
	"github.com/streadway/amqp"
)

type (
	Message struct {
		UserID string `json:"userid" `
		Key    string `json:"key" `
		Body   string `json:"body" `
	}
)

const ()

func main() {
	err := godotenv.Load()
	if err != nil {
		log.Fatal("Error loading .env file")
	}
	caCert, err := os.ReadFile(os.Getenv("WPOS_RABBITMQ_CACERT"))
	if err != nil {
		fmt.Println("Error1:", err)
	}

	cert, err := tls.LoadX509KeyPair(os.Getenv("WPOS_RABBITMQ_CLICERT"), os.Getenv("WPOS_RABBITMQ_CLIKEY"))
	if err != nil {
		fmt.Println("Error2:", err)
	}

	rootCAs := x509.NewCertPool()
	rootCAs.AppendCertsFromPEM(caCert)

	tlsConf := &tls.Config{
		RootCAs:      rootCAs,
		Certificates: []tls.Certificate{cert},
		//ServerName:   "localhost", // Optional
	}

	conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:%s/", os.Getenv("WPOS_RABBITMQ_USERID"), os.Getenv("WPOS_RABBITMQ_PASSWORD"), os.Getenv("WPOS_RABBITMQ_HOST"), os.Getenv("WPOS_RABBITQ_PORT")), tlsConf)
	if err != nil {
		fmt.Println("Dial Error:", err)
	}
	fmt.Println("Connection:", conn)

	defer conn.Close()

	ch, err := conn.Channel()
	if err != nil {
		fmt.Println("Error4:", err)
	}

	defer ch.Close()

	err = ch.ExchangeDeclare(
		os.Getenv("WPOS_RABBITMQ_EXCHANGE_NAME"), // name
		os.Getenv("WPOS_RABBITMQ_EXCHANGE_TYPE"), // type
		true,                                     // durable
		false,                                    // auto-deleted
		false,                                    // internal
		false,                                    // no-wait
		nil,                                      // arguments
	)
	if err != nil {
		fmt.Println("Error5:", err)
	}

	q, err := ch.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)

	if err != nil {
		fmt.Println("Error6:", err)
	}

	log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "imgs_topic", os.Getenv("WPOS_RABBITMQ_ROUTING"))
	err = ch.QueueBind(
		q.Name,                                   // queue name
		os.Getenv("WPOS_RABBITMQ_ROUTING"),       // routing key
		os.Getenv("WPOS_RABBITMQ_EXCHANGE_NAME"), // exchange name
		false,
		nil)

	if err != nil {
		fmt.Println("Error7:", err)
	}

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto ack
		false,  // exclusive
		false,  // no local
		false,  // no wait
		nil,    // args
	)

	if err != nil {
		fmt.Println("Error8:", err)
	}

	forever := make(chan bool)

	go func() {
		var m Message
		for d := range msgs {
			err = json.Unmarshal(d.Body, &m)

			//log.Printf(" [x] %s", d.Body)
			log.Printf(">%s", m.UserID)
			log.Printf(">>%s", m.Key)
			log.Printf(">>>%s", m.Body)
		}
	}()

	log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
	<-forever

}
profile
Fullstack Developer 입니다

0개의 댓글