Il modello publisher-subscriber (pubblicazione-sottoscrizione) prevede lo scambio di messaggi tra applicazioni distribuite con un filtro sugli argomenti (topic) condivisi.
Le app che sono interessate ad un certo topic si sottoscrivono e da quel momento ricevono i messaggi afferenti a quell'argomento.
La comunicazione avviene con l'ausilio di un servizio, detto broker, che ha il compito di smistare i messaggi. Tutte le app, siano esse di pubblicazione o di sottoscrizione, sono in connessione esclusivamente con il broker, il quale riceve i messaggi pubblicati organizzati per topic e inoltra il messaggio ai sottoscrittori di ciascun topic.
Il broker può implementare meccanismi di autenticazione e quindi accettare la connessione solo dalle app autorizzate. Inoltre può implementare meccanismi di cifratura per rendere sicura la comunicazione.
Questo modello trova applicazioni in molti ambiti nei quali una stessa informazione deve essere condivisa con più destinatari, come per esempio per implementare database distribuiti o per la comunicazione nell'ambito dell'IoT (Internet of Things).
Esistono vari protocolli standard che implementano questo modello di comunicazione tra cui AMQP (Advanced Message Queuing Protocol) e MQTT(Message Queue Telemetry Transport) che si pongono al livello applicativo dello stack TCP/IP.
In questa pagina sono sviluppate tre distinte applicazioni che utilizzano il protocollo MQTT per simulare uno scenario realistico nell'ambito dello IoT: alcuni device distribuiti sul territorio rilevano la temperatura ambientale (azione simulata) e la comunicano ad una console di registrazione centralizzata; sempre in modo centralizzato vi è una console di amministrazione per regolare la frequenza di aggiornamento di ciascun device.
Le app sono implementate in Python ed utilizzano la libreria paho-mqtt per interfacciare il protocollo MQTT. Il broker utilizzato è pubblico (test.mosquitto.org) e non implementa meccanismi di sicurezza.
Per installare la libreria paho-mqtt da console utilizzare il comando:
pip install paho-mqtt
Il device
from paho.mqtt.client import Client
import threading
import time
import random
device = "7ec-"+input("Nome unico del device: 7ec-")
delta = int(input("Intervallo di aggiornamento in secondi: "))
# subscriber ---------------------------------------
def on_message(mqtt, userdata, message):
global delta
try:
payload = str(message.payload.decode())
delta = int(payload)
print("Ricevuto nuovo intervallo di aggiornamento:",str(message.topic), payload)
except Exception:
print("Delta:", payload)
print("Il delta impostato non è valido")
mqtt = Client(client_id = device)
mqtt.user_data_set(device) #opzionale
mqtt.on_message = on_message
try:
mqtt.connect("test.mosquitto.org")
print("Connessione al broker avvenuta con successo")
except Exception:
print("Connessione fallita")
exit(1)
rc, mid = mqtt.subscribe("7ecno/"+device, 1)
print("Sottoscrizione al topic: ", "7ecno/"+device," rc: ", rc)
# publisher ---------------------------------------
def pubblica(mqtt):
global delta
while True:
n = random.randint(-5, 45)
mqtt.publish("7ec/"+device, str(n))
print("Pubblicata temperatura", "7ec/"+device, str(n))
time.sleep(delta)
# threading ---------------------------------------
try:
t1 = threading.Thread(target=mqtt.loop_forever)
t2 = threading.Thread(target=pubblica, args=(mqtt,))
except Exception:
print("Creazione del thread fallita")
t1.start()
t2.start()
La console di registrazione
from paho.mqtt.client import Client
# subscriber ---------------------------------------
def on_message(mqtt, userdata, message):
payload = str(message.payload.decode())
print(str(message.topic), payload)
mqtt = Client()
mqtt.on_message = on_message
try:
mqtt.connect("test.mosquitto.org")
print("Connessione al broker avvenuta con successo")
except Exception:
print("Connessione fallita")
exit(1)
rc, mid = mqtt.subscribe("7ec/#", 1)
print("Sottoscrizione al topic: ", "7ec/# rc: ", rc)
mqtt.loop_forever()
La console di amministrazione
from paho.mqtt.client import Client
# publisher ---------------------------------------
mqtt = Client()
try:
mqtt.connect("test.mosquitto.org")
print("Connessione al broker avvenuta con successo")
except Exception:
print("Connessione fallita")
exit(1)
while True:
device = "7ec-"+input("Nome del device da aggiornare: 7ec-")
try:
delta = int(input("Intervallo di aggiornamento in secondi: "))
mqtt.publish("7ecno/"+device, str(delta))
print("Pubblicato 7ecno/"+device, str(delta),"\n")
except Exception:
print("Valore non valido")
Sito: 7ecnologie
Sezione: 13. Reti
Capitolo: 05. LAB
Paragrafo: 01. Programmazione
Approfondimento: 03. Python: publisher-subscriber
Indice dei capitoli: 00. Risorse - 01. Le telecomunicazioni - 02. Il modello OSI - 03. La suite TCP/IP - 04. Il cablaggio strutturato - 05. LAB - 07. Tutorial - 98. Esercizi
Indice dei paragrafi: 01. Programmazione - 02. Monitoraggio della rete - 03. Network simulator - 04. Configurazione dei device
Indice degli approfondimenti: 01. C language: client-server - 02. Python: client-server - 03. Python: publisher-subscriber