请选择 进入手机版 | 继续访问电脑版
MSIPO技术圈 首页 IT技术 查看内容

通过FFmpeg 来筛选合并视频文件的项目记录

2023-07-13

项目需求

平台通过http下发json格式的消息

程序收到消息后 根据日期时间戳去对应的目录检索是否符合要求的文件 比如某个时间段

检索到之后 将视频片段拼接成一个完整的视频文件 

没有的话 根据时间戳判断是否需要等待或者 返回没有检索到 

头文件

#ifndef QMCY_LED_H
#define QMCY_LED_H

#include <iostream>
#include <atomic>
#include<vector>
#include<unordered_map>
#include<map>

#include<tuple>
#include <thread>
#include <memory>
#include <mutex>
#include <vector>



#include <semaphore.h>
#include <signal.h>
#include <condition_variable>


#include "zlog.h"

#include "httplib.h"
#include "json.hpp"
#include "BlockQueue.hpp"
#include "BlackBox.h"



#define HTTP_PORT	8081
#define HTTP_IP	"0.0.0.0"

#define 		MAX_THREADS	2
#define 		MAX_QUEUE_SIZE 4

#define DEVICE_VERSION	"VIDEO FILETER 1.0"




#define HTTP_SEARCH	"/api/search"


typedef enum
{
	MSG_SET_CONTENT,
	MSG_UPDATE_LED_LIST,
	MSG_EXIT_LED
}MSG_ID;



struct INNER_MSG
{
	MSG_ID msg_id;

	std::string taskid;
	std::string date;
	std::string time;
	int total_time;
};



typedef struct {

	std::string server_ip_name;
	std::string server_port_name;
	std::string local_port_name;


	std::string server_ip;
	std::string local_ip;
	std::string device_id;
	int server_port;
	int local_port;	
	bool debug_mode;
	bool encrypt_mode;
}AppBasicInfo;



template<class Typename>
class HANA 
{
	public:
		/**
		 * @brief 返回单例智能指针
		 */
		static std::shared_ptr<Typename> GetHANA()
		{

			static std::shared_ptr<Typename> object (new Typename);
			std::cout<<__func__<<" "<<object.use_count()<<std::endl;					
			std::cout<<__func__<<"address "<<&object<<std::endl;								
			return object;
		}
	private:
		HANA();
};




class semaphore {
public:
    explicit semaphore(size_t initial = 0) {
#if defined(HAVE_SEM)
        sem_init(&_sem, 0, initial);
#else
        _count = 0;
#endif
    }

    ~semaphore() {
#if defined(HAVE_SEM)
        sem_destroy(&_sem);
#endif
    }

    void post(size_t n = 1) {
#if defined(HAVE_SEM)
        while (n--) {
            sem_post(&_sem);
        }
#else
        std::unique_lock<std::recursive_mutex> lock(_mutex);
        _count += n;
        if (n == 1) {
            _condition.notify_one();
        } else {
            _condition.notify_all();
        }
#endif
    }

    void wait() {
#if defined(HAVE_SEM)
        sem_wait(&_sem);
#else
        std::unique_lock<std::recursive_mutex> lock(_mutex);
        while (_count == 0) {
            _condition.wait(lock);
        }
        --_count;
#endif
    }

private:
#if defined(HAVE_SEM)
    sem_t _sem;
#else
    size_t _count;
    std::recursive_mutex _mutex;
    std::condition_variable_any _condition;
#endif
};



class QMCY_APP:public std::enable_shared_from_this<QMCY_APP>
{
public:
	using PTR = std::shared_ptr<QMCY_APP>;
	//using LED_TABLE = std::unordered_map<std::string,std::shared_ptr<IModuleNVR>>;

	QMCY_APP();
	~QMCY_APP();

	void Version();

	bool Init(std::string &config_file, char *env[]);
	bool Start();	
	bool  Stop();



private:
	uint64_t m_counter;
	std::atomic<bool>m_run_flag;
	AppBasicInfo m_basic_info;
	std::thread m_http_thread;
	std::thread m_task_thread;
	std::thread m_timer_thread;	


	std::mutex m_table_mutex;


	httplib::Server m_http;
	

	void MainTask();
	void  HttpTask();	
	void TimerTask();	

	void DispatchTask(INNER_MSG inner_msg);
	void RealExecUnit(INNER_MSG inner_msg);
	
	void Report();

	void ParseSearch(const std :: string & json);

