项目需求
平台通过http下发json格式的消息
程序收到消息后 根据日期时间戳去对应的目录检索是否符合要求的文件 比如某个时间段
检索到之后 将视频片段拼接成一个完整的视频文件
没有的话 根据时间戳判断是否需要等待或者 返回没有检索到
头文件
#ifndef QMCY_LED_H
#define QMCY_LED_H
#include <iostream>
#include <atomic>
#include<vector>
#include<unordered_map>
#include<map>
#include<tuple>
#include <thread>
#include <memory>
#include <mutex>
#include <vector>
#include <semaphore.h>
#include <signal.h>
#include <condition_variable>
#include "zlog.h"
#include "httplib.h"
#include "json.hpp"
#include "BlockQueue.hpp"
#include "BlackBox.h"
#define HTTP_PORT 8081
#define HTTP_IP "0.0.0.0"
#define MAX_THREADS 2
#define MAX_QUEUE_SIZE 4
#define DEVICE_VERSION "VIDEO FILETER 1.0"
#define HTTP_SEARCH "/api/search"
typedef enum
{
MSG_SET_CONTENT,
MSG_UPDATE_LED_LIST,
MSG_EXIT_LED
}MSG_ID;
struct INNER_MSG
{
MSG_ID msg_id;
std::string taskid;
std::string date;
std::string time;
int total_time;
};
typedef struct {
std::string server_ip_name;
std::string server_port_name;
std::string local_port_name;
std::string server_ip;
std::string local_ip;
std::string device_id;
int server_port;
int local_port;
bool debug_mode;
bool encrypt_mode;
}AppBasicInfo;
template<class Typename>
class HANA
{
public:
/**
* @brief 返回单例智能指针
*/
static std::shared_ptr<Typename> GetHANA()
{
static std::shared_ptr<Typename> object (new Typename);
std::cout<<__func__<<" "<<object.use_count()<<std::endl;
std::cout<<__func__<<"address "<<&object<<std::endl;
return object;
}
private:
HANA();
};
class semaphore {
public:
explicit semaphore(size_t initial = 0) {
#if defined(HAVE_SEM)
sem_init(&_sem, 0, initial);
#else
_count = 0;
#endif
}
~semaphore() {
#if defined(HAVE_SEM)
sem_destroy(&_sem);
#endif
}
void post(size_t n = 1) {
#if defined(HAVE_SEM)
while (n--) {
sem_post(&_sem);
}
#else
std::unique_lock<std::recursive_mutex> lock(_mutex);
_count += n;
if (n == 1) {
_condition.notify_one();
} else {
_condition.notify_all();
}
#endif
}
void wait() {
#if defined(HAVE_SEM)
sem_wait(&_sem);
#else
std::unique_lock<std::recursive_mutex> lock(_mutex);
while (_count == 0) {
_condition.wait(lock);
}
--_count;
#endif
}
private:
#if defined(HAVE_SEM)
sem_t _sem;
#else
size_t _count;
std::recursive_mutex _mutex;
std::condition_variable_any _condition;
#endif
};
class QMCY_APP:public std::enable_shared_from_this<QMCY_APP>
{
public:
using PTR = std::shared_ptr<QMCY_APP>;
//using LED_TABLE = std::unordered_map<std::string,std::shared_ptr<IModuleNVR>>;
QMCY_APP();
~QMCY_APP();
void Version();
bool Init(std::string &config_file, char *env[]);
bool Start();
bool Stop();
private:
uint64_t m_counter;
std::atomic<bool>m_run_flag;
AppBasicInfo m_basic_info;
std::thread m_http_thread;
std::thread m_task_thread;
std::thread m_timer_thread;
std::mutex m_table_mutex;
httplib::Server m_http;
void MainTask();
void HttpTask();
void TimerTask();
void DispatchTask(INNER_MSG inner_msg);
void RealExecUnit(INNER_MSG inner_msg);
void Report();
void ParseSearch(const std :: string & json);
std::vector<std::string> GetFiles(std::string dir,std::string time);
void ReadConfig(const std::string &json);
std::shared_ptr<httplib::Client> m_http_client;
std::shared_ptr<BlockQueue<INNER_MSG>> m_inner_queue;
std::shared_ptr<BlackBox> m_thread_pool;
};
#endif
cpp文件
#include "QMCY_DUMP.h"
#include <sys/types.h>
#include <dirent.h>
static semaphore sem;
zlog_category_t *g_zlog;
QMCY_APP::QMCY_APP()
{
m_counter = 0;
std::cout<<__func__<<std::endl;
}
QMCY_APP::~QMCY_APP()
{
std::cout<<__func__<<std::endl;
}
void QMCY_APP::Version()
{
std::cout<<"Version 1.0"<<std::endl;
}
std::vector<std::string> QMCY_APP::GetFiles(const std::string dir,std::string time)
{
int hour,minute,second,sn;
int converted;
unsigned int last_item_time = 0;
int check_times = 0;
std::vector<std::string> result;
std::map<unsigned int,std::string> table;
unsigned int keyword;
char path[1000];
strcpy(path,dir.c_str());
DIR *dp;
struct dirent *files;
/*structure for storing inode numbers and files in dir
struct dirent
{
ino_t d_ino;
char d_name[NAME_MAX+1]
}
*/
RECHECK:
result.clear();
table.clear();
check_times++;
if((dp=opendir(path))==NULL)
perror("dir\n");
char newp[1000];
struct stat buf;
while((files=readdir(dp))!=NULL)
{
if(!strcmp(files->d_name,".") || !strcmp(files->d_name,".."))
continue;
strcpy(newp,path);
strcat(newp,"/");
strcat(newp,files->d_name);
//printf("%s\n",newp);
//printf("%s\n",files->d_name);
std::string filename = files->d_name;
std::size_t found = filename.find_last_of(".mp4");
if(found == filename.size() -1)
{
//printf("find :%d\n",found);
converted = sscanf(files->d_name, "%02d-%02d-%02d-%d", &hour, &minute, &second,&sn);
if(converted == 4)
{
unsigned int key = hour*3600+minute*60+second;
table.insert(std::pair<unsigned int,std::string>(key,files->d_name));
}
}
else
{
printf("not a mp4 file\n");
continue;
}
}
converted = sscanf(time.c_str(), "%02d:%02d:%02d", &hour, &minute, &second);
if(converted == 3)
{
keyword = hour*3600+minute*60+second;
std::cout<<"keyword is:"<<keyword<<std::endl;
for(auto &item:table)
{
if(item.first>=keyword-6 && item.first <=keyword+6)
{
std::cout<<"matched :"<<item.first<<"----->"<<item.second<<std::endl;
result.push_back(item.second);
}
else
{
std::cout<<"Unmatched :"<<item.first<<"----->"<<item.second<<std::endl;
}
last_item_time = item.first;
}
}
std::cout<<"last_item_time:"<<last_item_time <<" check_times:"<<check_times<<std::endl;
if( last_item_time <= keyword+6 && check_times <= 15)
{
sleep(1);
std::cout<<"Recheck files!!!"<<std::endl;
goto RECHECK;
}
return result;
}
void QMCY_APP::RealExecUnit(INNER_MSG inner_msg)
{
//std::this_thread::sleep_for(std::chrono::seconds(10));
//std::cout<<"long time task complete:"<<std::this_thread::get_id()<<std::endl;
zlog_info(g_zlog,"Date: [%s] time:[%s]",inner_msg.date.c_str(),inner_msg.time.c_str());
std::vector<std::string> files = GetFiles(inner_msg.date,inner_msg.time);
std::string filecontent;
if(files.size() >0)
{
for(auto &item:files)
{
std::cout<<"matched :"<<item<<std::endl;
filecontent+="file ";
filecontent+="'";
filecontent+=inner_msg.date;
filecontent+="/";
filecontent+=item;
filecontent+="'\r\n";
}
zlog_info(g_zlog,"Search result:[%s]",filecontent.c_str());
std::string filename =std::to_string( rand());
FILE * fp = fopen(filename.c_str(),"w");
if(fp)
{
fwrite(filecontent.c_str(),1,filecontent.size(),fp);
fclose(fp);
}
//Use ffmpeg to concat two or more videos to one video
//ffmpeg command : ./ffmpeg -f concat -safe 0 -i 363974492 -c:v copy -c:a aac 2023-06-29/363974492.mp4
std::string ffmpeg_cmd = "./ffmpeg -f concat -safe 0 -i ";
ffmpeg_cmd+= filename;
ffmpeg_cmd+=" -c:v copy -c:a aac ";
ffmpeg_cmd+=inner_msg.date;
ffmpeg_cmd+="/";
ffmpeg_cmd+=filename;
ffmpeg_cmd+=".mp4";
auto ret = system(ffmpeg_cmd.c_str());
std::cout<<"ffmpeg:"<<ffmpeg_cmd<<" result:"<<ret<<std::endl;
if(ret == 0)
{
zlog_info(g_zlog,"Concat videos successfully, output file is :[%s.mp4]",filename.c_str());
//Success
}
else
{
zlog_info(g_zlog,"Concat video failed!!!!");
//Error
}
char temp_cmd [100] = {0};
strcat(temp_cmd,"rm ");
strcat(temp_cmd,filename.c_str());
system(temp_cmd);
}
else
{
std::cout<<"No match files"<<std::endl;
zlog_info(g_zlog,"Concat video No match files!!!!");
}
}
void QMCY_APP::DispatchTask(INNER_MSG inner_msg)
{
auto size = m_thread_pool->get_tasks();
if(size >= MAX_QUEUE_SIZE)
{
//Need to tell server this task can not be exectued!!!!!
std::cout<<"Task full add failed task size "<<size<<std::endl;
zlog_info(g_zlog,"System is busy can not handle this command!!!!!");
return ;
}
m_thread_pool->AddTask([this,inner_msg](){
RealExecUnit(inner_msg);
});
//std::cout<<" DispatchTask AddTask Exit !!!!!!!!!!!"<<std::endl;
}
void QMCY_APP::MainTask()
{
INNER_MSG inner_msg;
while(true)
{
bool ret = m_inner_queue->pop_data(inner_msg);
if(ret )
{
if(MSG_EXIT_LED == inner_msg.msg_id)
{
std::cout<<"Get exit msg :"<<inner_msg.msg_id<<std::endl;
break;
}
else if(MSG_SET_CONTENT == inner_msg.msg_id)
{
DispatchTask(inner_msg);
}
}
}
std::cout<<__func__<<" exit!!!!"<<std::endl;
}
void QMCY_APP::Report()
{
std::unique_lock<std::mutex> lock(m_table_mutex);
jsonxx::json response ;
#if 0
for(auto it=m_led_table.begin(); it!=m_led_table.end();it++)
{
jsonxx::json item;
auto led = it->second;
if(m_run_flag.load() == false || led == nullptr)
{
return;
}
auto result = led ->NVR_GetStatus();
item["bmsid"]= it->first;
item["status"]= result.first;
item["msg"]= result.second;
response.push_back(std::move(item));
}
auto output = response.dump();
if(output.size())
{
std::cout<<"Report content:"<<output<<std::endl;
if(auto res = m_http_client->Post("/qmcy",output,"application/json"))
{
if (res->status == 200)
{
}
else
{
auto err = res.error();
std::cout << "HTTP error: " << httplib::to_string(err) << std::endl;
}
}
else
{
std::cout<<"Report status to server failed!"<<std::endl;
//zlog_error(g_zlog,"Report status to server[%s:%d] failed!",pHandle->server_ip,pHandle->server_port);
}
}
#endif
}
void QMCY_APP::TimerTask()
{
while(m_run_flag.load())
{
m_counter++;
if(m_counter % 200 == 0)
{
if(m_run_flag.load())
{
Report();
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout<<__func__<<" exit!!!"<<std::endl;
}
void QMCY_APP::ParseSearch(const std :: string & json)
{
jsonxx::json result;
INNER_MSG inner_msg;
try
{
result = jsonxx::json::parse(json);
}
catch (jsonxx::json_parse_error & err)
{
std::cout<<"Json parse failed" <<std::endl;
return;
}
if(result["taskid"].is_string())
{
inner_msg.taskid = result["taskid"].as_string();
}
if(result["date"].is_string())
{
inner_msg.date = result["date"].as_string();
}
if(result["time"].is_string())
{
inner_msg.time = result["time"].as_string();
}
if(result["total_time"].is_integer())
{
inner_msg.total_time = result["total_time"].as_int();
if(inner_msg.total_time <1 || inner_msg.total_time >20)
{
inner_msg.total_time = 10;
}
}
inner_msg.msg_id = MSG_SET_CONTENT;
m_inner_queue->push_data(inner_msg);
}
void QMCY_APP::HttpTask()
{
m_http.Get("/hi", [](const httplib::Request &, httplib::Response &res) {
res.set_content("Hello World!", "text/plain");
});
m_http.Get("/led/status", [](const httplib::Request &, httplib::Response &res) {
res.set_content("Get led status World!", "text/plain");
});
m_http.Post("/Stop", [](const httplib::Request &req, httplib::Response &res) {
res.set_content("PostLEDConfig message!", "text/plain");
std::cout<<"APP exit message"<<std::endl;
sem.post();
});
m_http.Post(HTTP_SEARCH, [&](const httplib::Request &req, httplib::Response &res) {
res.set_content(HTTP_SEARCH, "text/plain");
//std::cout<<req.body<<std::endl;
ParseSearch(req.body);
});
std::cout<<__FUNCTION__<<std::endl;
auto ret = m_http.listen("0.0.0.0", m_basic_info.local_port);
if(ret == false)
{
std::cout<<"Http failed"<<std::endl;
sem.post();
}
else
{
std::cout<<"Http success"<<std::endl;
}
std::cout<<__func__<<" exit!!!"<<std::endl;
}
void QMCY_APP::ReadConfig(const std :: string & data)
{
jsonxx::json result;
try
{
result = jsonxx::json::parse(data);
}
catch (jsonxx::json_parse_error & err)
{
std::cout<<"ReadConfig Json parse failed" <<std::endl;
return;
}
if(result["sn"].is_string())
{
m_basic_info.device_id = result["sn"].as_string();
}
if(result["server_ip_name"].is_string())
{
m_basic_info.server_ip_name = result["server_ip_name"].as_string();
}
if(result["server_port_name"].is_string())
{
m_basic_info.server_port_name = result["server_port_name"].as_string();
}
if(result["local_port_name"].is_string())
{
m_basic_info.local_port_name = result["local_port_name"].as_string();
}
}
bool QMCY_APP::Init(std::string &config_file, char *env[])
{
char *file_content = nullptr;
int size = 0;
unsigned seed = time(0);
srand(seed);
if(config_file.empty())
{
std::cout<<"config file is null"<<std::endl;
return false;
}
FILE * fp = fopen(config_file.c_str(),"r");
if(fp == NULL)
{
return false;
}
fseek(fp, 0, SEEK_END);
size = ftell(fp);
file_content = (char *)malloc(size+1);
if(file_content)
{
fseek(fp, 0, SEEK_SET);
fread(file_content,size,1,fp);
ReadConfig(file_content);
free(file_content);
}
fclose(fp);
int i = 0;
char key[20] ={0},value[30]={0};
for(; env[i]; i++){
std::string temp = env[i];
std::size_t found = temp.find(m_basic_info.server_ip_name);
if(found!= std::string::npos)
{
int converted = sscanf(temp.c_str(),"%[^=]=%s",key,value);
if(converted == 2)
{
m_basic_info.server_ip = value;
}
}
found = temp.find(m_basic_info.server_port_name);
if(found!= std::string::npos)
{
int converted = sscanf(temp.c_str(),"%[^=]=%s",key,value);
if(converted == 2)
{
m_basic_info.server_port = atoi(value);
}
}
found = temp.find(m_basic_info.local_port_name);
if(found!= std::string::npos)
{
int converted = sscanf(temp.c_str(),"%[^=]=%s",key,value);
if(converted == 2)
{
//std::cout<<"local port :"<<key<<" :"<<value<<std::endl;
m_basic_info.local_port = atoi(value);
}
}
}
if(m_basic_info.local_port<1024|| m_basic_info.local_port >65535 )
{
m_basic_info.local_port = 8888;
}
if(m_basic_info.server_port<1024|| m_basic_info.server_port >65535 )
{
m_basic_info.server_port = 8888;
}
zlog_info(g_zlog,"APP run info:[server ip = %s,server port=%d, local port = %d]",m_basic_info.server_ip.c_str(),m_basic_info.server_port,m_basic_info.local_port);
std::string posturi = "http://";
posturi.append(m_basic_info.server_ip);
posturi.append(":");
posturi.append(std::to_string(m_basic_info.server_port));
m_http_client = std::make_shared<httplib::Client>(posturi);
//m_log = HANA<LOG>::GetHANA();
m_thread_pool = std::make_shared<BlackBox>(MAX_THREADS,MAX_QUEUE_SIZE);
m_thread_pool->init();
m_inner_queue = std::make_shared<BlockQueue<INNER_MSG>>(30);
return true;
}
bool QMCY_APP::Start()
{
m_run_flag = true;
m_task_thread = std::thread(&QMCY_APP::MainTask,this);
m_http_thread = std::thread(&QMCY_APP::HttpTask,this);
m_timer_thread = std::thread(&QMCY_APP::TimerTask,this);
//std::this_thread::sleep_for(std::chrono::seconds(1));
zlog_info(g_zlog,"QMCY Video filter system successfully started");
return true;
}
bool QMCY_APP::Stop()
{
std::cout<<__func__<<std::endl;
INNER_MSG inner_msg;
inner_msg.msg_id = MSG_EXIT_LED;
m_inner_queue->push_data(inner_msg);
m_thread_pool->shutdown();
m_run_flag = false;
m_http.stop();
m_task_thread.join();
m_http_thread.join();
if(m_timer_thread.joinable())
{
m_timer_thread.join();
}
return true;
}
void PrintLEDVersion()
{
const char APP_time[] = "QMCY VIDEO FILTER (build in: " __DATE__ " " __TIME__ ")";
zlog_info(g_zlog,"=====================================================================================\n");
zlog_info(g_zlog," QMCY VIDEO FILTER system Version[%s], Date[%s] \n",DEVICE_VERSION, APP_time);
zlog_info(g_zlog,"=====================================================================================\n");
printf("程序版本[%s]\n",APP_time);
}
int main(int argc,char *argv[], char *env[])
{
auto rc = zlog_init("./zlog.conf");
if (rc) {
printf("init failed\n");
return false;
}
g_zlog = zlog_get_category("qmcy");
if (!g_zlog) {
printf("get cat fail\n");
zlog_fini();
return -1;
}
std::string config = "config.json";
std::shared_ptr<QMCY_APP> app = HANA<QMCY_APP>::GetHANA();
if(app->Init(config,env) == false)
{
std::cout<<"APP init failed"<<std::endl;
return -1;
}
std::cout<<"APP init success"<<std::endl;
app->Start();
signal(SIGINT, [](int) {
std::cout << "GET QMCY exit signal SIGINT:exit";
signal(SIGINT, SIG_IGN);// 设置退出信号
sem.post();
});// 设置退出信号
sem.wait();
app->Stop();
std::cout<<"Exit QMCY APP system"<<std::endl;
zlog_info(g_zlog,"Exit QMCY Video filter system ");
return 0;
}
Http服务用cpphttp
项目中用到 线程池 等待队列 等