main.cpp 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300
  1. ////////////////////////////////////////////////////////////////////////////////////////////////////
  2. //
  3. #include <stdio.h>
  4. #include <signal.h>
  5. #include <linux/limits.h>
  6. #include <sys/socket.h>
  7. #include <arpa/inet.h>
  8. #include <linux/if_arp.h>
  9. #include <netinet/in.h>
  10. #include <ifaddrs.h>
  11. #include <sys/ioctl.h>
  12. #include <fcntl.h>
  13. #include <unistd.h>
  14. #include <getopt.h>
  15. #include <gfa/svc/common/strutil.h>
  16. #include <gfa/svc/common/fileutil.h>
  17. #include <gfa/svc/common/instance.h>
  18. #include <gfa/svc/common/processclock.h>
  19. #include <gfa/svc/common/debug.h>
  20. #include <gfa/svc/mqttcl/mqttclient.h>
  21. #include <gfa/svc/mqttcl/mqttcfg.h>
  22. #include <gfa/svc/mqttcl/mqttdbg.h>
  23. #include "projal.h"
  24. ////////////////////////////////////////////////////////////////////////////////////////////////////
  25. #if _ENABLE_MEM_TRACE
  26. #include <mcheck.h>
  27. class CMtrace
  28. {
  29. public:
  30. CMtrace(void) {
  31. putenv("MALLOC_TRACE=/home/wrk/share/config/services/Debug/Desktop_Qt_5_7_0_GCC_64bit/mqttcl/mtrace.log");
  32. mtrace();
  33. }
  34. ~CMtrace(void){
  35. // muntrace();
  36. }
  37. };
  38. #endif // _ENABLE_MEM_TRACE
  39. #if _TRACK_TIMES
  40. static CProcessClock g_pc;
  41. unsigned long g_nDbgCounter1 = 0;
  42. unsigned long g_nDbgCounter2 = 0;
  43. unsigned long g_nDbgCounter3 = 0;
  44. #endif // _TRACK_TIMES
  45. ////////////////////////////////////////////////////////////////////////////////////////////////////
  46. // app control
  47. #define _APPID GFA_APPCTRL_APPID_MQTTCL
  48. #define _APPNAME "MqttCl"
  49. #define _DEPENDENCIES ((appid_t)(GFA_APPCTRL_APPID_REMANENT))
  50. ////////////////////////////////////////////////////////////////////////////////////////////////////
  51. //
  52. #define _UPDATE_INTERVAL_MS 100
  53. #define _RECONN_INTERVAL_MS 1000
  54. #define _LOGFILE_NAME "mqttcl.log"
  55. ////////////////////////////////////////////////////////////////////////////////////////////////////
  56. #define _SIG_BLOCK(s) sigprocmask(SIG_BLOCK, (s), NULL)
  57. #define _SIG_UNBLOCK(s) sigprocmask(SIG_UNBLOCK, (s), NULL)
  58. #define _NSEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_MS)
  59. #define _USEC_FROM_MSEC(ms) ((ms) * _PC_NS_PER_US)
  60. #define _TOPIC_CTRL_KEY_BINLE "binLe"
  61. #define _TOPIC_CTRL_KEY_BINBE "binBe"
  62. #define _TOPIC_CTRL_KEY_JSON "json"
  63. #define _TOPIC_CTRL_KEY_PBUF "pBuf"
  64. #define _TOPIC_CTRL_KEY_QOS "qos"
  65. #define _TOPIC_CTRL_KEY_RETAIN "retained"
  66. #define _TOPIC_CTRL_KEY_REM_RETAINED "delRetained"
  67. #define _TOPIC_CMD_CTRL "CONTROL"
  68. #define _TOPIC_CMD_SET "SET"
  69. #define _TOPIC_CMD_STATUS "STATUS"
  70. #define _CONNECT_MAX_RETRIES(err) (((err) == MOSQ_ERR_EAI) ? 30 : 3)
  71. ////////////////////////////////////////////////////////////////////////////////////////////////////
  72. //
  73. typedef enum _TOPIC_CTRL_CMD
  74. {
  75. TCC_Control,
  76. TCC_SetBinLe,
  77. TCC_SetBinBe,
  78. TCC_SetJson,
  79. TCC_SetPBuf,
  80. TCC_Status
  81. }TOPIC_CTRL_CMD, *LPTOPIC_CTRL_CMD;
  82. typedef const TOPIC_CTRL_CMD *LPCTOPIC_CTRL_CMD;
  83. ////////////////////////////////////////////////////////////////////////////////////////////////////
  84. //
  85. typedef enum _MQTT_CLIENT_STATES // when modifying, don't forget to adjust g_pszStateNames!!!
  86. {
  87. CLS_NotInit,
  88. CLS_SetUserAndPass,
  89. CLS_SetLastWill,
  90. CLS_SetTLS,
  91. CLS_Unconnected,
  92. CLS_Connect,
  93. CLS_Reconnect,
  94. CLS_Connecting,
  95. CLS_Connected,
  96. CLS_Subscribe,
  97. CLS_Subscribing,
  98. CLS_Subscribed,
  99. CLS_PublishConnect,
  100. CLS_ProcMsg,
  101. CLS_PublishDisconnect,
  102. CLS_Err,
  103. CLS_ShutDown,
  104. CLS_Unsubscribe,
  105. CLS_Disconnect,
  106. CLS_Cleanup,
  107. CLS_Paused,
  108. CLS_Exit
  109. }MQTT_CLIENT_STATES, *LPMQTT_CLIENT_STATES;
  110. typedef const MQTT_CLIENT_STATES *LPCMQTT_CLIENT_STATES;
  111. static const char *g_pszStateNames[] =
  112. {
  113. "Not Init",
  114. "Set Last Will",
  115. "Set TLS",
  116. "Unconnected",
  117. "Connect",
  118. "Reconnect",
  119. "Connecting",
  120. "Connected",
  121. "Subscribe",
  122. "Subscribing",
  123. "Subscribed",
  124. "Publish Connect",
  125. "ProcMsg",
  126. "Publish Disconnect",
  127. "Error",
  128. "ShutDown",
  129. "Unsubscribe",
  130. "Disconnect",
  131. "Cleanup",
  132. "Paused",
  133. "Exit"
  134. };
  135. static const char * _GetClientStateString(MQTT_CLIENT_STATES cs)
  136. {
  137. if(cs >= CLS_NotInit && cs <= CLS_Exit)
  138. return g_pszStateNames[cs];
  139. return "";
  140. }
  141. ////////////////////////////////////////////////////////////////////////////////////////////////////
  142. //
  143. typedef struct _SUB_CTRL_TOPIC
  144. {
  145. _SUB_CTRL_TOPIC(const std::string &&s) : sTopic(s)
  146. {
  147. this->nVarOffset = s.length() - 2;
  148. }
  149. std::string sTopic;
  150. size_t nVarOffset;
  151. }SUB_CTRL_TOPIC, *LPSUB_CTRL_TOPIC;
  152. typedef const SUB_CTRL_TOPIC *LPCSUB_CTRL_TOPIC;
  153. ////////////////////////////////////////////////////////////////////////////////////////////////////
  154. //
  155. //static std::string _CreateDeviceID(void);
  156. static volatile bool g_fRun = false;
  157. static volatile bool g_fPauseImp = false;
  158. static volatile bool g_fPauseCmd = false;
  159. static volatile bool g_fZombie = false;
  160. static appid_t g_nDepRunning = 0;
  161. static sigset_t g_set;
  162. static CLogfile g_lf;
  163. static int g_nLastSig = -1;
  164. static shm_t g_shmShadow;
  165. static MQTT_CLIENT_STATES g_cs = CLS_NotInit;
  166. static MQTT_CLIENT_STATES g_csLast = CLS_NotInit;
  167. static bool g_bConnected = false;
  168. static int g_nSubcribed = 0;
  169. static bool g_bIntr = false;
  170. static int g_nErrRetries = 0;
  171. ////////////////////////////////////////////////////////////////////////////////////////////////////
  172. //
  173. static const char* _GetBaseDir(std::string &rstrBaseDir)
  174. {
  175. char szBaseDir[PATH_MAX];
  176. rstrBaseDir = ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
  177. rtrim(rstrBaseDir, "/");
  178. return rstrBaseDir.c_str();
  179. }
  180. ////////////////////////////////////////////////////////////////////////////////////////////////////
  181. //
  182. static void _SigHandler(int sig)
  183. {
  184. g_nLastSig = sig;
  185. g_bIntr = true;
  186. g_fPauseImp = g_fPauseCmd = g_fZombie = false;
  187. }
  188. ////////////////////////////////////////////////////////////////////////////////////////////////////
  189. //
  190. static MQTT_CLIENT_STATES _cl_usleep(unsigned int t, MQTT_CLIENT_STATES csNext)
  191. {
  192. if(usleep(t) < 0 && errno == EINTR)
  193. return CLS_ShutDown;
  194. return csNext;
  195. }
  196. ////////////////////////////////////////////////////////////////////////////////////////////////////
  197. //
  198. static void _ProcessIncoming(CMqttClient &rcl, CMqttVarTable &vt, CMqttClConfig &cfg, LPCSUB_CTRL_TOPIC pCtrlMap)
  199. {
  200. CMqttVar *pVar;
  201. CMqttMessage *pMsg;
  202. std::string sVarPath;
  203. while((pMsg = rcl.PopRcvMsg()))
  204. {
  205. if(pMsg->TopicMatchesSub(pCtrlMap[TCC_Control].sTopic.c_str()))
  206. {
  207. sVarPath = pMsg->GetTopic(pCtrlMap[TCC_Control].nVarOffset);
  208. if((pVar = vt.Find(sVarPath.c_str())))
  209. {
  210. bool bChanged = false;
  211. int nType, nQos;
  212. CJson_t jtRoot, jtVal;
  213. std::string err;
  214. uint32_t nMaskOn = 0, nMaskOff = 0;
  215. if(pMsg->GetPayloadAsJSON(jtRoot, err))
  216. {
  217. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_RETAIN, jtVal))
  218. {
  219. switch((nType = jtVal.Type()))
  220. {
  221. case JSON_TRUE:
  222. bChanged = pVar->SetRetained(true) || bChanged;
  223. break;
  224. case JSON_FALSE:
  225. bChanged = pVar->SetRetained(false) || bChanged;
  226. break;
  227. default:
  228. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_RETAIN, nType);
  229. break;
  230. }
  231. }
  232. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_REM_RETAINED, jtVal))
  233. {
  234. switch((nType = jtVal.Type()))
  235. {
  236. case JSON_TRUE:
  237. pVar->RemoveRetained(cfg.GetTopicPrefix(), nullptr, rcl.GetMsgQueueSnd(), true);
  238. break;
  239. case JSON_FALSE:
  240. g_lf.Warning("%s: command \"%s\":false has no effect!\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED);
  241. break;
  242. default:
  243. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_REM_RETAINED, nType);
  244. break;
  245. }
  246. }
  247. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_QOS, jtVal))
  248. {
  249. switch((nType = jtVal.Type()))
  250. {
  251. case JSON_INTEGER:
  252. nQos = (int)json_integer_value(jtVal);
  253. if(nQos < MQTTCL_MIN_QOS)
  254. g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MIN_QOS);
  255. else if(nQos > MQTTCL_MAX_QOS)
  256. g_lf.Warning("%s: Invalid value for QOS: %d! Value adjusted to %d\n", pMsg->GetTopic().c_str(), nQos, MQTTCL_MAX_QOS);
  257. bChanged = pVar->SetQoS(nQos) || bChanged;
  258. break;
  259. default:
  260. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_QOS, nType);
  261. break;
  262. }
  263. }
  264. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINLE, jtVal))
  265. {
  266. switch((nType = jtVal.Type()))
  267. {
  268. case JSON_TRUE:
  269. nMaskOn |= MQTT_VALUE_BINLE;
  270. break;
  271. case JSON_FALSE:
  272. nMaskOff |= MQTT_VALUE_BINLE;
  273. break;
  274. default:
  275. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINLE, nType);
  276. break;
  277. }
  278. }
  279. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_BINBE, jtVal))
  280. {
  281. switch((nType = jtVal.Type()))
  282. {
  283. case JSON_TRUE:
  284. nMaskOn |= MQTT_VALUE_BINBE;
  285. break;
  286. case JSON_FALSE:
  287. nMaskOff |= MQTT_VALUE_BINBE;
  288. break;
  289. default:
  290. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_BINBE, nType);
  291. break;
  292. }
  293. }
  294. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_JSON, jtVal))
  295. {
  296. switch((nType = jtVal.Type()))
  297. {
  298. case JSON_TRUE:
  299. nMaskOn |= MQTT_VALUE_JSON;
  300. break;
  301. case JSON_FALSE:
  302. nMaskOff |= MQTT_VALUE_JSON;
  303. break;
  304. default:
  305. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_JSON, nType);
  306. break;
  307. }
  308. }
  309. if(jtRoot.GetValue(_TOPIC_CTRL_KEY_PBUF, jtVal))
  310. {
  311. switch((nType = jtVal.Type()))
  312. {
  313. case JSON_TRUE:
  314. nMaskOn |= MQTT_VALUE_PBUF;
  315. break;
  316. case JSON_FALSE:
  317. nMaskOff |= MQTT_VALUE_PBUF;
  318. break;
  319. default:
  320. g_lf.Error("%s: Invalid data type for JSON key '%s': %d\n", pMsg->GetTopic().c_str(), _TOPIC_CTRL_KEY_PBUF, nType);
  321. break;
  322. }
  323. }
  324. if(nMaskOff)
  325. {
  326. if(pVar->DisablePublish(nMaskOff, &vt))
  327. {
  328. bChanged = true;
  329. }
  330. }
  331. if(nMaskOn)
  332. {
  333. if(pVar->EnablePublish(nMaskOn, &vt))
  334. {
  335. bChanged = true;
  336. }
  337. }
  338. #if _DUMP_ENABLED_VARS
  339. if(bChanged)
  340. vt.DumpPubEnabled();
  341. #endif // _DUMP_ENABLED_VARS
  342. }
  343. else
  344. {
  345. g_lf.Error("%s: JSON error: %s!\n", pMsg->GetTopic().c_str(), err.c_str());
  346. }
  347. }
  348. }
  349. else
  350. {
  351. int nLocks = 0;
  352. static const uint32_t nFormats[] =
  353. {
  354. 0,
  355. MQTT_VALUE_BINLE,
  356. MQTT_VALUE_BINBE,
  357. MQTT_VALUE_JSON,
  358. MQTT_VALUE_PBUF
  359. };
  360. for(int i = TCC_SetBinLe; i <= TCC_SetPBuf; i++)
  361. {
  362. if(pMsg->TopicMatchesSub(pCtrlMap[i].sTopic.c_str()))
  363. {
  364. sVarPath = pMsg->GetTopic(pCtrlMap[i].nVarOffset);
  365. if((pVar = vt.Find(sVarPath.c_str())))
  366. {
  367. if(pVar->PublishEnabled())
  368. {
  369. if(!pVar->SetShmValue(nFormats[i], pMsg, nLocks))
  370. {
  371. // !!!
  372. }
  373. }
  374. else
  375. {
  376. // !!!
  377. }
  378. break;
  379. }
  380. else
  381. {
  382. // !!!
  383. break;
  384. }
  385. }
  386. }
  387. }
  388. pMsg->Release();
  389. }
  390. }
  391. ////////////////////////////////////////////////////////////////////////////////////////////////////
  392. //
  393. static int _ProcessOutgoing(CMqttClient &rcl)
  394. {
  395. int nRet = 0;
  396. CMqttMessage *pMsg;
  397. while((pMsg = rcl.PopSndMsg()))
  398. {
  399. rcl.publish(pMsg);
  400. pMsg->Release();
  401. ++nRet;
  402. }
  403. return nRet;
  404. }
  405. ////////////////////////////////////////////////////////////////////////////////////////////////////
  406. //
  407. static int _Subscribe(CMqttClient &rcl, int nDefaultQOS, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
  408. {
  409. int nRet;
  410. for(size_t i = 0; i < nLenMap; i++)
  411. {
  412. if((nRet = rcl.subscribe(NULL, pCtrlMap[i].sTopic.c_str(), nDefaultQOS)) != MOSQ_ERR_SUCCESS)
  413. break;
  414. TRACE("Subscribed: '%s' - QOS: %d\n", pCtrlMap[i].sTopic.c_str(), nDefaultQOS);
  415. }
  416. return nRet;
  417. }
  418. static void _Unsubscribe(CMqttClient &rcl, LPCSUB_CTRL_TOPIC pCtrlMap, size_t nLenMap)
  419. {
  420. for(size_t i = 0; i < nLenMap; i++)
  421. {
  422. rcl.unsubscribe(NULL, pCtrlMap[i].sTopic.c_str());
  423. }
  424. }
  425. ////////////////////////////////////////////////////////////////////////////////////////////////////
  426. //
  427. static void _OnClientEvents(LPCMQTT_GENERIC_NOTIFICATION pntf, void *pctx)
  428. {
  429. switch(pntf->evt)
  430. {
  431. case NEVT_Log:
  432. if(pntf->log.level == MOSQ_LOG_ERR)
  433. {
  434. TRACE("[%d] - %s\n", pntf->log.level, pntf->log.str);
  435. g_lf.Error("%s\n", pntf->log.str);
  436. }
  437. break;
  438. case NEVT_Connect:
  439. if(pntf->con.rc == MOSQ_ERR_SUCCESS)
  440. g_bConnected = true;
  441. break;
  442. case NEVT_Disconnect:
  443. break;
  444. case NEVT_Subscribe:
  445. ++g_nSubcribed;
  446. break;
  447. case NEVT_Unsubscribe:
  448. break;
  449. default:
  450. break;
  451. }
  452. }
  453. ////////////////////////////////////////////////////////////////////////////////////////////////////
  454. static void _ProcessCtrlMessages(HAPPCTRL hAC, HAPPINFO hAI)
  455. {
  456. ctrlmsg_t nCtrlMsg;
  457. while(!g_bIntr && (nCtrlMsg = ::GfaIpcAppCtrlGetNextCtrlMsg(hAI)))
  458. {
  459. switch(nCtrlMsg)
  460. {
  461. case GFA_APPCTRL_CTRLMSG_STOP:
  462. g_bIntr = true;
  463. g_fPauseImp = false;
  464. g_fPauseCmd = false;
  465. g_fZombie = false;
  466. g_lf.Info("Received Control Message 'Stop'\n");
  467. break;
  468. case GFA_APPCTRL_CTRLMSG_PAUSE:
  469. if(!g_fPauseCmd)
  470. {
  471. g_fPauseCmd = true;
  472. if(!g_fPauseImp)
  473. {
  474. ::GfaIpcAppCtrlSetState(hAC, GIAS_Paused);
  475. g_lf.Info("Received Control Message 'Pause'\n");
  476. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
  477. TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Paused));
  478. }
  479. }
  480. break;
  481. case GFA_APPCTRL_CTRLMSG_RESUME:
  482. if(g_fPauseCmd)
  483. {
  484. g_fPauseCmd = false;
  485. if(!g_fPauseImp)
  486. {
  487. g_lf.Info("Received Control Message 'Resume'\n");
  488. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  489. ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
  490. TRACE("%-8s: State: %s\n", "Me", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  491. }
  492. }
  493. break;
  494. default:
  495. break;
  496. }
  497. }
  498. }
  499. ////////////////////////////////////////////////////////////////////////////////////////////////////
  500. static void _ProcessStateEvents(HAPPCTRL hAC, HAPPINFO hAI)
  501. {
  502. appid_t nAppIdSrc;
  503. bool fOldPaused = g_fPauseImp;
  504. char szDispName[128];
  505. while(!g_bIntr && (nAppIdSrc = ::GfaIpcAppCtrlGetNextStateEvtSrc(hAI)))
  506. {
  507. GfaIpcAppStates state = ::GfaIpcAppCtrlGetState(hAC, nAppIdSrc);
  508. GfaIpcAppCtrlGetDisplayName(hAC, nAppIdSrc, szDispName, sizeof(szDispName));
  509. TRACE("%-8s: State: %s\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  510. if(nAppIdSrc & _DEPENDENCIES)
  511. {
  512. if(state == GIAS_Running)
  513. {
  514. g_lf.Info("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  515. g_nDepRunning |= nAppIdSrc;
  516. }
  517. else
  518. {
  519. g_lf.Warning("%s -> %s.\n", szDispName, ::GfaIpcAppCtrlGetStateText(state));
  520. g_nDepRunning &= ~nAppIdSrc;
  521. }
  522. }
  523. }
  524. if(!g_bIntr)
  525. {
  526. g_fPauseImp = (g_nDepRunning != _DEPENDENCIES);
  527. if(!g_fPauseCmd && (fOldPaused != g_fPauseImp))
  528. {
  529. fOldPaused = g_fPauseImp;
  530. GfaIpcAppStates newState = g_fPauseImp ? GIAS_Paused : GIAS_Running;
  531. ::GfaIpcAppCtrlSetState(hAC, newState);
  532. if(g_fPauseImp)
  533. g_lf.Warning("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
  534. else
  535. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(newState));
  536. }
  537. }
  538. }
  539. static std::string _GetTopicPrefixString(const CMqttClConfig &cfg)
  540. {
  541. if(cfg.TopicPrefixDisabled())
  542. return "";
  543. return formatString("%s/", cfg.GetTopicPrefix());
  544. }
  545. ////////////////////////////////////////////////////////////////////////////////////////////////////
  546. ////////////////////////////////////////////////////////////////////////////////////////////////////
  547. ////////////////////////////////////////////////////////////////////////////////////////////////////
  548. //
  549. int main(int /*argc*/, char **/*argv*/)
  550. {
  551. int nRet = 0;
  552. CProcessInstance pi;
  553. std::string sDevID;
  554. char szLogFile[PATH_MAX];
  555. std::string strBaseDir;
  556. const char *pszBaseDir = NULL;
  557. HAPPCTRL hAC = NULL;
  558. HAPPINFO hAI;
  559. HSHM hShm = NULL;
  560. void *pShm = NULL;
  561. unsigned long long nUsecWorkTime = 0;
  562. int nTlsMode;
  563. CProcessClock pcWork;
  564. ////////////////////////////////////////////////////////////////////////////////////////////////
  565. // check for multiple instances
  566. if(!pi.LockInstance(UUID_SHM))
  567. {
  568. CLogfile::StdErr("Failed to start instance!\n");
  569. return -1;
  570. }
  571. ////////////////////////////////////////////////////////////////////////////////////////////////
  572. // configure signal handling
  573. struct sigaction sa;
  574. ::sigfillset(&g_set);
  575. sigaddset(&g_set, SIGUSR1);
  576. memset(&sa, 0, sizeof(sa));
  577. sa.sa_handler = _SigHandler;
  578. sigaction(SIGHUP, &sa, NULL); // handles user's terminal disconnect
  579. sigaction(SIGQUIT, &sa, NULL); // handles Ctrl + '\'
  580. sigaction(SIGTERM, &sa, NULL); // handles normal termination
  581. sigaction(SIGABRT, &sa, NULL); // handles abnormal termination (i.e. abort())
  582. sigaction(SIGINT, &sa, NULL); // handles Ctrl + 'C'
  583. sa.sa_handler = SIG_IGN;
  584. sigaction(SIGTSTP, &sa, NULL); // ignores Ctrl + 'Z'
  585. sigaction(SIGSTOP, &sa, NULL); // ignores Stop
  586. sigaction(SIGCONT, &sa, NULL); // ignores Continue
  587. sigaction(SIGCHLD, &sa, NULL); // ignores child process termination
  588. sigaction(0, &sa, NULL); // ignores shell termination
  589. do
  590. {
  591. g_fZombie = true;
  592. ////////////////////////////////////////////////////////////////////////////////////////////
  593. // get the base directory for output files
  594. if(!pszBaseDir)
  595. pszBaseDir = _GetBaseDir(strBaseDir);
  596. CLogfile::StdOut("Using base directory \"%s\".\n", pszBaseDir);
  597. ////////////////////////////////////////////////////////////////////////////////////////////
  598. // initialize log file
  599. sprintf(szLogFile, "%s/%s", pszBaseDir, _LOGFILE_NAME);
  600. if(!g_lf.Open(szLogFile))
  601. {
  602. CLogfile::StdErr("Failed to create/open log file!\n");
  603. nRet = -1;
  604. break;
  605. }
  606. g_lf.Info("Process started.\n");
  607. ////////////////////////////////////////////////////////////////////////////////////////////
  608. // initialize app control
  609. g_lf.Info("Acquire AppCtrl-Handle.\n");
  610. if(!(hAC = ::GfaIpcAppCtrlAcquire(_APPID, _APPNAME, _USEC_FROM_MSEC(_UPDATE_INTERVAL_MS), _USEC_FROM_MSEC(_RECONN_INTERVAL_MS) * 3)))
  611. {
  612. g_lf.Error("Failed to acquire AppCtrl-Handle!\n");
  613. break;
  614. }
  615. ::GfaIpcAppCtrlSetState(hAC, GIAS_Initializing);
  616. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Initializing));
  617. if(!::GfaIpcAppCtrlSubscribeStateEvents(hAC, _DEPENDENCIES))
  618. {
  619. g_lf.Error("Failed to subscribe state event notifications!\n");
  620. break;
  621. }
  622. ////////////////////////////////////////////////////////////////////////////////////////////
  623. // parse the config file
  624. CMqttClConfig cfg(UUID_SHM);
  625. #ifdef MQTTCL_CONFIG_FILE_PATH
  626. std::string strMqttCfg = MQTTCL_CONFIG_FILE_PATH;
  627. #else // MQTTCL_CONFIG_FILE_PATH
  628. char szBaseDir[PATH_MAX];
  629. ::GetAppDirectory(szBaseDir, sizeof(szBaseDir));
  630. std::string strMqttCfg = formatString("%s/cfg/mqttcl.cfg.json", szBaseDir);
  631. #endif // MQTTCL_CONFIG_FILE_PATH
  632. if(!cfg.LoadCfg(strMqttCfg.c_str(), g_lf))
  633. {
  634. nRet = -1;
  635. break;
  636. }
  637. nTlsMode = cfg.GetTLSMode();
  638. // TRACE("%s/%s\n", cfg.GetDeviceID(), cfg.GetShmID());
  639. std::string strTopicPrefix = _GetTopicPrefixString(cfg);
  640. ////////////////////////////////////////////////////////////////////////////////////////////
  641. // client control topic map
  642. const SUB_CTRL_TOPIC subCtrlMap[] =
  643. {
  644. formatString("%s%s/#", strTopicPrefix.c_str(), _TOPIC_CMD_CTRL),
  645. formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_BINLE, _TOPIC_CMD_SET),
  646. formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_BINBE, _TOPIC_CMD_SET),
  647. formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_JSON, _TOPIC_CMD_SET),
  648. formatString("%s%s/%s/#", strTopicPrefix.c_str(), MQTT_TOPIC_VALUE_PBUF, _TOPIC_CMD_SET),
  649. // formatString("%s%s/#", strTopicPrefix.c_str(), _TOPIC_CMD_STATUS)
  650. };
  651. ////////////////////////////////////////////////////////////////////////////////////////////
  652. if(!(hShm = ::acquire_shm(sizeof(shm_t), 1)))
  653. {
  654. g_lf.Error("GfaIpcAcquireSHM failed!\n");
  655. break;
  656. }
  657. g_lf.Info("Acquired SHM Handle.\n");
  658. if(!(pShm = ::GfaIpcAcquirePointer(hShm)))
  659. {
  660. g_lf.Error("GfaIpcAcquirePointer failed!\n");
  661. break;
  662. }
  663. g_lf.Info("Acquired SHM Pointer.\n");
  664. ::GfaIpcDumpSHMROT();
  665. memcpy(&g_shmShadow, (const shm_t*)pShm, sizeof(shm_t));
  666. ////////////////////////////////////////////////////////////////////////////////////////////
  667. int nErr, nLocked = 0, nNumConn = 0;
  668. bool bReconnect, bConnPending;
  669. std::string strErr;
  670. CMqttVarTable vtbl;
  671. CShm_t shm(pShm, &g_shmShadow, hShm, NULL, -1, 0, cfg.GetDefaultQOS(), cfg.GetDefaultRetain());
  672. shm.InitPath(NULL, NULL);
  673. shm.CreateMembersTable(vtbl);
  674. #if _DUMP_ENABLED_VARS
  675. vtbl.DumpPubEnabled();
  676. #endif // _DUMP_ENABLED_VARS
  677. ////////////////////////////////////////////////////////////////////////////////////////////
  678. CMqttClient mqttCl(cfg.GetDeviceID());
  679. mqttCl.SetClientEventCallback(_OnClientEvents, NEVT_Connect | NEVT_Disconnect | NEVT_Subscribe | NEVT_Unsubscribe | NEVT_Message | NEVT_Log, NULL);
  680. g_fZombie = false;
  681. g_fRun = true;
  682. ::GfaIpcAppCtrlSetState(hAC, GIAS_Running);
  683. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Running));
  684. while(g_fRun)
  685. {
  686. ////////////////////////////////////////////////////////////////////////////////////////
  687. // update app control info
  688. if((hAI = ::GfaIpcAppCtrlInfoUpdate(hAC, nUsecWorkTime)))
  689. {
  690. _ProcessCtrlMessages(hAC, hAI);
  691. _ProcessStateEvents(hAC, hAI);
  692. }
  693. if(g_fPauseImp || g_fPauseCmd)
  694. {
  695. if(g_cs < CLS_ShutDown)
  696. {
  697. g_csLast = g_cs;
  698. g_cs = CLS_ShutDown;
  699. }
  700. else
  701. {
  702. nUsecWorkTime = 0;
  703. }
  704. }
  705. pcWork.ClockTrigger();
  706. switch(g_cs)
  707. {
  708. case CLS_NotInit:
  709. if((nErr = CMqttClient::Init()) == MOSQ_ERR_SUCCESS)
  710. {
  711. g_cs = CLS_SetUserAndPass;
  712. }
  713. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  714. {
  715. g_csLast = g_cs;
  716. g_cs = CLS_Err;
  717. }
  718. break;
  719. case CLS_SetUserAndPass:
  720. if(cfg.HasUsernameAndPassword())
  721. {
  722. if((nErr = mqttCl.username_pw_set(cfg.GetUsername(), cfg.GetPassword())) != MOSQ_ERR_SUCCESS)
  723. {
  724. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  725. g_lf.Error("Error setting Username or Password: %s\n", strErr.c_str());
  726. }
  727. }
  728. g_cs = CLS_SetTLS;
  729. break;
  730. case CLS_SetTLS:
  731. if(nTlsMode == MQTTCL_TLS_MODE_CRT)
  732. {
  733. g_lf.Info("Using TLS with certificates.\n");
  734. if((nErr = mqttCl.tls_set(cfg.GetTlsCaCrtFile(), NULL, cfg.GetTlsClCrtFile(), cfg.GetTlsClKeyFile(), NULL)) != MOSQ_ERR_SUCCESS)
  735. {
  736. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  737. if(g_bIntr)
  738. {
  739. g_csLast = g_cs;
  740. g_cs = CLS_ShutDown;
  741. }
  742. else
  743. {
  744. g_csLast = g_cs;
  745. g_cs = CLS_Err;
  746. }
  747. }
  748. else
  749. {
  750. g_cs = CLS_SetLastWill;
  751. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  752. }
  753. }
  754. else if(nTlsMode == MQTTCL_TLS_MODE_PSK)
  755. {
  756. g_lf.Info("Using TLS with PSK.\n");
  757. if((nErr = mqttCl.tls_psk_set(cfg.GetTlsPSK(), cfg.GetTlsPSKCliID(), NULL)) != MOSQ_ERR_SUCCESS)
  758. {
  759. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  760. if(g_bIntr)
  761. {
  762. g_csLast = g_cs;
  763. g_cs = CLS_ShutDown;
  764. }
  765. else
  766. {
  767. g_csLast = g_cs;
  768. g_cs = CLS_Err;
  769. }
  770. }
  771. else
  772. {
  773. g_cs = CLS_SetLastWill;
  774. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  775. }
  776. }
  777. else
  778. {
  779. g_cs = CLS_SetLastWill;
  780. g_lf.Info("Connecting to broker @ %s:%u ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  781. }
  782. break;
  783. case CLS_SetLastWill:
  784. if(cfg.HasLastWill())
  785. {
  786. std::string strTopic = formatString("%s%s", strTopicPrefix.c_str(), cfg.GetLastWillTopic());
  787. if((nErr = mqttCl.will_set(strTopic.c_str(), cfg.GetLastWillMessageLength(), cfg.GetLastWillMessage(), cfg.GetLastWillQOS(), cfg.GetLastWillRetain())) != MOSQ_ERR_SUCCESS)
  788. {
  789. CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr);
  790. }
  791. }
  792. g_cs = CLS_Unconnected;
  793. break;
  794. case CLS_Unconnected:
  795. if(g_bConnected)
  796. {
  797. g_bConnected = false;
  798. g_lf.Warning("Lost connection to broker @ %s:%u. Tying to reconnect ...\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  799. }
  800. if(!nNumConn)
  801. g_cs = CLS_Connect;
  802. else
  803. g_cs = CLS_Reconnect;
  804. break;
  805. case CLS_Connect:
  806. if((nErr = mqttCl.connect(cfg.GetBrokerAddr(), cfg.GetBrokerPort(), cfg.GetKeepAliveTime())) == MOSQ_ERR_SUCCESS)
  807. g_cs = CLS_Connecting;
  808. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  809. {
  810. TRACE("CLS_Connect: %s\n", strErr.c_str());
  811. if(bConnPending)
  812. g_cs = CLS_Connecting;
  813. else if(bReconnect)
  814. {
  815. g_csLast = g_cs;
  816. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  817. }
  818. else if(g_bIntr)
  819. {
  820. g_csLast = g_cs;
  821. g_cs = CLS_ShutDown;
  822. }
  823. else
  824. {
  825. if(++g_nErrRetries > _CONNECT_MAX_RETRIES(nErr))
  826. {
  827. g_csLast = g_cs;
  828. g_cs = CLS_Err;
  829. }
  830. else
  831. {
  832. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  833. }
  834. }
  835. }
  836. break;
  837. case CLS_Reconnect:
  838. if((nErr = mqttCl.reconnect()) == MOSQ_ERR_SUCCESS)
  839. g_cs = CLS_Connecting;
  840. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  841. {
  842. TRACE("CLS_Reconnect: %s\n", strErr.c_str());
  843. if(bConnPending)
  844. g_cs = CLS_Connecting;
  845. else if(bReconnect)
  846. {
  847. g_csLast = g_cs;
  848. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  849. }
  850. else if(g_bIntr)
  851. {
  852. g_csLast = g_cs;
  853. g_cs = CLS_ShutDown;
  854. }
  855. else
  856. {
  857. if(++g_nErrRetries > _CONNECT_MAX_RETRIES(nErr))
  858. {
  859. g_csLast = g_cs;
  860. g_cs = CLS_Err;
  861. }
  862. else
  863. {
  864. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), g_cs);
  865. }
  866. }
  867. }
  868. break;
  869. case CLS_Connecting:
  870. g_nErrRetries = 0;
  871. if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  872. {
  873. TRACE("CLS_Connecting: %s\n", strErr.c_str());
  874. if(bReconnect)
  875. {
  876. g_csLast = g_cs;
  877. g_cs = _cl_usleep(_USEC_FROM_MSEC(_RECONN_INTERVAL_MS), CLS_Unconnected);
  878. }
  879. else if(g_bIntr)
  880. {
  881. g_csLast = g_cs;
  882. g_cs = CLS_ShutDown;
  883. }
  884. else if(!bConnPending)
  885. {
  886. g_csLast = g_cs;
  887. g_cs = CLS_Err;
  888. }
  889. }
  890. else if(g_bConnected)
  891. {
  892. g_cs = CLS_Connected;
  893. }
  894. break;
  895. case CLS_Connected:
  896. TRACE("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  897. g_lf.Info("Connected to broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  898. ++nNumConn;
  899. g_nSubcribed = 0;
  900. g_cs = CLS_Subscribe;
  901. break;
  902. case CLS_Subscribe:
  903. g_lf.Info("Subscribing control-topics ...\n");
  904. if((nErr = _Subscribe(mqttCl, cfg.GetDefaultQOS(), subCtrlMap, _COUNTOF(subCtrlMap))) == MOSQ_ERR_SUCCESS)
  905. g_cs = CLS_Subscribing;
  906. else if(!CMqttClient::EvalError(nErr, bReconnect, bConnPending, g_bIntr, strErr))
  907. {
  908. if(bConnPending)
  909. g_cs = CLS_Connecting;
  910. else if(bReconnect)
  911. g_cs = CLS_Unconnected;
  912. else if(g_bIntr)
  913. {
  914. g_csLast = g_cs;
  915. g_cs = CLS_ShutDown;
  916. }
  917. else
  918. {
  919. g_csLast = g_cs;
  920. g_cs = CLS_Err;
  921. }
  922. }
  923. break;
  924. case CLS_Subscribing:
  925. if(!mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  926. {
  927. if(bReconnect)
  928. g_cs = CLS_Unconnected;
  929. else if(bConnPending)
  930. g_cs = CLS_Connecting;
  931. else if(g_bIntr)
  932. {
  933. g_csLast = g_cs;
  934. g_cs = CLS_ShutDown;
  935. }
  936. else
  937. {
  938. g_csLast = g_cs;
  939. g_cs = CLS_Err;
  940. }
  941. }
  942. else if(g_nSubcribed == _COUNTOF(subCtrlMap))
  943. {
  944. g_cs = CLS_Subscribed;
  945. }
  946. break;
  947. case CLS_Subscribed:
  948. g_lf.Info("Subscriptions acknowledged.\n");
  949. g_lf.Info("Enter SHM processing loop ...\n");
  950. g_cs = CLS_PublishConnect;
  951. break;
  952. case CLS_PublishConnect:
  953. if(cfg.HasConnectMsg())
  954. {
  955. std::string strTopic = formatString("%s%s", strTopicPrefix.c_str(), cfg.GetConnectTopic());
  956. CMqttMessage *pMsg = CMqttMessage::CreateMessage(strTopic.c_str(), cfg.GetConnectMessage(), cfg.GetConnectMessageLength(), cfg.GetConnectQOS(), cfg.GetConnectRetain());
  957. mqttCl.publish(pMsg);
  958. pMsg->Release();
  959. }
  960. g_cs = CLS_ProcMsg;
  961. break;
  962. case CLS_ProcMsg:
  963. if(mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bReconnect, bConnPending, g_bIntr, strErr))
  964. {
  965. #if _TRACK_TIMES
  966. std::string s1, s2;
  967. pc_time64_t elapsed;
  968. g_nDbgCounter1 = g_nDbgCounter2 = g_nDbgCounter3 = 0;
  969. g_pc.ClockTrigger();
  970. #endif // _TRACK_TIMES
  971. _ProcessIncoming(mqttCl, vtbl, cfg, subCtrlMap);
  972. #if _TRACK_TIMES
  973. if(g_nDbgCounter1)
  974. {
  975. elapsed = g_pc.ClockGetElapsed();
  976. s1 = CProcessClock::Interval2String(elapsed);
  977. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter1);
  978. TRACE("_ProcessIncoming (%lu variables): %s (%s per var)\n", g_nDbgCounter1, s1.c_str(), s2.c_str());
  979. }
  980. g_pc.ClockTrigger();
  981. #endif // _TRACK_TIMES
  982. _SIG_BLOCK(&g_set);
  983. vtbl.CheckShmAndPublish(cfg.GetTopicPrefix(), nullptr, mqttCl.GetMsgQueueSnd(), nLocked);
  984. _SIG_UNBLOCK(&g_set);
  985. #if _TRACK_TIMES
  986. g_nDbgCounter2 = mqttCl.GetMsgQueueSnd().Size();
  987. if(g_nDbgCounter2)
  988. {
  989. elapsed = g_pc.ClockGetElapsed();
  990. s1 = CProcessClock::Interval2String(elapsed);
  991. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter2);
  992. TRACE("CheckShmAndPublish (%lu variables): %s (%s per var)\n", g_nDbgCounter2, s1.c_str(), s2.c_str());
  993. g_pc.ClockTrigger();
  994. }
  995. g_nDbgCounter3 =
  996. #endif // _TRACK_TIMES
  997. _ProcessOutgoing(mqttCl);
  998. #if _TRACK_TIMES
  999. if(g_nDbgCounter3)
  1000. {
  1001. elapsed = g_pc.ClockGetElapsed();
  1002. s1 = CProcessClock::Interval2String(elapsed);
  1003. s2 = CProcessClock::Interval2String(elapsed / g_nDbgCounter3);
  1004. TRACE("_ProcessOutgoing (%lu variables): %s (%s per var)\n", g_nDbgCounter3, s1.c_str(), s2.c_str());
  1005. }
  1006. #endif // _TRACK_TIMES
  1007. }
  1008. else
  1009. {
  1010. if(bReconnect)
  1011. g_cs = CLS_Unconnected;
  1012. else if(bConnPending)
  1013. g_cs = CLS_Connecting;
  1014. else if(g_bIntr)
  1015. {
  1016. g_csLast = g_cs;
  1017. g_cs = CLS_PublishDisconnect;
  1018. }
  1019. else
  1020. {
  1021. g_csLast = g_cs;
  1022. g_cs = CLS_Err;
  1023. }
  1024. }
  1025. break;
  1026. case CLS_Err:
  1027. g_lf.Error("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
  1028. TRACE("[%s] - %s\n", _GetClientStateString(g_csLast), strErr.c_str());
  1029. g_fZombie = true;
  1030. g_cs = CLS_PublishDisconnect;
  1031. break;
  1032. case CLS_PublishDisconnect:
  1033. if(cfg.HasLastWillOnExit())
  1034. {
  1035. std::string strTopic = formatString("%s%s", strTopicPrefix.c_str(), cfg.GetLastWillTopic());
  1036. CMqttMessage *pMsg = CMqttMessage::CreateMessage(strTopic.c_str(), cfg.GetLastWillOnExitMessage(), cfg.GetLastWillOnExitMessageLength(), cfg.GetLastWillQOS(), cfg.GetLastWillRetain());
  1037. mqttCl.publish(pMsg);
  1038. pMsg->Release();
  1039. bool bDummy = false;
  1040. mqttCl.TimedLoop(_NSEC_FROM_MSEC(_UPDATE_INTERVAL_MS), nErr, bDummy, bDummy, bDummy, strErr);
  1041. }
  1042. g_cs = CLS_Disconnect;
  1043. break;
  1044. case CLS_ShutDown:
  1045. if(g_bIntr && g_nLastSig >= 0)
  1046. {
  1047. g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
  1048. g_nLastSig = -1;
  1049. }
  1050. g_lf.Info("Process shutting down ...\n");
  1051. if(g_csLast >= CLS_Subscribed)
  1052. g_cs = CLS_Unsubscribe;
  1053. else if(g_csLast >= CLS_Connected)
  1054. g_cs = CLS_Disconnect;
  1055. else if(g_csLast > CLS_NotInit)
  1056. g_cs = CLS_Cleanup;
  1057. else if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
  1058. g_cs = CLS_Paused;
  1059. else
  1060. g_cs = CLS_Exit;
  1061. break;
  1062. case CLS_Unsubscribe:
  1063. g_lf.Info("Unsubscribe control-topics.\n");
  1064. _Unsubscribe(mqttCl, subCtrlMap, _COUNTOF(subCtrlMap));
  1065. g_nSubcribed = 0;
  1066. g_cs = CLS_Disconnect;
  1067. break;
  1068. case CLS_Disconnect:
  1069. g_lf.Info("Disconnect from broker @ %s:%u.\n", cfg.GetBrokerAddr(), cfg.GetBrokerPort());
  1070. mqttCl.disconnect();
  1071. g_bConnected = false;
  1072. g_cs = CLS_Cleanup;
  1073. break;
  1074. case CLS_Cleanup:
  1075. g_lf.Info("Mqtt clean up.\n");
  1076. CMqttClient::Cleanup();
  1077. if((g_fPauseImp || g_fPauseCmd) && !g_bIntr)
  1078. g_cs = CLS_Paused;
  1079. else
  1080. g_cs = CLS_Exit;
  1081. break;
  1082. case CLS_Paused:
  1083. if(!g_fPauseImp && !g_fPauseCmd)
  1084. {
  1085. if(!g_bIntr)
  1086. g_cs = CLS_NotInit;
  1087. else
  1088. g_cs = CLS_Exit;
  1089. }
  1090. else
  1091. {
  1092. usleep(_USEC_FROM_MSEC(_UPDATE_INTERVAL_MS));
  1093. continue;
  1094. }
  1095. break;
  1096. case CLS_Exit:
  1097. g_fRun = false;
  1098. break;
  1099. }
  1100. nUsecWorkTime = pcWork.ClockGetElapsed() / 1000;
  1101. }
  1102. }
  1103. while(false);
  1104. ////////////////////////////////////////////////////////////////////////////////////////////////
  1105. if(hShm)
  1106. {
  1107. if(pShm)
  1108. {
  1109. g_lf.Info("Release SHM Pointer.\n");
  1110. ::GfaIpcReleasePointer(hShm, pShm);
  1111. }
  1112. g_lf.Info("Release SHM Handle.\n");
  1113. ::GfaIpcReleaseSHM(hShm);
  1114. }
  1115. if(g_fZombie)
  1116. {
  1117. if(hAC)
  1118. ::GfaIpcAppCtrlSetState(hAC, GIAS_Zombie);
  1119. TRACE("Enter Zombie state ...\n");
  1120. g_lf.Warning("Enter Zombie state ...\n");
  1121. g_lf.Flush();
  1122. pause();
  1123. if(g_nLastSig >= 0)
  1124. g_lf.Info("Received signal '%s'.\n", strsignal(g_nLastSig));
  1125. }
  1126. if(hAC)
  1127. {
  1128. g_lf.Info("Enter state %s ...\n", ::GfaIpcAppCtrlGetStateText(GIAS_Terminating));
  1129. ::GfaIpcAppCtrlSetState(hAC, GIAS_Terminating);
  1130. g_lf.Info("Releasing App Control ...\n");
  1131. ::GfaIpcAppCtrlRelease(hAC);
  1132. }
  1133. g_lf.Info("Process exit.\n\n");
  1134. g_lf.Close();
  1135. CLogfile::StdErr("MqttCl exit.\n");
  1136. return nRet;
  1137. }