xuanxi 发表于 2015-10-4 14:22:01

MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例

  使用IBM MQTTv3实现相关的发布订阅功能
  MQTTv3的发布消息的实现:



Java代码
[*]package com.etrip.mqttv3;
[*]
[*]import com.ibm.micro.client.mqttv3.MqttClient;
[*]import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
[*]import com.ibm.micro.client.mqttv3.MqttMessage;
[*]import com.ibm.micro.client.mqttv3.MqttTopic;
[*]/**
[*] * MQTTV3的发布消息类
[*] *
[*] * @author longgangbai
[*] */
[*]public class MQTTPub {   
[*]    public static void doTest(){   
[*]      try {   
[*]            MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub");   
[*]            MqttTopic topic = client.getTopic("tokudu/china");   
[*]            MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes());   
[*]            message.setQos(1);   
[*]            client.connect();
[*]            while(true){
[*]                MqttDeliveryToken token = topic.publish(message);
[*]                while (!token.isComplete()){   
[*]                  token.waitForCompletion(1000);   
[*]                }
[*]            }
[*]      } catch (Exception e) {   
[*]            e.printStackTrace();
[*]      }
[*]    }
[*]}
  MQTTV3的订阅消息类



Java代码
[*]package com.etrip.mqttv3;
[*]import com.ibm.micro.client.mqttv3.MqttClient;
[*]import com.ibm.micro.client.mqttv3.MqttConnectOptions;
[*]/**
[*] * MQTTV3的订阅消息类
[*] *
[*] * @author longgangbai
[*] */
[*]public class MQTTSubsribe {   
[*]    public static String doTest() {   
[*]      try {   
[*]            //创建MqttClient
[*]            MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000");   
[*]            //回调处理类
[*]            CallBack callback = new CallBack();   
[*]            client.setCallback(callback);
[*]            //创建连接可选项信息
[*]            MqttConnectOptions conOptions = new MqttConnectOptions();   
[*]            //
[*]            conOptions.setCleanSession(false);   
[*]            //连接broker
[*]            client.connect(conOptions);
[*]            //发布相关的订阅
[*]            client.subscribe("tokudu/china", 1);   
[*]            //client.disconnect();   
[*]      } catch (Exception e) {   
[*]            e.printStackTrace();
[*]            return "failed";   
[*]      }
[*]      return "success";   
[*]    }
[*]}
  回调处理类处理订阅的消息类
  



Java代码
[*]package com.etrip.mqttv3;
[*]
[*]import com.ibm.micro.client.mqttv3.MqttCallback;
[*]import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
[*]import com.ibm.micro.client.mqttv3.MqttMessage;
[*]import com.ibm.micro.client.mqttv3.MqttTopic;
[*]/**
[*] * 回调处理类
[*] * 处理订阅的消息类
[*] *
[*] * @author longgangbai
[*] */
[*]public class CallBack implements MqttCallback {   
[*]
[*]    public CallBack() {   
[*]    }
[*]    /**
[*]   * 接收到信息的处理
[*]   */
[*]    public void messageArrived(MqttTopic topic, MqttMessage message) {   
[*]      try {   
[*]            System.out.println(" MQTTSubsribemessage.toString()"+message.toString());
[*]      } catch (Exception e) {   
[*]            e.printStackTrace();
[*]      }
[*]    }
[*]    public void connectionLost(Throwable cause) {
[*]
[*]    }
[*]    public void deliveryComplete(MqttDeliveryToken token) {
[*]
[*]    }
[*]}
  
  
  测试类:



Java代码
[*]package com.etrip.mqttv3;
[*]/**
[*] * MQTTV3的测试类
[*] *
[*] * @author longgangbai
[*] */
[*]public class MQTTMain {
[*]    public static void main(String[] args) {
[*]      //订阅消息的方法
[*]      MQTTSubsribe.doTest();
[*]      //发布消息的类
[*]      MQTTPub.doTest();
[*]
[*]    }
[*]}
页: [1]
查看完整版本: MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例