ag游戏庄闲|官方网站
!实时搜索: 盒子论坛 | 注册用户 | 修改信息 | 退出
检举帖 | 全文检索 | 关闭广告 | 捐赠
技术论坛
?用户名
?密??码
自动登陆(30天有效)
忘了密码
≡技术区≡
DELPHI技术
移动应用开发
Web应用开发
数据库专区
报表专区
网络通讯
开源项目
论坛精华贴
≡发布区≡
发布代码
发布控件
文档资料
经典工具
≡事务区≡
网站意见
盒子之家
招聘应聘
信息交换
论坛信息
最新加入: sy1368364703
今日帖子: 12
在线用户: 20
导航: 论坛 -> DELPHI技术 斑竹:liumazi,sephil ?
作者:
男 glwang (glwang) ★☆☆☆☆ -
盒子活跃会员
2019/8/8 15:18:03
标题:
36分!求CSDN下载积分大富翁帮忙下载:基于Delphi的MQTT协议实现(使用INDY无三方控件) 浏览:351
加入我的收藏
楼主: 基于Delphi的MQTT协议实现(使用INDY无三方控件)??????
基于Delphi的MQTT协议实现(使用INDY无三方控件)使用方法参考:
?http://blog.tdiot.cc/?p=10&preview=true

https://download.csdn.net/download/oyaowang123/9621489

感谢!!
----------------------------------------------
-
作者:
男 wang_80919 (Flying Wang) ▲▲▲▲▲ -
普通会员
2019/8/8 15:20:12
1楼: 一堆人说?INDY?不好,你干嘛下载?
----------------------------------------------
(C)(P)Flying?Wang
作者:
男 earthsbest (全能中间件) ▲▲△△△ -
注册会员
2019/8/8 15:39:49
2楼: 好像就是这个?https://github.com/pjde/delphi-mqtt
----------------------------------------------
Delphi4Linux交流群:734515869?http://www.cnblogs.com/rtcmw
作者:
男 glwang (glwang) ★☆☆☆☆ -
盒子活跃会员
2019/8/8 17:37:28
3楼: 2楼?跟开源的那个不是一样的
----------------------------------------------
-
作者:
男 wang_80919 (Flying Wang) ▲▲▲▲▲ -
普通会员
2019/8/8 17:49:05
4楼: 我的意思是,由于有很多人不喜欢?INDY,所以也就没几个人乐意下载。
----------------------------------------------
(C)(P)Flying?Wang
作者:
男 54993699 (54993699) ★☆☆☆☆ -
注册会员
2019/8/9 8:38:34
5楼: 下下来了,不知道怎么给你,?就一个PAS,我直接粘了
unit?utdMQTT;

interface

uses
??System.SysUtils,?System.Classes,IdBaseComponent,?IdComponent,?IdTCPConnection,?IdTCPClient,IdHashMessageDigest,IdGlobal,IdHash,
??System.StrUtils;