	std::vector<std::string> GetFiles(std::string dir,std::string time);
	void ReadConfig(const std::string &json);	
	
	std::shared_ptr<httplib::Client> m_http_client;

	std::shared_ptr<BlockQueue<INNER_MSG>>	m_inner_queue;	
	
	std::shared_ptr<BlackBox> m_thread_pool;
	
};


#endif 

cpp文件

#include "QMCY_DUMP.h"
#include <sys/types.h>
#include <dirent.h>



static semaphore sem;

zlog_category_t *g_zlog;


QMCY_APP::QMCY_APP()
{
	m_counter = 0;

	std::cout<<__func__<<std::endl;
}


QMCY_APP::~QMCY_APP()
{
	std::cout<<__func__<<std::endl;	
}


  void QMCY_APP::Version()
  {
  	std::cout<<"Version 1.0"<<std::endl;
  }






std::vector<std::string> QMCY_APP::GetFiles(const std::string dir,std::string time)
{

	int hour,minute,second,sn;
	int converted;
	unsigned int last_item_time = 0;
	int check_times = 0;
	
	std::vector<std::string> result;
	std::map<unsigned int,std::string> table;
	unsigned int keyword;
	char path[1000];
	strcpy(path,dir.c_str());
	DIR *dp;
	struct dirent *files;
	/*structure for storing inode numbers and files in dir
	struct dirent
	{
	  ino_t d_ino;
	  char d_name[NAME_MAX+1]
	}
	*/


	RECHECK:

	result.clear();
	table.clear();
	
	check_times++;

	
	if((dp=opendir(path))==NULL)
	  perror("dir\n");
	char newp[1000];
	struct stat buf;
	while((files=readdir(dp))!=NULL)
	{
		if(!strcmp(files->d_name,".") || !strcmp(files->d_name,".."))
		 continue;

		strcpy(newp,path);
		strcat(newp,"/");
		strcat(newp,files->d_name); 
		//printf("%s\n",newp);

		//printf("%s\n",files->d_name);

		std::string filename = files->d_name;
		std::size_t found = filename.find_last_of(".mp4");
		if(found == filename.size() -1)
		{
			//printf("find :%d\n",found);


			converted = sscanf(files->d_name, "%02d-%02d-%02d-%d", &hour, &minute, &second,&sn);
			if(converted == 4)
			{
				unsigned int key = hour*3600+minute*60+second;
				table.insert(std::pair<unsigned int,std::string>(key,files->d_name));
			}

		}
		else
		{
			printf("not a mp4 file\n");
			continue;
		}

	}


	

	converted = sscanf(time.c_str(), "%02d:%02d:%02d", &hour, &minute, &second);



	

	if(converted == 3)
	{
		keyword = 		hour*3600+minute*60+second;
		std::cout<<"keyword is:"<<keyword<<std::endl;
		for(auto &item:table)
		{
			if(item.first>=keyword-6 && item.first <=keyword+6)
			{
				std::cout<<"matched :"<<item.first<<"----->"<<item.second<<std::endl;
				result.push_back(item.second);

			}
			else
			{
				std::cout<<"Unmatched :"<<item.first<<"----->"<<item.second<<std::endl;			
			}
			
			last_item_time = item.first;			
		}



	}

	std::cout<<"last_item_time:"<<last_item_time <<" check_times:"<<check_times<<std::endl; 	


	if( last_item_time <= keyword+6 && check_times <= 15)
	{
		sleep(1);
		std::cout<<"Recheck files!!!"<<std::endl;		

		goto RECHECK;
	}


	return result;
}

