目录 DataWriter分析DataWriter 类分析DataWriter 类是 Fast DDS 库中的一个重要类,它用于实现 DDS(Data Distribution Service)发布-订阅通信模型中的数据写入功能。 用途: 成员变量:
成员函数:
大致的实现方法: 同时,DataWriter 类还提供了一些其他功能,如通过 loan_sample 和 discard_loan 函数直接在内部池中借用和归还数据样本的功能,以及获取数据写入的状态信息等。 下面是详细的方法描述 因为这个类本质上只是一个包装类,所以重点函数基本上都是DataWriterImpl 类实现的 DataWriterImpl 类分析类图 DataWriterImpl类是Fast DDS库中的一个关键类,用于实现数据写入的功能。它包含了数据写入相关的操作和状态,以及与底层通信层(RTPS)的交互。下面是对该类及其函数的作用和大致实现方式的介绍:
关键函数分析write ReturnCode_t DataWriterImpl::write(void* data) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } // 检查数据是否为空 if (data == nullptr) { return ReturnCode_t::RETCODE_BAD_PARAMETER; } // 检查数据共享是否兼容 if (is_data_sharing_compatible_) { // 使用负载池分配Payload SerializedPayload_t payload; if (!get_payload_pool()->get_payload(payload)) { return ReturnCode_t::RETCODE_OUT_OF_RESOURCES; } // 序列化数据到负载中 if (!type_->serialize(data, &payload)) { get_payload_pool()->release_payload(payload); return ReturnCode_t::RETCODE_ERROR; } // 创建新的更改并添加到历史记录中 WriteParams wparams; ReturnCode_t ret = create_new_change_with_params(ALIVE, data, wparams); if (ret != ReturnCode_t::RETCODE_OK) { get_payload_pool()->release_payload(payload); return ret; } // 设置负载并发布更改 CacheChange_t* change = history_.get_last_added_change(); change->serializedPayload = payload; writer_->add_change(change); } else { // 使用内部负载分配器分配Payload std::unique_ptr<SerializedPayload_t> payload(new SerializedPayload_t()); if (!payload || !get_payload_pool()->get_payload(*payload)) { return ReturnCode_t::RETCODE_OUT_OF_RESOURCES; } // 序列化数据到负载中 if (!type_->serialize(data, payload.get())) { get_payload_pool()->release_payload(*payload); return ReturnCode_t::RETCODE_ERROR; } // 创建新的更改并添加到历史记录中 WriteParams wparams; ReturnCode_t ret = create_new_change_with_params(ALIVE, data, wparams); if (ret != ReturnCode_t::RETCODE_OK) { get_payload_pool()->release_payload(*payload); return ret; } // 设置负载并发布更改 CacheChange_t* change = history_.get_last_added_change(); change->serializedPayload = *payload; writer_->add_change(change); } return ReturnCode_t::RETCODE_OK; } write函数的主要步骤如下:
write_w_timestamp ReturnCode_t DataWriterImpl::write_w_timestamp(void* data, const fastrtps::Time_t& timestamp) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } // 检查数据是否为空 if (data == nullptr) { return ReturnCode_t::RETCODE_BAD_PARAMETER; } // 检查数据共享是否兼容 if (is_data_sharing_compatible_) { // 使用负载池分配Payload SerializedPayload_t payload; if (!get_payload_pool()->get_payload(payload)) { return ReturnCode_t::RETCODE_OUT_OF_RESOURCES; } // 序列化数据到负载中 if (!type_->serialize(data, &payload)) { get_payload_pool()->release_payload(payload); return ReturnCode_t::RETCODE_ERROR; } // 创建新的更改并添加到历史记录中 WriteParams wparams; ReturnCode_t ret = create_new_change_with_params(ALIVE, data, wparams); if (ret != ReturnCode_t::RETCODE_OK) { get_payload_pool()->release_payload(payload); return ret; } // 设置时间戳并发布更改 CacheChange_t* change = history_.get_last_added_change(); change->serializedPayload = payload; change->sourceTimestamp = timestamp; writer_->add_change(change); } else { // 使用内部负载分配器分配Payload std::unique_ptr<SerializedPayload_t> payload(new SerializedPayload_t()); if (!payload || !get_payload_pool()->get_payload(*payload)) { return ReturnCode_t::RETCODE_OUT_OF_RESOURCES; } // 序列化数据到负载中 if (!type_->serialize(data, payload.get())) { get_payload_pool()->release_payload(*payload); return ReturnCode_t::RETCODE_ERROR; } // 创建新的更改并添加到历史记录中 WriteParams wparams; ReturnCode_t ret = create_new_change_with_params(ALIVE, data, wparams); if (ret != ReturnCode_t::RETCODE_OK) { get_payload_pool()->release_payload(*payload); return ret; } // 设置时间戳并发布更改 CacheChange_t* change = history_.get_last_added_change(); change->serializedPayload = *payload; change->sourceTimestamp = timestamp; writer_->add_change(change); } return ReturnCode_t::RETCODE_OK; } write_w_timestamp函数与write函数的实现非常相似。不同之处在于它接受一个额外的参数timestamp,用于指定数据的时间戳。 write_w_timestamp函数的主要步骤如下:
register_instance ReturnCode_t DataWriterImpl::register_instance(void* key) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } // 检查键值是否为空 if (key == nullptr) { return ReturnCode_t::RETCODE_BAD_PARAMETER; } // 检查键是否已注册 if (history_.key_exists(key)) { return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; } // 创建新的更改并添加到历史记录中 WriteParams wparams; ReturnCode_t ret = create_new_change_with_params(NOT_ALIVE_UNREGISTERED, key, wparams); if (ret != ReturnCode_t::RETCODE_OK) { return ret; } // 将实例注册到历史记录中 CacheChange_t* change = history_.get_last_added_change(); history_.register_instance(change, key); return ReturnCode_t::RETCODE_OK; } register_instance函数的主要步骤如下:
history_是DataWriterImpl类中的成员变量,它是用来管理DataWriter的历史更改记录的对象。 history_的作用是存储DataWriter发送的所有更改。每当DataWriter要发送新的数据时,它会创建一个新的更改(CacheChange_t对象),并将其添加到history_中。历史记录中的更改可以按照序列号进行排序,以便按照正确的顺序发送和传输数据。 通过管理历史记录,DataWriter可以实现一些重要的功能,例如支持可靠性、历史访问、数据回溯和重传等。历史记录还可用于处理订阅者的需求,如按需读取、历史数据查询和数据过滤等。 在DataWriterImpl中,history_是一个WriterHistory类型的对象,它实现了管理和维护历史更改记录的功能。WriterHistory是Fast DDS库提供的一个用于管理历史更改的实现。 通过使用history_,DataWriter可以跟踪和管理发送的数据,保证数据可靠性和一致性,并提供灵活的历史访问和数据处理能力。 get_publication_matched_status ReturnCode_t DataWriterImpl::get_publication_matched_status(PublicationMatchedStatus& status) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } { std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex()); // 获取PublicationMatchedStatus并复制给status参数 status = publication_matched_status_; // 重置current_count_change和total_count_change publication_matched_status_.current_count_change = 0; publication_matched_status_.total_count_change = 0; } // 设置相应的状态条件 user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::publication_matched(), false); return ReturnCode_t::RETCODE_OK; } get_publication_matched_status函数的主要步骤如下:
DataWriter的当前PublicationMatchedStatus表示与该DataWriter相关的订阅者(DataReader)的匹配状态。PublicationMatchedStatus记录了与DataWriter相关的订阅者数量的变化情况。 PublicationMatchedStatus结构包含以下字段:
通过监视PublicationMatchedStatus的变化,可以了解DataWriter与订阅者之间的匹配情况的变化。例如,当新的订阅者与DataWriter匹配时,current_count和total_count会增加,并且可以通过检查current_count_change和total_count_change字段来获知匹配状态的变化。 使用PublicationMatchedStatus可以实现一些功能,例如:
get_offered_deadline_missed_status ReturnCode_t DataWriterImpl::get_offered_deadline_missed_status(OfferedDeadlineMissedStatus& status) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } { std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex()); // 获取OfferedDeadlineMissedStatus并复制给status参数 status = offered_deadline_missed_status_; // 重置total_count和total_count_change offered_deadline_missed_status_.total_count_change = 0; } // 设置相应的状态条件 user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_deadline_missed(), false); return ReturnCode_t::RETCODE_OK; } get_offered_deadline_missed_status函数的主要步骤如下:
OfferedDeadlineMissedStatus表示DataWriter的最后期限未达到的情况。该状态记录了DataWriter未能按照其所定义的最后期限要求及时发送数据的统计信息。 OfferedDeadlineMissedStatus结构包含以下字段:
通过监视OfferedDeadlineMissedStatus的变化,可以了解DataWriter未按照最后期限要求发送数据的情况。当DataWriter未能及时发送数据时,total_count和total_count_change字段将增加,可以通过检查这些字段的值来获知最后期限未达到的次数和变化情况。 使用OfferedDeadlineMissedStatus可以实现一些功能,例如:
ReturnCode_t DataWriterImpl::get_offered_incompatible_qos_status(OfferedIncompatibleQosStatus& status) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } { std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex()); // 获取OfferedIncompatibleQosStatus并复制给status参数 status = offered_incompatible_qos_status_; // 重置total_count和total_count_change offered_incompatible_qos_status_.total_count_change = 0; } // 设置相应的状态条件 user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_incompatible_qos(), false); return ReturnCode_t::RETCODE_OK; } get_offered_incompatible_qos_status函数的主要步骤如下:
OfferedIncompatibleQosStatus表示DataWriter提供的QoS(Quality of Service)与订阅者(DataReader)的QoS不兼容的情况。该状态记录了DataWriter与订阅者之间QoS不兼容的统计信息。 OfferedIncompatibleQosStatus结构包含以下字段:
通过监视OfferedIncompatibleQosStatus的变化,可以了解DataWriter与订阅者之间QoS不兼容的情况。当DataWriter与订阅者发现不兼容的QoS时,total_count和total_count_change字段将增加,可以通过检查这些字段的值来获知不兼容QoS的次数和变化情况。 使用OfferedIncompatibleQosStatus可以实现一些功能,例如:
get_liveliness_lost_status ReturnCode_t DataWriterImpl::get_liveliness_lost_status(LivelinessLostStatus& status) { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } { std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex()); // 获取LivelinessLostStatus并复制给status参数 status = liveliness_lost_status_; // 重置total_count和total_count_change liveliness_lost_status_.total_count_change = 0; } // 设置相应的状态条件 user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::liveliness_lost(), false); return ReturnCode_t::RETCODE_OK; } get_liveliness_lost_status函数的主要步骤如下:
在数据通信中,活跃性(Liveliness)是指一个实体(如DataWriter)继续存在和活跃的状态。在发布-订阅模型中,DataWriter通过发送活跃性消息或定期发送心跳消息来证明自己的活跃状态。这样,订阅者(DataReader)能够确认DataWriter是否仍然处于活跃状态,以便维持通信链接。 活跃性的目的是确保数据通信中的实体保持活跃,以便保持通信链路的有效性和一致性。通过活跃性机制,可以检测到DataWriter是否不再活跃,即停止发送活跃性消息或心跳消息,或者由于某些原因无法满足活跃性要求。当DataWriter丢失活跃性时,订阅者可以做出相应的处理,如更新订阅者的状态、重新分配资源或采取其他容错措施。 get_sending_locators ReturnCode_t DataWriterImpl::get_sending_locators(rtps::LocatorList& locators) const { // 检查DataWriter是否已启用 if (writer_ == nullptr) { return ReturnCode_t::RETCODE_NOT_ENABLED; } // 获取DataWriter正在使用的发送定位器列表 rtps::LocatorList sending_locators = writer_->getRTPSParticipant()->getSendingLocators(); // 将发送定位器列表复制给输出参数locators locators = sending_locators; return ReturnCode_t::RETCODE_OK; } get_sending_locators函数的主要步骤如下:
通过调用get_sending_locators函数,可以获取DataWriter正在使用的发送定位器列表。发送定位器列表包含了DataWriter用于发送数据的网络定位器信息,可用于了解DataWriter当前的通信配置和连接信息。这对于监视和调试数据传输以及网络配置非常有用。 |
原文地址:https://blog.csdn.net/qq_32378713/article/details/131699450
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:https://www.msipo.com/article-1213.html 如若内容造成侵权/违法违规/事实不符,请联系MSIPO邮箱:3448751423@qq.com进行投诉反馈,一经查实,立即删除!
Copyright © 2023, msipo.com