Mosquittoに、Python Paho MQTT libraryを使ってアクセスする

あらかじめ起動しておいたMosquittoにPython Paho MQTT library (pkgsrc/net/py-paho-mqtt 2.1.0)を使って、Mosquittoの受信したデータを、InfluxDB 1 (pkgsrc/databases/influxdb)に保存する。 なぜInfluxDBに保存するのかと言うと、最終的にMosquittoで受信したデータをGrafana (pkgsrc/www/grafana)で表示するためである。

乱数をMosquittoに送信し、それを購読して、InfluxDBに保存する。

#!/usr/pkg/bin/python3.14

#
# Subscribe mosquitto MQTT server and store data into InfluxDB 1
#

import time
import random
import ssl

from paho.mqtt.enums import MQTTProtocolVersion
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient

# Connection information for InfluxDB 1.x
INFLUXDB_HOSTNAME = 'localhost'
INFLUXDB_PORT = 8086
INFLUXDB_DATABASE = 'mqtt001'
INFLUXDB_USERID = 'admin'
INFLUXDB_PASSWORD = 'ADMIN_PASSWORD'

# Connection information of MQTT
MQTT_BROKER = 'MYHOSTNAME'
MQTT_PORT = 8883
MQTT_TOPIC = 'topic/0001'

# Instantiate InfluxDB
influx_client = InfluxDBClient (
    host=INFLUXDB_HOSTNAME,
    port=INFLUXDB_PORT,
    username=INFLUXDB_USERID,
    password=INFLUXDB_PASSWORD,
    database=INFLUXDB_DATABASE
)

# Create database at first at once
influx_client.create_database(INFLUXDB_DATABASE)

# Callback function for MQTT subscription
def on_message (client, userdata, message):
    payload = message.payload.decode('utf-8')
    timestamp = int(time.time() * 1000000000) # in ns
    # debug print
    print (f'Received: {payload}')

    json_body = [
        {
            "measurement": "sensor_data",
            "tags": {
                "device": "dev001_reg700"
            },
            "fields": {
                "value": float(payload)
            },
            "time": timestamp
         }
    ]

    # Write data to InfluxDB
    influx_client.write_points(json_body)

# Create MQTT client
mqtt_client = mqtt.Client(
    callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    protocol=MQTTProtocolVersion.MQTTv5
)

# Assign callback function on receiving
mqtt_client.on_message = on_message

# Setup TLS connection
tls_context = ssl.create_default_context()
tls_context.verify_mode = ssl.CERT_REQUIRED
mqtt_client.tls_set_context(tls_context)

# Use user ID and password auth
PUBSUB_ID = 'user001'
PUBSUB_PW = 'password001'
mqtt_client.username_pw_set(
    username=PUBSUB_ID,
    password=PUBSUB_PW
)

# Connect to broker
mqtt_client.connect(
    host=MQTT_BROKER,
    port=MQTT_PORT, 
    keepalive=60
)


# Start to subscribe
mqtt_client.subscribe(topic=MQTT_TOPIC)
## debug print
print (f'Subscribe topic: {MQTT_TOPIC}')

# main loop in another thread
mqtt_client.loop_start()


# Testing publishing
while True:
    value = round(random.uniform(10, 30), 2)
    mqtt_client.publish(MQTT_TOPIC, str(value))
    print(f'Published: {value}')
    time.sleep(1)

0 件のコメント:

コメントを投稿

注: コメントを投稿できるのは、このブログのメンバーだけです。

Mosquittoに、Python Paho MQTT libraryを使ってアクセスする

あらかじめ起動しておいたMosquitto にPython Paho MQTT library (pkgsrc/net/py-paho-mqtt 2.1.0)を使って、Mosquittoの受信したデータを、InfluxDB 1 (pkgsrc/databases/influ...