用 Node.js 把 Mosquitto MQTT 服务器上的消息存入 MySQL

首先需要安装相关的包: (cnpm 是npm 国内淘宝镜像,请参考前文如何安装 cnpm

#cnpm install mysql -g

#cnpm install mqtt -g

#cnpm install pm2 -g

如果运行代码遇到模块没找到的错误,需要运行

#cnpm link mqtt

第一次运行代码可以用

#node app_mqtt2mysql.js 来执行。如果没有任何输出,说明代码正常工作了。 按 Ctr-C 退出。

#pm2 start app_mqtt2mysql.js -i 1 -n mqtt

就可以让程序长驻内存。

#pm2 list 可以看到程序的运行状态。

查看 Mosquitto 的日志可以看到,有任何消息 /VV 下的主题 publish 到服务器时,YJ-MQTT都会收到,并把消息插入数据库,查看数据库表即可验证。

下面是完整的源代码:

# cat app_mqtt2mysql.js

var fs = require(‘fs’);

var mqtt = require(‘mqtt’);

var Topic = ‘VV/#’; //subscribe to all topics
var Broker_URL = ‘mqtt://mqtt.yj777.cn’;
var caFile = fs.readFileSync(‘/etc/mosquitto/ssl/ca-cert.pem’);

// Database
var Database_URL = ‘127.0.0.1’;

var options = {
clientId: ‘YJ-MQTT’,
protocol: ‘mqtts’,   // 加密的协议名需要写成 mqtts 。
protocolId: ‘MQIsdp’,
protocolVersion: 3,
secureProtocol: ‘TLSv1_method’,

// MQTT 服务器端设置了 require_certificate false,所以,我们这里只提供 CA 证书。
ca: caFile,
rejectUnauthorized: false,
port: 8883,
username: ‘username’,
password: ‘Passw0rd’,
keepalive: 60
// key: KEY,
// cert: CERT,
};

var client = mqtt.connect(Broker_URL, options);
client.on(‘connect’, mqtt_connect);
client.on(‘reconnect’, mqtt_reconnect);
client.on(‘error’, mqtt_error);
client.on(‘message’, mqtt_messsageReceived);
client.on(‘close’, mqtt_close);

function mqtt_connect() {
//console.log(“Connecting MQTT”);
client.subscribe(Topic, mqtt_subscribe);
};

function mqtt_subscribe(err, granted) {
console.log(“Subscribed to ” + Topic);
if (err) {console.log(err);}
};

function mqtt_reconnect(err) {
//console.log(“Reconnect MQTT”);
//if (err) {console.log(err);}
client = mqtt.connect(Broker_URL, options);
};

function mqtt_error(err) {
//console.log(“Error!”);
//if (err) {console.log(err);}
};

function after_publish() {
//do nothing
};

//receive a message from MQTT broker
function mqtt_messsageReceived(topic, message, packet) {
var message_str = message.toString(); //convert byte array to string
message_str = message_str.replace(/\n$/, ”); //remove new line
message_str = message_str.replace(/^@/, ”); //remove first \@
//payload syntax: clientID,topic,message
// if (countInstances(message_str) != 1) {
// console.log(“Invalid payload”);
// } else {
if (message_str) insert_message(topic, message_str, packet);
//console.log(message_arr);
// }
};

function mqtt_close() {
//console.log(“Close MQTT”);
};

////////////////////////////////////////////////////
///////////////////// MYSQL ////////////////////////
////////////////////////////////////////////////////
var mysql = require(‘mysql’);
//Create Connection
var connection = mysql.createConnection({
host: Database_URL,
user: “db_user”,
password: “db_pass”,
database: “db_name”
});

connection.connect(function(err) {
if (err) throw err;
//console.log(“Database Connected!”);
});

//insert a row into the tbl_messages table
function insert_message(topic, message_str, packet) {
// var message_arr = extract_string(message_str); //split a string into an array
var clientID = “”;
// var message = message_arr[0];
var message = message_str;
var sql = “INSERT INTO ?? (??,??,??) VALUES (?,?,?)”;
var params = [‘tablename’, ‘clientID’, ‘topic’, ‘message’, clientID, topic, message];
sql = mysql.format(sql, params);

connection.query(sql, function (error, results) {
if (error) throw error;
console.log(“Message added: ” + message_str);
// we can call the API to run sync_device.php
});
};

//split a string into an array of substrings
function extract_string(message_str) {
var message_arr = message_str.split(“,”); //convert to array
return message_arr;
};

//count number of delimiters in a string
var delimiter = “,”;
function countInstances(message_str) {
var substrings = message_str.split(delimiter);
return substrings.length – 1;
};

// Disable Console Log
// console.log = function() {}