MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例
使用IBM MQTTv3实现相关的发布订阅功能MQTTv3的发布消息的实现:
Java代码
[*]package com.etrip.mqttv3;
[*]
[*]import com.ibm.micro.client.mqttv3.MqttClient;
[*]import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
[*]import com.ibm.micro.client.mqttv3.MqttMessage;
[*]import com.ibm.micro.client.mqttv3.MqttTopic;
[*]/**
[*] * MQTTV3的发布消息类
[*] *
[*] * @author longgangbai
[*] */
[*]public class MQTTPub {
[*] public static void doTest(){
[*] try {
[*] MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub");
[*] MqttTopic topic = client.getTopic("tokudu/china");
[*] MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes());
[*] message.setQos(1);
[*] client.connect();
[*] while(true){
[*] MqttDeliveryToken token = topic.publish(message);
[*] while (!token.isComplete()){
[*] token.waitForCompletion(1000);
[*] }
[*] }
[*] } catch (Exception e) {
[*] e.printStackTrace();
[*] }
[*] }
[*]}
MQTTV3的订阅消息类
Java代码
[*]package com.etrip.mqttv3;
[*]import com.ibm.micro.client.mqttv3.MqttClient;
[*]import com.ibm.micro.client.mqttv3.MqttConnectOptions;
[*]/**
[*] * MQTTV3的订阅消息类
[*] *
[*] * @author longgangbai
[*] */
[*]public class MQTTSubsribe {
[*] public static String doTest() {
[*] try {
[*] //创建MqttClient
[*] MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000");
[*] //回调处理类
[*] CallBack callback = new CallBack();
[*] client.setCallback(callback);
[*] //创建连接可选项信息
[*] MqttConnectOptions conOptions = new MqttConnectOptions();
[*] //
[*] conOptions.setCleanSession(false);
[*] //连接broker
[*] client.connect(conOptions);
[*] //发布相关的订阅
[*] client.subscribe("tokudu/china", 1);
[*] //client.disconnect();
[*] } catch (Exception e) {
[*] e.printStackTrace();
[*] return "failed";
[*] }
[*] return "success";
[*] }
[*]}
回调处理类处理订阅的消息类
Java代码
[*]package com.etrip.mqttv3;
[*]
[*]import com.ibm.micro.client.mqttv3.MqttCallback;
[*]import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
[*]import com.ibm.micro.client.mqttv3.MqttMessage;
[*]import com.ibm.micro.client.mqttv3.MqttTopic;
[*]/**
[*] * 回调处理类
[*] * 处理订阅的消息类
[*] *
[*] * @author longgangbai
[*] */
[*]public class CallBack implements MqttCallback {
[*]
[*] public CallBack() {
[*] }
[*] /**
[*] * 接收到信息的处理
[*] */
[*] public void messageArrived(MqttTopic topic, MqttMessage message) {
[*] try {
[*] System.out.println(" MQTTSubsribemessage.toString()"+message.toString());
[*] } catch (Exception e) {
[*] e.printStackTrace();
[*] }
[*] }
[*] public void connectionLost(Throwable cause) {
[*]
[*] }
[*] public void deliveryComplete(MqttDeliveryToken token) {
[*]
[*] }
[*]}
测试类:
Java代码
[*]package com.etrip.mqttv3;
[*]/**
[*] * MQTTV3的测试类
[*] *
[*] * @author longgangbai
[*] */
[*]public class MQTTMain {
[*] public static void main(String[] args) {
[*] //订阅消息的方法
[*] MQTTSubsribe.doTest();
[*] //发布消息的类
[*] MQTTPub.doTest();
[*]
[*] }
[*]}
页:
[1]