Publish/Subscribe messaging — protokol standar IoT untuk komunikasi antar device melalui broker.
MQTT (Message Queuing Telemetry Transport) adalah protokol messaging ringan yang dirancang untuk IoT. Menggunakan model Publish/Subscribe melalui perantara (Broker).
MQTT Pub/Sub: ESP32 publish sensor data → Broker → Flutter subscribe & terima
| Konsep | Penjelasan | Contoh |
|---|---|---|
| Broker | Server perantara yang menerima & meneruskan pesan | Mosquitto, HiveMQ Cloud |
| Topic | Alamat/kategori pesan (seperti channel) | home/sensor/suhu |
| Publish | Kirim pesan ke topic tertentu | ESP32 kirim suhu ke sensor/suhu |
| Subscribe | Dengarkan pesan dari topic tertentu | Flutter subscribe sensor/# |
| QoS 0 | Kirim sekali, tidak ada konfirmasi | Sensor data periodik |
| QoS 1 | Minimal 1x terkirim (bisa duplikat) | Kontrol aktuator |
| QoS 2 | Persis 1x terkirim (paling reliable) | Billing / payment |
| Wildcard # | Match semua sub-topic | sensor/# = sensor/suhu, sensor/cahaya, dll |
mosquitto.conf:Config# mosquitto.conf
listener 1883
allow_anonymous true
Terminalmosquitto -c mosquitto.conf -v
Install library: PubSubClient via Arduino Library Manager.
C++ (Arduino)#include <WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>
// WiFi (STA mode — konek ke router)
const char* wifi_ssid = "NamaWiFi_Rumah";
const char* wifi_pass = "passwordwifi";
// MQTT Broker (IP laptop yang menjalankan Mosquitto)
const char* mqtt_server = "192.168.1.100";
const int mqtt_port = 1883;
#define LED_PIN 2
#define LDR_PIN 34
WiFiClient espClient;
PubSubClient mqtt(espClient);
unsigned long lastPublish = 0;
// ========================
// Callback: pesan masuk
// ========================
void mqttCallback(char* topic, byte* payload, unsigned int length) {
// Parse JSON payload
JsonDocument doc;
deserializeJson(doc, payload, length);
Serial.printf("Terima [%s]: ", topic);
if (String(topic) == "aktuator/led") {
bool state = doc["state"];
digitalWrite(LED_PIN, state ? HIGH : LOW);
Serial.println(state ? "LED ON" : "LED OFF");
// Publish konfirmasi status
publishStatus();
}
}
// ========================
// Publish sensor data
// ========================
void publishSensor() {
JsonDocument doc;
doc["cahaya"] = analogRead(LDR_PIN);
doc["uptime"] = millis() / 1000;
String json;
serializeJson(doc, json);
mqtt.publish("sensor/cahaya", json.c_str());
}
void publishStatus() {
JsonDocument doc;
doc["led"] = digitalRead(LED_PIN) ? true : false;
String json;
serializeJson(doc, json);
mqtt.publish("status/led", json.c_str());
}
// ========================
// Connect ke broker
// ========================
void connectMQTT() {
while (!mqtt.connected()) {
Serial.print("Connecting MQTT...");
String clientId = "ESP32-" + String(random(0xffff), HEX);
if (mqtt.connect(clientId.c_str())) {
Serial.println("connected!");
// Subscribe ke topic kontrol
mqtt.subscribe("aktuator/led");
mqtt.subscribe("aktuator/relay");
// Publish status awal
publishStatus();
} else {
Serial.printf("failed (rc=%d), retry in 3s\n",
mqtt.state());
delay(3000);
}
}
}
void setup() {
Serial.begin(115200);
pinMode(LED_PIN, OUTPUT);
// Connect WiFi (STA mode)
WiFi.begin(wifi_ssid, wifi_pass);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.printf("\nWiFi OK. IP: %s\n",
WiFi.localIP().toString().c_str());
// Setup MQTT
mqtt.setServer(mqtt_server, mqtt_port);
mqtt.setCallback(mqttCallback);
connectMQTT();
}
void loop() {
if (!mqtt.connected()) {
connectMQTT();
}
mqtt.loop(); // memproses pesan masuk
// Publish sensor data setiap 2 detik
if (millis() - lastPublish > 2000) {
lastPublish = millis();
publishSensor();
}
}
Terminalflutter pub add mqtt_client
Dartimport 'dart:convert';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
class MqttService {
late MqttServerClient _client;
bool _isConnected = false;
Function(String topic, Map<String, dynamic> data)? onMessage;
bool get isConnected => _isConnected;
/// Connect ke MQTT Broker
Future<bool> connect({
String broker = '192.168.1.100',
int port = 1883,
}) async {
_client = MqttServerClient(broker, '')
..port = port
..logging(on: false)
..keepAlivePeriod = 30
..autoReconnect = true
..onDisconnected = _onDisconnected
..onConnected = _onConnected;
final connMsg = MqttConnectMessage()
.withClientIdentifier(
'flutter_${DateTime.now().millisecondsSinceEpoch}')
.startClean()
.withWillQos(MqttQos.atLeastOnce);
_client.connectionMessage = connMsg;
try {
await _client.connect();
if (_client.connectionStatus?.state ==
MqttConnectionState.connected) {
_isConnected = true;
_listenMessages();
return true;
}
} catch (e) {
print('MQTT connect error: $e');
}
return false;
}
void _onConnected() {
_isConnected = true;
print('MQTT connected');
}
void _onDisconnected() {
_isConnected = false;
print('MQTT disconnected');
}
/// Subscribe ke topic
void subscribe(String topic, {MqttQos qos = MqttQos.atLeastOnce}) {
_client.subscribe(topic, qos);
}
/// Publish pesan ke topic
void publish(String topic, Map<String, dynamic> data) {
final builder = MqttClientPayloadBuilder();
builder.addString(jsonEncode(data));
_client.publishMessage(topic, MqttQos.atLeastOnce,
builder.payload!);
}
/// Kirim kontrol LED
void setLed(bool isOn) {
publish('aktuator/led', {'state': isOn});
}
/// Listen semua pesan masuk
void _listenMessages() {
_client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
for (var msg in c) {
final payload = msg.payload as MqttPublishMessage;
final text = MqttPublishPayload.bytesToStringAsString(
payload.payload.message);
final topic = msg.topic;
try {
final data = jsonDecode(text) as Map<String, dynamic>;
onMessage?.call(topic, data);
} catch (_) {}
}
});
}
void disconnect() {
_client.disconnect();
_isConnected = false;
}
}
Dartimport 'package:flutter/material.dart';
class MqttDashboard extends StatefulWidget {
const MqttDashboard({super.key});
@override
State<MqttDashboard> createState() => _MqttDashboardState();
}
class _MqttDashboardState extends State<MqttDashboard> {
final MqttService _mqtt = MqttService();
bool _ledOn = false;
int _cahaya = 0;
int _uptime = 0;
final List<String> _logs = [];
@override
void initState() {
super.initState();
_connectMqtt();
}
Future<void> _connectMqtt() async {
_mqtt.onMessage = (topic, data) {
if (!mounted) return;
setState(() {
// Log
_logs.insert(0,
'[$topic] ${data.toString().substring(0, 40.clamp(0, data.toString().length))}');
if (_logs.length > 20) _logs.removeLast();
// Parse topic
if (topic == 'sensor/cahaya') {
_cahaya = data['cahaya'] ?? 0;
_uptime = data['uptime'] ?? 0;
} else if (topic == 'status/led') {
_ledOn = data['led'] ?? false;
}
});
};
final ok = await _mqtt.connect();
if (ok) {
_mqtt.subscribe('sensor/#');
_mqtt.subscribe('status/#');
}
if (mounted) setState(() {});
}
@override
void dispose() {
_mqtt.disconnect();
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
backgroundColor: const Color(0xFF0f172a),
appBar: AppBar(
title: const Text('MQTT Dashboard'),
backgroundColor: const Color(0xFF1e293b),
actions: [
_badge(_mqtt.isConnected),
const SizedBox(width: 12),
],
),
body: Padding(
padding: const EdgeInsets.all(16),
child: Column(
children: [
// Sensor cards
Row(
children: [
_card('☀️ Cahaya', '$_cahaya', Colors.amber),
const SizedBox(width: 12),
_card('⏱️ Uptime', '${_uptime}s', Colors.cyan),
],
),
const SizedBox(height: 24),
// LED Control
SwitchListTile(
title: Text(
'LED: ${_ledOn ? "ON" : "OFF"}',
style: const TextStyle(color: Colors.white),
),
subtitle: const Text(
'Topic: aktuator/led',
style: TextStyle(color: Colors.grey),
),
value: _ledOn,
activeColor: Colors.amber,
onChanged: (val) {
_mqtt.setLed(val);
setState(() => _ledOn = val);
},
tileColor: const Color(0xFF1e293b),
shape: RoundedRectangleBorder(
borderRadius: BorderRadius.circular(12)),
),
const SizedBox(height: 24),
// Message log
Expanded(
child: Container(
width: double.infinity,
padding: const EdgeInsets.all(12),
decoration: BoxDecoration(
color: const Color(0xFF1e293b),
borderRadius: BorderRadius.circular(12),
),
child: Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: [
const Text('📋 MQTT Messages',
style: TextStyle(
color: Colors.white70,
fontWeight: FontWeight.bold)),
const SizedBox(height: 8),
Expanded(
child: ListView.builder(
itemCount: _logs.length,
itemBuilder: (_, i) => Padding(
padding: const EdgeInsets.only(bottom: 2),
child: Text(
_logs[i],
style: const TextStyle(
color: Colors.green,
fontSize: 11,
fontFamily: 'monospace',
),
),
),
),
),
],
),
),
),
],
),
),
);
}
Widget _badge(bool online) {
return Container(
padding: const EdgeInsets.symmetric(horizontal: 10, vertical: 4),
decoration: BoxDecoration(
color: (online ? Colors.green : Colors.red)
.withValues(alpha: 0.2),
borderRadius: BorderRadius.circular(12),
),
child: Text(
online ? 'MQTT ✓' : 'OFFLINE',
style: TextStyle(
color: online ? Colors.green : Colors.red,
fontSize: 12,
fontWeight: FontWeight.bold,
),
),
);
}
Widget _card(String label, String value, Color color) {
return Expanded(
child: Container(
padding: const EdgeInsets.all(16),
decoration: BoxDecoration(
color: const Color(0xFF1e293b),
borderRadius: BorderRadius.circular(12),
),
child: Column(
children: [
Text(label,
style: TextStyle(color: color, fontSize: 13)),
const SizedBox(height: 8),
Text(value,
style: TextStyle(
color: Colors.white,
fontSize: 24,
fontWeight: FontWeight.bold)),
],
),
),
);
}
}
| Fitur | USB Serial | Bluetooth | HTTP | WebSocket | MQTT |
|---|---|---|---|---|---|
| Jarak | Kabel | ~10m | WiFi range | WiFi range | Internet |
| Arah | 2-arah | 2-arah | Request only | 2-arah | Pub/Sub |
| Real-time | ✓ | ✓ | Polling | ✓ | ✓ |
| Multiple client | 1 | 1 (Classic) | ✓ | ✓ | ✓✓ |
| Internet | ✗ | ✗ | Optional | Optional | ✓ |
| Kompleksitas | Rendah | Sedang | Sedang | Sedang | Tinggi |
| Minggu | 5-6 | 7-10 | 11-12 | 13 | 14 |