0. 들어가며...
자율주행 알고리즘을 개발할 때, C++을 사용하는 큰 이유중 하나는 thread를 사용하는 병렬 컴퓨팅에 있다고 생각한다.
Thread 시스템의 능력을 최대한으로 활용하기 위해서는 mutex와 같은 상호 간섭 배제 수단을 최대한 필요한 곳에만 적용하여
모든 thread가 ‘병렬’적으로 동작하도록 해야한다.
개별 thread가 각각 완전히 독립적인 작업만 하는 경우에는 크게 신경써야하는 것이 없다. (modern c++에서는 smart pointer 등 포인터 관리에 있어서도 개발자의 골치를 아프게 하는 문제가 많이 줄었다.)
하지만, 자율주행 알고리즘에서 병렬 컴퓨팅의 핵심은 각자 계산한 결과를 서로서로 참조하는 것에 있다.
예를 들어, 전역 경로 생성을 하면서 장애물 회피 경로를 계산해야한다. 회피 경로는 전역 경로를 참고해야하므로, 전역 경로 계산 thread와 엮여있다. 그러면서도 전역 경로가 계속 추가되는 경우 (Fleet Management System 등과 연동하는 경우) 전역 경로를 계산하는 것은 회피 경로와 별개로 동작해야한다. 제어에 있어서도 회피 경로를 추종할 때, 현재 주어진 회피 경로를 제어가 추종하는 동안, 새로운 회피 경로는 계산 되어야 하며, 계산이 완료 되면 제어 thread에서는 이를 반영해서 다시 명령을 내려야한다.
단순하게 안전성만 신경쓴다면
mutex 범벅과 함께
전역경로 생성 중에는 회피 경로 생성을 멈추고, 회피 경로 생성 중에는 제어를 멈추면 된다.
Single thread와 비슷해질 수록 안전해진다. 그럼 c++을 굳이...?
하지만 효율과 연산속도를 위해서는 개별 연산은 병렬적으로 하고, 결과만 공유해야한다.
이 때, 많이 사용하는 것이 Get / Set 구조이다. 간단한 구조는 아래와 같다.
1. 멤버 변수 공유 방법
GET / SET 함수
기본적으로 많이 사용하는 get / set 함수는 아래와 같다.
std::shared_mutex mutex_; // 읽기(shared)와 쓰기(unique)를 구분해서 지원한다. 메모리는 동시에 읽을 수는 있지만, 쓰는 행위에 대해서는 배타적이여야 한다.
int data_;
int get()
{
std::shared_lock<std::shared_lock> rlock(mutex_); // shared_lock은 다른 thread에서의 shared_lock 권한요청도 승인한다. unique_lock에 대해서는 배타적이다.
return data_;
}
void set(int new_data) // 기본 데이터 타입은 복사가 빠르지만, 그 외에는 const T&로 넘기는게 훨씬 빠름에 유의.
{
std::unique_lock<std::shared_lock> wlock(mutex_); // unique_lock이 걸린 동안에는 다른 unique_lock이나 shared_lock은 권한을 획득할 수 없다.
data_ = new_data;
}
자율주행 알고리즘을 예로 들면, 멤버변수로 전역 경로, 회피 경로 등을 선언하고, get / set을 통해서 서로 주고받을 수 있다.
#include <memory.h>
#include <atomic>
#include <chrono>
#include <iostream>
#include <mutex>
#include <random>
#include <shared_mutex>
#include <thread>
using namespace std::chrono_literals;
std::atomic<bool> running_{true};
std::shared_mutex status_mutex_;
float number_ = 0;
enum Status {
IDLE = 0,
RUN = 1,
STOP = 2,
};
Status status_{IDLE};
Status get_status() {
std::shared_lock<std::shared_mutex> rlock(status_mutex_);
return status_;
}
void set_status(const Status& status) {
std::unique_lock<std::shared_mutex> wlock(status_mutex_);
status_ = status;
}
void thread_A() {
while (running_.load() == true) {
Status status = get_status();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
switch (status) {
case Status::IDLE:
std::cout << "thread A: current status: IDLE\n";
break;
case Status::RUN:
std::cout << "thread A: current status: RUN\n";
break;
case Status::STOP:
std::cout << "thread A: current status: STOP\n";
break;
default:
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void thread_B() {
while (running_.load() == true) {
Status status = get_status();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
switch (status) {
case Status::IDLE:
std::cout << "thread B: current status: IDLE\n";
break;
case Status::RUN:
std::cout << "thread B: current status: RUN\n";
break;
case Status::STOP:
std::cout << "thread B: current status: STOP\n";
break;
default:
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void thread_C() {
while (running_.load() == true) {
Status status = get_status();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
switch (status) {
case Status::IDLE:
std::cout << "thread C: current status: IDLE\n";
break;
case Status::RUN:
std::cout << "thread C: current status: RUN\n";
break;
case Status::STOP:
std::cout << "thread C: current status: STOP\n";
break;
default:
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void thread_D() {
while (running_.load() == true) {
Status status = get_status();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
switch (status) {
case Status::IDLE:
std::cout << "thread D: current status: IDLE\n";
break;
case Status::RUN:
std::cout << "thread D: current status: RUN\n";
break;
case Status::STOP:
std::cout << "thread D: current status: STOP\n";
break;
default:
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void status_manager() {
while (running_.load() == true) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distrib(0, 2);
int num = distrib(gen);
switch (num) {
case 0:
std::cout << "Manager: Set status to IDLE\n";
set_status(IDLE);
break;
case 1:
std::cout << "Manager: Set status to RUN\n";
set_status(RUN);
break;
case 2:
std::cout << "Manager: Set status to STOP\n";
set_status(STOP);
break;
case -1:
std::cout << "\n\n\n Manager: [END] \n\n\n";
running_.store(false);
break;
default:
std::cout << "Manager: Please enter either 0, 1, 2 or -1.\n";
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
}
int main() {
std::thread A(thread_A);
std::thread B(thread_B);
std::thread C(thread_C);
std::thread D(thread_D);
std::thread manager(status_manager);
if (A.joinable()) A.join();
if (B.joinable()) B.join();
if (C.joinable()) C.join();
if (D.joinable()) D.join();
}
2. 문제 상황
이렇게 multi thread에서 변수 공유는 해결했는데... 자율주행 등과 같은 복잡한 문제는 각 thread는 현재 상태에 맞추어 다르게 동작해야한다. (상태라고 하는 것은 대기, 제자리 회전, 일반 주행, 일시 정지 등이 있을 수 있겠다.)
당연하지만 get / set으로는 서로 동작 주기가 각기 다른 여러 thread의 상태(주행중, 일시정지, 회전 등)를 동기화 시킬 수 없다. A thread에서 get을 하는 타이밍과 B thread에서 get을 하는 타이밍이 다르고, set 이 일어난다면, 두 thread는 서로 다른 status를 위한 동작을 하고 있다. (제자리 회전하는 명령과 일반 주행하는 회피경로 생성이 동시에 수행된다는 얘기다.)
이런 문제는 항상 발생하는 것이 아니기 때문에, Figure. 1의 결과처럼 잘 동작하는 경우도 있지만, 드물지않게 status가 이미 바뀌었음에도 불구하고, 동작은 이전 status를 기준으로 실행되는 현상이 발생한다. (Figure. 2).
3. 문제 정의
단순히 변수를 공유하는 것이 아니라, 모든 thread가 동일한 변수 값을 인지하고 움직여야하는 상황이다.
문제를 해결하기 위한 두 가지 요소는 다음과 같이 정의된다.
- status가 변경되는 순간 모든 thread에서 해당 변수의 값은 동일하게 관측되어야 한다.
- 모든 thread가 status가 변경되기 전에는 개별 연산이 이루어지지 않은 상태로 대기해야 한다.
처음에는 std::conditional_variable
(== cv)를 활용하려고 했으나, 대기를 위해 lock과 unlock을 반복하면서(.wait()
) thread를 잡고 있는 것은 불필요한 연산처럼 보였다 (내가 제안하는 방식은 thread의 기본 작동 주기마다 검사를 하기 때문에 cv에 비해 조금 더 연산 횟수가 적다.). 또한, cv는 notify하는 시점에 연결된 cv들의 lock를 풀어서 thread를 진행시켜주는 것인데, 이 문제의 핵심은 어떻게 다시 시작하느냐가 아니라, 언제 notify를 해야하는가 이다. (단순히 값을 넣고 notify를 하는게 아니라, 다른 thread 들이 변경된 값을 ‘인지’한 이후에 다시 재개 중요하다.)
4. 해결방법
기본 아이디어는 thread의 상태를 관리해주는 manager thread를 설정해서 Finite State Machine(FSM)을 관리하도록 하였다.
Status가 변경될 때마다 status 상태 변수(atomoic<bool>
)를 변경한다. 각 thread는 해당 변수의 상태를 읽어서 변경이 되었다면, 본인 thread는 이를 확인한 상태임을 표시한다. (atomic<int>
)
모든 thread가 상태의 변경을 확인한 경우, status 상태 변수를 false로 변경하여, 다시 thread가 동작할 수 있도록 한다.
아래는 아이디어를 구현한 코드이다.
thread A,B,C,D는 각기 다른 일을 하는 것을 표현했다. 구조의 핵심은 atomic으로 선언한 status_changed_
변수를 통해 status가 변경되는 것을 원자적으로 관리하는 것과 변경된 이후 각 thread가 이를 인지한 경우 인지 했음을 알리는 것이다 (through *_sync_
).
모든 thread가 status의 변경을 인지했다면 manager thread에서 status_changed_
를 false
로 변경하고, 각 sync 변수도 다시 false로 변경해준다. (변경을 모두 인지했다면, 어떤 thread가 먼저 시작하는지는 신경쓰지 않아도 된다.)
#include <memory.h>
#include <atomic>
#include <chrono>
#include <iostream>
#include <mutex>
#include <random>
#include <shared_mutex>
#include <thread>
using namespace std::chrono_literals;
std::atomic<bool> running_{true};
std::shared_mutex status_mutex_;
std::atomic<bool> status_changed_{false};
std::atomic<bool> a_sync_{false};
std::atomic<bool> b_sync_{false};
std::atomic<bool> c_sync_{false};
std::atomic<bool> d_sync_{false};
float number_ = 0;
enum class Status {
IDLE = 0,
RUN = 1,
STOP = 2,
};
Status status_{Status::IDLE};
Status get_status() {
std::shared_lock<std::shared_mutex> rlock(status_mutex_);
return status_;
}
void set_status(const Status& status) {
std::unique_lock<std::shared_mutex> wlock(status_mutex_);
if (status_ != status) {
status_ = status;
status_changed_.store(true);
}
}
void thread_A() {
while (running_.load() == true) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (status_changed_.load() == true) {
a_sync_.store(true);
continue;
}
Status status = get_status();
switch (status) {
case Status::IDLE:
std::cout << "thread A: current status: IDLE\n";
break;
case Status::RUN:
std::cout << "thread A: current status: RUN\n";
break;
case Status::STOP:
std::cout << "thread A: current status: STOP\n";
break;
default:
break;
}
}
}
void thread_B() {
while (running_.load() == true) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (status_changed_.load() == true) {
b_sync_.store(true);
continue;
}
Status status = get_status();
switch (status) {
case Status::IDLE:
std::cout << "thread B: current status: IDLE\n";
break;
case Status::RUN:
std::cout << "thread B: current status: RUN\n";
break;
case Status::STOP:
std::cout << "thread B: current status: STOP\n";
break;
default:
break;
}
}
}
void thread_C() {
while (running_.load() == true) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (status_changed_.load() == true) {
c_sync_.store(true);
continue;
}
Status status = get_status();
switch (status) {
case Status::IDLE:
std::cout << "thread C: current status: IDLE\n";
break;
case Status::RUN:
std::cout << "thread C: current status: RUN\n";
break;
case Status::STOP:
std::cout << "thread C: current status: STOP\n";
break;
default:
break;
}
}
}
void thread_D() {
while (running_.load() == true) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (status_changed_.load() == true) {
d_sync_.store(true);
continue;
}
Status status = get_status();
switch (status) {
case Status::IDLE:
std::cout << "thread D: current status: IDLE\n";
break;
case Status::RUN:
std::cout << "thread D: current status: RUN\n";
break;
case Status::STOP:
std::cout << "thread D: current status: STOP\n";
break;
default:
break;
}
}
}
void status_manager() {
while (running_.load() == true) {
std::this_thread::sleep_for(std::chrono::milliseconds(300));
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distrib(0, 2);
int num = distrib(gen);
if (status_changed_.load() == true) {
if (a_sync_.load() == true && b_sync_.load() == true && c_sync_.load() == true && d_sync_.load() == true) {
status_changed_.store(false);
a_sync_.store(false);
b_sync_.store(false);
c_sync_.store(false);
d_sync_.store(false);
}
continue;
}
switch (num) {
case 0:
std::cout << "Manager: Set status to IDLE\n";
set_status(Status::IDLE);
break;
case 1:
std::cout << "Manager: Set status to RUN\n";
set_status(Status::RUN);
break;
case 2:
std::cout << "Manager: Set status to STOP\n";
set_status(Status::STOP);
break;
case -1:
std::cout << "\n\n\n Manager: [END] \n\n\n";
running_.store(false);
break;
default:
std::cout << "Manager: Please enter either 0, 1, 2 or -1.\n";
break;
}
}
}
int main() {
std::thread A(thread_A);
std::thread B(thread_B);
std::thread C(thread_C);
std::thread D(thread_D);
std::thread manager(status_manager);
if (A.joinable()) A.join();
if (B.joinable()) B.join();
if (C.joinable()) C.join();
if (D.joinable()) D.join();
if (manager.joinable()) manager.join();
}
5. 결과
아래는 결과이다. 이전과 다르게 status가 바뀐 것을 모두 인지하고 동작하는 것을 알 수 있다.
'Side Project > C++ 관련' 카테고리의 다른 글
[C++] std::vector의 reserve와 resize (0) | 2024.08.02 |
---|