あらかじめ起動しておいたMosquittoにPython Paho MQTT library (pkgsrc/net/py-paho-mqtt 2.1.0)を使って、Mosquittoの受信したデータを、InfluxDB 1 (pkgsrc/databases/influxdb)に保存する。 なぜInfluxDBに保存するのかと言うと、最終的にMosquittoで受信したデータをGrafana (pkgsrc/www/grafana)で表示するためである。
乱数をMosquittoに送信し、それを購読して、InfluxDBに保存する。
#!/usr/pkg/bin/python3.14
#
# Subscribe mosquitto MQTT server and store data into InfluxDB 1
#
import time
import random
import ssl
from paho.mqtt.enums import MQTTProtocolVersion
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
# Connection information for InfluxDB 1.x
INFLUXDB_HOSTNAME = 'localhost'
INFLUXDB_PORT = 8086
INFLUXDB_DATABASE = 'mqtt001'
INFLUXDB_USERID = 'admin'
INFLUXDB_PASSWORD = 'ADMIN_PASSWORD'
# Connection information of MQTT
MQTT_BROKER = 'MYHOSTNAME'
MQTT_PORT = 8883
MQTT_TOPIC = 'topic/0001'
# Instantiate InfluxDB
influx_client = InfluxDBClient (
host=INFLUXDB_HOSTNAME,
port=INFLUXDB_PORT,
username=INFLUXDB_USERID,
password=INFLUXDB_PASSWORD,
database=INFLUXDB_DATABASE
)
# Create database at first at once
influx_client.create_database(INFLUXDB_DATABASE)
# Callback function for MQTT subscription
def on_message (client, userdata, message):
payload = message.payload.decode('utf-8')
timestamp = int(time.time() * 1000000000) # in ns
# debug print
print (f'Received: {payload}')
json_body = [
{
"measurement": "sensor_data",
"tags": {
"device": "dev001_reg700"
},
"fields": {
"value": float(payload)
},
"time": timestamp
}
]
# Write data to InfluxDB
influx_client.write_points(json_body)
# Create MQTT client
mqtt_client = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
protocol=MQTTProtocolVersion.MQTTv5
)
# Assign callback function on receiving
mqtt_client.on_message = on_message
# Setup TLS connection
tls_context = ssl.create_default_context()
tls_context.verify_mode = ssl.CERT_REQUIRED
mqtt_client.tls_set_context(tls_context)
# Use user ID and password auth
PUBSUB_ID = 'user001'
PUBSUB_PW = 'password001'
mqtt_client.username_pw_set(
username=PUBSUB_ID,
password=PUBSUB_PW
)
# Connect to broker
mqtt_client.connect(
host=MQTT_BROKER,
port=MQTT_PORT,
keepalive=60
)
# Start to subscribe
mqtt_client.subscribe(topic=MQTT_TOPIC)
## debug print
print (f'Subscribe topic: {MQTT_TOPIC}')
# main loop in another thread
mqtt_client.loop_start()
# Testing publishing
while True:
value = round(random.uniform(10, 30), 2)
mqtt_client.publish(MQTT_TOPIC, str(value))
print(f'Published: {value}')
time.sleep(1)
0 件のコメント:
コメントを投稿
注: コメントを投稿できるのは、このブログのメンバーだけです。