Синхронизация потоков в OC Linux
Если вы хотя бы раз разрабатывали многопоточное приложение, не важно под какую ос - думаю вы сталкивались с понятием синхронизации. Т.е. я не собираюсь досконально разбирать что это такое и для чего нужно - есть масса руководств по этой теме :). Здесь будет рассматриваться один конкретный случай синхронизации потоков применительно к ОС Linux. И так приступим.
Для создания потока используется библиотека pthread и вызов pthread_create, для синхронизации в этой же библиотеке описаны специальные объекты - мутексы. Мутекс - это объект который может принадлежать в некий момент времени только одному потоку и имеющий два состояния - занят и свободен. Поток пытающийся получить доступ к мутексу в случае если последний занят будет остановлен системой до освобождения объекта. На этом собственно и основана синхронизация - перед использованием общего ресурса потоки сначала обращаются к мутексу и в конечном счете выстраиваются в очередь.
Все отлично работает - но рассмотрим ситуацию когда есть много потоков выполняющих только чтение некоего ресурса(они не нуждаются в синхронизации между собой) и один или несколько потоков выполняющих изменение этого ресурса. Т.е. синхронизировать нужно этот пишущий поток со всеми читателями. Если просто использовать мутекс для синхронизации доступа к ресурсу очевидно что и потоки чтения будут получать доступ туда последовательно, что в свою очередь замедлит приложение, фактически превратив его в однопоточное :). В этой ситуации одним из вариантов решения будет применение семафора. Семафор - это специальный объект ядра предназначенный для взаимодействия процессов в системе. Не буду утомлять описанием этого объекта, это прекрасно сделали до меня, например тут Linux Interprocess Communications. Упомяну только что в системе создается именованное множество семафоров(содержащее минимум 1 семафор), каждый семафор содержит некое количество ресурсов выраженное целым числом. Поток может запрашивать ресурсы у семафора и естественно должен отдавать их обратно когда они более не нужны. В случае если запрошенное количество ресурсов недоступно, но меньше максимального имеющегося количества - поток ожидает освобождения(в случае если не установлена опция - без ожидания)- иначе возвращается ошибка. Таким образом семафор более гибкий механизм синхронизации(однако неприятность в том что это объект ядра). Попробуем применить его к озвученной выше задаче.
Для облегчения работы рассмотрим объектный интерфейс реализующий множество семафоров состоящее из одного семафора: В примере 2 файла - можно безболезненно слить их в один, например sem.cpp :)
/*sem.h*/ #include <sys/types.h> #include <unistd.h> #include <errno.h> // semafore #include <ctype.h> #include <stdlib.h> #include <sys/ipc.h> #include <sys/sem.h> /* В моей системе данное объединение не объявлено в подключенных файлах - если компилятор напишет что оно ранее объявлено - просто сотрите :) */ union semun { int val; /* Value for SETVAL */ struct semid_ds *buf; /* Buffer for IPC_STAT, IPC_SET */ unsigned short *array; /* Array for GETALL, SETALL */ struct seminfo *__buf; /* Buffer for IPC_INFO (Linux specific) */ }; class sem { private: int sid; // идентификатор семафора key_t key; // ключ по которому получаем идентификатор int res_count; // количество ресурсов у семафора public: /* кол-во ресурсов некое случайное число путь в системе - обязательно должен существовать ! */ sem(int max_res, int id, const char* identify); /* деструктор - тут удаляем семафор, иначе он останется до следующей перезагрузки системы или пока его кто-то явно не удалит */ ~sem(); /* обертки для занятия/освобождения ресурсов */ bool lock(int res); bool unlock(int res); }; typedef sem* psem;;
Cледующий файл:
/*sem.cpp*/ #include "sem.h" sem::sem(int max_res, int id, const char* identify) { sid = -1; res_count = 0; /* получаем ключ для семафора */ if ((key = ftok(identify,id)) == -1) { return; } /* 0666 - rw для всех, чтобы потом можно было обратиться к семафору от любого пользователя системы, в общем то личное дело каждого сначала пытаемся открыть имеющийся семафор с таким ключом - и удалить его, старый нам ни к чему */ if ((sid = semget(key, 0, 0666)) != -1) { if (semctl(sid, 0, IPC_RMID, 0) == -1) { sid = 0; } } if (sid != 0) // проверяем что семафор был найден и удален или не существовал { /* создаем с флагом IPC_EXCL - означает что в случае если семафор уже имеется - вызов будет провален c значением EEXIST, без возврата значения отрытого уже существующего семафора */ if ((sid = semget( key, 1, IPC_CREAT | IPC_EXCL | 0666 )) != -1) // { union semun semopts; semopts.val = max_res; semctl(sid, 0, SETVAL, semopts); res_count = max_res; } } else sid = -1; } sem::~sem() { if (sid != -1) { semctl(sid, 0, IPC_RMID, 0); } } bool sem::lock(int res) { /* отсекаем неверные запросы сразу, не используя обращения к структурам семафора */ if ((res > res_count) || (sid == -1)) { return false; } /* параметры в структуре 0 - номер семафора количество ресурсов - если захватываем, должно быть отрицательным 0 - ждать если на данный момент нет достаточного количества ресурсов или IPC_NOWAIT - возвращать ошибку */ struct sembuf sem_lock={0,(-1)*res,0}; /* параметры запроса - идентификатор семафора - структура которую заполняли выше - ее адрес - сколько раз выполнить операцию */ if ((semop(sid, &sem_lock, 1)) == -1) { return false; } return true; } bool sem::unlock(int res) { /* аналогично функции lock */ if ((res > res_count) || (sid == -1)) { return false; } struct sembuf sem_unlock= { 0, res, IPC_NOWAIT}; if ((semop(sid, &sem_unlock, 1)) == -1) { return false; } return true; }
Создадим семафор с 10 ресурсами например, читающие потоки будут запрашивать по 1 ресурсу за раз, пишущий сразу 10. В итоге читающие потоки будут блокировать друг друга только в случае если их запущенное число превысит 10(нужно самостоятельно подбирать нужное значение). Поток записи же при старте будет гарантированно ждать освобождения ресурса всеми потоками чтения и заставлять ждать позднее стартующие потоки окончания своей работы. Хочу обратить внимание на один нюанс - перед запросом получения ресурса потоком чтения следует ставить запрос на разрешение обратиться за получением этого самого ресурса. Т.е. поток чтения в случае если не установлен некий флаг разрешения зацикливается и ждет пока его установят, только после этого приступая к запросу. Это нужно для того чтобы потоки чтения не оттесняли поток записи в конец цикла ожидания - например освобождено уже 9 ресурсов из 10, поток записи ждет, но запускается поток чтения, запрашивает ресурс и естественное его получает - в итоге 8 свободных ресурсов :), запись оттесняется. Кстати очень удобно если требуется выполнение какой-либо работы во время простоя системы :).
Рассмотрим пример программы:
/* stest.cpp */ #include <sys/types.h> #include <unistd.h> #include <errno.h> // threads #include <pthread.h> // semaphore #include "sem.h" #define max_rs 10 using namespace std; /* Флаг остановки доступа к ресурсам */ bool stop_access = false; // процедура потока чтения void* read_proc(void* arg) { if (arg == NULL) { cerr << "Argument empty \n"; return NULL; } int pid = getpid(); // ждем разрешения while(stop_access) {} // получаем ресурс if (((psem)(arg))->lock(1)) { // типа что-то делаем :) cout << "reader lock " << pid << endl; sleep(1); cout << " unlock " << pid << endl; if (((psem)(arg))->unlock(1)) { } } return NULL; } // процедура потока записи void* write_proc(void* arg) { if (arg == NULL) { cerr << "Argument empty \n"; return NULL; } int pid = getpid(); // запрещаем обращение к ресурсам stop_access = true; if (((psem)(arg))->lock(10)) { stop_access = false; cout << "writer lock " << pid << endl; sleep(4); cout << " unlock " << pid << endl; if (((psem)(arg))->unlock(10)) { } } return; } main(int argc,char* argv[]) { sem* sm = new sem(10,getpid(),"."); // инициализируем потоки pthread_attr_t attr; pthread_attr_init(&attr); // отсоединенный поток - не ждем его возврата pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); int ret; pthread_setcancelstate(PTHREAD_CANCEL_ENABLE , &ret); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &ret); int i = 0; pthread_t thread_handle; // последовательный запуск 8-ми потоков for (i = 0; i < 8; i++) { if (pthread_create(&thread_handle,NULL,&read_proc,sm) != 0) { cout << strerror(errno) << endl; } // запускаем поток записи на 3-е итерации if (i == 2) { if (pthread_create(&thread_handle,NULL,&write_proc,sm) != 0) { cout << strerror(errno) << endl; } } } // задержка выхода - чтобы увидеть результат sleep(10); sm->~sem(); return 0; }
Собирается эта программа очень просто: g++ -g stest.cpp sem.cpp -lpthread -o stest
Вывод примерно такой:
reader lock 31718 reader lock 31719 reader lock 31720 unlock 31718 unlock 31719 unlock 31720 writer lock 31721 unlock 31721 reader lock 31726 reader lock 31725 reader lock 31723 reader lock 31724 reader lock 31722 unlock unlock 31726 unlock 31723 unlock 31724 unlock 31722 31725
Заметно что запустившийся поток записи заблокировал ресурс, отработал и только после этого дал зеленый свет остальным потокам. Потоки чтения не блокируют друг друга - об этом можно судить по порядку вывода записей освобождения ресурсов - происходит не в порядке захвата.
Собственно на этом заканчивается мое повествование, для более подробной информации по семафорам можно пройти по ссылке которая упоминалась выше. Надеюсь этот материал поможет вам в решении ваших задач.