{
??使用请保留此处信息。
??基于indy的MQTT协议实现
??部分代码参考:http://jamie.op-i.net/blog/code/mqtt-client-library-for-delphi/
??改成可跨平台的版本,去掉三方控件,简化代码量。
??作者博客:http://blog.tdiot.cc/
??如有疑问可到博客留言
}
type
??TMQTTMessageType?=?(
??????????Reserved0,??//0??Reserved
??????????CONNECT,?//??1??Client?request?to?connect?to?Broker
??????????CONNACK,?//??2??Connect?Acknowledgment
??????????PUBLISH,?//??3??Publish?message
??????????PUBACK,?//??4??Publish?Acknowledgment
??????????PUBREC,?//??5??Publish?Received?(assured?delivery?part?1)
??????????PUBREL,?//??6??Publish?Release?(assured?delivery?part?2)
??????????PUBCOMP,?//??7??Publish?Complete?(assured?delivery?part?3)
??????????SUBSCRIBE,?//??8??Client?Subscribe?request
??????????SUBACK,?//??9??Subscribe?Acknowledgment
??????????UNSUBSCRIBE,?//?10??Client?Unsubscribe?request
??????????UNSUBACK,?//?11??Unsubscribe?Acknowledgment
??????????PINGREQ,?//??12??PING?Request
??????????PINGRESP,?//??13??PING?Response
??????????DISCONNECT,?//?14??Client?is?Disconnecting
??????????Reserved15?//??15
????????);
??TRemainingLength?=?Array?of?Byte;???//剩余长度字段,可变,1到4字节
??TUTF8Text?=?Array?of?Byte;
type
??TMQTTMessage?=?Record
????FixedHeader:?Byte;
????RL:?TBytes;
????Data:?TBytes;
??End;
{事件}
TConnAckEvent?=?procedure?(Sender:?TObject;?sErrcode:string)?of?object;
TPublishEvent?=?procedure?(Sender:?TObject;?topic,?payload:?string)?of?object;
TPingRespEvent?=?procedure?(Sender:?TObject)?of?object;
TSubAckEvent?=?procedure?(Sender:?TObject;?MessageID:?integer;?GrantedQoS:?integer)?of?object;
TUnSubAckEvent?=?procedure?(Sender:?TObject;?MessageID:?integer)?of?object;
type
??TClientHandleThread=class(TThread)?//创建线程读取返回数据
??private
????{?Private?declarations?}
????servercmd:integer;
????serverMsg:string;
????FIdTCPClient:?TIdTCPClient;
????FCurrentData:?TMQTTMessage;
????FConnAckEvent:?TConnAckEvent;//连接返回事件
????FPublishEvent:?TPublishEvent;//收到发布信息事件
????FPingRespEvent:TPingRespEvent;//收到心跳应答包
????FSubAckEvent:TSubAckEvent;//收到订阅应答包

????function?BytesToStrLength(LengthBytes:?TBytes):?integer;
????function?RemainingLengthToInt(RLBytes:?TBytes):?Integer;
????procedure?AppendBytes(var?DestArray:?TBytes;const?NewBytes:?TBytes);
????procedure?HandleInput;
??protected
????procedure?Execute;?override;
??public
????constructor?Create(IdTCPClient:?TIdTCPClient);?overload;
????property?OnConnAck?:?TConnAckEvent?read?FConnAckEvent?write?FConnAckEvent;
????property?OnPublish?:?TPublishEvent?read?FPublishEvent?write?FPublishEvent;
????property?OnPingRespEvent?:?TPingRespEvent?read?FPingRespEvent?write?FPingRespEvent;
????property?OnSubAckEvent?:?TSubAckEvent?read?FSubAckEvent?write?FSubAckEvent;
??end;


type
??TTdMqtt?=?class(TComponent)
??private
????FActive:?Boolean;
????FClientID:?string;
????FUser,FPwd:string;
????FHost:?string;
????FPort:?Integer;
????FIdTCPClient:?TIdTCPClient;
????FReadThread:?TClientHandleThread;
????FConnAckEvent:?TConnAckEvent;//连接返回事件
????FPublishEvent:?TPublishEvent;//收到发布信息事件
????FPingRespEvent:TPingRespEvent;//收到心跳应答包
????FSubAckEvent:TSubAckEvent;//收到订阅应答包
????FMessageID:?integer;
????function?MD5(source:string):string;
????function?GetMessageID:?TBytes;
??protected
????function?GetIsDesignTime:?Boolean;
????function?GetIsLoading:?Boolean;
????property?IsLoading:?Boolean?read?GetIsLoading;
????property?IsDesignTime:?Boolean?read?GetIsDesignTime;
????procedure?SetActive(AValue:?Boolean);
????function?FixedHeader(MessageType:?TMQTTMessageType;?Dup,?Qos,Retain:?Word):?Byte;?//固定头第一个字节
????procedure?AppendArray(var?Dest:?TUTF8Text;?Source:?Array?of?Byte);
????function?StrToBytes(str:?String;?perpendLength:?boolean):?TUTF8Text;
????function?RemainingLength(x:?Integer):?TRemainingLength;?????//固定头第二个字节,动态长度1-4
????function?BuildCommand(FixedHeader:?Byte;?RemainL:?TRemainingLength;??VariableHead:?TBytes;?Payload:?Array?of?Byte):?TBytes;?//构建最终的发送数据包
????procedure?CopyIntoArray(var?DestArray:?Array?of?Byte;?SourceArray:?Array?of?Byte;?StartIndex:?integer);

