Mosquittoに、Python Paho MQTT libraryを使ってアクセスする

あらかじめ起動しておいたMosquittoにPython Paho MQTT library (pkgsrc/net/py-paho-mqtt 2.1.0)を使って、Mosquittoの受信したデータを、InfluxDB 1 (pkgsrc/databases/influxdb)に保存する。 なぜInfluxDBに保存するのかと言うと、最終的にMosquittoで受信したデータをGrafana (pkgsrc/www/grafana)で表示するためである。

乱数をMosquittoに送信し、それを購読して、InfluxDBに保存する。

#!/usr/pkg/bin/python3.14

#
# Subscribe mosquitto MQTT server and store data into InfluxDB 1
#

import time
import random
import ssl

from paho.mqtt.enums import MQTTProtocolVersion
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient

# Connection information for InfluxDB 1.x
INFLUXDB_HOSTNAME = 'localhost'
INFLUXDB_PORT = 8086
INFLUXDB_DATABASE = 'mqtt001'
INFLUXDB_USERID = 'admin'
INFLUXDB_PASSWORD = 'ADMIN_PASSWORD'

# Connection information of MQTT
MQTT_BROKER = 'MYHOSTNAME'
MQTT_PORT = 8883
MQTT_TOPIC = 'topic/0001'

# Instantiate InfluxDB
influx_client = InfluxDBClient (
    host=INFLUXDB_HOSTNAME,
    port=INFLUXDB_PORT,
    username=INFLUXDB_USERID,
    password=INFLUXDB_PASSWORD,
    database=INFLUXDB_DATABASE
)

# Create database at first at once
influx_client.create_database(INFLUXDB_DATABASE)

# Callback function for MQTT subscription
def on_message (client, userdata, message):
    payload = message.payload.decode('utf-8')
    timestamp = int(time.time() * 1000000000) # in ns
    # debug print
    print (f'Received: {payload}')

    json_body = [
        {
            "measurement": "sensor_data",
            "tags": {
                "device": "dev001_reg700"
            },
            "fields": {
                "value": float(payload)
            },
            "time": timestamp
         }
    ]

    # Write data to InfluxDB
    influx_client.write_points(json_body)

# Create MQTT client
mqtt_client = mqtt.Client(
    callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    protocol=MQTTProtocolVersion.MQTTv5
)

# Assign callback function on receiving
mqtt_client.on_message = on_message

# Setup TLS connection
tls_context = ssl.create_default_context()
tls_context.verify_mode = ssl.CERT_REQUIRED
mqtt_client.tls_set_context(tls_context)

# Use user ID and password auth
PUBSUB_ID = 'user001'
PUBSUB_PW = 'password001'
mqtt_client.username_pw_set(
    username=PUBSUB_ID,
    password=PUBSUB_PW
)

# Connect to broker
mqtt_client.connect(
    host=MQTT_BROKER,
    port=MQTT_PORT, 
    keepalive=60
)


# Start to subscribe
mqtt_client.subscribe(topic=MQTT_TOPIC)
## debug print
print (f'Subscribe topic: {MQTT_TOPIC}')

# main loop in another thread
mqtt_client.loop_start()


# Testing publishing
while True:
    value = round(random.uniform(10, 30), 2)
    mqtt_client.publish(MQTT_TOPIC, str(value))
    print(f'Published: {value}')
    time.sleep(1)

Mosquittoを安全に利用する

はじめに

pkgsrc/net/mosquitto-2.1.2nb1を使う機会があったのだが、なるべく安全な方法で利用したいと思っていた。 TLS証明書を利用して暗号化された経路で通信し、接続にはユーザーIDとパスワードによるユーザー認証を利用し、ユーザー毎にアクセス制限を設定したい。 また、受信した情報はファイルに保存しておきたい。

設定は/usr/pkg/etc/mosquitto.confに書けば良い。

TLS証明書を利用する

Let's Encryptで取得したTLS証明書が使える。 以下のように設定すれば良い。 これは、pkgsrc/security/acmeshを使って取得したものだが、どのような方法で取得しても良い。

