diff --git a/meian/Dockerfile b/meian/Dockerfile new file mode 100644 index 0000000..a2e0244 --- /dev/null +++ b/meian/Dockerfile @@ -0,0 +1,32 @@ +# 将官方 Python 运行时用作父镜像 +FROM python:3.10.13-slim-bullseye + +# 工作目录设置 +RUN mkdir -p /app +WORKDIR /app/meian + +# 将文件复制到容器中 +COPY btop ./sources.list.bullseye ./requirements.txt ./*.py /app/meian/ +COPY ./data /app/meian/data +COPY ./devices /app/meian/devices + +RUN export TZ=Asia/Shanghai \ + && ln -sf /usr/share/zoneinfo/$TZ /etc/localtime \ + && echo $TZ > /etc/timezone \ + && export http_proxy=$proxy_url && export https_proxy=$proxy_url \ + && mv sources.list.bullseye /etc/apt/sources.list \ + && apt-get update \ + && apt-get install -y iputils-ping vim net-tools telnet sqlite3 curl \ + && pip install --no-cache-dir -r requirements.txt -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com --timeout 1000 \ + && apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \ + && unset http_proxy \ + && unset https_proxy + +RUN echo "alias ll='ls -alh --color'" >> /root/.bashrc +RUN echo "alias ll='ls -alh --color'" >> /root/.profile + +# 0: 本地环境,1:测试容器环境,2: 生产容器环境,3: 本地开发环境 +ENV ENV_TYPE=2 + +#CMD ["sleep", "3600"] +CMD ["python", "app.py"] diff --git a/meian/Pipfile b/meian/Pipfile new file mode 100644 index 0000000..9063c92 --- /dev/null +++ b/meian/Pipfile @@ -0,0 +1,14 @@ +[[source]] +url = "https://mirrors.ustc.edu.cn/pypi/simple" +verify_ssl = false +name = "pip_conf_index_global" + +[packages] +paho-mqtt = "==1.6.1" +pydantic = "==2.5.2" +requests = "*" + +[dev-packages] + +[requires] +python_version = "3.10" diff --git a/meian/Pipfile.lock b/meian/Pipfile.lock new file mode 100644 index 0000000..f6b04cf --- /dev/null +++ b/meian/Pipfile.lock @@ -0,0 +1,293 @@ +{ + "_meta": { + "hash": { + "sha256": "236feffaf390225d7412befe5ca7393964d8a8083383e62a53dea908da63502b" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.10" + }, + "sources": [ + { + "name": "pip_conf_index_global", + "url": "https://mirrors.ustc.edu.cn/pypi/simple", + "verify_ssl": false + } + ] + }, + "default": { + "annotated-types": { + "hashes": [ + "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53", + "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89" + ], + "markers": "python_version >= '3.8'", + "version": "==0.7.0" + }, + "certifi": { + "hashes": [ + "sha256:922820b53db7a7257ffbda3f597266d435245903d80737e34f8a45ff3e3230d8", + "sha256:bec941d2aa8195e248a60b31ff9f0558284cf01a52591ceda73ea9afffd69fd9" + ], + "markers": "python_version >= '3.6'", + "version": "==2024.8.30" + }, + "charset-normalizer": { + "hashes": [ + "sha256:06435b539f889b1f6f4ac1758871aae42dc3a8c0e24ac9e60c2384973ad73027", + "sha256:06a81e93cd441c56a9b65d8e1d043daeb97a3d0856d177d5c90ba85acb3db087", + "sha256:0a55554a2fa0d408816b3b5cedf0045f4b8e1a6065aec45849de2d6f3f8e9786", + "sha256:0b2b64d2bb6d3fb9112bafa732def486049e63de9618b5843bcdd081d8144cd8", + "sha256:10955842570876604d404661fbccbc9c7e684caf432c09c715ec38fbae45ae09", + "sha256:122c7fa62b130ed55f8f285bfd56d5f4b4a5b503609d181f9ad85e55c89f4185", + "sha256:1ceae2f17a9c33cb48e3263960dc5fc8005351ee19db217e9b1bb15d28c02574", + "sha256:1d3193f4a680c64b4b6a9115943538edb896edc190f0b222e73761716519268e", + "sha256:1f79682fbe303db92bc2b1136016a38a42e835d932bab5b3b1bfcfbf0640e519", + "sha256:2127566c664442652f024c837091890cb1942c30937add288223dc895793f898", + "sha256:22afcb9f253dac0696b5a4be4a1c0f8762f8239e21b99680099abd9b2b1b2269", + "sha256:25baf083bf6f6b341f4121c2f3c548875ee6f5339300e08be3f2b2ba1721cdd3", + "sha256:2e81c7b9c8979ce92ed306c249d46894776a909505d8f5a4ba55b14206e3222f", + "sha256:3287761bc4ee9e33561a7e058c72ac0938c4f57fe49a09eae428fd88aafe7bb6", + "sha256:34d1c8da1e78d2e001f363791c98a272bb734000fcef47a491c1e3b0505657a8", + "sha256:37e55c8e51c236f95b033f6fb391d7d7970ba5fe7ff453dad675e88cf303377a", + "sha256:3d47fa203a7bd9c5b6cee4736ee84ca03b8ef23193c0d1ca99b5089f72645c73", + "sha256:3e4d1f6587322d2788836a99c69062fbb091331ec940e02d12d179c1d53e25fc", + "sha256:42cb296636fcc8b0644486d15c12376cb9fa75443e00fb25de0b8602e64c1714", + "sha256:45485e01ff4d3630ec0d9617310448a8702f70e9c01906b0d0118bdf9d124cf2", + "sha256:4a78b2b446bd7c934f5dcedc588903fb2f5eec172f3d29e52a9096a43722adfc", + "sha256:4ab2fe47fae9e0f9dee8c04187ce5d09f48eabe611be8259444906793ab7cbce", + "sha256:4d0d1650369165a14e14e1e47b372cfcb31d6ab44e6e33cb2d4e57265290044d", + "sha256:549a3a73da901d5bc3ce8d24e0600d1fa85524c10287f6004fbab87672bf3e1e", + "sha256:55086ee1064215781fff39a1af09518bc9255b50d6333f2e4c74ca09fac6a8f6", + "sha256:572c3763a264ba47b3cf708a44ce965d98555f618ca42c926a9c1616d8f34269", + "sha256:573f6eac48f4769d667c4442081b1794f52919e7edada77495aaed9236d13a96", + "sha256:5b4c145409bef602a690e7cfad0a15a55c13320ff7a3ad7ca59c13bb8ba4d45d", + "sha256:6463effa3186ea09411d50efc7d85360b38d5f09b870c48e4600f63af490e56a", + "sha256:65f6f63034100ead094b8744b3b97965785388f308a64cf8d7c34f2f2e5be0c4", + "sha256:663946639d296df6a2bb2aa51b60a2454ca1cb29835324c640dafb5ff2131a77", + "sha256:6897af51655e3691ff853668779c7bad41579facacf5fd7253b0133308cf000d", + "sha256:68d1f8a9e9e37c1223b656399be5d6b448dea850bed7d0f87a8311f1ff3dabb0", + "sha256:6ac7ffc7ad6d040517be39eb591cac5ff87416c2537df6ba3cba3bae290c0fed", + "sha256:6b3251890fff30ee142c44144871185dbe13b11bab478a88887a639655be1068", + "sha256:6c4caeef8fa63d06bd437cd4bdcf3ffefe6738fb1b25951440d80dc7df8c03ac", + "sha256:6ef1d82a3af9d3eecdba2321dc1b3c238245d890843e040e41e470ffa64c3e25", + "sha256:753f10e867343b4511128c6ed8c82f7bec3bd026875576dfd88483c5c73b2fd8", + "sha256:7cd13a2e3ddeed6913a65e66e94b51d80a041145a026c27e6bb76c31a853c6ab", + "sha256:7ed9e526742851e8d5cc9e6cf41427dfc6068d4f5a3bb03659444b4cabf6bc26", + "sha256:7f04c839ed0b6b98b1a7501a002144b76c18fb1c1850c8b98d458ac269e26ed2", + "sha256:802fe99cca7457642125a8a88a084cef28ff0cf9407060f7b93dca5aa25480db", + "sha256:80402cd6ee291dcb72644d6eac93785fe2c8b9cb30893c1af5b8fdd753b9d40f", + "sha256:8465322196c8b4d7ab6d1e049e4c5cb460d0394da4a27d23cc242fbf0034b6b5", + "sha256:86216b5cee4b06df986d214f664305142d9c76df9b6512be2738aa72a2048f99", + "sha256:87d1351268731db79e0f8e745d92493ee2841c974128ef629dc518b937d9194c", + "sha256:8bdb58ff7ba23002a4c5808d608e4e6c687175724f54a5dade5fa8c67b604e4d", + "sha256:8c622a5fe39a48f78944a87d4fb8a53ee07344641b0562c540d840748571b811", + "sha256:8d756e44e94489e49571086ef83b2bb8ce311e730092d2c34ca8f7d925cb20aa", + "sha256:8f4a014bc36d3c57402e2977dada34f9c12300af536839dc38c0beab8878f38a", + "sha256:9063e24fdb1e498ab71cb7419e24622516c4a04476b17a2dab57e8baa30d6e03", + "sha256:90d558489962fd4918143277a773316e56c72da56ec7aa3dc3dbbe20fdfed15b", + "sha256:923c0c831b7cfcb071580d3f46c4baf50f174be571576556269530f4bbd79d04", + "sha256:95f2a5796329323b8f0512e09dbb7a1860c46a39da62ecb2324f116fa8fdc85c", + "sha256:96b02a3dc4381e5494fad39be677abcb5e6634bf7b4fa83a6dd3112607547001", + "sha256:9f96df6923e21816da7e0ad3fd47dd8f94b2a5ce594e00677c0013018b813458", + "sha256:a10af20b82360ab00827f916a6058451b723b4e65030c5a18577c8b2de5b3389", + "sha256:a50aebfa173e157099939b17f18600f72f84eed3049e743b68ad15bd69b6bf99", + "sha256:a981a536974bbc7a512cf44ed14938cf01030a99e9b3a06dd59578882f06f985", + "sha256:a9a8e9031d613fd2009c182b69c7b2c1ef8239a0efb1df3f7c8da66d5dd3d537", + "sha256:ae5f4161f18c61806f411a13b0310bea87f987c7d2ecdbdaad0e94eb2e404238", + "sha256:aed38f6e4fb3f5d6bf81bfa990a07806be9d83cf7bacef998ab1a9bd660a581f", + "sha256:b01b88d45a6fcb69667cd6d2f7a9aeb4bf53760d7fc536bf679ec94fe9f3ff3d", + "sha256:b261ccdec7821281dade748d088bb6e9b69e6d15b30652b74cbbac25e280b796", + "sha256:b2b0a0c0517616b6869869f8c581d4eb2dd83a4d79e0ebcb7d373ef9956aeb0a", + "sha256:b4a23f61ce87adf89be746c8a8974fe1c823c891d8f86eb218bb957c924bb143", + "sha256:bd8f7df7d12c2db9fab40bdd87a7c09b1530128315d047a086fa3ae3435cb3a8", + "sha256:beb58fe5cdb101e3a055192ac291b7a21e3b7ef4f67fa1d74e331a7f2124341c", + "sha256:c002b4ffc0be611f0d9da932eb0f704fe2602a9a949d1f738e4c34c75b0863d5", + "sha256:c083af607d2515612056a31f0a8d9e0fcb5876b7bfc0abad3ecd275bc4ebc2d5", + "sha256:c180f51afb394e165eafe4ac2936a14bee3eb10debc9d9e4db8958fe36afe711", + "sha256:c235ebd9baae02f1b77bcea61bce332cb4331dc3617d254df3323aa01ab47bd4", + "sha256:cd70574b12bb8a4d2aaa0094515df2463cb429d8536cfb6c7ce983246983e5a6", + "sha256:d0eccceffcb53201b5bfebb52600a5fb483a20b61da9dbc885f8b103cbe7598c", + "sha256:d965bba47ddeec8cd560687584e88cf699fd28f192ceb452d1d7ee807c5597b7", + "sha256:db364eca23f876da6f9e16c9da0df51aa4f104a972735574842618b8c6d999d4", + "sha256:ddbb2551d7e0102e7252db79ba445cdab71b26640817ab1e3e3648dad515003b", + "sha256:deb6be0ac38ece9ba87dea880e438f25ca3eddfac8b002a2ec3d9183a454e8ae", + "sha256:e06ed3eb3218bc64786f7db41917d4e686cc4856944f53d5bdf83a6884432e12", + "sha256:e27ad930a842b4c5eb8ac0016b0a54f5aebbe679340c26101df33424142c143c", + "sha256:e537484df0d8f426ce2afb2d0f8e1c3d0b114b83f8850e5f2fbea0e797bd82ae", + "sha256:eb00ed941194665c332bf8e078baf037d6c35d7c4f3102ea2d4f16ca94a26dc8", + "sha256:eb6904c354526e758fda7167b33005998fb68c46fbc10e013ca97f21ca5c8887", + "sha256:eb8821e09e916165e160797a6c17edda0679379a4be5c716c260e836e122f54b", + "sha256:efcb3f6676480691518c177e3b465bcddf57cea040302f9f4e6e191af91174d4", + "sha256:f27273b60488abe721a075bcca6d7f3964f9f6f067c8c4c605743023d7d3944f", + "sha256:f30c3cb33b24454a82faecaf01b19c18562b1e89558fb6c56de4d9118a032fd5", + "sha256:fb69256e180cb6c8a894fee62b3afebae785babc1ee98b81cdf68bbca1987f33", + "sha256:fd1abc0d89e30cc4e02e4064dc67fcc51bd941eb395c502aac3ec19fab46b519", + "sha256:ff8fa367d09b717b2a17a052544193ad76cd49979c805768879cb63d9ca50561" + ], + "markers": "python_full_version >= '3.7.0'", + "version": "==3.3.2" + }, + "idna": { + "hashes": [ + "sha256:050b4e5baadcd44d760cedbd2b8e639f2ff89bbc7a5730fcc662954303377aac", + "sha256:d838c2c0ed6fced7693d5e8ab8e734d5f8fda53a039c0164afb0b82e771e3603" + ], + "markers": "python_version >= '3.6'", + "version": "==3.8" + }, + "paho-mqtt": { + "hashes": [ + "sha256:2a8291c81623aec00372b5a85558a372c747cbca8e9934dfe218638b8eefc26f" + ], + "index": "pip_conf_index_global", + "version": "==1.6.1" + }, + "pydantic": { + "hashes": [ + "sha256:80c50fb8e3dcecfddae1adbcc00ec5822918490c99ab31f6cf6140ca1c1429f0", + "sha256:ff177ba64c6faf73d7afa2e8cad38fd456c0dbe01c9954e71038001cd15a6edd" + ], + "index": "pip_conf_index_global", + "markers": "python_version >= '3.7'", + "version": "==2.5.2" + }, + "pydantic-core": { + "hashes": [ + "sha256:038c9f763e650712b899f983076ce783175397c848da04985658e7628cbe873b", + "sha256:074f3d86f081ce61414d2dc44901f4f83617329c6f3ab49d2bc6c96948b2c26b", + "sha256:079206491c435b60778cf2b0ee5fd645e61ffd6e70c47806c9ed51fc75af078d", + "sha256:09b0e985fbaf13e6b06a56d21694d12ebca6ce5414b9211edf6f17738d82b0f8", + "sha256:0f6116a558fd06d1b7c2902d1c4cf64a5bd49d67c3540e61eccca93f41418124", + "sha256:103ef8d5b58596a731b690112819501ba1db7a36f4ee99f7892c40da02c3e189", + "sha256:16e29bad40bcf97aac682a58861249ca9dcc57c3f6be22f506501833ddb8939c", + "sha256:206ed23aecd67c71daf5c02c3cd19c0501b01ef3cbf7782db9e4e051426b3d0d", + "sha256:2248485b0322c75aee7565d95ad0e16f1c67403a470d02f94da7344184be770f", + "sha256:27548e16c79702f1e03f5628589c6057c9ae17c95b4c449de3c66b589ead0520", + "sha256:2d0ae0d8670164e10accbeb31d5ad45adb71292032d0fdb9079912907f0085f4", + "sha256:3128e0bbc8c091ec4375a1828d6118bc20404883169ac95ffa8d983b293611e6", + "sha256:3387277f1bf659caf1724e1afe8ee7dbc9952a82d90f858ebb931880216ea955", + "sha256:34708cc82c330e303f4ce87758828ef6e457681b58ce0e921b6e97937dd1e2a3", + "sha256:35613015f0ba7e14c29ac6c2483a657ec740e5ac5758d993fdd5870b07a61d8b", + "sha256:3ad873900297bb36e4b6b3f7029d88ff9829ecdc15d5cf20161775ce12306f8a", + "sha256:40180930807ce806aa71eda5a5a5447abb6b6a3c0b4b3b1b1962651906484d68", + "sha256:439c9afe34638ace43a49bf72d201e0ffc1a800295bed8420c2a9ca8d5e3dbb3", + "sha256:45e95333b8418ded64745f14574aa9bfc212cb4fbeed7a687b0c6e53b5e188cd", + "sha256:4641e8ad4efb697f38a9b64ca0523b557c7931c5f84e0fd377a9a3b05121f0de", + "sha256:49b08aae5013640a3bfa25a8eebbd95638ec3f4b2eaf6ed82cf0c7047133f03b", + "sha256:4bc536201426451f06f044dfbf341c09f540b4ebdb9fd8d2c6164d733de5e634", + "sha256:4ce601907e99ea5b4adb807ded3570ea62186b17f88e271569144e8cca4409c7", + "sha256:4e40f2bd0d57dac3feb3a3aed50f17d83436c9e6b09b16af271b6230a2915459", + "sha256:4e47a76848f92529879ecfc417ff88a2806438f57be4a6a8bf2961e8f9ca9ec7", + "sha256:513b07e99c0a267b1d954243845d8a833758a6726a3b5d8948306e3fe14675e3", + "sha256:531f4b4252fac6ca476fbe0e6f60f16f5b65d3e6b583bc4d87645e4e5ddde331", + "sha256:57d52fa717ff445cb0a5ab5237db502e6be50809b43a596fb569630c665abddf", + "sha256:59986de5710ad9613ff61dd9b02bdd2f615f1a7052304b79cc8fa2eb4e336d2d", + "sha256:5baab5455c7a538ac7e8bf1feec4278a66436197592a9bed538160a2e7d11e36", + "sha256:5c7d5b5005f177764e96bd584d7bf28d6e26e96f2a541fdddb934c486e36fd59", + "sha256:60b7607753ba62cf0739177913b858140f11b8af72f22860c28eabb2f0a61937", + "sha256:615a0a4bff11c45eb3c1996ceed5bdaa2f7b432425253a7c2eed33bb86d80abc", + "sha256:61ea96a78378e3bd5a0be99b0e5ed00057b71f66115f5404d0dae4819f495093", + "sha256:652c1988019752138b974c28f43751528116bcceadad85f33a258869e641d753", + "sha256:6637560562134b0e17de333d18e69e312e0458ee4455bdad12c37100b7cad706", + "sha256:678265f7b14e138d9a541ddabbe033012a2953315739f8cfa6d754cc8063e8ca", + "sha256:699156034181e2ce106c89ddb4b6504c30db8caa86e0c30de47b3e0654543260", + "sha256:6b9ff467ffbab9110e80e8c8de3bcfce8e8b0fd5661ac44a09ae5901668ba997", + "sha256:6c327e9cd849b564b234da821236e6bcbe4f359a42ee05050dc79d8ed2a91588", + "sha256:6d30226dfc816dd0fdf120cae611dd2215117e4f9b124af8c60ab9093b6e8e71", + "sha256:6e227c40c02fd873c2a73a98c1280c10315cbebe26734c196ef4514776120aeb", + "sha256:6e4d090e73e0725b2904fdbdd8d73b8802ddd691ef9254577b708d413bf3006e", + "sha256:70f4b4851dbb500129681d04cc955be2a90b2248d69273a787dda120d5cf1f69", + "sha256:70f947628e074bb2526ba1b151cee10e4c3b9670af4dbb4d73bc8a89445916b5", + "sha256:774de879d212db5ce02dfbf5b0da9a0ea386aeba12b0b95674a4ce0593df3d07", + "sha256:77fa384d8e118b3077cccfcaf91bf83c31fe4dc850b5e6ee3dc14dc3d61bdba1", + "sha256:79e0a2cdbdc7af3f4aee3210b1172ab53d7ddb6a2d8c24119b5706e622b346d0", + "sha256:7e88f5696153dc516ba6e79f82cc4747e87027205f0e02390c21f7cb3bd8abfd", + "sha256:7f8210297b04e53bc3da35db08b7302a6a1f4889c79173af69b72ec9754796b8", + "sha256:81982d78a45d1e5396819bbb4ece1fadfe5f079335dd28c4ab3427cd95389944", + "sha256:823fcc638f67035137a5cd3f1584a4542d35a951c3cc68c6ead1df7dac825c26", + "sha256:853a2295c00f1d4429db4c0fb9475958543ee80cfd310814b5c0ef502de24dda", + "sha256:88e74ab0cdd84ad0614e2750f903bb0d610cc8af2cc17f72c28163acfcf372a4", + "sha256:8aa1768c151cf562a9992462239dfc356b3d1037cc5a3ac829bb7f3bda7cc1f9", + "sha256:8c8a8812fe6f43a3a5b054af6ac2d7b8605c7bcab2804a8a7d68b53f3cd86e00", + "sha256:95b15e855ae44f0c6341ceb74df61b606e11f1087e87dcb7482377374aac6abe", + "sha256:96581cfefa9123accc465a5fd0cc833ac4d75d55cc30b633b402e00e7ced00a6", + "sha256:9bd18fee0923ca10f9a3ff67d4851c9d3e22b7bc63d1eddc12f439f436f2aada", + "sha256:a33324437018bf6ba1bb0f921788788641439e0ed654b233285b9c69704c27b4", + "sha256:a6a16f4a527aae4f49c875da3cdc9508ac7eef26e7977952608610104244e1b7", + "sha256:a717aef6971208f0851a2420b075338e33083111d92041157bbe0e2713b37325", + "sha256:a71891847f0a73b1b9eb86d089baee301477abef45f7eaf303495cd1473613e4", + "sha256:aae7ea3a1c5bb40c93cad361b3e869b180ac174656120c42b9fadebf685d121b", + "sha256:ab1cdb0f14dc161ebc268c09db04d2c9e6f70027f3b42446fa11c153521c0e88", + "sha256:ab4ea451082e684198636565224bbb179575efc1658c48281b2c866bfd4ddf04", + "sha256:abf058be9517dc877227ec3223f0300034bd0e9f53aebd63cf4456c8cb1e0863", + "sha256:af36f36538418f3806048f3b242a1777e2540ff9efaa667c27da63d2749dbce0", + "sha256:b53e9ad053cd064f7e473a5f29b37fc4cc9dc6d35f341e6afc0155ea257fc911", + "sha256:b7851992faf25eac90bfcb7bfd19e1f5ffa00afd57daec8a0042e63c74a4551b", + "sha256:b9b759b77f5337b4ea024f03abc6464c9f35d9718de01cfe6bae9f2e139c397e", + "sha256:ba39688799094c75ea8a16a6b544eb57b5b0f3328697084f3f2790892510d144", + "sha256:ba6b6b3846cfc10fdb4c971980a954e49d447cd215ed5a77ec8190bc93dd7bc5", + "sha256:bb4c2eda937a5e74c38a41b33d8c77220380a388d689bcdb9b187cf6224c9720", + "sha256:c0b97ec434041827935044bbbe52b03d6018c2897349670ff8fe11ed24d1d4ab", + "sha256:c1452a1acdf914d194159439eb21e56b89aa903f2e1c65c60b9d874f9b950e5d", + "sha256:c2027d05c8aebe61d898d4cffd774840a9cb82ed356ba47a90d99ad768f39789", + "sha256:c2adbe22ab4babbca99c75c5d07aaf74f43c3195384ec07ccbd2f9e3bddaecec", + "sha256:c2d97e906b4ff36eb464d52a3bc7d720bd6261f64bc4bcdbcd2c557c02081ed2", + "sha256:c339dabd8ee15f8259ee0f202679b6324926e5bc9e9a40bf981ce77c038553db", + "sha256:c6eae413494a1c3f89055da7a5515f32e05ebc1a234c27674a6956755fb2236f", + "sha256:c949f04ecad823f81b1ba94e7d189d9dfb81edbb94ed3f8acfce41e682e48cef", + "sha256:c97bee68898f3f4344eb02fec316db93d9700fb1e6a5b760ffa20d71d9a46ce3", + "sha256:ca61d858e4107ce5e1330a74724fe757fc7135190eb5ce5c9d0191729f033209", + "sha256:cb4679d4c2b089e5ef89756bc73e1926745e995d76e11925e3e96a76d5fa51fc", + "sha256:cb774298da62aea5c80a89bd58c40205ab4c2abf4834453b5de207d59d2e1651", + "sha256:ccd4d5702bb90b84df13bd491be8d900b92016c5a455b7e14630ad7449eb03f8", + "sha256:cf9d3fe53b1ee360e2421be95e62ca9b3296bf3f2fb2d3b83ca49ad3f925835e", + "sha256:d2ae91f50ccc5810b2f1b6b858257c9ad2e08da70bf890dee02de1775a387c66", + "sha256:d37f8ec982ead9ba0a22a996129594938138a1503237b87318392a48882d50b7", + "sha256:d81e6987b27bc7d101c8597e1cd2bcaa2fee5e8e0f356735c7ed34368c471550", + "sha256:dcf4e6d85614f7a4956c2de5a56531f44efb973d2fe4a444d7251df5d5c4dcfd", + "sha256:de790a3b5aa2124b8b78ae5faa033937a72da8efe74b9231698b5a1dd9be3405", + "sha256:e47e9a08bcc04d20975b6434cc50bf82665fbc751bcce739d04a3120428f3e27", + "sha256:e60f112ac88db9261ad3a52032ea46388378034f3279c643499edb982536a093", + "sha256:e87fc540c6cac7f29ede02e0f989d4233f88ad439c5cdee56f693cc9c1c78077", + "sha256:eac5c82fc632c599f4639a5886f96867ffced74458c7db61bc9a66ccb8ee3113", + "sha256:ebb4e035e28f49b6f1a7032920bb9a0c064aedbbabe52c543343d39341a5b2a3", + "sha256:ec1e72d6412f7126eb7b2e3bfca42b15e6e389e1bc88ea0069d0cc1742f477c6", + "sha256:ef98ca7d5995a82f43ec0ab39c4caf6a9b994cb0b53648ff61716370eadc43cf", + "sha256:f0cbc7fff06a90bbd875cc201f94ef0ee3929dfbd5c55a06674b60857b8b85ed", + "sha256:f4791cf0f8c3104ac668797d8c514afb3431bc3305f5638add0ba1a5a37e0d88", + "sha256:f5e412d717366e0677ef767eac93566582518fe8be923361a5c204c1a62eaafe", + "sha256:fb2ed8b3fe4bf4506d6dab3b93b83bbc22237e230cba03866d561c3577517d18", + "sha256:fe0a5a1025eb797752136ac8b4fa21aa891e3d74fd340f864ff982d649691867" + ], + "markers": "python_version >= '3.7'", + "version": "==2.14.5" + }, + "requests": { + "hashes": [ + "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760", + "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6" + ], + "index": "pip_conf_index_global", + "markers": "python_version >= '3.8'", + "version": "==2.32.3" + }, + "typing-extensions": { + "hashes": [ + "sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d", + "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8" + ], + "markers": "python_version >= '3.8'", + "version": "==4.12.2" + }, + "urllib3": { + "hashes": [ + "sha256:a448b2f64d686155468037e1ace9f2d2199776e17f0a46610480d311f73e3472", + "sha256:dd505485549a7a552833da5e6063639d0d177c04f23bc3864e41e5dc5f612168" + ], + "markers": "python_version >= '3.8'", + "version": "==2.2.2" + } + }, + "develop": {} +} diff --git a/meian/app.py b/meian/app.py new file mode 100644 index 0000000..f34db90 --- /dev/null +++ b/meian/app.py @@ -0,0 +1,209 @@ +# -*- coding:utf-8 -*- +""" +@File : app +@Author : xuxingchen +@Version : 0.1.6 +@Contact : xuxingchen@sinochem.com +@Desc : 将中转mqtt与北向设备端进行连接 +""" +import base64 +import json +import os.path +import time +import traceback +from urllib import request + +import logger +from logger import Logger +from utils import generate_random_string +from devices.meian_db import SQLiteDatabaseEngine, BaseTable, DeviceTable, GatewayTable, UserInfoTable, \ + HeartBeatTable, RegisterTable, RecordTable +from devices.common_service import auto_service +from devices.common_model import UserData +from devices.yunfu_model import MeianProduct +from misc import create_mqtt_client +from config import DB_PATH, BROKER_0_HOST, BROKER_0_PORT, BROKER_0_API_PORT, BROKER_0_API_KEY, BROKER_0_API_SECRET, \ + BROKER_0_USERNAME, BROKER_0_PASSWD, BROKER_0_SSL, BROKER_1_HOST, BROKER_1_PORT, BROKER_1_SSL + + +def on_connect(client, userdata, _flags, rc): + Logger.connect(f"{userdata.client_id} mqtt connecting: {{rc: {rc}}}") + if userdata.topics: + _topics = [(topic, 0) for topic in userdata.topics] + client.subscribe(_topics) + Logger.debug(f"{userdata.client_id} subscribe topics: {userdata.topics}") + + +def on_message(_client, userdata: UserData, message): + try: + msg = json.loads(message.payload.decode().replace("\x00", "")) + if msg.get("dataType", False) and msg.get("dataType") != "heartBeat": + Logger.debug(f"💡 {userdata.client_id} 接收到新数据,执行 on_message 中 ...", log_path=None) + Logger.info(f"{message.topic}: {msg}") + userdata.set_topic(message.topic) + if userdata.table_handler is None: + _db = SQLiteDatabaseEngine(db_path=DB_PATH) + userdata.set_table_handler(BaseTable(_db.connection, _db.cursor)) + if userdata.token: + if "messageId" in msg.keys() and userdata.token == msg["messageId"]: # 兼容aiot平台 + userdata.set_status_add("status", True) + userdata.set_status_add("response", msg) + if "token" in msg.keys() and userdata.token == msg["token"] and userdata.code == msg["dataType"]: # 兼容美安 + userdata.set_status_add("status", True) + userdata.set_status_add("response", msg) + else: + auto_service(msg, userdata) + except Exception as e: + Logger.error(f"{type(e).__name__}, {e}") + if logger.DEBUG: + traceback.print_exc() + + +def on_disconnect(_client, _userdata, rc): + Logger.disconnect(f"Break mqtt connection! {{rc: {rc}}}") + + +def gen_gateway_info(th): + # 构建每个网关应当订阅的主题列表 + info = {} + # 获取网关信息 + gateway_list = GatewayTable.get_registered_gateway(th) + assert gateway_list is not None, "请优先完善网关配置表" + for gateway_item in gateway_list: + sub_aiot_id_list = json.loads(GatewayTable.get_registered_sub_aiot_id(th, gateway_item[0])[0]) + # 追加每个网关下每个子设备的主题 + if sub_aiot_id_list: + info[gateway_item[0]] = { + "sct": gateway_item[1], + "topics": [ + f"/jmlink/{aiot_id}/tml/service/call" for aiot_id in sub_aiot_id_list + ] + } + else: + info[gateway_item[0]] = { + "sct": gateway_item[1], + "topics": [] + } + return info + + +def check_ids(ids, host, port, key, secret): + """检查每个id对应的客户端认证""" + assert ids is not None, "数据库中无设备数据" + id_list = [_id[0] for _id in ids] + url = f"http://{host}:{port}/api/v5/authentication/password_based:built_in_database/users" + headers = { + "Content-Type": "application/json", + "Authorization": "Basic " + base64.b64encode((key + ":" + secret).encode()).decode() + } + for project_code in id_list: + body_data = { + "user_id": f"{project_code}@meian", + "password": MeianProduct.secret, + "is_superuser": False + } + body_json = json.dumps(body_data).encode('utf-8') + req = request.Request(url, data=body_json, headers=headers) + try: + with request.urlopen(req) as f: + response = json.loads(f.read().decode()) + status_code = f.getcode() + if status_code == 201: + if "user_id" in response: + Logger.init(f"为项目{project_code}新增用户") + except Exception as e: + if "HTTP Error 409" in str(e): # 用户已存在 + pass + else: + Logger.error(f"{type(e).__name__}, {e}") + if logger.DEBUG: + traceback.print_exc() + + +if __name__ == '__main__': + db = SQLiteDatabaseEngine(db_path=DB_PATH) # 此时数据表中应当已具备数据 + table_handler = BaseTable(db.connection, db.cursor) + # 数据库自检初始化 + Logger.init("数据库执行初始化校验 ...") + GatewayTable.check(table_handler) + DeviceTable.check(table_handler) + HeartBeatTable.check(table_handler) + RegisterTable.check(table_handler) + UserInfoTable.check(table_handler) + RecordTable.check(table_handler) + Logger.init("数据库完成初始化校验 ✅") + + # 设备mqtt认证客户端初始化,确保每个设备的账户都存在 + Logger.init("中转mqtt认证客户端账户执行初始化校验 ...") + ids = GatewayTable.get_project_code_list(table_handler) + check_ids(ids, BROKER_0_HOST, BROKER_0_API_PORT, BROKER_0_API_KEY, BROKER_0_API_SECRET) + Logger.init("中转mqtt认证客户端账户完成初始化校验 ✅") + + # 1. 获取待订阅的主题 + topics = json.loads(DeviceTable.get_topics(table_handler)[1]) # 0: subscribe, 1: publish + # 根据现有的设备表信息结合网关表信息构建aiot平台服务调用的主题 + gateway_info = gen_gateway_info(table_handler) + + client_dict = {} + # 2. 构建客户端 + user_data_0 = UserData() + user_data_0.set_topics(topics) # 用于连接时订阅 + user_data_0.set_status_add("client", 0) + user_data_0.set_client_id(f"mqtt-python-{generate_random_string()}") + user_data_0.set_clients(client_dict) + client_0 = create_mqtt_client( + broker_host=BROKER_0_HOST, + broker_port=BROKER_0_PORT, + userdata=user_data_0, + on_connect=on_connect, + on_message=on_message, + client_id=user_data_0.client_id, + username=BROKER_0_USERNAME, + password=BROKER_0_PASSWD, + ssl_flag=BROKER_0_SSL + ) + client_0.loop_start() + client_dict["center"] = [client_0, user_data_0] + + for gateway_id in gateway_info.keys(): + user_data_1 = UserData() + user_data_1.set_topics(gateway_info[gateway_id]["topics"]) # 用于连接时订阅 + user_data_1.set_status_add("client", 1) + user_data_1.set_status_add("last_timestamp", time.time()) + user_data_1.set_status_add("gateway_id", gateway_id) + user_data_1.set_client_id(f"device|1|1|{gateway_id}") + client_1 = create_mqtt_client( + broker_host=BROKER_1_HOST, + broker_port=BROKER_1_PORT, + userdata=user_data_1, + on_connect=on_connect, + on_disconnect=on_disconnect, + on_message=on_message, + client_id=user_data_1.client_id, + username=gateway_id, + password=gateway_info[gateway_id]["sct"], + ssl_flag=BROKER_1_SSL + ) + client_1.loop_start() + client_dict[gateway_id] = [client_1, user_data_1] + user_data_1.set_clients(client_dict) + + while True: + time.sleep(86400) + t = str(int((time.time() - 86400) * 1000)) + UserInfoTable.delete_redundancy_1(table_handler, t) + UserInfoTable.delete_redundancy_0(table_handler, t) + # 每7天清除一次日志文件 + creation_time = os.path.getctime(logger.LOGGER_PATH) + days_since_creation = (time.time() - creation_time) / (60 * 60 * 24) + if os.path.exists(logger.LOGGER_PATH) and days_since_creation >= 7: + try: + f0 = open(logger.LOGGER_PATH, "r", encoding="utf8") + f1 = open(f"{logger.LOGGER_PATH}.old", "w", encoding="utf8") + f1.write(f0.read()) + f1.close() + f0.close() + os.remove(logger.LOGGER_PATH) + print(f"日志文件 {logger.LOGGER_PATH} 完成重置") + except Exception as e: + print(f"日志文件 {logger.LOGGER_PATH} 重置失败: {e}") diff --git a/meian/btop b/meian/btop new file mode 100644 index 0000000..bfce5e9 Binary files /dev/null and b/meian/btop differ diff --git a/meian/config.py b/meian/config.py new file mode 100644 index 0000000..988bb55 --- /dev/null +++ b/meian/config.py @@ -0,0 +1,47 @@ +# -*- coding:utf-8 -*- +""" +@Author : xuxingchen +@Contact : xuxingchen@sinochem.com +@Desc : +""" +import os +import json + +import logger + + +ENV_TYPE = int(os.environ.get("ENV_TYPE", 3)) # 0: 本地环境,1:测试容器环境,2: 生产容器环境,3: 本地开发环境 + +SLEEP_TIME = 0.03 if ENV_TYPE != 0 else 1 # 循环等待时间,正式环境30ms检测一次,开发环境1000ms检测一次 +TIMEOUT_SECOND = 5 if ENV_TYPE != 0 else 10 # 等待超时时间,正式环境5s,开发环境10s +DB_PATH = "./data/_meian.db" if ENV_TYPE != 2 else "./data/meian.db" +GATEWAY_CONFIG_PATH = "./data/_gateways.json" if ENV_TYPE != 2 else "./data/gateways.json" +CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data", "config.json") +ENV_INFO = json.load(open(CONFIG_PATH, "r", encoding="utf8")) +BACKEND_ID = "320566c*********200" +BACKEND_SECRET = "807aa9d**********b1188d85ff6" +BACKEND_URL = "https://iot-api.******.com" if ENV_TYPE == 2 else "https://iot-api-test.********.com" + +LOGGER_DEBUG = os.environ.get("LOGGER_DEBUG", ENV_INFO[ENV_TYPE]["LOGGER_DEBUG"]) +logger.DEBUG = False if LOGGER_DEBUG.upper() == "FALSE" else True +DEBUG = logger.DEBUG +logger.LOGGER_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data", "run.log") + +# project_code - project_id +with open(GATEWAY_CONFIG_PATH, "r", encoding="utf8") as f: + GATEWAY_CONFIG = json.load(f) + +# 0 mqtt +BROKER_0_HOST = os.environ.get("BROKER_0_HOST", ENV_INFO[ENV_TYPE]["BROKER_0_HOST"]) +BROKER_0_PORT = int(os.environ.get("BROKER_0_PORT", ENV_INFO[ENV_TYPE]["BROKER_0_PORT"])) +BROKER_0_API_PORT = int(os.environ.get("BROKER_0_API_PORT", ENV_INFO[ENV_TYPE]["BROKER_0_API_PORT"])) +BROKER_0_SSL = False +BROKER_0_USERNAME = os.environ.get("BROKER_0_USERNAME", ENV_INFO[ENV_TYPE]["BROKER_0_USERNAME"]) +BROKER_0_PASSWD = os.environ.get("BROKER_0_PASSWD", ENV_INFO[ENV_TYPE]["BROKER_0_PASSWD"]) +BROKER_0_API_KEY = os.environ.get("BROKER_0_API_KEY", ENV_INFO[ENV_TYPE]["BROKER_0_API_KEY"]) +BROKER_0_API_SECRET = os.environ.get("BROKER_0_API_SECRET", ENV_INFO[ENV_TYPE]["BROKER_0_API_SECRET"]) + +# 1 aiot +BROKER_1_HOST = os.environ.get("BROKER_1_HOST", ENV_INFO[ENV_TYPE]["BROKER_1_HOST"]) +BROKER_1_PORT = int(os.environ.get("BROKER_1_PORT", ENV_INFO[ENV_TYPE]["BROKER_1_PORT"])) +BROKER_1_SSL = False diff --git a/meian/data/_devices.csv b/meian/data/_devices.csv new file mode 100644 index 0000000..465c5d4 --- /dev/null +++ b/meian/data/_devices.csv @@ -0,0 +1,5 @@ +项目编码CODE, 设备ID, 订阅主题, 发布主题, 平台ID +YFCS, ma17qt7k, YFCS, jinmao-test, 278349 +YFCS, test01, YFCS, jinmao, 278436 +YFTEST, test02, TEST, jinmao, 278442 +YFTEST, test03, TEST, jinmao, 278443 diff --git a/meian/data/_gateways.json b/meian/data/_gateways.json new file mode 100644 index 0000000..8f8dbbe --- /dev/null +++ b/meian/data/_gateways.json @@ -0,0 +1,4 @@ +{ + "YFCS": ["278350", "b4e092284d978aff", "08e1e4a44ab35ee28491565fa993094a"], + "YFTEST": ["278440", "9ab43b7c6ed78192", ""] +} \ No newline at end of file diff --git a/meian/data/_users.csv b/meian/data/_users.csv new file mode 100644 index 0000000..433dc0f --- /dev/null +++ b/meian/data/_users.csv @@ -0,0 +1,2 @@ +用户ID, 用户姓名, 用户类型, 设备ID, face_url +71499f561**********45be0ec9514, 21栋201, 业主, ma17qt7k, "http://new-aiot-prod-public.oss-cn-beijing.aliyuncs.com/10********76c01/ad1***********8e221710?x-oss-process=image/resize,m_pad,h_960,w_540,color_FFFFFF" diff --git a/meian/devices/common_model.py b/meian/devices/common_model.py new file mode 100644 index 0000000..b1a35fb --- /dev/null +++ b/meian/devices/common_model.py @@ -0,0 +1,114 @@ +# -*- coding:utf-8 -*- +""" +@File : common_model +@Author : xuxingchen +@Version : 1.0 +@Contact : xuxingchen@sinochem.com +@Desc : 公用的数据结构 +""" +import threading +from enum import Enum +from typing import Optional + +from pydantic import BaseModel + + +# 错误码 +class ErrorCode(Enum): + SUCCESS = (0, "全部完成下发") + PART_SUCCESS = (10, "部分完成下发") + FAILURE = (20, "一个都没下发成功") + INPUT_TYPE_ERROR = (30, "入参数据类型异常") + NEVER_POST_FACE = (40, "用户未下发过人脸") + UNKNOWN = (90, "未知异常") + + def __new__(cls, code, message): + obj = object.__new__(cls) + obj._value_ = code + obj.message = message + return obj + + +class BaseInfo(BaseModel): + def check(self): + for attr in self.__dict__.keys(): + # if property can be null, default value should not be set to None + if self.__dict__[attr] is None: + raise ValueError(f"{attr} not allowed to be set to None") + + +class UserData: + def __init__(self): + self.topic: Optional[str] = None + self.topics: list = [] + self.table_handler = None + self.code = None + self.token = None + self.status: dict = {} + self.clients: dict = {} + self.client_id = None + self.lock = threading.Lock() # 添加一个锁用于线程同步 + + def set_topic(self, value: str): + with self.lock: + self.topic = value + + def set_topics(self, value: list): + with self.lock: + self.topics = value + + def set_table_handler(self, value): + with self.lock: + self.table_handler = value + + def set_code(self, value): + with self.lock: + self.code = value + + def set_token(self, value): + with self.lock: + self.token = value + + def set_status_add(self, key, value): + with self.lock: + self.status[key] = value + + def set_status_remove(self, key): + with self.lock: + if self.status and key in self.status.keys(): + self.status.pop(key) + + def get_status(self, key): + if self.status and key in self.status.keys(): + return self.status[key] + + def set_clients(self, value: dict): + with self.lock: + self.clients = value + + def set_client_add(self, key, value): + with self.lock: + self.clients[key] = value + + def set_client_id(self, key): + with self.lock: + self.client_id = key + + +class Project(BaseModel): + project_name: str + device_id: str + subscribe_topic: str + publish_topic: str + gateway_id: str + gateway_sct: str + register_type: int + + +class UserInfo(BaseModel): + user_id: str + device_id: str + name: str + user_type: int # 0:业主 1:访客 + qrcode: str = None # 当使用二维码时,该值就是美安设备对应的userid + face_url: str = None diff --git a/meian/devices/common_service.py b/meian/devices/common_service.py new file mode 100644 index 0000000..750d048 --- /dev/null +++ b/meian/devices/common_service.py @@ -0,0 +1,730 @@ +# -*- coding:utf-8 -*- +""" +@File : common_service +@Author : xuxingchen +@Version : 2.0 +@Contact : xuxingchen@sinochem.com +@Desc : 业务逻辑服务层 +""" +import json +import time +import traceback +import requests + +import devices.meian_model as meian_data_entity +import devices.yunfu_model as yunfu_data_entity + +import logger +from logger import Logger, speed_ms +from devices.meian_db import DeviceTable, RegisterTable, GatewayTable, HeartBeatTable, UserInfoTable, RecordTable +from devices.common_model import ErrorCode, UserData, UserInfo +from utils import generate_token, to_obj, datetime_to_timestamp, generate_time_token, encode_to_base64, \ + extract_building_unit, extract_number +from config import TIMEOUT_SECOND, SLEEP_TIME, BACKEND_URL, BACKEND_ID, BACKEND_SECRET, GATEWAY_CONFIG + + +class BaseService: + def __init__(self): + self.table_handler = None + self.topic = None + self.clients = None + + def set_meta(self, userdata: UserData): + self.table_handler = userdata.table_handler + self.topic = userdata.topic + self.clients = userdata.clients + + def handle(self, **kwargs): + """must be `override`""" + pass + + +def get_owner_house_floor(user_id, project_code: str, device_position_desc: str) -> int: + """根据用户ID获取用户房产信息,提取楼层信息并返回 + + 1. 该功能只支持业主使用,访客查询不到,默认返回 + """ + token_url = f"{BACKEND_URL}/v2/accesskey_auth" + owner_info_url = f"{BACKEND_URL}/v3/realty-master-data/owners/{user_id}" + house_info_url = f"{BACKEND_URL}/v3/realty-master-data/houses" + + # get token + data = {'id': BACKEND_ID, 'secret': BACKEND_SECRET} + token_response = requests.post(token_url, json=data) + if token_response.status_code == 200: + logger.Logger.debug(f"token 请求成功,响应内容:{token_response.json()}") + token = token_response.json()["access_token"] + else: + logger.Logger.error(f"token 请求失败,状态码: {token_response.status_code}") + raise Exception(f"token 请求失败,状态码: {token_response.status_code}") + # get house ids + headers = { + "Accept": "application/json", + "Access-Token": token, + "Content-Type": "application/json;charset=UTF-8" + } + owner_info_response = requests.get(owner_info_url, headers=headers) + if owner_info_response.status_code == 200: + owner_info = owner_info_response.json() + logger.Logger.debug(f"owner_info 请求成功,响应内容:{owner_info}") + # 根据 project_code 获取 project_id + if GATEWAY_CONFIG[project_code][2] == "" or owner_info["data"]["project_id"] == GATEWAY_CONFIG[project_code][2]: + house_ids = owner_info["data"]["house_ids"] + else: + logger.Logger.error(f"project id 不匹配," + f"预置:{GATEWAY_CONFIG[project_code][2]}, 用户信息所属:{owner_info['data']['project_id']}") + raise Exception(f"project id 不匹配") + elif owner_info_response.status_code == 404 and owner_info_response.json()["msg"] == "OWNER_NOT_EXISTS": + logger.Logger.warn(f"owner_info 请求成功,但用户ID不存在,非住户") + return 1 + else: + logger.Logger.error(f"token 请求失败,状态码: {token_response.status_code}") + raise Exception(f"token 请求失败,状态码: {token_response.status_code}") + + # 结合设备注册备注 获取楼层信息 + data = { + "query": { + "id": {"$in": house_ids} + } + } + house_info_response = requests.post(house_info_url, json=data, headers=headers) + if house_info_response.status_code == 200: + logger.Logger.debug(f"house_info 请求成功,响应内容:{house_info_response.json()}") + building_number, unit_number = extract_building_unit(device_position_desc) + house_info = house_info_response.json()["data"]["list"] + floor = 1 + for house_item in house_info: + building_name = extract_number(house_item["building_name"]).zfill(2) + unit_name = extract_number(house_item["unit_name"]).zfill(2) + if building_name == building_number and unit_name == unit_number: + floor = int(house_item["floor_name"]) + break + else: + logger.Logger.error(f"house_info 请求失败,状态码: {token_response.status_code}") + raise Exception(f"house_info 请求失败,状态码: {token_response.status_code}") + + return floor + + +class Services: + """业务逻辑入口""" + + class HeartBeatService(BaseService): + def handle(self, msg_obj: meian_data_entity.HeartBeat): + # Logger.debug("设备心跳已接收,正在执行上线 ...", log_path=None) + last_time = HeartBeatTable.get_last_time(self.table_handler, msg_obj.device_id) + # 若无心跳记录或者距离上次心跳记录时间达到5分钟 + if (last_time is None or + (time.time() - float( + HeartBeatTable.get_last_time(self.table_handler, msg_obj.device_id))) > (60. * 5.)): + status = HeartBeatTable.update(self.table_handler, msg_obj, self.topic) + if status: + # 执行上线操作 + project_code = DeviceTable.get_project_code(self.table_handler, msg_obj.device_id) + if project_code: + gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code) + if gateway_id is None: + Logger.error("网关不存在") + return -1 + aiot_id = DeviceTable.get_aiot_id(self.table_handler, msg_obj.device_id) + if aiot_id is None: + Logger.warn("设备未注册,不执行上线操作") + return -1 + topic = f"/jmlink/{aiot_id}/comm/online" + topic_resp = f"/jmlink/{aiot_id}/comm/online_resp" + s = time.time() + # 获取客户端 + client_1 = self.clients[gateway_id][0] + userdata = self.clients[gateway_id][1] + userdata.set_status_add("status", False) + userdata.set_status_add("start_timestamp", time.time()) + try: + # 订阅回传 + client_1.subscribe(topic_resp) + Logger.debug(f"subscribe topics: {topic_resp}") + # 发布事件 + token = generate_token() + userdata.set_token(token) + online_json = json.dumps(yunfu_data_entity.Online(messageId=token).__dict__) + client_1.publish(topic, online_json) + while True: + if userdata.status["status"]: + # 拿到结果后写入到数据库中更新数据 + if "code" in userdata.status["response"].keys(): + error_code = userdata.status["response"]["code"] + if error_code == 0: + Logger.info(f"设备 - {aiot_id} 完成上线") + else: + Logger.error( + f"{topic_resp} 返回错误码: {error_code}, " + f"{yunfu_data_entity.error_code[error_code]}") + break + if time.time() - userdata.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出 + Logger.debug( + f"等待回复超时: {speed_ms(userdata.status['start_timestamp'])}ms") + break + # Logger.debug(f"{userdata.status} waiting for {topic_resp} ... ") + time.sleep(SLEEP_TIME) + finally: + # 属性复位 + client_1.unsubscribe(topic_resp) + # Logger.debug(f"移除订阅: {topic_resp}") + userdata.set_token(None) + userdata.set_status_remove("response") + # Logger.debug(f"{speed_ms(s)}ms") + else: + Logger.warn("上线请求非法,默认无操作") + else: + Logger.debug("未达到重新上线阈值,默认无操作") + pass + + class RegisterService(BaseService): + def handle(self, msg_obj: meian_data_entity.Register): + Logger.debug("RegisterService Handle ...") + status = RegisterTable.update(self.table_handler, msg_obj, self.topic) + if status: + if not DeviceTable.get_device_register_type(self.table_handler, msg_obj.device_id): + Logger.debug("Execute register ...") + project_code = DeviceTable.get_project_code(self.table_handler, msg_obj.device_id) + if project_code: + gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code) + if gateway_id is None: + Logger.error("网关不存在") + return -1 + topic = f"/jmlink/{gateway_id}/comm/sub/register" + topic_resp = f"/jmlink/{gateway_id}/comm/sub/register_resp" + # 将连接信息(网关&网关secret)和回传信息发送给回传监听进程 + s = time.time() + # 获取客户端 + client_1 = self.clients[gateway_id][0] + userdata = self.clients[gateway_id][1] + userdata.set_status_add("status", False) + userdata.set_status_add("start_timestamp", time.time()) + try: + # 订阅回传 + client_1.subscribe(topic_resp) + Logger.debug(f"subscribe topics: {topic_resp}") + # 发布事件 + token = generate_token() + userdata.set_token(token) + register_json = json.dumps(yunfu_data_entity.CommRegister( + messageId=token, + params=yunfu_data_entity.RegisterParam( + deviceName=msg_obj.device_id, + displayName=f"美安-{msg_obj.device_id}-{msg_obj.device_position_desc}" + ).__dict__ + ).__dict__) + Logger.debug(register_json) + client_1.publish(topic, register_json) + while True: + if userdata.status["status"]: + # 拿到结果后写入到数据库中更新数据 + if "data" in userdata.status["response"].keys(): + resp = userdata.status["response"]["data"][0] + if resp["code"] == 0: + DeviceTable.update_aiot_id(self.table_handler, resp["deviceName"], + resp["deviceId"]) + Logger.info(f"设备 - {msg_obj.device_id} 完成注册") + + # 每次完成注册后刷新订阅信息 + Logger.debug(f"刷新订阅列表中 ...") + sub_aiot_id_list = json.loads( + GatewayTable.get_registered_sub_aiot_id(self.table_handler, gateway_id)[ + 0]) + current_topics = set( + [f"/jmlink/{aiot_id}/tml/service/call" for aiot_id in sub_aiot_id_list]) + last_topics = set(userdata.topics) + topics_to_subscribe = current_topics - last_topics + topics_to_unsubscribe = last_topics - current_topics + if topics_to_subscribe or topics_to_unsubscribe: + # 订阅缺少的主题 + for topic in list(topics_to_subscribe): + client_1.subscribe(topic) + Logger.debug(f"新增订阅: {topic}") + # 取消订阅多余的主题 + for topic in list(topics_to_unsubscribe): + client_1.unsubscribe(topic) + Logger.debug(f"移除订阅: {topic}") + else: + Logger.debug(f"订阅列表无变化") + Logger.debug(f"订阅列表刷新完成 ✅") + else: + Logger.error( + f"{topic_resp} 返回错误码: {resp['code']}, " + f"{yunfu_data_entity.error_code[resp['code']]}") + break + if time.time() - userdata.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出 + Logger.debug( + f"等待回复超时: {speed_ms(userdata.status['start_timestamp'])}ms") + break + # Logger.debug(f"{userdata.status} waiting for {topic_resp} ... ") + time.sleep(SLEEP_TIME) # 实际运行中设为0.1,100ms检测一次 + finally: + client_1.unsubscribe(topic_resp) + Logger.debug(f"移除订阅: {topic_resp}") + userdata.set_token(None) + userdata.set_status_remove("response") + Logger.debug(f"{speed_ms(s)}ms") + else: + Logger.error("网关配置表中不存在设备关联的项目信息") + else: + Logger.debug("设备已注册过,默认无操作") + + class PushRtAccessRecordService(BaseService): + def handle(self, msg_obj: meian_data_entity.PushRtAccessRecord): + Logger.debug("Execute open_event ...") + # 若存在通行记录,但人员信息表中不存在人员信息,则将通行记录本地保存留用 + if msg_obj.user_id.startswith("old-"): # 历史遗留在设备中的人员id应当以old-开头进行插入 + try: + RecordTable.add(self.table_handler, msg_obj) + Logger.info("通行事件本地记录成功") + return 1 + except Exception as e: + Logger.error(f"{type(e).__name__}, {e}") + if logger.DEBUG: + traceback.print_exc() + # 查询子设备ID + aiot_id = DeviceTable.get_aiot_id(self.table_handler, msg_obj.device_id) + if aiot_id is None: + Logger.warn(f"Device - {aiot_id} is not registered") + return -1 + topic = f"/jmlink/{aiot_id}/tml/event/post" + topic_resp = f"/jmlink/{aiot_id}/tml/event/post_resp" + project_code = DeviceTable.get_project_code(self.table_handler, msg_obj.device_id) + if project_code: + gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code) + # 将连接信息(网关&网关secret)和回传信息发送给回传监听进程 + s = time.time() + # 获取客户端 + client_1 = self.clients[gateway_id][0] + userdata = self.clients[gateway_id][1] + userdata.set_status_add("status", False) + userdata.set_status_add("start_timestamp", time.time()) + # 订阅回传 + client_1.subscribe(topic_resp) + Logger.debug(f"subscribe topics: {topic_resp}") + try: + # 数据转换 + open_event = yunfu_data_entity.OpenEvent() + open_event.datetime = str(datetime_to_timestamp(msg_obj.time)) + open_event.type = yunfu_data_entity.OpenEventType.__dict__[msg_obj.access_mode.upper()].value + open_event.certificate_type = yunfu_data_entity.OpenEventCertificateType.__dict__[ + msg_obj.access_mode.upper()].value + # 若是二维码通信,根据二维码获取用户信息,若是人脸通信,根据userid获取用户信息 + if msg_obj.access_mode == "qrCode": + user = UserInfoTable.get_user_by_qrcode(self.table_handler, msg_obj.user_id, msg_obj.device_id) + open_event.code = user[0] + open_event.certificate = user[0] + name = user[1] + else: + name = UserInfoTable.get_name(self.table_handler, msg_obj.user_id, msg_obj.device_id) + open_event.code = msg_obj.user_id + open_event.certificate = msg_obj.user_id + open_event.name = name + open_event.check() + token = generate_token() + userdata.set_token(token) + event_json = json.dumps(yunfu_data_entity.EventPost( + messageId=token, eventCode="open_event", + params=open_event.__dict__ + ).__dict__) + # 发布事件 + client_1.publish(topic, event_json) + # 等待回传 + while True: + if userdata.status["status"]: + if "code" in userdata.status["response"].keys(): + if userdata.status["response"]["code"] != 0: + error_code = userdata.status['response']['errorCode'] + Logger.error(f"{topic_resp} 返回错误码: {error_code}, " + f"{meian_data_entity.error_code[error_code]}") + else: + Logger.debug("通行事件推送成功") + break + if time.time() - userdata.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出 + Logger.error("等待回复超时") + break + # Logger.debug(f"{userdata.status} waiting for {topic_resp} ... ") + time.sleep(SLEEP_TIME) + finally: + # 属性复位 + client_1.unsubscribe(topic_resp) + Logger.debug(f"移除订阅: {topic_resp}") + userdata.set_token(None) + userdata.set_status_remove("response") + Logger.debug(f"{speed_ms(s)}ms") + + class GetQrCodeService(BaseService): + """获取访客二维码 + + 1. 将接受的用户信息记录追加或更新到本地DB + 2. 根据时间戳和访客id生成一个token字符串,作为唯一识别id + 3. 将生成的二维码下置给指定设备 + 4. 将唯一识别id作为二维码字符串返回给平台 + """ + + def handle(self, msg: dict): + Logger.debug("Execute Get QrCode ...") + code = ErrorCode.SUCCESS + device_id = DeviceTable.get_device_id(self.table_handler, self.topic.split("/tml")[0].split("link/")[1]) + project_code = DeviceTable.get_project_code(self.table_handler, device_id) + gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code) + success_aiot_list = [] + aiot_ids = [] + qrcode = None + try: + # 根据时间戳和访客id生成一个token字符串,作为唯一识别id + qrcode = generate_time_token(msg["params"]["user_id"]) + # 将生成的二维码下置给指定设备 + aiot_ids = json.loads(msg["params"]["device_ids"]) + s = time.time() + for aiot_id in aiot_ids: # 一般情况 aiot_ids 里应该只有一个值 + # 查询对应aiot_id实际的美安设备id和订阅主题 + device_id = DeviceTable.get_device_id(self.table_handler, aiot_id) + if device_id is None: + continue + + # 将接受的用户信息记录追加或更新到本地DB + user_type = 0 if msg["serviceCode"] != "get_visitor_qrcode" else 1 # 业主0 访客1,身份类型在插入后就不会再被更新 + device_position_desc = RegisterTable.get_device_position_desc(self.table_handler, device_id) + if user_type == 1 or device_position_desc is None: + floor = 1 + else: + floor = get_owner_house_floor(msg["params"]["user_id"], project_code, device_position_desc) + user_info = UserInfo( + user_id=msg["params"]["user_id"], + device_id=device_id, + name=msg["params"]["name"], + user_type=user_type + ) + UserInfoTable.update(self.table_handler, user_info) + + topic, topic_resp = DeviceTable.get_device_topic(self.table_handler, device_id) + # 获取客户端 + client_0 = self.clients["center"][0] + userdata_0 = self.clients["center"][1] + userdata_0.set_status_add("status", False) + userdata_0.set_status_add("start_timestamp", time.time()) + try: + Logger.debug(f"subscribe topics: {topic_resp}") + # 构建消息体 + token = generate_token() # 正式运行应使用 generate_token() + userdata_0.set_token(token) + userdata_0.set_code("qrCodeDownState") + qrcode_json = json.dumps(meian_data_entity.QrCodeInfo( + dataType="qrCodeInfo", + deviceId=device_id, + token=token, + userId=qrcode, + qrCode=encode_to_base64(qrcode), + floor=floor + ).__dict__) + # 发布事件 + client_0.publish(topic, qrcode_json) + # 等待回传 + while True: + if userdata_0.status["status"]: + if "errorCode" in userdata_0.status["response"].keys(): + if userdata_0.status["response"]["errorCode"] != 0: + error_code = userdata_0.status['response']['errorCode'] + Logger.error(f"{topic_resp} 返回错误码: {error_code}, " + f"{meian_data_entity.error_code[error_code]}") + else: + Logger.info(f"设备 - {device_id} 二维码 - {qrcode} 完成下置") + UserInfoTable.update_qrcode(self.table_handler, + user_info.user_id, device_id, qrcode) + success_aiot_list.append(aiot_id) + break + if time.time() - userdata_0.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出 + Logger.error("等待回复超时") + break + # Logger.debug(f"{userdata_0.status} waiting for {topic_resp} ... ") + time.sleep(SLEEP_TIME) + finally: + userdata_0.set_token(None) + userdata_0.set_code(None) + userdata_0.set_status_remove("response") + Logger.debug(f"{speed_ms(s)}ms") + except Exception as e: + Logger.error(f"{type(e).__name__}, {e}") + if type(e).__name__ == "TypeError": + code = ErrorCode.INPUT_TYPE_ERROR + else: + code = ErrorCode.UNKNOWN + + # 将唯一识别id作为二维码字符串返回给平台 + # 获取客户端 + client_1 = self.clients[gateway_id][0] + userdata = self.clients[gateway_id][1] + userdata.set_status_add("status", False) + userdata.set_status_add("start_timestamp", time.time()) + try: + if code == ErrorCode.SUCCESS: + if set(success_aiot_list) == set(aiot_ids): # 全部完成下发 + Logger.debug("指令完成下发") + elif success_aiot_list: # 完成一部分下发 + code = ErrorCode.PART_SUCCESS + Logger.warn(f"部分成功下发: {success_aiot_list}") + UserInfoTable.update_qrcode(self.table_handler, msg["params"]["user_id"], project_code, qrcode) + elif len(success_aiot_list) == 0: # 一个都没下发 + code = ErrorCode.FAILURE + Logger.error("无指令完成下发") + aiot_topic = self.topic + "_resp" + return_json = json.dumps(yunfu_data_entity.EventPostResp( + messageId=msg["messageId"], + requestTime=msg["time"], + serviceCode=msg["serviceCode"], + data=yunfu_data_entity.QrCodeResp(code=code.value, message=code.message, qrcode=qrcode).__dict__ + ).__dict__) + client_1.publish(aiot_topic, return_json) + finally: + userdata.set_token(None) + userdata.set_status_remove("response") + + class AddFaceService(BaseService): + def handle(self, msg: dict): + Logger.debug("Execute Add Face ...") + code = ErrorCode.SUCCESS + device_id = DeviceTable.get_device_id(self.table_handler, self.topic.split("/tml")[0].split("link/")[1]) + project_code = DeviceTable.get_project_code(self.table_handler, device_id) + gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code) + success_aiot_list = [] + aiot_ids = json.loads(msg["params"]["device_ids"]) + user_id = msg["params"]["user_id"] + face_url = f"{msg['params']['face_url']}?x-oss-process=image/resize,m_pad,h_960,w_540,color_FFFFFF" + try: + # 将人脸下置给指定设备 + s = time.time() + for aiot_id in aiot_ids: + # 查询对应aiot_id实际的美安设备id和订阅主题 + device_id = DeviceTable.get_device_id(self.table_handler, aiot_id) + if device_id is None: + continue + + # 将接受的用户信息记录追加或更新到本地DB + user_info = UserInfo( + user_id=user_id, + device_id=device_id, + name=msg["params"]["name"], + user_type=0 + ) + UserInfoTable.update(self.table_handler, user_info) + + topic, topic_resp = DeviceTable.get_device_topic(self.table_handler, device_id) + # 获取客户端 + client_0 = self.clients["center"][0] + userdata_0 = self.clients["center"][1] + userdata_0.set_status_add("status", False) + userdata_0.set_status_add("start_timestamp", time.time()) + try: + Logger.debug(f"subscribe topics: {topic_resp}") + # 构建消息体 + token = generate_token() # 正式运行应使用 generate_token() + userdata_0.set_token(token) + userdata_0.set_code("faceDownState") + device_position_desc = RegisterTable.get_device_position_desc(self.table_handler, device_id) + if device_position_desc is None: + floor = 1 + else: + floor = get_owner_house_floor(user_id, project_code, device_position_desc) + face_json = json.dumps(meian_data_entity.FaceInfo( + dataType="faceInfo", + deviceId=device_id, + token=token, + userId=user_id, + faceUrl=face_url, + floor=floor + ).__dict__) + # 发布事件 + client_0.publish(topic, face_json) + Logger.debug(f"{face_json} ==> {topic}") + # 等待回传 + while True: + if userdata_0.status["status"]: + if "errorCode" in userdata_0.status["response"].keys(): + if userdata_0.status["response"]["errorCode"] != 0: + error_code = userdata_0.status['response']['errorCode'] + Logger.error(f"{topic_resp} 返回错误码: {error_code}, " + f"{meian_data_entity.error_code[error_code]}") + else: + Logger.info(f"设备 - {device_id} 用户({user_id}) 人脸完成下置") + UserInfoTable.update_face_url(self.table_handler, user_id, device_id, face_url) + success_aiot_list.append(aiot_id) + break + if time.time() - userdata_0.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出 + Logger.error("等待回复超时") + break + # Logger.debug(f"{userdata_0.status} waiting for {topic_resp} ... ") + time.sleep(SLEEP_TIME) + finally: + userdata_0.set_token(None) + userdata_0.set_code(None) + userdata_0.set_status_remove("response") + Logger.debug(f"{speed_ms(s)}ms") + except Exception as e: + Logger.error(f"{type(e).__name__}, {e}") + if type(e).__name__ == "TypeError": + code = ErrorCode.INPUT_TYPE_ERROR + else: + code = ErrorCode.UNKNOWN + + # 将下置结果返回给平台 + # 获取客户端 + client_1 = self.clients[gateway_id][0] + userdata = self.clients[gateway_id][1] + userdata.set_status_add("status", False) + userdata.set_status_add("start_timestamp", time.time()) + try: + if code == ErrorCode.SUCCESS: + if set(success_aiot_list) == set(aiot_ids): # 全部完成下发 + Logger.info("指令完成下发") + elif success_aiot_list: # 完成一部分下发 + code = ErrorCode.PART_SUCCESS + Logger.warn(f"部分成功下发: {success_aiot_list}") + else: # 一个都没下发 + code = ErrorCode.FAILURE + Logger.error("无指令完成下发") + aiot_topic = self.topic + "_resp" + return_json = json.dumps(yunfu_data_entity.EventPostResp( + messageId=msg["messageId"], + requestTime=msg["time"], + serviceCode=msg["serviceCode"], + data=yunfu_data_entity.AddFaceResp(code=code.value, message=code.message).__dict__ + ).__dict__) + client_1.publish(aiot_topic, return_json) + finally: + userdata.set_token(None) + userdata.set_status_remove("response") + + class DeleteFaceService(BaseService): + def handle(self, msg: dict): + Logger.debug("Execute Delete Face ...") + code = ErrorCode.SUCCESS + device_id = DeviceTable.get_device_id(self.table_handler, self.topic.split("/tml")[0].split("link/")[1]) + project_code = DeviceTable.get_project_code(self.table_handler, device_id) + gateway_id, _ = GatewayTable.get_gateway(self.table_handler, project_code) + success_aiot_list = [] + aiot_ids = json.loads(msg["params"]["device_ids"]) + user_id = msg["params"]["user_id"] + try: + s = time.time() + # 将人脸删除指令下发指定设备 + for aiot_id in aiot_ids: + # 查询对应aiot_id实际的美安设备id和订阅主题 + device_id = DeviceTable.get_device_id(self.table_handler, aiot_id) + if device_id is None: + continue + # 本地DB判断人脸信息是否存在 + if UserInfoTable.exists_face_url(self.table_handler, user_id, device_id): + topic, topic_resp = DeviceTable.get_device_topic(self.table_handler, device_id) + # 获取客户端 + client_0 = self.clients["center"][0] + userdata_0 = self.clients["center"][1] + userdata_0.set_status_add("status", False) + userdata_0.set_status_add("start_timestamp", time.time()) + try: + Logger.debug(f"subscribe topics: {topic_resp}") + # 构建消息体 + token = generate_token() # 正式运行应使用 generate_token() + userdata_0.set_token(token) + userdata_0.set_code("deleteUserState") + face_json = json.dumps(meian_data_entity.DeleteUser( + dataType="deleteUser", + deviceId=device_id, + token=token, + userId=user_id + ).__dict__) + # 发布事件 + client_0.publish(topic, face_json) + # 等待回传 + while True: + if userdata_0.status["status"]: + if "errorCode" in userdata_0.status["response"].keys(): + if userdata_0.status["response"]["errorCode"] != 0: + error_code = userdata_0.status['response']['errorCode'] + Logger.error(f"{topic_resp} 返回错误码: {error_code}, " + f"{meian_data_entity.error_code[error_code]}") + else: + Logger.info(f"设备 - {device_id} 移除用户({user_id})人脸") + UserInfoTable.update_face_url(self.table_handler, user_id, device_id, None) + success_aiot_list.append(aiot_id) + break + if time.time() - userdata_0.status["start_timestamp"] > TIMEOUT_SECOND: # 超时跳出 + Logger.error("等待回复超时") + break + # Logger.debug(f"{userdata_0.status} waiting for {topic_resp} ... ") + time.sleep(SLEEP_TIME) + finally: + userdata_0.set_token(None) + userdata_0.set_code(None) + userdata_0.set_status_remove("response") + else: + code = ErrorCode.NEVER_POST_FACE + Logger.warn(f"本地数据库中未发现下发过人脸信息,默认无操作") + Logger.debug(f"{speed_ms(s)}ms") + except Exception as e: + Logger.error(f"{type(e).__name__}, {e}") + if type(e).__name__ == "TypeError": + code = ErrorCode.INPUT_TYPE_ERROR + else: + code = ErrorCode.UNKNOWN + + # 将下置结果返回给平台 + # 获取客户端 + client_1 = self.clients[gateway_id][0] + userdata = self.clients[gateway_id][1] + userdata.set_status_add("status", False) + userdata.set_status_add("start_timestamp", time.time()) + try: + if code == ErrorCode.SUCCESS: + if set(success_aiot_list) == set(aiot_ids): # 全部完成下发 + Logger.info("指令完成下发") + elif success_aiot_list: # 完成一部分下发 + code = ErrorCode.PART_SUCCESS + Logger.warn(f"部分成功下发: {success_aiot_list}") + else: # 一个都没下发 + code = ErrorCode.FAILURE + Logger.error("无指令完成下发") + aiot_topic = self.topic + "_resp" + return_json = json.dumps(yunfu_data_entity.EventPostResp( + messageId=msg["messageId"], + requestTime=msg["time"], + serviceCode=msg["serviceCode"], + data=yunfu_data_entity.DelFaceResp(code=code.value, message=code.message).__dict__ + ).__dict__) + client_1.publish(aiot_topic, return_json) + finally: + userdata.set_token(None) + userdata.set_status_remove("response") + + +def auto_service(msg: dict, userdata: UserData): + """跟据dataType自动化加载不同的服务层""" + if "dataType" in msg.keys(): + entity_name = msg["dataType"][0].upper() + msg["dataType"][1:] + if entity_name in meian_data_entity.__dict__.keys(): + Logger.debug(f"Target service name: {entity_name}") + # 转为服务层数据实体 + entity_type = meian_data_entity.__dict__[entity_name] + msg_obj = to_obj(msg, entity_type) + # 构建服务层实体并进行调用 + service_name = entity_name + "Service" + if service_name in Services.__dict__.keys(): + servicer = Services.__dict__[service_name]() + servicer.set_meta(userdata) + servicer.handle(msg_obj) + elif "serviceCode" in msg.keys() and "params" in msg.keys(): # 校验是aiot平台服务调用下发 + service_map = { + "get_owner_qrcode": "GetQrCodeService", + "get_visitor_qrcode": "GetQrCodeService", + "add_face": "AddFaceService", + "del_face": "DeleteFaceService" + } + # 构建服务层实体并进行调用 + if msg["serviceCode"] in service_map.keys(): + service_name = service_map[msg["serviceCode"]] + servicer = Services.__dict__[service_name]() + servicer.set_meta(userdata) + servicer.handle(msg) + else: + pass diff --git a/meian/devices/meian_db.py b/meian/devices/meian_db.py new file mode 100644 index 0000000..f5bd694 --- /dev/null +++ b/meian/devices/meian_db.py @@ -0,0 +1,702 @@ +# -*- coding:utf-8 -*- +""" +@File : meian_db +@Author : xuxingchen +@Version : 1.0 +@Contact : xuxingchen@sinochem.com +@Desc : meian database crud +""" +import csv +import json +import os.path +import sqlite3 +import time +import traceback + +from logger import Logger, new_dc +from datetime import datetime +from devices.meian_model import HeartBeat, Register, PushRtAccessRecord +from devices.common_model import UserInfo +from utils import datetime_to_timestamp +from config import ENV_TYPE, DEBUG + + +class SQLiteDatabaseEngine: + def __init__(self, db_path: str = "demo.db") -> None: + self.sqlite3 = sqlite3 + self.db_path = db_path + self.connection = None + self.cursor = None + self.connect() + + def __del__(self): + # self.disconnect() + pass + + def connect(self): + """连接SQLite 数据库(如果数据库不存在则会自动创建)""" + self.connection = self.sqlite3.connect(self.db_path) + self.cursor = self.connection.cursor() + Logger.init(new_dc(f"🔗 SQLite - {self.db_path} has connect successfully! 🔗", "[1;32m")) + + def disconnect(self): + try: + self.cursor.close() + self.connection.close() + except Exception as e: + if type(e).__name__ != "ProgrammingError": + Logger.error(f"{type(e).__name__}, {e}") + Logger.info(new_dc(f"🔌 Disconnect from SQLite - {self.db_path}! 🔌", "[1m")) + + def exist(self, table_name): + self.cursor.execute( + f"SELECT name FROM sqlite_master WHERE type='table' AND name=?", + (table_name,), + ) + result = self.cursor.fetchone() + if result: + return True + else: + return False + + +class BaseTable: + def __init__(self, connection=None, cursor=None): + self.connection = connection + self.cursor = cursor + + def set(self, connection, cursor): + self.connection = connection + self.cursor = cursor + + def execute(self, sql: str, params: tuple = ()): + self.cursor.execute(sql, params) + self.connection.commit() + + def executemany(self, sql: str, params: list[tuple]): + self.cursor.executemany(sql, params) + self.connection.commit() + + def query(self, sql: str, params: tuple = ()): + self.cursor.execute(sql, params) + + +class DeviceTable(BaseTable): + @staticmethod + def check(table_handler: BaseTable): + """检测是否存在当前表,并根据devices.csv开始数据初始化""" + table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='device'") + if table_handler.cursor.fetchone() is None: + table_handler.execute( + f""" + CREATE TABLE device ( + device_id TEXT, + project_code TEXT, + subscribe_topic TEXT, + publish_topic TEXT, + aiot_id TEXT NULL, + register_type INTEGER default 0, + PRIMARY KEY (device_id) + ) + """ + ) + init_config_path = os.path.join(os.path.dirname((os.path.abspath("__file__"))), "data", + "_devices.csv" if ENV_TYPE != 2 else "devices.csv") + if os.path.exists(init_config_path): + with open(init_config_path, newline='', encoding="utf8") as csvfile: + csvreader = csv.reader(csvfile) + head = next(csvreader) + data = [] + if len(head) == 5: + for row in csvreader: + register_type = 0 if row[4] == '' else 1 + data.append(tuple([i.strip() for i in row] + [register_type])) + + table_handler.executemany( + f""" + INSERT INTO device + (project_code, device_id, subscribe_topic, publish_topic, aiot_id, register_type) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT (device_id) DO NOTHING + """, + data + ) + + @staticmethod + def get_topics(table_handler): + table_handler.query( + """ + SELECT ( + SELECT json_group_array(subscribe_topic) + FROM ( + SELECT DISTINCT subscribe_topic + FROM device + ) + ) AS subscribe_topics, + ( + SELECT json_group_array(publish_topic) + FROM ( + SELECT DISTINCT publish_topic + FROM device + ) + ) AS publish_topics; + """ + ) + res = table_handler.cursor.fetchall() + if res: + return res[0] + else: + return None + + @staticmethod + def check_device_id(table_handler, topic, device_id): + """device in `publish_topic` and `device_id` column""" + table_handler.query( + f""" + SELECT DISTINCT device_id from device where publish_topic = '{topic}' and device_id = '{device_id}' + """ + ) + if len(table_handler.cursor.fetchall()) > 0: + return True + else: + return False + + @staticmethod + def get_device_topic(table_handler, device_id): + """获取对应设备的订阅主题""" + table_handler.query( + f""" + SELECT subscribe_topic, publish_topic from device where device_id = '{device_id}' + """ + ) + res = table_handler.cursor.fetchall() + if res: + return res[0] + else: + return None + + @staticmethod + def get_device_register_type(table_handler, device_id): + table_handler.query( + f""" + SELECT register_type from device where device_id = '{device_id}' + """ + ) + res = table_handler.cursor.fetchall() + if res and res[0][0] == 1: + return True + else: + return False + + @staticmethod + def get_aiot_id(table_handler, device_id): + table_handler.query( + f""" + SELECT aiot_id from device where device_id = '{device_id}' and register_type = 1 + """ + ) + res = table_handler.cursor.fetchall() + if res: + return res[0][0] + else: + return None + + @staticmethod + def get_device_id(table_handler, aiot_id): + table_handler.query( + f""" + SELECT device_id from device where aiot_id = '{aiot_id}' and register_type = 1 + """ + ) + res = table_handler.cursor.fetchall() + if res: + return res[0][0] + else: + return None + + @staticmethod + def get_device_ids(table_handler): + """获取所有设备的ID""" + table_handler.query( + f""" + SELECT device_id from device + """ + ) + res = table_handler.cursor.fetchall() + if res: + return res + else: + return None + + @staticmethod + def get_project_code(table_handler, device_id): + table_handler.query( + f""" + SELECT project_code from device where device_id = '{device_id}' + """ + ) + res = table_handler.cursor.fetchall() + if res: + return res[0][0] + else: + return None + + @staticmethod + def update_aiot_id(table_handler: BaseTable, device_id, aiot_id): + table_handler.execute( + f""" + UPDATE device SET aiot_id = '{aiot_id}', register_type = 1 WHERE device_id = '{device_id}' + """ + ) + + +class GatewayTable(BaseTable): + @staticmethod + def check(table_handler: BaseTable): + """检测是否存在当前表,并根据gateway.json开始数据初始化""" + table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='gateway'") + if table_handler.cursor.fetchone() is None: + table_handler.execute( + f""" + CREATE TABLE gateway ( + gateway_id TEXT NULL, + project_code TEXT, + gateway_sct TEXT NULL, + register_type INTEGER default 0, + PRIMARY KEY (gateway_id) + ) + """ + ) + init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data", + "_gateways.json" if ENV_TYPE != 2 else "gateways.json") + if os.path.exists(init_config_path): + with open(init_config_path, "r", encoding="utf8") as f: + try: + gateway_config = json.load(f) + data = [] + for project_code, gateway_info in gateway_config.items(): + data.append((project_code, gateway_info[0], gateway_info[1], 1)) + table_handler.executemany( + f""" + INSERT INTO gateway (project_code, gateway_id, gateway_sct, register_type) VALUES (?, ?, ?, ?) + ON CONFLICT (gateway_id) DO NOTHING + """, + data + ) + except Exception as e: + Logger.error(f"{type(e).__name__}, {e}") + if DEBUG: + traceback.print_exc() + + @staticmethod + def get_gateway(table_handler, project_code): + table_handler.query( + f""" + SELECT gateway_id, gateway_sct from gateway where project_code = '{project_code}' + """ + ) + res = table_handler.cursor.fetchall() + if res: + return res[0] + else: + return None, None + + @staticmethod + def get_registered_gateway(table_handler): + table_handler.query( + f""" + SELECT gateway_id, gateway_sct from gateway where register_type = 1 + """ + ) + res = table_handler.cursor.fetchall() + if res: + return res + else: + return None + + @staticmethod + def get_project_code_list(table_handler): + table_handler.query( + f""" + SELECT DISTINCT project_code from gateway + """ + ) + res = table_handler.cursor.fetchall() + if res: + return res + else: + return [] + + @staticmethod + def get_registered_sub_aiot_id(table_handler, gateway_id): + """查询已注册网关下的所有子设备在aiot平台的设备id""" + table_handler.query( + f""" + SELECT json_group_array(aiot_id) FROM device WHERE project_code IN + (SELECT DISTINCT project_code FROM gateway + WHERE register_type = 1 and gateway_id = '{gateway_id}') + AND register_type = 1 + """ + ) + res = table_handler.cursor.fetchall() + if res: + return res[0] + else: + return None + + +class HeartBeatTable(BaseTable): + @staticmethod + def check(table_handler: BaseTable): + table_handler.execute( + """ + CREATE TABLE IF NOT EXISTS heart_beat ( + device_id TEXT, + factory_id TEXT, + last_heart_beat TEXT, + PRIMARY KEY (device_id) + ) + """ + ) + + @staticmethod + def delete(table_handler: BaseTable, obj): + table_handler.execute("DELETE FROM heart_beat WHERE device_id=?", (obj.device_id,)) + + @staticmethod + def update(table_handler: BaseTable, obj: HeartBeat, topic: str): + if DeviceTable.check_device_id(table_handler, topic, obj.device_id): + time_stamp = str(int(time.time())) + table_handler.execute( + """ + INSERT INTO heart_beat (device_id, factory_id, last_heart_beat) + VALUES (?, ?, ?) + ON CONFLICT (device_id) + DO UPDATE SET + last_heart_beat=? + """, + (obj.device_id, obj.factory_id, time_stamp, time_stamp), + ) + return True + else: + if DEBUG: + Logger.warn( + f"device_id - {obj.device_id} is invalid in {topic}, operation was not performed" + ) + return False + + @staticmethod + def get_last_time(table_handler: BaseTable, device_id): + table_handler.query( + """ + SELECT last_heart_beat + FROM heart_beat + WHERE device_id = ? + """, + (device_id,), + ) + res = table_handler.cursor.fetchall() + if res: + return res[0][0] + else: + return None + + +class RegisterTable(BaseTable): + @staticmethod + def check(table_handler: BaseTable): + table_handler.execute( + """ + CREATE TABLE IF NOT EXISTS register ( + device_id TEXT, + factory_id TEXT, + device_type TEXT, + device_position_code TEXT, + device_position_desc TEXT, + last_register_timestamp TEXT, + PRIMARY KEY (device_id) + ) + """ + ) + + def drop(self): + self.execute("drop table register") + + def delete(self, obj): + self.execute( + "DELETE FROM register WHERE device_id=? and factory_id=?", + ( + obj.device_id, + obj.factory_id, + ), + ) + + def insert(self, obj, topic: str): + if DeviceTable.check_device_id(self, topic, obj.device_id): + time_stamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + self.execute( + """ + INSERT INTO register + (device_id, factory_id, device_type, device_position_code, device_position_desc, + last_register_timestamp) + VALUES (?, ?, ?, ?, ?, ?) + """, + ( + obj.device_id, + obj.factory_id, + obj.device_type, + obj.device_position_code, + obj.device_position_desc, + time_stamp, + ), + ) + return True + else: + if DEBUG: + Logger.warn( + f"device_id - {obj.device_id} is invalid in {topic}, operation was not performed" + ) + return False + + @staticmethod + def update(table_handler, obj: Register, topic: str): + if DeviceTable.check_device_id(table_handler, topic, obj.device_id): + time_stamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + table_handler.execute( + """ + INSERT INTO register (device_id, factory_id, device_type, device_position_code, device_position_desc, + last_register_timestamp) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT (device_id) + DO UPDATE SET + device_type=?, device_position_code=?, device_position_desc=?, last_register_timestamp=? + """, + ( + obj.device_id, + obj.factory_id, + obj.device_type, + obj.device_position_code, + obj.device_position_desc, + time_stamp, + obj.device_type, + obj.device_position_code, + obj.device_position_desc, + time_stamp, + ), + ) + return True + else: + if DEBUG: + Logger.warn( + f"device_id - {obj.device_id} is invalid in {topic}, operation was not performed" + ) + return False + + @staticmethod + def get_device_position_desc(table_handler: BaseTable, device_id: str): + """根据device_id获取设备的空间描述信息""" + table_handler.query( + """ + SELECT device_position_desc + FROM register + WHERE device_id = ? + """, + (device_id,), + ) + res = table_handler.cursor.fetchall() + if res: + return res[0][0] + else: + return None + + +class UserInfoTable(BaseTable): + @staticmethod + def check(table_handler: BaseTable): + table_handler.query("SELECT name FROM sqlite_master WHERE type='table' AND name='user_info'") + if table_handler.cursor.fetchone() is None: + table_handler.execute( + f""" + CREATE TABLE user_info ( + user_id TEXT, + device_id TEXT, + name TEXT, + user_type INTEGER, + qrcode TEXT NULL, + face_url TEXT NULL, + create_timestamp TEXT, + update_timestamp TEXT, + PRIMARY KEY (user_id, device_id) + ) + """ + ) + table_handler.execute( + f""" + CREATE INDEX idx_user_info_qrcode ON user_info(qrcode); + """ + ) + init_config_path = os.path.join(os.path.dirname(os.path.abspath("__file__")), "data", + "_users.csv" if ENV_TYPE != 2 else "users.csv") + if os.path.exists(init_config_path): + with open(init_config_path, newline='', encoding="utf8") as csvfile: + csvreader = csv.reader(csvfile) + head = next(csvreader) + data = [] + if len(head) == 4: + for row in csvreader: + user_id = row[0].strip() + name = row[1].strip() + user_type = 0 if row[2].strip() == "业主" else 1 + timestamp = str(int(time.time() * 1000)) + device_id = row[3].strip() + data.append((user_id, name, user_type, timestamp, timestamp, device_id)) + table_handler.executemany( + f""" + INSERT INTO user_info + (user_id, name, user_type, create_timestamp, update_timestamp, device_id) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT (user_id, device_id) DO NOTHING + """, + data + ) + elif len(head) == 5: + for row in csvreader: + user_id = row[0].strip() + name = row[1].strip() + user_type = 0 if row[2].strip() == "业主" else 1 + timestamp = str(int(time.time() * 1000)) + device_id = row[3].strip() + face_url = row[4].strip() + data.append((user_id, name, user_type, timestamp, timestamp, device_id, face_url)) + table_handler.executemany( + f""" + INSERT INTO user_info + (user_id, name, user_type, create_timestamp, update_timestamp, device_id, face_url) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (user_id, device_id) DO NOTHING + """, + data + ) + + @staticmethod + def delete_redundancy_1(table_handler, timestamp): + """用于在每天清理无效的冗余访客数据""" + table_handler.execute(f"DELETE FROM user_info WHERE user_type=1 and update_timestamp < {timestamp}") + + @staticmethod + def delete_redundancy_0(table_handler, timestamp): + """用于在每天清理无效的冗余业主数据""" + table_handler.execute(f"DELETE FROM user_info WHERE user_type=0 " + f"and (face_url is NULL or face_url = '') and update_timestamp < {timestamp}") + + @staticmethod + def update(table_handler, obj: UserInfo): + time_stamp = str(int(time.time() * 1000)) + table_handler.execute( + """ + INSERT INTO user_info (user_id, name, user_type, create_timestamp, update_timestamp, device_id) + VALUES (?, ?, ?, ?, ?, ?) + ON CONFLICT (user_id, device_id) + DO UPDATE SET + name=?, update_timestamp=? + """, + (obj.user_id, obj.name, obj.user_type, time_stamp, time_stamp, obj.device_id, + obj.name, time_stamp), + ) + + @staticmethod + def select_all(table_handler): + table_handler.execute("SELECT * FROM user_info") + return table_handler.cursor.fetchall() + + @staticmethod + def get_name(table_handler, user_id, device_id): + table_handler.query( + """ + SELECT name + FROM user_info + WHERE user_id = ? AND device_id = ? + """, + (user_id, device_id), + ) + res = table_handler.cursor.fetchall() + if res: + return res[0][0] + else: + return None + + @staticmethod + def get_user_by_qrcode(table_handler, qrcode, device_id): + table_handler.query( + """ + SELECT user_id, name + FROM user_info + WHERE qrcode = ? AND device_id = ? + """, + (qrcode, device_id), + ) + res = table_handler.cursor.fetchall() + if res: + return res[0] + else: + return None + + @staticmethod + def update_qrcode(table_handler, user_id, device_id, qrcode): + time_stamp = str(int(time.time() * 1000)) + table_handler.execute( + f""" + UPDATE user_info SET qrcode = ?, update_timestamp = ? WHERE user_id = ? and device_id = ? + """, + (qrcode, time_stamp, user_id, device_id) + ) + + @staticmethod + def update_face_url(table_handler, user_id, device_id, face_url): + time_stamp = str(int(time.time() * 1000)) + table_handler.execute( + f""" + UPDATE user_info SET face_url = ?, update_timestamp = ? WHERE user_id = ? and device_id = ? + """, + (face_url, time_stamp, user_id, device_id) + ) + + @staticmethod + def exists_face_url(table_handler, user_id, device_id): + """判断人脸地址是否存在且不为空""" + table_handler.query( + f""" + SELECT face_url FROM user_info WHERE user_id = ? AND device_id = ? + AND face_url IS NOT NULL AND face_url != '' + """, + (user_id, device_id) + ) + res = table_handler.cursor.fetchall() + if res and len(res[0]) > 0: + return True + else: + return False + + +class RecordTable(BaseTable): + @staticmethod + def check(table_handler: BaseTable): + table_handler.execute( + """ + CREATE TABLE IF NOT EXISTS record ( + user_id TEXT, + device_id TEXT, + record_datetime TEXT + ) + """ + ) + + @staticmethod + def add(table_handler: BaseTable, obj: PushRtAccessRecord): + table_handler.execute( + """ + INSERT INTO record (user_id, device_id, record_datetime) + VALUES (?, ?, ?) + """, + (obj.user_id, obj.device_id, str(datetime_to_timestamp(obj.time))), + ) diff --git a/meian/devices/meian_model.py b/meian/devices/meian_model.py new file mode 100644 index 0000000..0bc4d05 --- /dev/null +++ b/meian/devices/meian_model.py @@ -0,0 +1,98 @@ +# -*- coding:utf-8 -*- +""" +@File : meian_model +@Author : xuxingchen +@Version : 1.0 +@Contact : xuxingchen@sinochem.com +@Desc : Data Entity +""" + +from pydantic import BaseModel + +# 错误码 +error_code = { + 0: "成功", + -1: "图片下载超时", + -2: "图片下载失败", + -3: "用户ID不存在", + -4: "参数无效", + -5: "注册失败", + -6: "用户ID已存在", + -7: "无效人脸", + -8: "内部错误", + -9: "图片解码失败,图像尺寸宽高像素应为540*960", + -10: "添加人脸失败", + -11: "人脸图像无特征", + -12: "查询离线通行记录失败", + -13: "设备忙", + -14: "rf_id_not_exist", + -15: "rf_id_existed", + -16: "add_rf_id_failed or rm_rf_id_failed", + -17: "get_rf_id_reader_mode_failed", + -18: "set_rf_id_reader_mode_failed", + -19: "添加人脸操作不支持", +} + + +class BaseInfo(BaseModel): + data_type: str = None + + def check(self): + for attr in self.__dict__.keys(): + # if property can be null, default value should not be set to None + if self.__dict__[attr] is None: + raise ValueError(f"{attr} not allowed to be set to None") + + +class BaseRequest(BaseInfo): + device_id: str = None + token: str = None + + +class BaseResponse(BaseInfo): + factory_id: str = None + error_code: int = None + token: str = None + + +class HeartBeat(BaseInfo): + device_id: str = None + factory_id: str = None + + +class PushRtAccessRecord(HeartBeat): + time: str = None + user_id: str = None + access_mode: str = None + + +class Register(BaseRequest): + factory_id: str = None # 厂商唯一标识 + device_type: int = None # 0:面板机,1:梯控机 + device_position_code: str = None + device_position_desc: str = None + + +class FaceInfo(BaseModel): + dataType: str + deviceId: str + token: str + userId: str + faceUrl: str + floor: int + + +class QrCodeInfo(BaseModel): + dataType: str + deviceId: str + token: str + userId: str + qrCode: str + floor: int + + +class DeleteUser(BaseModel): + dataType: str + deviceId: str + token: str + userId: str diff --git a/meian/devices/yunfu_model.py b/meian/devices/yunfu_model.py new file mode 100644 index 0000000..2e738c9 --- /dev/null +++ b/meian/devices/yunfu_model.py @@ -0,0 +1,137 @@ +# -*- coding:utf-8 -*- +""" +@File : yunfu_model +@Author : xuxingchen +@Version : 1.0.1 +@Contact : xuxingchen@sinochem.com +@Desc : 云服的数据模型 +""" +import os +import time +from typing import Optional + +from pydantic import BaseModel, field_validator, Field +from enum import Enum + +from utils import get_sign + +error_code = { + 0: "成功", + 401: "参数错误,原因可能是:必填参数缺失或格式错误", + 402: "云端错误。请稍后再试", + 403: "mq解析失败", + 404: "发送kafka异常", + 602: "通过deviceUuid获取redis中DeviceInfoCacheVO缓存失败", + 40014: "设备不存在" +} + + +class MeianProduct: + if int(os.environ.get("ENV_TYPE", 0)) != 2: + id: str = "4bb9a56bfe9*********2ee99bac6d" + secret: str = "9b0159********abe57c10ea665ddd" + else: + id: str = "a2ce31a40b********230ccc860e" + secret: str = "03970a**********28b84e8437" + + +class OpenEventType(Enum): + # 此处的取名应当与美安设备的accessMode能够一一对应 + FACE = 8 + QRCODE = 10 + OFFLINE = 11 + + +class OpenEventCertificateType(Enum): + # 此处的取名应当与美安设备的accessMode能够一一对应 + FACE = 2 + PASSWORD = 3 + QRCODE = 4 + + +class BaseInfo(BaseModel): + def check(self): + for attr in self.__dict__.keys(): + # if property can be null, default value should not be set to None + if self.__dict__[attr] is None: + raise ValueError(f"{attr} not allowed to be set to None") + + +class CommRegister(BaseInfo): + messageId: str = None + version: str = "1.0" + time: int = int(time.time() * 1000) + params: dict = None + + +class EventPost(BaseInfo): + messageId: str = None + version: str = "1.0" + time: int = int(time.time() * 1000) + ext: dict = { + "ack": 1 + } + eventCode: str = None + params: dict = None + + +class EventPostResp(BaseInfo): + messageId: str + requestTime: int + time: int = int(time.time() * 1000) + code: int = 0 + message: str = "success" + serviceCode: str + data: dict = None + + +class OpenEvent(BaseInfo): + type: OpenEventType | int = None + user_picture: str = "default" + code: str = "default" + name: str = None + certificate_type: OpenEventCertificateType | int = None + certificate: str = None + result: int = 1 + error_code: int = 1 + enter_type: int = 1 + datetime: str = None + + +class RegisterParam(BaseInfo): + productId: str = MeianProduct.id + deviceName: str = None + displayName: str = None + sign: Optional[str] = Field(default=None, validate_default=True) + + @field_validator("sign") + def gen_sign(cls, value, values): + productId = values.data.get("productId") + deviceName = values.data.get("deviceName") + return get_sign(f"deviceName{deviceName}productId{productId}", MeianProduct.secret) + + +class Online(BaseInfo): + messageId: str + version: str = "1.0" + time: int = int(time.time() * 1000) + cleanSession: str = "true" + productModel: str = "default" + chipModel: str = "default" + otaVersion: str = "" + + +class QrCodeResp(BaseInfo): + code: int + message: str + qrcode: str + + +class AddFaceResp(BaseInfo): + code: int + message: str + + +class DelFaceResp(BaseInfo): + code: int + message: str diff --git a/meian/logger.py b/meian/logger.py new file mode 100644 index 0000000..a73e659 --- /dev/null +++ b/meian/logger.py @@ -0,0 +1,137 @@ +# -*- coding:utf-8 -*- +""" +@File : logger +@Author : xuxingchen +@Version : 1.0 +@Contact : xuxingchen@sinochem.com +@Desc : None +""" +import time + +DEBUG = None +LOGGER_PATH = None + + +def log(text: str, log_path: str = None): + """打印日志""" + log_path = log_path if log_path else LOGGER_PATH + log_line = '[{}] {}'.format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), text) + if log_path: + log = open(log_path, 'a', encoding='utf8') + log.write("{}\n".format(log_line)) + log.close() + print(log_line) + + +def log_plus(text: str, log_path: str = None, prefix_text: str = None, suffix_text: str = None): + """加强版打印日志,预置了不同文字颜色""" + log_path = log_path if log_path else LOGGER_PATH + if prefix_text: + log_line_start = f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}] {prefix_text}" + else: + log_line_start = f"[{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}]" + if suffix_text: + log_line = f"{log_line_start} {text} {suffix_text}" + else: + log_line = f"{log_line_start} {text}" + if log_path: + with open(log_path, 'a', encoding='utf8') as log: + log.write("{}\n".format(log_line)) + log.close() + print(log_line) + + +def new_dc(msgs: str | float | int, fore_color: str = "", back_color: str = "") -> str: + """给文本上色 + + fore_color格式如下: + [{显示方式};{前景色};{背景色}m + + 显示方式 + 0(默认值)、1(高亮)、22(非粗体)、4(下划线)、24(非下划线)、 5(闪烁)、25(非闪烁)、7(反显)、27(非反显) + 前景色 + 30(黑色)、31(红色)、32(绿色)、 33(黄色)、34(蓝色)、35(洋 红)、36(青色)、37(白色) + 背景色 + 40(黑色)、41(红色)、42(绿色)、 43(黄色)、44(蓝色)、45(洋 红)、46(青色)、47(白色) + 如: + 高亮绿色红色背景 [1;32;41m + 默认-绿字体 [32m + """ + if fore_color == "": + fore_color = '[32m' # 默认绿色 + return "\033{}{}{}\033[0m".format(back_color, fore_color, str(msgs)) + + +def log_speed_ms(start_time: float, suffix: str = "", prefix: str = "", number_color: str = "", decimal: int = 4): + """控制台打印消耗的毫秒时间""" + log(f"{suffix}用时:{new_dc(str(round((time.time() - start_time) * 1000, decimal)), number_color)}ms{prefix}") + + +def speed_ms(start_time: float, decimal: int = 4): + """消耗的毫秒时间""" + return round((time.time() - start_time) * 1000, decimal) + + +class Logger: + """对控制台日志输出的二次封装,对部分常用日志进行预置""" + + @staticmethod + def debug(text: str, log_path: str = None): + """预置的debug形式的log""" + log_path = log_path if log_path else LOGGER_PATH + if isinstance(DEBUG, bool) and DEBUG: + log_plus(text, log_path, f"[{new_dc('INFO-DEBUG', '[34m')}]") + + @staticmethod + def info(text: str, log_path: str = None): + """预置的info形式的log""" + log_path = log_path if log_path else LOGGER_PATH + log_plus(text, log_path, f"[{new_dc('INFO', '[34m')}]") + + @staticmethod + def error(text: str, log_path: str = None): + """预置的error形式的log""" + log_path = log_path if log_path else LOGGER_PATH + log_plus(text, log_path, f"[{new_dc('ERROR', '[1;31m')}]") + + @staticmethod + def warn(text: str, log_path: str = None): + """预置的warn形式的log""" + log_path = log_path if log_path else LOGGER_PATH + log_plus(text, log_path, f"[{new_dc('WARN', '[1;33m')}]") + + @staticmethod + def init(text: str, log_path: str = None): + """预置的error形式的log""" + log_path = log_path if log_path else LOGGER_PATH + log_plus(text, log_path, f"[{new_dc('INIT', '[35m')}]") + + @staticmethod + def title(text: str, log_path: str = None): + """预置title形式的显目log""" + log_path = log_path if log_path else LOGGER_PATH + log_plus(new_dc(text, '[1m'), log_path, "🚀", "🚀") + + @staticmethod + def complete(text: str, log_path: str = None): + """预置complete形式的显目log""" + log_path = log_path if log_path else LOGGER_PATH + log_plus(new_dc(text, '[1;32m'), log_path, "✅", "✅") + + @staticmethod + def remove(text: str, log_path: str = None): + """预置remove形式的显目log""" + log_path = log_path if log_path else LOGGER_PATH + log_plus(new_dc(text, '[1m'), log_path, "🚮", "🚮") + + @staticmethod + def connect(text: str, log_path: str = None): + """预置connect形式的显目log""" + log_path = log_path if log_path else LOGGER_PATH + log_plus(new_dc(text, '[1;32m'), log_path, "🔗", "🔗") + + @staticmethod + def disconnect(text: str, log_path: str = None): + """预置disconnect形式的显目log""" + log_path = log_path if log_path else LOGGER_PATH + log_plus(new_dc(text, '[1m'), log_path, "🚫", "🚫") diff --git a/meian/misc.py b/meian/misc.py new file mode 100644 index 0000000..3ecabb8 --- /dev/null +++ b/meian/misc.py @@ -0,0 +1,48 @@ +# -*- coding:utf-8 -*- +""" +@Author : xuxingchen +@Contact : xuxingchen@sinochem.com +@Desc : some misc +""" +import ssl + +import paho.mqtt.client as mqtt + +from utils import get_ssl_certificate +from devices.common_model import UserData + + +def create_mqtt_client(broker_host, + broker_port, + userdata: UserData, + on_message=None, + on_connect=None, + on_disconnect=None, + client_id: str = "", + username: str = "", + password: str = "", + ssl_flag: bool = False): + if client_id != "": + client = mqtt.Client(client_id=client_id) + else: + client = mqtt.Client() + client.user_data_set(userdata) + if on_connect: + client.on_connect = on_connect + if on_disconnect: + client.on_disconnect = on_disconnect + if on_message: + client.on_message = on_message + if ssl_flag: + ssl_crt = get_ssl_certificate(broker_host, broker_port) + tls_context = ssl.create_default_context() + tls_context.verify_mode = ssl.CERT_REQUIRED + tls_context.load_verify_locations(cadata=ssl_crt) + try: + client.tls_set_context(context=tls_context) + client.tls_insecure_set(False) + except ValueError: + pass + client.username_pw_set(username, password) + client.connect(broker_host, broker_port) + return client diff --git a/meian/readme.md b/meian/readme.md new file mode 100644 index 0000000..07e4e01 --- /dev/null +++ b/meian/readme.md @@ -0,0 +1,63 @@ +# 初始化配置及持久化文件 + +> - `./data/gateway{}.json` 手动维护, 存储已经完成注册上线虚拟网关信息 +> - `./data/devices.csv` 手动维护, 存储每个设备的所属项目编码、设备ID以及人为设置的主题信息 +> - `./data/users.csv` 手动维护, 存储初始的用户信息 +> - `./data/meian.db` 自动维护, 存储程序初始化及运行过程中的数据 + +# 可拓展性 + +> devices.meian_model +> - 完成创建的数据模型,可拓展更多的数据模型 +> - 不同体系的数据模型应当存储在不同的文件中,如{factory}_model +> - 每个用于转换的数据模型都应该继承一个具有check函数的BaseInfo类,用于校验数据的有效性 + +> devices.meian_db +> - 实现与数据库的对接逻辑,可拓展不同表的处理逻辑 +> - 每个表class应当继承BaseTable再进行个性化拓展 +> - 在完成拓展后需要在TableHandle进行路由逻辑补充 + +> devices.meian_service +> - 业务逻辑层,可拓展不同的业务处理逻辑 +> - 拓展的每个服务应当最追加到Services中,并对BaseService进行继承,实现handle方法,实现拿到数据后的**处理逻辑** +> - 完成拓展后,若对BaseService进行了修改则需在auto_service中进行路由逻辑补充 + +# 生产部署手册 + +**注意事项:** +> - 完成 devices.csv 的信息收集 +> - 根据 devices.csv 的项目编码一一手动完成网关设备的注册上线并记录返回的ID与token保存存到gateway.json中 +> - 若有初始化人员信息需求则补充users.csv,若无则不做操作或删除该文件 +> - 每次重启需保证meian.db的数据备份,保持其唯一性 + +环境变量 + +| 环境变量 | 参考值 | 备注 | +|----------------------|-------------------------------------|--------------------------------------| +| ENV_TYPE | 0: local;1: test-svc;2: product-svc | 用于控制环境变量的动态选择,可统一控制以下所有变量,但优先级低于环境变量 | +| LOGGER_DEBUG | False\True | 用于控制日志的丰富程度 | +| BROKER_0_HOST, | iot-broker-test.*******.com | 云端中转mqtt的访问地址 | +| BROKER_0_PORT, | 31883 | 云端中转mqtt的请求端口 | +| BROKER_0_API_PORT, | 38083 | 云端中转mqtt的api请求端口 | +| BROKER_0_USERNAME, | center@meian | 云端中转mqtt的mqtt账号 | +| BROKER_0_PASSWD, | 28b8db7******a5f2 | 云端中转mqtt的mqtt密码 | +| BROKER_0_API_KEY, | fe705******b707 | 云端中转mqtt的api请求key | +| BROKER_0_API_SECRET, | yNsTO21******iRHByI | 云端中转mqtt的api请求secret | +| BROKER_1_HOST, | iot-broker-test.*******.com | 云服aiot平台的mqtt请求地址 | +| BROKER_1_PORT, | 1883 | 云服aiot平台的mqtt请求端口 | + +# 常用查询sql + +```sql +-- 查询长时间离线的设备信息 +select heart_beat.device_id, device_position_desc, last_heart_beat from heart_beat LEFT JOIN register ON register.device_id=heart_beat.device_id LEFT JOIN device ON device.device_id=heart_beat.device_id where project_code = 'WF-TYJMY' AND last_heart_beat < '1720576800'; + +-- 查询某个项目的设备注册状态 +select * from device where project_code = 'NSJMW'; +``` + +# 常用日志过滤 + +```bash +tail -f data/run.log |grep -v "heartBeat\|心跳\|上线阈值\|新数据\|HeartBeat\|移除订阅\|ms\|subscribe" +``` \ No newline at end of file diff --git a/meian/requirements.txt b/meian/requirements.txt new file mode 100644 index 0000000..d15c2e1 --- /dev/null +++ b/meian/requirements.txt @@ -0,0 +1,3 @@ +paho-mqtt==1.6.1 +pydantic==2.5.2 +requests \ No newline at end of file diff --git a/meian/sources.list.bullseye b/meian/sources.list.bullseye new file mode 100644 index 0000000..f105887 --- /dev/null +++ b/meian/sources.list.bullseye @@ -0,0 +1,4 @@ +deb https://mirrors.aliyun.com/debian/ bullseye main non-free contrib +# deb https://mirrors.aliyun.com/debian-security/ bullseye-security main +# deb https://mirrors.aliyun.com/debian/ bullseye-updates main non-free contrib +# deb https://mirrors.aliyun.com/debian/ bullseye-backports main non-free contrib \ No newline at end of file diff --git a/meian/utils.py b/meian/utils.py new file mode 100644 index 0000000..83257aa --- /dev/null +++ b/meian/utils.py @@ -0,0 +1,175 @@ +# -*- coding:utf-8 -*- +""" +@File : utils.py +@Author : xuxingchen +@Version : 1.0 +@Contact : xuxingchen@sinochem.com +@Desc : None +""" +import hashlib +from datetime import datetime +import hmac +import base64 +import re +import ssl +import string +import time +import uuid +import random +import socket + + +def extract_number(text: str) -> str: + """从描述文字中提取数据信息""" + number_match = re.search(r'(\d+)', text) + number = number_match.group(1) if number_match else "" + return number + + +def extract_building_unit(text: str) -> tuple[str, str]: + """从空间描述文字中提取从楼栋和单元信息""" + # 提取楼栋数字 + building_match = re.search(r'(\d+)号楼', text) + building_number = building_match.group(1).zfill(2) if building_match else "" + # 提取单元数字 + unit_match = re.search(r'(\d+)单元', text) + unit_number = unit_match.group(1).zfill(2) if unit_match else "" + return building_number, unit_number + + +def get_sign(data, key): + hmac_md5 = hmac.new(key.encode('utf-8'), data.encode("utf8"), "MD5") + return hmac_md5.hexdigest() + + +def encode_to_base64(data: str) -> str: + """将 字符串 编码为 base64 字符串""" + encoded_data = base64.b64encode(data.encode("utf8")) + return encoded_data.decode("utf8") + + +def to_snake_case(name: str) -> str: + """将驼峰命名转换为蛇命名""" + s1 = re.sub('([a-z])([A-Z])', r'\1_\2', name) + return re.sub('([A-Z0-9]+)([A-Z])', r'\1_\2', s1).lower() + + +def to_obj(data: dict, obj_class: type): + """将字典数据转换为dataclass对象""" + obj = obj_class() + for key, value in data.items(): + attr_name = to_snake_case(key) + if hasattr(obj, attr_name): + setattr(obj, attr_name, value) + obj.check() + return obj + + +def generate_token(): + token_part = str(uuid.uuid4()).replace('-', '') + timestamp_part = str(int(time.time())) + token = token_part[:16] + timestamp_part + token_part[16:] + return token + + +def generate_time_token(data: str): + timestamp = str(int(time.time())) + data = timestamp + data + # 对连接后的字符串进行哈希 + token = hashlib.sha256(data.encode()).hexdigest() + return token[:32] + + +def generate_random_string(length: int = 6): + characters = string.ascii_letters + string.digits + random_string = ''.join(random.choice(characters) for _ in range(length)) + return random_string + + +def get_ssl_certificate(domain: str, port: int): + # 建立一个加密连接 + context = ssl.create_default_context() + with socket.create_connection((domain, port)) as sock: + with context.wrap_socket(sock, server_hostname=domain) as ssock: + # 获取证书 + cert = ssl.DER_cert_to_PEM_cert(ssock.getpeercert(True)) + return cert + + +def save_certificate_to_file(cert, filename): + with open(filename, 'w') as file: + file.write(cert) + + +def datetime_to_timestamp(datetime_str): + # 将日期时间字符串解析为 datetime 对象 + dt = datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S') + # 将 datetime 对象转换为指定格式的字符串 + formatted_dt = dt.strftime('%Y-%m-%dT%H:%M:%S.000Z') + return formatted_dt + + +def encrypt_string(data: str, key: str): + # 待加密数据和加密密钥转为字节 + data_bytes = data.encode('utf-8') + key_bytes = key.encode('utf-8') + # 使用key_bytes对data_bytes每个字节按位异或 + encrypted_data = bytearray() + for i in range(len(data_bytes)): + encrypted_data.append(data_bytes[i] ^ key_bytes[i % len(key_bytes)]) + # 使用Base64编码以便于存储和传输 + return base64.b64encode(encrypted_data).decode('utf-8') + + +def decrypt_string(data: str, key: str): + # 使用Base64解码获取原始加密数据 + encrypted_data = base64.b64decode(data) + key_bytes = key.encode('utf-8') + # 使用key_bytes对encrypted_data每个字节按位异或 解密 + decrypted_data = bytearray() + for i in range(len(encrypted_data)): + decrypted_data.append(encrypted_data[i] ^ key_bytes[i % len(key_bytes)]) + return decrypted_data.decode('utf-8') + + +if __name__ == '__main__': + # 定义密钥和要计算哈希的消息 + # pid = "4bb9a56bf*********e99bac6d" # sub + # psc = "9b0159706*********0ea665ddd" # sub + pid = "ef8dba7*********42938e04" + psc = "b0b6114771***********b861c58bf7c" + deviceName = "SKHZJMF" + message = f'deviceName{deviceName}productId{pid}' + print(get_sign(message, psc)) + print(time.time() * 1000) + + encrypt_number = encrypt_string("18136222222", "meian") + print(encrypt_number) + print(decrypt_string(encrypt_number, "meian")) + + from devices.meian_model import Register + + register = to_obj({ + "dataType": "register", + "deviceId": "ma17qt7k", + "devicePositionCode": "YFCS", + "devicePositionDesc": "\u4e91\u670d\u6d4b\u8bd5", + "deviceType": 1, + "factoryId": "MeianTech", + "token": "1" + }, Register) + + print(encode_to_base64("hello")) + + print(register) + print(register.__dict__) + + print("\u4e91\u670d\u6d4b\u8bd5") + + print(generate_token()) + print(generate_time_token("langzihan")) + + print(datetime_to_timestamp("2024-04-08 15:19:55")) + + # cert = get_ssl_certificate("iot-broker-test.*********.com", 38883) + # save_certificate_to_file(cert, "*********.crt")