??public
????{?Public?declarations?}
????destructor?Destroy;?override;
????constructor?Create(AOwner:?TComponent);?override;
??published
????property?ClientID:string?read?FClientID?write?FClientID;
????property?Host:string?read?FHost?write?FHost;
????property?Port:Integer?read?FPort?write?FPort;
????property?Active:?Boolean?read?FActive?write?SetActive?default?False;
????property?User:string?read?FUser?write?FUser;
????property?Pwd:string?read?FPwd?write?FPwd;

????procedure?MQTTConnect;
????procedure?MQTTPing;
????procedure?MQTTPublish(sTopic,sPayload:string);
????procedure?MQTTSubscribe(sTopic:string);
????property?OnConnAck?:?TConnAckEvent?read?FConnAckEvent?write?FConnAckEvent;
????property?OnPublish?:?TPublishEvent?read?FPublishEvent?write?FPublishEvent;
????property?OnPingRespEvent?:?TPingRespEvent?read?FPingRespEvent?write?FPingRespEvent;
????property?OnSubAckEvent?:?TSubAckEvent?read?FSubAckEvent?write?FSubAckEvent;

??end;
procedure?Register;

implementation

procedure?Register;
begin
??RegisterComponents('Tndsoft',?[TTdMqtt]);
end;

{TClientHandleThread}
function?TClientHandleThread.BytesToStrLength(LengthBytes:?TBytes):?integer;
begin
??Assert(Length(LengthBytes)?=?2,?'UTF-8?Length?Bytes?preceeding?the?text?must?be?2?Bytes?in?Legnth');

??Result?:=?0;
??Result?:=?LengthBytes[0]?shl?8;
??Result?:=?Result?+?LengthBytes[1];
end;

function?TClientHandleThread.RemainingLengthToInt(RLBytes:?TBytes):?Integer;
var
??multi:?integer;
??i:?integer;
??digit:?Byte;
begin
??multi?:=?1;
??i?:=?0;
??Result?:=?0;

??digit?:=?RLBytes[i];
??repeat
????digit?:=?RLBytes[i];
????Result?:=?Result?+?(digit?and?127)?*?multi;
????multi?:=?multi?*?128;
????Inc(i);
??until?((digit?and?128)?=?0);
end;

procedure?TClientHandleThread.AppendBytes(var?DestArray:?TBytes;const?NewBytes:?TBytes);
var
??DestLen:?Integer;
begin
??DestLen?:=?Length(DestArray);
??SetLength(DestArray,?DestLen?+?Length(NewBytes));
??Move(NewBytes,?DestArray[DestLen],?Length(NewBytes));
end;

?procedure?TClientHandleThread.HandleInput;
var
??MessageType:?Byte;
??DataLen:?integer;
??QoS:?integer;
??sErrCode:string;
??Topic:?string;
??Payload:?String;
??iTopicLen,iPayloadLen:Integer;
??ResponseVH:?TBytes;
??ConnectReturn:?Integer;
??i:Integer;
??DestArray:TUTF8Text;
begin
??if?(FCurrentData.FixedHeader?<>?0)?then
????begin
??????MessageType?:=?FCurrentData.FixedHeader?shr?4;

