104 void RecvThreadFunc();
106 int32_t
Start(
bool is_blocked);
109 std::atomic<bool> is_running_ = {
false};
110 std::atomic<bool> is_finish_recv_once_ = {
false};
113 MessageManager<SensorType> *pt_manager_ =
nullptr;
114 bool enable_log_ =
false;
115 bool is_init_ =
false;
116 std::future<void> async_result_;
121template <
typename SensorType>
125 can_client_ = can_client;
126 pt_manager_ = pt_manager;
127 enable_log_ = enable_log;
128 if (can_client_ ==
nullptr) {
129 AERROR <<
"Invalid can client.";
130 return ::apollo::common::ErrorCode::CANBUS_ERROR;
132 if (pt_manager_ ==
nullptr) {
133 AERROR <<
"Invalid protocol manager.";
134 return ::apollo::common::ErrorCode::CANBUS_ERROR;
137 return ::apollo::common::ErrorCode::OK;
140template <
typename SensorType>
142 AINFO <<
"Can client receiver thread starts.";
143 CHECK_NOTNULL(can_client_);
144 CHECK_NOTNULL(pt_manager_);
146 int32_t receive_error_count = 0;
147 int32_t receive_none_count = 0;
148 const int32_t ERROR_COUNT_MAX = 10;
149 auto default_period = 10 * 1000;
151 while (IsRunning()) {
152 is_finish_recv_once_.exchange(
false);
153 ADEBUG <<
"is_finish_recv_once_ 1 is " << is_finish_recv_once_.load();
154 std::vector<CanFrame> buf;
156 if (can_client_->Receive(&buf, &frame_num) !=
158 LOG_IF_EVERY_N(ERROR, receive_error_count++ > ERROR_COUNT_MAX,
160 <<
"Received " << receive_error_count <<
" error messages.";
161 cyber::USleep(default_period);
164 receive_error_count = 0;
166 if (buf.size() !=
static_cast<size_t>(frame_num)) {
167 AERROR_EVERY(100) <<
"Receiver buf size [" << buf.size()
168 <<
"] does not match can_client returned length["
169 << frame_num <<
"].";
172 if (frame_num == 0) {
173 LOG_IF_EVERY_N(ERROR, receive_none_count++ > ERROR_COUNT_MAX,
175 <<
"Received " << receive_none_count <<
" empty messages.";
176 cyber::USleep(default_period);
179 receive_none_count = 0;
181 for (
const auto &frame : buf) {
182 uint8_t len = frame.len;
183 uint32_t uid = frame.id;
184 const uint8_t *data = frame.data;
185 pt_manager_->Parse(uid, data, len);
187 AINFO <<
"recv_can_frame#" << frame.CanFrameString();
190 is_finish_recv_once_.exchange(
true);
191 ADEBUG <<
"is_finish_recv_once_ 2 is " << is_finish_recv_once_.load();
194 AINFO <<
"Can client receiver thread stopped.";
197template <
typename SensorType>
199 return is_running_.load();
202template <
typename SensorType>
204 ADEBUG <<
"is_finish_recv_once_ state is " << is_finish_recv_once_.load();
205 return is_finish_recv_once_.load();
208template <
typename SensorType>
210 if (is_init_ ==
false) {
211 return ::apollo::common::ErrorCode::CANBUS_ERROR;
213 is_running_.exchange(
true);
216 return ::apollo::common::ErrorCode::OK;
219template <
typename SensorType>
222 AINFO <<
"Stopping can client receiver ...";
223 is_running_.exchange(
false);
224 async_result_.wait();
226 AINFO <<
"Can client receiver is not running.";
228 AINFO <<
"Can client receiver stopped [ok].";