I had plan to use rabbitmq for printing and sending signals to registers. There were some problems because this pos is web based software and always needs some actions from server like forced cashier logout or overide, and printing receipts and labels. As far as know, there are some javascript way but these approaches need user interaction.
To avoid this a service should run background and communicates with backends. To to this, message queue is one of the best solutions. I chose rabbitmq and I made an app using this rabbitmq and it was good.
I need to add two requirements beside its basic features. One of these is TLS and user authentication.
Adding TLS is quite simple and easy. It can be done with tls-gen. But adding authentication with username and password was hard. I spent many hours to figure it out because I forgot totally how to do it.
I wrote rabbitmq producer and consumer apps with python and golang that can be found in sandbox directory.
Also I wrote docker compose yaml file along with rabbitmq configuration files shown below. I used topic exchange type. It should be topic or routing. There can be many registers and printers.
I haven't decide what inter server protocol couldn't be used in between backend and frontend apps. Possible candidates are RESTAPI, gRPC, or coap. I found coap was possible recently.
ersion: '3.7'
services:
rabbit-service:
image: rabbitmq:3-management-alpine
hostname: rabbit-service
ports:
- "15672:15672"
- "5671:5671"
volumes:
- ./rabbitmq/conf.d:/etc/rabbitmq/conf.d/
- ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json
- ./rabbitmq/enabled_plugins:/etc/rabbitmq/enabled_plugins
- ./storage/rabbitmq/data/:/var/lib/rabbitmq/
- ./storage/rabbitmq/log/:/var/log/rabbitmq
- ./certs:/certs
management.load_definitions = /etc/rabbitmq/definitions.json
listeners.tcp = none
listeners.ssl.default = 5671
ssl_options.cacertfile = /certs/ca_certificate.pem
ssl_options.certfile = /certs/server_macmini.local_certificate.pem
ssl_options.keyfile = /certs/server_macmini.local_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
ssl_cert_login_from = common_name
ssl_options.password = kim7795004
loopback_users.guest = none
default_vhost = /
default_user = doseok
default_pass = kim7795004
auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN
log.console = true
{
"rabbit_version": "3.6.6",
"users": [
{
"name": "doseok",
"password_hash": "PR67qUrkPVsnaZhEXGwOqE1TtZka10ZkxAt/IHPJGgW8Pk3I",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": "admin"
}
],
"vhosts": [
{
"name": "/wpos"
}
],
"permissions": [
{
"user": "doseok",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "doseok",
"vhost": "/wpos",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"topic_permissions": [
{
"user": "doseok",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "doseok",
"vhost": "/wpos",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"parameters": [],
"policies": [],
"queues": [],
"exchanges": [],
"bindings": []
}
[rabbitmq_management,rabbitmq_prometheus,rabbitmq_auth_mechanism_ssl].
#!/usr/bin/env python
import pika
import sys
import logging
import ssl
def sslmain():
logging.basicConfig(level=logging.ERROR) #logging.ERROR INFO
ca = "../certs/ca_certificate.pem"
cert = "../certs/client_macmini.local_certificate.pem"
key = "../certs/client_macmini.local_key.pem"
context = ssl.create_default_context(cafile=ca)
context.verify_mode = ssl.CERT_REQUIRED
context.load_cert_chain(cert, key)
ssl_options = pika.SSLOptions(context, "localhost")
credentials = pika.credentials.PlainCredentials(username='doseok', password='kim7795004')
conn_params = pika.ConnectionParameters(
host='localhost',
port=5671,
virtual_host='/',
credentials=credentials,
ssl_options=ssl_options
)
with pika.BlockingConnection(conn_params) as conn:
channel = conn.channel()
channel.exchange_declare(exchange='topic_wpos', exchange_type='topic')
#routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
routing_key = ' '.join(sys.argv[2:]) or 'wpos.message'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish( exchange='topic_wpos', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
conn.close()
def main():
connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_wpos', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'wpos.message'
routing_key = 'wpos.message'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish( exchange='topic_wpos', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
if __name__=="__main__":
sslmain()
#!/usr/bin/env python
import pika
import sys
import logging
import ssl
def sslmain():
logging.basicConfig(level=logging.ERROR) #logging.INFO ERROR
ca = "../certs/ca_certificate.pem"
cert = "../certs/client_macmini.local_certificate.pem"
key = "../certs/client_macmini.local_key.pem"
context = ssl.create_default_context(cafile=ca)
context.verify_mode = ssl.CERT_REQUIRED
context.load_cert_chain(cert, key)
ssl_options = pika.SSLOptions(context, "localhost")
credentials = pika.credentials.PlainCredentials(username='doseok', password='kim7795004')
conn_params = pika.ConnectionParameters(
host='localhost',
port=5671,
virtual_host='/',
credentials=credentials,
ssl_options=ssl_options
)
with pika.BlockingConnection(conn_params) as conn:
channel = conn.channel()
channel.exchange_declare(exchange='topic_wpos', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
binding_keys = ["wpos.message",]
for binding_key in binding_keys:
channel.queue_bind( exchange='topic_wpos', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
conn.close()
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_wpos', exchange_type='topic')
result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
if __name__=="__main__":
sslmain()
package main
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"log"
"math/rand"
"os"
"strings"
"time"
"github.com/joho/godotenv"
"github.com/streadway/amqp"
)
type (
Message struct {
UserID string `json:"userid" `
Key string `json:"key" `
Body string `json:"body" `
}
)
const (
charset = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
)
var (
key string
)
func getRandStr(charset string, charlen int) string {
chars := []rune(charset)
rand.Seed(time.Now().UnixNano())
var b strings.Builder
for i := 0; i < charlen; i++ {
b.WriteRune(chars[rand.Intn(len(chars))])
}
return b.String()
}
func main() {
err := godotenv.Load()
if err != nil {
log.Fatal("Error loading .env file")
}
caCert, err := os.ReadFile(os.Getenv("WPOS_RABBITMQ_CACERT"))
if err != nil {
fmt.Println("Error1:", err)
}
cert, err := tls.LoadX509KeyPair(os.Getenv("WPOS_RABBITMQ_CLICERT"), os.Getenv("WPOS_RABBITMQ_CLIKEY"))
if err != nil {
fmt.Println("Error2:", err)
}
rootCAs := x509.NewCertPool()
rootCAs.AppendCertsFromPEM(caCert)
tlsConf := &tls.Config{
RootCAs: rootCAs,
Certificates: []tls.Certificate{cert},
//ServerName: os.Getenv("WPOS_RABBITMQ_HOST"), // Optional
}
conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:%s/", os.Getenv("WPOS_RABBITMQ_USERID"), os.Getenv("WPOS_RABBITMQ_PASSWORD"), os.Getenv("WPOS_RABBITMQ_HOST"), os.Getenv("WPOS_RABBITMQ_PORT")), tlsConf)
if err != nil {
fmt.Println("Dial Error:", err)
}
fmt.Println("Connection:", conn)
// If you used credentials in docker, we did not!
// return amqp.DialTLS("amqps://user:pass@localhost:5671/", tlsConf)
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Println("Error4:", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
os.Getenv("WPOS_RABBITMQ_EXCHANGE_NAME"), // name
os.Getenv("WPOS_RABBITMQ_EXCHANGE_TYPE"), // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println("Error5:", err)
}
msg := Message{UserID: userid, Key: getRandStr(charset, 16), Body: getRandStr(charset, 256)}
body, err := json.Marshal(msg)
if err != nil {
fmt.Println("Error5:", err)
}
err = ch.Publish(
exchange_name, // exchange
os.Getenv("WPOS_RABBITMQ_ROUTING"), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
fmt.Println("Error6:", err)
}
log.Printf(" [x] Sent %s", body)
}
package main
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"log"
"os"
"github.com/joho/godotenv"
"github.com/streadway/amqp"
)
type (
Message struct {
UserID string `json:"userid" `
Key string `json:"key" `
Body string `json:"body" `
}
)
const ()
func main() {
err := godotenv.Load()
if err != nil {
log.Fatal("Error loading .env file")
}
caCert, err := os.ReadFile(os.Getenv("WPOS_RABBITMQ_CACERT"))
if err != nil {
fmt.Println("Error1:", err)
}
cert, err := tls.LoadX509KeyPair(os.Getenv("WPOS_RABBITMQ_CLICERT"), os.Getenv("WPOS_RABBITMQ_CLIKEY"))
if err != nil {
fmt.Println("Error2:", err)
}
rootCAs := x509.NewCertPool()
rootCAs.AppendCertsFromPEM(caCert)
tlsConf := &tls.Config{
RootCAs: rootCAs,
Certificates: []tls.Certificate{cert},
//ServerName: "localhost", // Optional
}
conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s:%s/", os.Getenv("WPOS_RABBITMQ_USERID"), os.Getenv("WPOS_RABBITMQ_PASSWORD"), os.Getenv("WPOS_RABBITMQ_HOST"), os.Getenv("WPOS_RABBITQ_PORT")), tlsConf)
if err != nil {
fmt.Println("Dial Error:", err)
}
fmt.Println("Connection:", conn)
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Println("Error4:", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
os.Getenv("WPOS_RABBITMQ_EXCHANGE_NAME"), // name
os.Getenv("WPOS_RABBITMQ_EXCHANGE_TYPE"), // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println("Error5:", err)
}
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
fmt.Println("Error6:", err)
}
log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "imgs_topic", os.Getenv("WPOS_RABBITMQ_ROUTING"))
err = ch.QueueBind(
q.Name, // queue name
os.Getenv("WPOS_RABBITMQ_ROUTING"), // routing key
os.Getenv("WPOS_RABBITMQ_EXCHANGE_NAME"), // exchange name
false,
nil)
if err != nil {
fmt.Println("Error7:", err)
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
fmt.Println("Error8:", err)
}
forever := make(chan bool)
go func() {
var m Message
for d := range msgs {
err = json.Unmarshal(d.Body, &m)
//log.Printf(" [x] %s", d.Body)
log.Printf(">%s", m.UserID)
log.Printf(">>%s", m.Key)
log.Printf(">>>%s", m.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}