?????if?(MessageType?=?Ord(CONNACK))?then
????????begin
??????????//?Check?if?we?were?given?a?Connect?Return?Code.
??????????ConnectReturn?:=?0;
??????????//?Any?return?code?except?0?is?an?Error
??????????if?((Length(FCurrentData.Data)?>?0)?and?(Length(FCurrentData.Data)???????????begin
??????????ConnectReturn?:=?FCurrentData.Data[1];
??????????case?ConnectReturn?of
??????????0:sErrCode:='连接已接受';
??????????1:sErrCode:='连接已拒绝,不支持的协议版本';
??????????2:sErrCode:='连接已拒绝,不合格的客户端标识符';
??????????3:sErrCode:='连接已拒绝,服务端不可用';
??????????4:sErrCode:='连接已拒绝,无效的用户名或密码';
??????????5:sErrCode:='连接已拒绝,未授权';
??????????end;
??????????end;
??????????if?Assigned(OnConnAck)?then?OnConnAck(Self,?sErrCode);
????????end
??????else
??????if?(MessageType?=?Ord(PUBLISH))?then
????????begin
??????????//?Read?the?Length?Bytes
??????????iTopicLen?:=?BytesToStrLength(Copy(FCurrentData.Data,?0,?2));
??????????//?Get?the?Topic
??????????SetLength(DestArray,0);

??????????SetLength(DestArray,iTopicLen);
??????????Move(FCurrentData.Data[2],?DestArray[0],?iTopicLen);
??????????Topic:=TEncoding.ANSI.GetString(Tbytes(DestArray));//StringOf(Tbytes(DestArray));

??????????//?Get?the?Payload

??????????//iPayloadLen:=(Length(FCurrentData.Data)?-?2?-?iTopicLen-2);??//此值也用数据中的长度值计算
??????????//iPayloadLen:=?BytesToStrLength(Copy(FCurrentData.Data,?iTopicLen+2,?2));
??????????{此处使用新方法获取长度。可变长度-2-iTopicLen}
??????????iPayloadLen:=?RemainingLengthToInt(FCurrentData.RL)-2-iTopicLen;



??????????SetLength(DestArray,iPayloadLen);
??????????Move(FCurrentData.Data[2+iTopicLen],?DestArray[0],?iPayloadLen);
??????????Payload:=TEncoding.ANSI.GetString(Tbytes(DestArray));

??????????if?Assigned(OnPublish)?then?OnPublish(Self,?Topic,Payload);


//??????????fMQTTForDelphi.Memo1.Lines.Add(Format('接收到消息.主体:%s?内容:%s',[Topic,Payload]));
????????end
??????else
??????if?(MessageType?=?Ord(SUBACK))?then
????????begin
??????????//?Reading?the?Message?ID
??????????ResponseVH?:=?Copy(FCurrentData.Data,?0,?2);
??????????DataLen?:=?BytesToStrLength(ResponseVH);
??????????//?Next?Read?the?Granted?QoS
??????????QoS?:=?0;
??????????if?(Length(FCurrentData.Data)?-?2)?>?0?then
??????????begin
??????????ResponseVH?:=?Copy(FCurrentData.Data,?2,?1);
??????????QoS?:=?ResponseVH[0];
??????????end;
??????????if?Assigned(OnSubAckEvent)?then?OnSubAckEvent(Self,?DataLen,QoS);
????????end
??????else
??????if?(MessageType?=?Ord(UNSUBACK))?then
????????begin
??????????//?Read?the?Message?ID?for?the?event?handler
??????????ResponseVH?:=?Copy(FCurrentData.Data,?0,?2);
??????????DataLen?:=?BytesToStrLength(ResponseVH);
//??????????fMQTTForDelphi.Memo1.Lines.Add(Format('收到取消订阅包。长度:%s',[DataLen]))
//??????????if?Assigned(OnUnSubAck)?then?OnUnSubAck(Self,?DataLen);
????????end
??????else
??????if?(MessageType?=?Ord(PINGRESP))?then
??????begin
?????????if?Assigned(OnPingRespEvent)?then?OnPingRespEvent(Self);
??????end;
????end;
end;

procedure?TClientHandleThread.Execute;
var
??CurrentMessage:TMQTTMessage;
??vBuffer:TIdBytes;
??Buffer:?TBytes;
??RLInt:?Integer;
??I:Integer;
begin
??while?not?Terminated?do
??begin
????if?not?FIdTCPClient.Connected?then
??????Terminate
????else
????try
??????CurrentMessage.FixedHeader?:=?0;
??????CurrentMessage.RL?:=?nil;
??????CurrentMessage.Data?:=?nil;
??????CurrentMessage.FixedHeader:=FIdTCPClient.IOHandler.ReadByte;??//读取FixedHdader
??????//读取剩余长度-编码过
??????SetLength(CurrentMessage.RL,1);
??????SetLength(Buffer,1);
??????CurrentMessage.RL[0]:=FIdTCPClient.IOHandler.ReadByte;??//读取剩余长度第一位
??????for?i?:=?1?to?3?do????//读取剩余长度其他位数,剩余长度为可变长度1-4.
??????begin
????????if?((?CurrentMessage.RL[i?-?1]?and?128)?<>?0)?then
????????begin
??????????Buffer[0]?:=?FIdTCPClient.IOHandler.ReadByte;
??????????AppendBytes(CurrentMessage.RL,?Buffer);
????????end
????????else
????????Break;
??????end;
??????//解码剩余长度}
??????RLInt?:=?RemainingLengthToInt(CurrentMessage.RL);
??????//将剩余长度的数据全部读出}
??????if?(RLInt?>?0)??then
??????begin
????????SetLength(CurrentMessage.Data,?0);
????????SetLength(CurrentMessage.Data,?RLInt);
????????FIdTCPClient.IOHandler.ReadBytes(vBuffer,RLInt,False);
????????CurrentMessage.Data:=TBytes(vBuffer);
//????????FPSocket^.RecvBufferEx(Pointer(CurrentMessage.Data),?RLInt,?1000);
??????end;
??????FCurrentData?:=?CurrentMessage;
??????Synchronize(HandleInput);
????except

????end;
??end;
end;

constructor?TClientHandleThread.Create(IdTCPClient:?TIdTCPClient);
begin
??inherited?Create;
??FIdTCPClient:=IdTCPClient;
end;

{TTDMQTT}

function?TTdMqtt.MD5(source:string):string;
var
??MyMD5:TIdHashMessageDigest5;
??Digest:T4x4LongWordRecord;
??ciphertext:string;
begin
??Result?:=?'';
??{$IFDEF?VER150}
????try
??????MyMD5?:=?TIdHashMessageDigest5.Create;
??????Digest?:=?MyMD5.HashValue(source);
??????ciphertext?:=?MyMD5.AsHex(Digest);
??????ciphertext?:=?UpperCase(ciphertext);
??????Result?:=?ciphertext;
????finally
??????MyMD5.Free;
????end;
??{$ELSE}
????try
??????MyMD5?:=?TIdHashMessageDigest5.Create;
??????ciphertext?:=?MyMD5.HashStringAsHex(source);
??????ciphertext?:=?UpperCase(ciphertext);
??????Result?:=?ciphertext;
????finally
??????MyMD5.Free;
????end;
??{$ENDIF}
end;

function?TTdMqtt.StrToBytes(str:?String;?perpendLength:?boolean):?TUTF8Text;
var
??i:?integer;
??ordStr:SmallInt;
??buf:TBytes;
begin
??buf:=TEncoding.ANSI.GetBytes(str);
??if?perpendLength?then
??begin
????SetLength(Result,?Length(buf)?+?2);
????Result[0]?:=?Length(buf)?div?256;
????Result[1]?:=?Length(buf)?mod?256;
????Move(buf[0],Result[2],Length(buf));
??end
??else
??begin
????SetLength(Result,?Length(buf));
????Move(buf[0],Result[0],Length(buf));
??end;
end;

procedure?TTdMqtt.CopyIntoArray(var?DestArray:?Array?of?Byte;?SourceArray:?Array?of?Byte;?StartIndex:?integer);
begin
??Assert(StartIndex?>=?0);
??Move(SourceArray[0],?DestArray[StartIndex],?Length(SourceArray));
end;

function?TTdMqtt.BuildCommand(FixedHeader:?Byte;?RemainL:?TRemainingLength;??VariableHead:?TBytes;?Payload:?Array?of?Byte):?TBytes;?//构建最终的发送数据包
var
??iNextIndex:?integer;
begin
??//?Attach?Fixed?Header?(1?byte)
??iNextIndex?:=?0;
??SetLength(Result,?1);
??Result[iNextIndex]?:=?FixedHeader;

??//?Attach?RemainingLength?(1-4?bytes)
??iNextIndex?:=?Length(Result);
??SetLength(Result,?Length(Result)?+?Length(RemainL));
??CopyIntoArray(Result,?RemainL,?iNextIndex);

??//?Attach?Variable?Head
??iNextIndex?:=?Length(Result);
??SetLength(Result,?Length(Result)?+?Length(VariableHead));
??CopyIntoArray(Result,?VariableHead,?iNextIndex);

??//?Attach?Payload.
??iNextIndex?:=?Length(Result);
??SetLength(Result,?Length(Result)?+?Length(Payload));
??CopyIntoArray(Result,?Payload,?iNextIndex);
end;

function?TTdMqtt.RemainingLength(x:?Integer):?TRemainingLength;?????//固定头第二个字节,动态长度1-4
var
??byteindex:?integer;
??digit:?integer;
begin
??SetLength(Result,?1);
??byteindex?:=?0;
??while?(x?>?0)?do
??begin
????digit?:=?x?mod?128;
????x?:=?x?div?128;
????if?x?>?0?then
????begin
??????digit?:=?digit?or?128;
????end;
????Result[byteindex]?:=?digit;
????if?x?>?0?then
????begin
??????inc(byteindex);
??????SetLength(Result,?Length(Result)?+?1);
????end;
??end;
end;

procedure?TTdMqtt.AppendArray(var?Dest:?TUTF8Text;?Source:?Array?of?Byte);
var
??DestLen:?Integer;
begin
??DestLen?:=?Length(Dest);
??SetLength(Dest,?DestLen?+?Length(Source));
??Move(Source,?Dest[DestLen],?Length(Source));
end;

function?TTdMqtt.FixedHeader(MessageType:?TMQTTMessageType;?Dup,?Qos,Retain:?Word):?Byte;?//固定头第一个字节
begin
??Result?:=?(Ord(MessageType)?*?16)?+?(Dup?*?8)?+?(Qos?*?2)?+?(Retain?*?1);
end;

function?TTdMqtt.GetIsDesignTime:?Boolean;
begin
??Result?:=?(csDesigning?in?ComponentState);
end;
function?TTdMqtt.GetIsLoading:?Boolean;
begin
??Result?:=?(csLoading?in?ComponentState);
end;

procedure?TTdMqtt.MQTTConnect;?//
var
??MqttData_FixedHead:Byte;???????//固定头
??MqttData_VariableHeader:TBytes;//可变头
??MqttData_RemainingLength:?TRemainingLength;//剩余长度
??Payload:?TUTF8Text;
??SendData:?TBytes;??????????//构件完成的最终需要发送的二进制数据
??function?VariableHeader_Connect(KeepAlive:?Word):?TBytes;
??const
????MQTT_PROTOCOL?=?'MQTT';
????MQTT_VERSION?=?4;//V3.1.1协议级别为4.非3了
??var
????Qos,?Retain:?word;
????iByteIndex:?integer;
????ProtoBytes:?TUTF8Text;
????ConnectFlag:Byte;//连接标志
??begin
????SetLength(Result,?10);????//长度为10
????iByteIndex?:=?0;
????ProtoBytes?:=?StrToBytes(MQTT_PROTOCOL,?true);
????CopyIntoArray(Result,?ProtoBytes,?iByteIndex);?????//协议名

????Inc(iByteIndex,?Length(ProtoBytes));
????Result[iByteIndex]?:=?MQTT_VERSION;??????????//版本号

????Inc(iByteIndex);
??//??asm
??//????mov?ConnectFlag,11000010B?//UserName,Pwd,CleanSession为1,其余均为0
??//??end;
????ConnectFlag:=194;

????Result[iByteIndex]?:=?ConnectFlag;//连接标志

????Inc(iByteIndex);
????Result[iByteIndex]?:=?0;??????????//保持连接时间第一位

????Inc(iByteIndex);
????Result[iByteIndex]?:=?KeepAlive;?//保持连接时间第二位
??end;
begin
??MqttData_FixedHead:=FixedHeader(CONNECT,0,0,0);
??MqttData_VariableHeader:=?VariableHeader_Connect(40);//构建可变头。参数单位秒,在此时间之内,客户端需要发送?PINGREQ?否则服务端将断开网络连接
??SetLength(Payload,?0);??//开始构建有效荷载,由于需要认证帐号密码,此处荷载内容为ID,USER,PWD
??AppendArray(Payload,?StrToBytes(FClientID,?true));?//id
??AppendArray(Payload,?StrToBytes(FUser,?true));?????//user
??AppendArray(Payload,?StrToBytes(FPwd,?true));?????//pwd
??MqttData_RemainingLength:=RemainingLength(Length(MqttData_VariableHeader)?+?Length(Payload));//计算剩余长度
??SendData:=BuildCommand(MqttData_FixedHead,?MqttData_RemainingLength,?MqttData_VariableHeader,?Payload);??//组包
??FIdTCPClient.Socket.Write(TIdBytes(SendData));??//发送
end;

procedure?TTdMqtt.MQTTPing;
var
??FH:?Byte;
??RL:?Byte;
??Data:?TBytes;
begin
??SetLength(Data,?2);
??FH?:=?FixedHeader(PINGREQ,?0,?0,?0);
??RL?:=?0;
??Data[0]?:=?FH;
??Data[1]?:=?RL;
??FIdTCPClient.Socket.Write(TIdBytes(Data));
end;


procedure?TTdMqtt.MQTTPublish(sTopic,sPayload:string);
var
??Data:?TBytes;
??FH:?Byte;
??RL:?TRemainingLength;
??VH:?TBytes;
??Payload:?TUTF8Text;
??function?VariableHeader_Publish(topic:?string):?TBytes;
??var
????BytesTopic:?TUTF8Text;
??begin
????BytesTopic?:=?StrToBytes(Topic,?true);
????SetLength(Result,?Length(BytesTopic));
????CopyIntoArray(Result,?BytesTopic,?0);
??end;
begin
??FH?:=?FixedHeader(PUBLISH,?0,?0,?1);??//保留消息设为1;对于发布者不定期发送状态消息这个场景,保留消息很有用。新的订阅者将会收到最近的状态。
??VH?:=?VariableHeader_Publish(sTopic);
??SetLength(Payload,?0);
??AppendArray(Payload,?StrToBytes(sPayload,?false));?//需要注意,此处长度的计算??。协议中:有效荷载长度:用固定包头中的剩余长度字段减去可变包头长度
??RL?:=?RemainingLength(Length(VH)?+?Length(Payload));
??Data?:=?BuildCommand(FH,?RL,?VH,?Payload);
??FIdTCPClient.Socket.Write(TIdBytes(Data));
end;


function?TTdMqtt.GetMessageID:?TBytes;
begin
??Assert((Self.FMessageID?>?Low(Word)),?'Message?ID?too?low');
??Assert((Self.FMessageID?
??{??Self.FMessageID?is?initialised?to?1?upon?TMQTTClient.Create
??The?Message?ID?is?a?16-bit?unsigned?integer,?which?typically?increases?by?exactly
??one?from?one?message?to?the?next,?but?is?not?required?to?do?so.
??The?two?bytes?of?the?Message?ID?are?ordered?as?MSB,?followed?by?LSB?(big-endian).}
??SetLength(Result,?2);
??Result[0]?:=?Hi(Self.FMessageID);
??Result[1]?:=?Lo(Self.FMessageID);
??Inc(Self.FMessageID);
end;

procedure?TTdMqtt.MQTTSubscribe(sTopic:string);
var
??Data:?TBytes;
??FH:?Byte;
??RL:?TRemainingLength;
??VH:?TBytes;
??Payload:?TUTF8Text;
??function?VariableHeader_Subscribe:?TBytes;
??begin
????Result?:=?GetMessageID;
??end;
begin
??FH?:=?FixedHeader(SUBSCRIBE,?0,?1,?0);
??VH?:=?VariableHeader_Subscribe;
??SetLength(Payload,?0);
??AppendArray(Payload,?StrToBytes(sTopic,?true));
??//?Append?a?new?Byte?to?Add?the?Requested?QoS?Level?for?that?Topic
??SetLength(Payload,?Length(Payload)?+?1);
??//?Always?Append?Requested?QoS?Level?0
??Payload[Length(Payload)?-?1]?:=?$0;
??RL?:=?RemainingLength(Length(VH)?+?Length(Payload));
??Data?:=?BuildCommand(FH,?RL,?VH,?Payload);
??FIdTCPClient.Socket.Write(TIdBytes(Data));
end;


procedure?TTdMqtt.SetActive(AValue:?Boolean);
begin
??//?At?design?time?we?just?set?the?value?and?save?it?for?run?time.
??//?During?loading?we?ignore?it?till?all?other?properties?are?set.
??//?Loaded?will?recall?it?to?toggle?it
??if?IsDesignTime?or?IsLoading?then
??begin
????FActive?:=?AValue;
??end
??else
??if?FActive?<>?AValue
??then
??begin
????if?AValue?then
????begin
??????try
????????FIdTCPClient.Host:=FHost;
????????FIdTCPClient.Port:=FPort;
????????FIdTCPClient.ReadTimeout:=5000;
????????FIdTCPClient.ConnectTimeout:=5000;
????????FIdTCPClient.Connect;
????????{发送MQTT连接}
????????MQTTConnect;
????????{创建读取线程}
????????FReadThread:=TClientHandleThread.Create(FIdTCPClient);
????????FReadThread.OnConnAck?:=?Self.OnConnAck;
????????FReadThread.OnPublish?:=?Self.OnPublish;
????????FReadThread.OnPingRespEvent:=Self.OnPingRespEvent;
????????FReadThread.OnSubAckEvent:=Self.OnSubAckEvent;
??????except
????????FActive?:=?True;
????????SetActive(False);?//?allow?descendants?to?clean?up
????????raise?Exception.Create('连接失败。');
??????end;
??????FActive?:=?True;
????end
????else
????begin
??????//?Must?set?to?False?here.?Shutdown()?implementations?call?property?setters?that?check?this
??????FActive?:=?False;
??????FIdTCPClient.Disconnect;
????end;
??end;
end;

destructor?TTdMqtt.Destroy;
begin
??FIdTCPClient.Free;
??inherited?Destroy;
end;

constructor?TTdMqtt.Create(AOwner:?TComponent);
var
??Guid:?TGUID;
begin
??FHost:='127.0.0.1';
??FPort:=1883;
??FUser:='ade';
??FPwd:='ade';
??FMessageID?:=?1;
??FIdTCPClient:=TIdTCPClient.Create(nil);
??CreateGUID(Guid);
??FClientID:='tdito.cc-'+LeftStr(MD5(GUIDToString(Guid)),16);
??inherited?Create(AOwner);
end;


end.



{
??导出:
??host,port
??心跳时间

??sub()
??pub
??ClientID
}
----------------------------------------------
-
作者:
男 glwang (glwang) ★☆☆☆☆ -
盒子活跃会员
2019/8/9 9:14:40
6楼: 感谢楼上5楼的兄弟,花了这么多积分,其实可以上传到盒子ftp的,再次感谢!
----------------------------------------------
-
作者:
男 keymark (keymark) ▲△△△△ -
注册会员
2019/8/9 11:55:06
7楼: 可以传附件了的。
大小限制:4.90M,扩展名限制:gif|jpg|png|doc|zip|rar|chm|pdf|mid|mp3|asf|rm|swf|txt
这多年过去了依然不支持
。pas??当然也是好事。最少文件名不会乱。?压个包就是了。
----------------------------------------------
m3u8播放器:DPlayer/hlsjs-p2p-engine/ckplayer/flashls-dev/sewise-player/http不能播https某些情况下dns服务:coredns/http服务:miniweb/!http://www.lib4dev.com/topics/delphi>http://www.lib4dev.com/topics/pascal?p=34&s=!http://www.lib4dev.com/topics/delphi
作者:
男 dmzn (dmzn) ★☆☆☆☆ -
盒子活跃会员
2019/8/9 12:08:30
8楼: 测试了几个MQTT客户端,包括:?sgcWebSocket-MQTT,TMS?MQTT,QDAC?MQTT
结论:?不稳定
后来用mosquitto库改了一个版本,相对稳定多了.
http://bbs.2ccc.com/topic.asp?topicid=566990
----------------------------------------------
生活愉快.
作者:
男 l_star (l.star) ★☆☆☆☆ -
普通会员
2019/8/9 13:11:53
9楼: https://github.com/pjde/delphi-mqtt

在生产环境中使用。
----------------------------------------------
-
作者:
男 glwang (glwang) ★☆☆☆☆ -
盒子活跃会员
2019/8/9 13:45:18
10楼: 楼上9楼用的啥版本Delphi,我用Delphi?10.3?编译不了。PS感谢8楼测试:sgcWebSocket-MQTT,TMS?MQTT,QDAC?MQTT。8楼的编译缺少文件:UManagerGroup,?UThreadPool,?ULibFun。
----------------------------------------------
-
作者:
男 l_star (l.star) ★☆☆☆☆ -
普通会员
2019/8/9 17:40:56
11楼: 7有源码自己改
----------------------------------------------
-
信息
登陆以后才能回复
Copyright ? 2CCC.Com 盒子论坛 v2.1 版权所有 页面执行35.15625毫秒 RSS