certfile /var/db/mosquitto/.acme.sh/MYHOSTNAME.net_ecc/fullchain.cer
keyfile /var/db/mosquitto/.acme.sh/MYHOSTNAME_ecc/app4.ryoon.net.key
cafile /var/db/mosquitto/.acme.sh/MYHOSTNAME_ecc/ca.cer

接続にユーザーIDとパスワードによるユーザー認証を利用する

ユーザー毎にアクセス制限を設定するためにも、ユーザーIDとパスワードによりユーザー認証をした上で接続する必要がある。 これは、以下のように設定すれば良い。

password_file /var/db/mosquitto/mosquitto.passwd

この中身は、mosquitto_passwd(1)で生成すれば良い。 以下のようにユーザーIDとパスワードをmosquitto.passwdとして用意しておき、mosquitto_passwdコマンドでパスワードをハッシュ化したファイルに変換すれば良い。 ハッシュアルゴリズムも選択できるようだ。

$ cat mosquitto.passwd
user001:password001
user002:password002
$ mosquitto_passwd -U mosquitto.passwd
$ cat mosquitto.passwd
user001:$7$1000$qmp3mSmMucnEwhH3vLWRLquFBPYVQFdALWJwvihS7UI6d2cTKSma77hbTeYFSoJ+M/WTowwzUg7klW+7ClbNng==$YxIm/PFeBoiCN9xBvV9XR1XWK6VqIJrwE432iX7b5ygnoyKKMRSHf9rk/9Lw02Gp9FWfhwnPU8ugzPJUIDzImA==
user002:$7$1000$x8FPjLROciMpi7q7OIkV+cdzEf+Eo02kGMuPaWALcoS9Vt7G9J/um376TaGuSDOj76HSvL8HIla8HHQx7DkJGQ==$3V1pN+4ApuSBPtnyzxfSWSnUb01Q0EjQ6LmkAQb+VetHuxCkcJkzEhQyG2Y6F7MWgG1DqfyrJIWCF7pYqHuquQ==

ユーザー毎にアクセス制限を設定する

まず、アクセス制限の内容を読み込むように/usr/pkg/etc/mosquitto.confに以下のように設定する。

acl_file /var/db/mosquitto/mosquitto.acl

その上で、/var/db/mosquitto/mosquitto.aclを以下のように設定する。 user001は全てのトピックに読み書きでき、user002は全てのトピックに書き込みだけできるようにする。 データと投入するクライアントはuser002でアクセスして来ることを想定し、user001で接続することを想定している。 user001は書き込みはできなくても良いのだが。

$ cat /var/db/mosquitto/mosquitto.acl
user user001
topic readwrite #

user user002
topic write #

受信したデータをファイルに保存する

/var/db/mosquitto/mosquitto.dbに保存するように設定する。

autosave_interval 60
autosave_on_changes true
persistence true
persistence_file mosquitto.db
persistence_location /var/db/mosquitto

その他

他にも設定してある内容は以下のようだ。

# mosquitto daemonを動かすユーザー
user mosquitto
# 待ち受けるポート
istener 8883
# ログとしてsyslogに出力するレベル
log_type debug

NetBSDでPostgreSQLサーバーを動かす

NetBSDでPostgreSQLサーバーを動かすのだが、受け付ける接続数を多くしたい。 具体的には/usr/pkg/pgsql/data/postgresql.confでmax_connectionsに100を設定したい。

そうするには、kern.ipc.semmnikern.ipc.semmnsの数を増加させる必要があるようだ。

# vi /etc/sysctl.conf
kern.ipc.semmni=100
kern.ipc.semmns=512

この位に増やせば、以下のように設定できるようだ。

# vi /usr/pkg/pgsql/data/postgresql.conf
max_connections = 100

man pageによると、以下のように説明されている。

$ man 7 sysctl
kern.ipc.semmni
        Max number of number of semaphore identifiers.

kern.ipc.semmns
        Max number of number of semaphores in system.

Mosquittoに、Python Paho MQTT libraryを使ってアクセスする

あらかじめ起動しておいたMosquitto にPython Paho MQTT library (pkgsrc/net/py-paho-mqtt 2.1.0)を使って、Mosquittoの受信したデータを、InfluxDB 1 (pkgsrc/databases/influ...