ReportTransporter 是诊断报告系统中负责将 JSON 序列化后的报告数据异步发送至集度服务端的核心传输引擎。它采用经典的生产者-消费者模式,通过独立工作线程解耦报告的生成与上传,并在传输失败时自动执行重入队重试。本文档将系统性地剖析其线程模型、队列同步策略、策略模式扩展点以及与 ReportRecordCollector 的事件驱动集成方式。
架构概览
ReportTransporter 由三层组件协同构成:线程安全队列(ReportQueue)作为缓冲中介,单例调度器(ReportTransporter)负责生命周期与线程管理,传输策略接口(ReportTransportAction)通过策略模式注入具体的网络发送逻辑。三者的协作关系如下:
graph LR
subgraph Producer ["生产者侧:数据采集"]
RRC[ReportRecordCollector]
end
subgraph TransportLayer ["ReportTransporter 异步传输层"]
direction TB
RQ[ReportQueue<br/>std::queue<std::string*><br/>+ std::mutex]
RT[ReportTransporter Singleton<br/>原子标志 mIsRunning<br/>detach 工作线程]
RTA[ReportTransportAction<br/>策略接口 doTransport]
end
subgraph Consumer ["消费者侧:网络发送"]
CS["callServer<br/>直接调用"]
end
RRC -->|"EventHandler<br/>onBlockCompleted<br/>序列化后 enque"| RQ
RT -->|"threadWork 循环<br/>deque → doTransport"| RTA
RTA -->|"成功: delete<br/>失败: re-enque"| RQ
RTA -->|"callServer(2) / callServer(31)"| CS数据流向从 ReportRecordCollector 触发事件开始:每个 Block 完成时,onBlockCompleted 将 StationDataReportRecord 序列化为 JSON 字符串并以 std::string*(堆指针)形式入队。工作线程在循环中出队后调用 ReportTransportAction::doTransport() 进行实际发送。发送成功则释放指针,发送失败则将同一指针重新入队,等待下一次重试。
Sources: ReportTransporter.hpp | ReportTransporter.cpp | ReportQueue.hpp
ReportQueue:互斥锁保护的 FIFO 指针队列
设计动机
报告数据是深度嵌套的 JSON 对象(如 StationDataReportRecord 包含多层 Category → Block → Item → ECU 的树形结构),序列化后的字符串体量较大。为减少拷贝开销,队列直接存储 std::string* 指针而非 std::string 值对象。这一设计将入队/出队的复制成本从 O(n)(n 为字符串长度)降至 O(1),代价是引入明确的内存所有权约定。
所有权与生命周期约定
ReportQueue 定义了严格的两阶段所有权规则:
| 操作 | 所有权变更 | 释放责任 |
|---|---|---|
enque(item) | 指针所有权从调用方转移至队列 | 队列负责释放(仅限仍在其内部的元素) |
deque() | 指针所有权从队列转移至调用方 | 调用方使用完毕后必须自行 delete |
clear() / 析构 | 销毁队列内所有剩余元素 | 队列内部逐一 delete |
当一个元素被 deque() 取出后,它在队列中不再有任何痕迹——队列无法跟踪它。因此"出队后由使用方释放"是一条不可违背的契约。ReportTransporter::threadWork 中在 doTransport 返回 0(成功)时立即 delete reportText,正是履行这一契约的体现。
Sources: ReportQueue.hpp | ReportQueue.cpp
线程安全实现
所有公开方法均使用 std::lock_guard<std::mutex> 在进入时锁定内部的 mQueueLock,确保即使在 std::thread::detach() 模式下,生产者(主线程中的事件回调)与消费者(工作线程)对底层 std::queue 的并发访问都是串行化的。size() 方法同样加锁,为外部查询提供了瞬时的、一致性的队列长度快照。
void ReportQueue::enque(std::string* item) {
std::lock_guard<std::mutex> lock(this->mQueueLock);
this->mQueue.push(item);
}
std::string* ReportQueue::deque() {
std::lock_guard<std::mutex> lock(this->mQueueLock);
std::string* item = this->mQueue.front();
this->mQueue.pop();
return item;
}值得注意的是,deque() 并未对"空队列时调用"做防护——它直接调用 front() + pop(),这依赖于调用方 threadWork 在调用前通过 size() > 0 进行了预判。
Sources: ReportQueue.cpp
ReportTransporter:单例调度器与 detach 线程模型
单例模式
ReportTransporter 通过 getDefault() 静态方法实现线程局部的惰性单例。首次调用时在堆上构造唯一实例并赋给静态指针 mDEFAULT。这个指针在程序生命周期内不释放(无 delete),适合贯穿整个工位测试会话的持久化场景。
ReportTransporter* ReportTransporter::mDEFAULT = nullptr;
ReportTransporter* ReportTransporter::getDefault() {
if (mDEFAULT == nullptr) {
mDEFAULT = new ReportTransporter();
}
return mDEFAULT;
}Sources: ReportTransporter.cpp
线程生命周期管理
start() / stop() 方法以 std::atomic_bool mIsRunning 作为状态开关,使用 memory_order_relaxed 保证原子性但不施加额外的顺序约束——这在此场景下是合理的,因为启动/停止本身即为序列化的用户操作:
| 方法 | 行为 |
|---|---|
start() | 若未运行,将 mIsRunning 置 true,创建 std::thread 并立即 detach() |
stop() | 将 mIsRunning 置 false,等待工作线程 join() 后销毁线程对象 |
| 析构函数 | 调用 stop() 确保线程退出,再调用 mQueue.clear() 清理残留数据 |
选择 detach() 而非 join() 管理线程意味着线程在启动后即与主线程解耦,无需显式等待。当 stop() 被调用时,mIsRunning 被设为 false,工作线程在下一轮循环体顶部检测到后自然退出;stop() 内部通过 joinable() + join() 安全地回收线程资源。
Sources: ReportTransporter.cpp
工作线程主循环
threadWork 是一个静态方法,接收 ReportTransporter* 上下文指针。其循环逻辑如下:
flowchart TD
START(["threadWork 开始"]) --> CHECK_RUN{"mIsRunning?"}
CHECK_RUN -->|false| END(["退出,打印 'thread end'"])
CHECK_RUN -->|true| CHECK_SIZE{"queue.size() > 0?"}
CHECK_SIZE -->|false| SLEEP1["sleep 10ms"]
SLEEP1 --> CHECK_RUN
CHECK_SIZE -->|true| CHECK_ACTION{"mTransportAction != nullptr?"}
CHECK_ACTION -->|false| SLEEP2["sleep 10ms(无 Action 时额外等待)"]
SLEEP2 --> CHECK_RUN
CHECK_ACTION -->|true| DEQUE["deque() 取出 reportText"]
DEQUE --> TRANSPORT["action->doTransport(reportText)"]
TRANSPORT --> CHECK_RESULT{"返回值 == 0?"}
CHECK_RESULT -->|否(失败)| REENQUE["enque 重新塞回队列<br/>printf 失败日志"]
REENQUE --> CHECK_RUN
CHECK_RESULT -->|是(成功)| DELETE["delete reportText<br/>printf 剩余队列项数"]
DELETE --> CHECK_RUN两次 sleep_for(10ms) 的区分值得关注:当队列为空时休眠是常规的避免忙等的策略;但当队列有数据而 mTransportAction 尚未设置时同样休眠,意味着 setAction() 必须在 start() 之前或之后的合理窗口内调用——否则数据将在队列中积压。
Sources: ReportTransporter.cpp
重试机制
当 doTransport 返回非零值时,工作线程执行 re-enqueue 操作:将同一个 std::string* 指针重新推入队列,不释放。这实现了无限重试语义——在网络抖动或服务端短暂不可用的场景下,失败的报文会在下一轮循环中被再次处理。当前实现没有最大重试次数或退避策略,重试间隔取决于队列深度和 sleep(10ms) 的空闲等待:如果队列中仅有失败报文,重试间隔约为 10ms。
ReportTransportAction:策略模式扩展点
ReportTransportAction 是一个抽象接口,仅暴露一个纯虚方法:
virtual int doTransport(std::string *reportText) = 0;
// 返回值: 0 表示成功, 非0 表示错误码通过 setAction() 注入具体实现,ReportTransporter 本身不关心"如何传输"——它只负责"何时传输"。这种设计使得传输逻辑可以独立于队列调度进行替换和测试。
实际实现:ReportTransportActionImpl
在 jidu_report.cpp 中的具体实现展示了完整的传输流程:
- 时间戳格式化:将 JSON 中的
testTimeUtc和executionTimeUtc从"YYYY-MM-DD HH:MM:SS.000"格式转换为 ISO 8601 风格"YYYY-MM-DDTHH:MM:SSZ"(空格→T,毫秒→Z) - 类型路由:
resultType == "03"→ 调用callServer(31, ...)—— 打印结果报告- 其他类型("01" Block 数据、"02" 工位结果)→ 调用
callServer(2, ...)
- 内存管理:解析后的
cJSON对象在发送完成后通过cJSON_Delete释放,序列化中间字符串通过cJSON_free释放
Sources: jidu_report.cpp
| resultType | 含义 | callServer 类型 | 产生时机 |
|---|---|---|---|
| "01" | 单个 Block 测试数据 | callServer(2) | 每个 blockEnd() 时通过 onBlockCompleted 触发 |
| "02" | 工位整体结果汇总 | callServer(2) | collectEnd() 时通过 generate02() 生成 |
| "03" | 打印结果报告 | callServer(31) | collectEnd() 时通过 generate03() 生成 |
与 ReportRecordCollector 的事件驱动集成
事件处理器注册
ReportRecordCollector 通过观察者模式与 ReportTransporter 解耦。在 jd_report_station_begin() 中,外部代码完成三个关键步骤:
transporter->setAction(action); // 注入传输策略
transporter->start(); // 启动工作线程
collector.addEventHandler(&handler); // 注册事件监听器ReportCollectorEventHandlerImpl 的每个回调方法都遵循同一模式:将传入的报告记录对象序列化为 JSON 字符串 → 在堆上分配新的 std::string → 通过 getDefault() 获取单例 → 调用 getQueue()->enque() 入队。
Sources: jidu_report.cpp
三种报告类型的触发时机
sequenceDiagram
participant OTX as OTX 脚本
participant RRC as ReportRecordCollector
participant Handler as EventHandlerImpl
participant RT as ReportTransporter
participant Queue as ReportQueue
participant Thread as 工作线程
participant Server as 集度服务器
OTX->>RRC: blockBegin(category, block)
OTX->>RRC: collectItemStepOK / collectItemStepNOK (多次)
OTX->>RRC: blockEnd(category, block)
RRC->>RRC: 创建 StationDataReportRecord (resultType=01)
RRC->>Handler: onBlockCompleted(newBlock)
Handler->>Handler: toJsonString()
Handler->>Queue: enque(new std::string(json))
Thread->>Queue: deque()
Thread->>Server: callServer(2, json)
OTX->>RRC: collectEnd()
RRC->>RRC: generate02() → StationResultReportRecord (resultType=02)
RRC->>Handler: onStationResultCompleted(newResult)
Handler->>Queue: enque(...)
RRC->>RRC: generate03() → PrintResultReportRecord (resultType=03)
RRC->>Handler: onPrintResultCompleted(newResult)
Handler->>Queue: enque(...)
Thread->>Server: callServer(2, ...) / callServer(31, ...)每个 Block 的测试数据(resultType "01")在 blockEnd 时立即异步上报,不等待工位结束。工位结束时额外生成 "02"(汇总结果)和 "03"(打印结果)两份报告。这种"流式上报 + 最终汇总"的策略确保了即使工位异常中断,已完成 Block 的数据也不会丢失。
Sources: ReportRecordCollector.cpp | ReportRecordCollector.cpp
外部 API 接口
jidu.h 中暴露的 C 语言 API 封装了 ReportTransporter 的完整生命周期:
| API 函数 | 作用 |
|---|---|
jd_report_station_begin() | 设置 Action、启动 Transporter、注册 EventHandler、开启采集 |
jd_report_station_end() | 采集补充信息(TPMS、电量等)、结束采集、休眠 500ms 等待队列排空、注销 EventHandler |
jd_report_getQueueSize() | 查询当前队列中待传输的报告项数(线程安全) |
jd_report_getResult() | 获取打印报告,在有 NG 项时会自旋等待队列排空后从服务端拉取 mask 数据 |
jd_report_station_end() 中的 sleep_for(500ms) 是一个经验性的等待,目的是在结束采集后给予工作线程足够的窗口将队列中剩余的数据发送完毕——这是一种简易的"排空等待"策略,而非精确的同步机制。
jd_report_getResult() 则展示了更复杂的同步模式:当存在 NG 项时,它自旋等待队列清空(while (queue->size() == 0) {}),然后调用 callServer(32) 获取服务端生成的打印 mask,将其与本地报告合并。这确保了打印结果展示的是服务端最终确认的数据视图。
Sources: jidu.h | jidu_report.cpp
设计考量与边界条件
优势
- 解耦采集与传输:报告数据的产生(工位操作)和发送(网络 I/O)在物理线程上完全隔离,采集侧不会因网络延迟而阻塞
- 自动重试:传输失败无需外部干预,报文自动重新入队
- 扩展开放:
ReportTransportAction接口使得传输目标可以替换(例如从 HTTP 切换为 MQTT)而无需修改队列调度逻辑 - 内存效率:指针传递避免了大 JSON 字符串的重复拷贝
需关注的边界
- 无界队列:
std::queue没有容量上限,在极端网络故障下可能无限堆积数据导致内存持续增长 - 无退避重试:失败后立即重新入队,网络恢复前将形成紧循环(10ms 间隔),CPU 占用恒定
- detach 线程:若
stop()未被调用而程序异常退出,detach 线程可能被强制终止,导致队列中未处理的报文永久丢失 - 非精确排空:
jd_report_station_end()中的 500ms 等待和jd_report_getResult()中的自旋等待均为启发式策略,在高负载场景下可能不足
相关文档
- ReportRecordCollector:工位数据分块采集与结果汇总 — 上游数据生产者,了解 Block/Item/ECU 的分层采集模型
- 报告数据模型:StationData、PrintResult 与 JSON 序列化 — 深入理解 ReportQueue 中传输的 JSON 结构
- callServer 接口设计:请求类型路由、预处理/后处理与脱敏机制 — ReportTransportAction 实际调用的服务端通信接口