void QMCY_APP::RealExecUnit(INNER_MSG inner_msg)
{
	//std::this_thread::sleep_for(std::chrono::seconds(10));

	
	//std::cout<<"long time task complete:"<<std::this_thread::get_id()<<std::endl;

	zlog_info(g_zlog,"Date: [%s] time:[%s]",inner_msg.date.c_str(),inner_msg.time.c_str());


	std::vector<std::string> files = GetFiles(inner_msg.date,inner_msg.time);

	std::string filecontent;

	if(files.size() >0)
	{
		for(auto &item:files)
		{
				std::cout<<"matched :"<<item<<std::endl;
				filecontent+="file ";
				filecontent+="'";
				filecontent+=inner_msg.date;
				filecontent+="/";
				filecontent+=item;				
				filecontent+="'\r\n";				
		}


		zlog_info(g_zlog,"Search result:[%s]",filecontent.c_str());


		std::string filename =std::to_string( rand());

		FILE * fp = fopen(filename.c_str(),"w");
		if(fp)
		{
			fwrite(filecontent.c_str(),1,filecontent.size(),fp);
			fclose(fp);
		}

		//Use ffmpeg to concat two or more videos to one video
		//ffmpeg command : ./ffmpeg -f concat -safe 0 -i 363974492 -c:v copy -c:a aac 2023-06-29/363974492.mp4 

		std::string ffmpeg_cmd = "./ffmpeg -f concat -safe 0 -i ";
		ffmpeg_cmd+= filename;
		ffmpeg_cmd+=" -c:v copy -c:a aac ";
		ffmpeg_cmd+=inner_msg.date;
		ffmpeg_cmd+="/";
		ffmpeg_cmd+=filename;
		ffmpeg_cmd+=".mp4";		

		auto ret = system(ffmpeg_cmd.c_str());

		std::cout<<"ffmpeg:"<<ffmpeg_cmd<<" result:"<<ret<<std::endl;
		if(ret == 0)
		{
			zlog_info(g_zlog,"Concat videos successfully, output file is  :[%s.mp4]",filename.c_str());
			//Success
		}
		else
		{
			zlog_info(g_zlog,"Concat video failed!!!!");		
			//Error
		}

		char temp_cmd [100] = {0};

		strcat(temp_cmd,"rm ");
		strcat(temp_cmd,filename.c_str());
		system(temp_cmd);
		
	}
	else
	{
		std::cout<<"No match files"<<std::endl;
		zlog_info(g_zlog,"Concat video No match files!!!!");				
	}
	
	
}


void QMCY_APP::DispatchTask(INNER_MSG inner_msg)
{

	auto size = m_thread_pool->get_tasks();

	if(size >= MAX_QUEUE_SIZE)
	{
		//Need to tell server this task can not be exectued!!!!!
		std::cout<<"Task full   add failed  task size "<<size<<std::endl;										

		zlog_info(g_zlog,"System is busy  can not handle this command!!!!!");
		
		return ;
	}


	m_thread_pool->AddTask([this,inner_msg](){
		RealExecUnit(inner_msg);
	});	

	//std::cout<<" DispatchTask AddTask Exit !!!!!!!!!!!"<<std::endl;						
		
}

void QMCY_APP::MainTask()
{
	INNER_MSG inner_msg;

	while(true)
	{
		bool ret = m_inner_queue->pop_data(inner_msg); 
		if(ret )
		{
			if(MSG_EXIT_LED == inner_msg.msg_id)
			{
				std::cout<<"Get exit msg :"<<inner_msg.msg_id<<std::endl;						
				break;
			}
			else if(MSG_SET_CONTENT == inner_msg.msg_id)
			{
				DispatchTask(inner_msg);
			}
		}
	}

	std::cout<<__func__<<" exit!!!!"<<std::endl;
}


void QMCY_APP::Report()
{

	std::unique_lock<std::mutex> lock(m_table_mutex);

	jsonxx::json response ;
#if 0
	for(auto it=m_led_table.begin(); it!=m_led_table.end();it++)
	{

		jsonxx::json item;

		auto led   = it->second;

		if(m_run_flag.load() == false || led == nullptr)
		{
			return;
		}

		auto result = led ->NVR_GetStatus();

		item["bmsid"]= it->first;
		item["status"]= result.first;
		item["msg"]= result.second;
		
		response.push_back(std::move(item));

	}

	auto output = response.dump();
	if(output.size())
	{
		std::cout<<"Report content:"<<output<<std::endl;
		if(auto res = m_http_client->Post("/qmcy",output,"application/json"))
		{
			if (res->status == 200)
			{
				
			}
			else
			{
				auto err = res.error();
				std::cout << "HTTP error: " << httplib::to_string(err) << std::endl;
			}
	
		}
		else
		{
			std::cout<<"Report status to server  failed!"<<std::endl;
			//zlog_error(g_zlog,"Report status to server[%s:%d]  failed!",pHandle->server_ip,pHandle->server_port);
		}
	}
#endif	

}



void QMCY_APP::TimerTask()
{

	while(m_run_flag.load())
	{
		m_counter++;
		if(m_counter % 200 == 0)
		{
			if(m_run_flag.load())
			{
				Report();
			}

		}
		std::this_thread::sleep_for(std::chrono::seconds(1));
	}

	std::cout<<__func__<<" exit!!!"<<std::endl;
}



void QMCY_APP::ParseSearch(const std :: string & json)
{
	jsonxx::json result;

	INNER_MSG inner_msg;
	
	try
	{
		result = jsonxx::json::parse(json);			
	}
	catch (jsonxx::json_parse_error & err)
	{
		std::cout<<"Json parse failed" <<std::endl;			
		return;
	}
	
	if(result["taskid"].is_string())
	{
		inner_msg.taskid = result["taskid"].as_string();
	}

	if(result["date"].is_string())
	{
		inner_msg.date = result["date"].as_string();
	}

	if(result["time"].is_string())
	{
		inner_msg.time = result["time"].as_string();
	}

	if(result["total_time"].is_integer())
	{
		inner_msg.total_time = result["total_time"].as_int();
		if(inner_msg.total_time <1 || inner_msg.total_time >20)
		{
			inner_msg.total_time = 10;
		}
	}


	inner_msg.msg_id = MSG_SET_CONTENT;
	m_inner_queue->push_data(inner_msg);
}




void QMCY_APP::HttpTask()
{

	m_http.Get("/hi", [](const httplib::Request &, httplib::Response &res) {
	  res.set_content("Hello World!", "text/plain");
	});


	m_http.Get("/led/status", [](const httplib::Request &, httplib::Response &res) {
	  res.set_content("Get led status World!", "text/plain");
	});


	m_http.Post("/Stop", [](const httplib::Request &req, httplib::Response &res) {
		  res.set_content("PostLEDConfig message!", "text/plain");
	  		std::cout<<"APP exit message"<<std::endl;
			sem.post();		
		});


	m_http.Post(HTTP_SEARCH, [&](const httplib::Request &req, httplib::Response &res) {
	  res.set_content(HTTP_SEARCH, "text/plain");
		//std::cout<<req.body<<std::endl;		

		ParseSearch(req.body);
	  
	});

	std::cout<<__FUNCTION__<<std::endl;


	auto ret =   m_http.listen("0.0.0.0", m_basic_info.local_port);
	if(ret == false)
	{
		std::cout<<"Http failed"<<std::endl;
		sem.post();		
	}
	else
	{
		std::cout<<"Http success"<<std::endl;
	}

	std::cout<<__func__<<" exit!!!"<<std::endl;

}



void QMCY_APP::ReadConfig(const std :: string & data)
{
	jsonxx::json result;

	try
	{
		result = jsonxx::json::parse(data); 		
	}
	catch (jsonxx::json_parse_error & err)
	{
		std::cout<<"ReadConfig Json parse failed" <<std::endl;			
		return;
	}

	if(result["sn"].is_string())
	{
		m_basic_info.device_id = result["sn"].as_string(); 	
	}

	if(result["server_ip_name"].is_string())
	{
		m_basic_info.server_ip_name = result["server_ip_name"].as_string(); 	
	}

	if(result["server_port_name"].is_string())
	{
		m_basic_info.server_port_name = result["server_port_name"].as_string(); 	
	}

	if(result["local_port_name"].is_string())
	{
		m_basic_info.local_port_name = result["local_port_name"].as_string(); 	
	}
	
}


bool QMCY_APP::Init(std::string &config_file, char *env[])
{
	char *file_content = nullptr;
	int size = 0;

	unsigned seed = time(0);
	srand(seed);


	if(config_file.empty())
	{
		std::cout<<"config file is null"<<std::endl;
		return false;
	}
	
	FILE * fp = fopen(config_file.c_str(),"r");
	if(fp == NULL)
	{
		return false;
	}

	fseek(fp, 0, SEEK_END);
	size = ftell(fp);


	file_content = (char *)malloc(size+1);
	if(file_content)
	{
		fseek(fp, 0, SEEK_SET);
		fread(file_content,size,1,fp);

		ReadConfig(file_content);
		
		free(file_content);

	}
	
	fclose(fp);	




	int i = 0;
	char key[20] ={0},value[30]={0};
	
	for(; env[i]; i++){
		std::string temp = env[i];

		std::size_t found = temp.find(m_basic_info.server_ip_name);
		if(found!= std::string::npos)
		{
			int converted = sscanf(temp.c_str(),"%[^=]=%s",key,value);
			if(converted == 2)
			{
				m_basic_info.server_ip = value;				
			}
			
		}


		found = temp.find(m_basic_info.server_port_name);
		if(found!= std::string::npos)
		{
			int converted = sscanf(temp.c_str(),"%[^=]=%s",key,value);
			if(converted == 2)
			{
				m_basic_info.server_port = atoi(value);
			}

		}


		found = temp.find(m_basic_info.local_port_name);
		if(found!= std::string::npos)
		{
			int converted = sscanf(temp.c_str(),"%[^=]=%s",key,value);
			if(converted == 2)
			{
				//std::cout<<"local port :"<<key<<" :"<<value<<std::endl;
				m_basic_info.local_port = atoi(value);				
			}
		}


		
	}


	if(m_basic_info.local_port<1024|| m_basic_info.local_port >65535 )
	{
		m_basic_info.local_port = 8888;
	}

	if(m_basic_info.server_port<1024|| m_basic_info.server_port >65535 )
	{
		m_basic_info.server_port = 8888;
	}



	zlog_info(g_zlog,"APP run info:[server ip = %s,server port=%d, local port = %d]",m_basic_info.server_ip.c_str(),m_basic_info.server_port,m_basic_info.local_port);


	std::string posturi = "http://";
	posturi.append(m_basic_info.server_ip);
	posturi.append(":");	
	posturi.append(std::to_string(m_basic_info.server_port));	


	m_http_client = std::make_shared<httplib::Client>(posturi);

	//m_log = HANA<LOG>::GetHANA();

	m_thread_pool = std::make_shared<BlackBox>(MAX_THREADS,MAX_QUEUE_SIZE);
	m_thread_pool->init();

	m_inner_queue = std::make_shared<BlockQueue<INNER_MSG>>(30);

	return true;	
		
}



bool QMCY_APP::Start()
{

	m_run_flag = true;
	
	m_task_thread = std::thread(&QMCY_APP::MainTask,this);
	m_http_thread = std::thread(&QMCY_APP::HttpTask,this);	
	m_timer_thread = std::thread(&QMCY_APP::TimerTask,this);	



	//std::this_thread::sleep_for(std::chrono::seconds(1));

	zlog_info(g_zlog,"QMCY Video filter system successfully started");

	
	return true;
}


bool QMCY_APP::Stop()
{
	std::cout<<__func__<<std::endl;
	INNER_MSG inner_msg;


	inner_msg.msg_id = MSG_EXIT_LED;

	m_inner_queue->push_data(inner_msg);

	m_thread_pool->shutdown();
	

	m_run_flag = false;


	m_http.stop();


	m_task_thread.join();
	m_http_thread.join();
	if(m_timer_thread.joinable())
	{
		m_timer_thread.join();
	}


	return true;
}



void PrintLEDVersion()
{
	const char APP_time[] =  "QMCY VIDEO FILTER (build in: " __DATE__ " " __TIME__ ")";

	zlog_info(g_zlog,"=====================================================================================\n");
	zlog_info(g_zlog,"                     QMCY VIDEO FILTER system Version[%s], Date[%s]                  \n",DEVICE_VERSION, APP_time);
	zlog_info(g_zlog,"=====================================================================================\n"); 


	printf("程序版本[%s]\n",APP_time);

}




int main(int argc,char *argv[], char *env[])
{


	auto rc = zlog_init("./zlog.conf");
	if (rc) {
		printf("init failed\n");
		return false;
	}

	g_zlog = zlog_get_category("qmcy");
	if (!g_zlog) {
		printf("get cat fail\n");
		zlog_fini();
		return -1;
	}


	std::string config = "config.json";


	std::shared_ptr<QMCY_APP> app = HANA<QMCY_APP>::GetHANA();

	
	if(app->Init(config,env) == false) 
	{
		std::cout<<"APP init failed"<<std::endl;
		return -1;
	}


	std::cout<<"APP init success"<<std::endl;
	app->Start();


  signal(SIGINT, [](int) {
		std::cout << "GET QMCY exit signal SIGINT:exit";
		signal(SIGINT, SIG_IGN);// 设置退出信号
		sem.post();
  });// 设置退出信号

	sem.wait();

	app->Stop();

	std::cout<<"Exit QMCY APP system"<<std::endl;

	zlog_info(g_zlog,"Exit QMCY Video filter system ");

	
	return 0;
}


Http服务用cpphttp

项目中用到 线程池 等待队列 等 

相关阅读

热门文章

    手机版|MSIPO技术圈 皖ICP备19022944号-2

    Copyright © 2024, msipo.com

    返回顶部