POSIX Threads/Synchronizacja między wątkami/Mutexy

Wstęp

edytuj

Mutex (MUTual EXclusion, wzajemne wykluczanie) jest blokadą, którą może uzyskać tylko jeden wątek. Mutexy służą głównie do realizacji sekcji krytycznych, czyli bezpiecznego w sensie wielowątkowym dostępu do zasobów współdzielonych.

Schemat działania na mutexach jest następujący:

  1. pozyskanie blokady
  2. modyfikacja lub odczyt współdzielonego obiektu
  3. zwolnienie blokady

Mutex w pthreads jest opisywany przez strukturę typu pthread_mutex_t, zaś jego atrybuty pthread_mutexattr_t.

Inicjalizacja i zwalnianie mutexu

edytuj

Zmienna typu pthread_mutex_t może zostać zainicjowana na dwa sposoby:

  • poprzez przypisanie symbolu PTHREAD_MUTEX_INITIALIZER;
  • przez wywołanie funkcji pthread_mutex_init, która umożliwia również podanie atrybutów blokady.

Każdy mutex, bez względu na sposób inicjalizacji, musi zostać zwolniony funkcją pthread_mutex_destroy. Implementacja biblioteki może bowiem PTHREAD_MUTEX_INITIALIZER realizować poprzez wywołanie jakiejś funkcji, która np. alokuje pamięć i nie zwolnienie mutexu doprowadzi do wycieku pamięci.

  • pthread_mutex_t
mutex

Funkcje

edytuj
  • int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)  (doc)
inicjalizacja mutexa, wskaźnik na atrybuty attr może być pusty
  • int pthread_mutex_destroy(pthread_mutex_t *mutex)  (doc)
zwolnienie mutexu

Przykład

edytuj
pthread_mutex_t	mutex = PTHREAD_MUTEX_INITIALIZER;

/* lub */

pthread_mutex_t mutex;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);		/* inicjalizacja atrybutów mutexa */
pthread_mutex_init(&mutex, &attr);	/* inicjalizacja mutexa */
...
pthread_mutex_destroy(&mutex);		/* zwolnienie zasobów związanych z mutexem */

Pozyskiwanie i zwolnienie blokady

edytuj

Pozyskanie blokady umożliwiają trzy funkcje:

  1. pthread_mutex_lock,
  2. pthread_mutex_trylock,
  3. pthread_mutex_timedlock.

Jeśli żaden inny wątek nie posiada blokady, działają identycznie - tzn. blokada jest przyznawana wywołującemu wątkowi. Różnią się zachowaniem w przypadku niemożności uzyskania blokady:

  1. pthread_mutex_lock - oczekiwanie w nieskończoność, aż blokada zostanie zwolniona przez inny wątek;
  2. pthread_mutex_trylock - sterowanie wraca natychmiast, zwracając kod EBUSY;
  3. pthread_mutex_timedlock - oczekiwanie ograniczone czasowo, jeśli czas minie, zwraca kod ETIMEDOUT.

Wątek musi zwolnić blokadę funkcją pthread_mutex_unlock.

Funkcja pthread_mutex_timedlock jest dostępna, gdy system implementuje rozszerzenie TMO. W odróżnieniu od innych funkcji operujących na czasach oczekiwania (np. select dla plików), w których podaje się ile czasu ma upłynąć od chwili wywołania funkcji, w pthreads podawany jest czas bezwzględny.

Funkcje

edytuj
  • int pthread_mutex_lock(pthread_mutex_t *mutex)  (doc)
  • int pthread_mutex_trylock(pthread_mutex_t *mutex)  (doc)
  • int pthread_mutex_timedlock(pthread_mutex_t *mutex, const struct timespec *timeout)  (doc)

Szkic użycia

edytuj
#include <pthread.h>
#include <errno.h>
#include <time.h>

pthread_mutex_t	mutex = PTHREAD_MUTEX_INITIALIZER;

/* ... */
pthread_mutex_lock(&mutex);
	// działania na obiekcie współdzielonym
pthread_mutex_unlock(&mutex);

/* ... */
switch (pthread_mutex_trylock(&mutex)) {
	case 0:
		// działania na obiekcie współdzielonym
		pthread_mutex_unlock(&mutex);
		break;

	case EBUSY:
		puts("Blokadę posiada inny wątek");
		break;
	
	default:
		puts("Inny błąd");
		break
}

/* ... */
struct timespec timeout;
clock_gettime(CLOCK_REALTIME, &timeout);	// pobranie bieżącego czasu
timeout.tv_sec += 2;				// zwiększenie liczby sekund o 2

switch (pthread_mutex_timedlock(&mutex, &timeout)) {
	case 0:
		puts("Blokada pozyskana przed upływem 2 sekund");
			// działania na obiekcie współdzielonym
		pthread_mutex_unlock(&mutex);
		break;

	case ETIMEDOUT:
		puts("Upłynęły 2 sekundy");
		break;

	default:
		puts("Inny błąd");
		break;
}

Przykład

edytuj

Program demonstruje sekcję krytyczną z użyciem mutexów. Jeśli przy kompilacji zdefiniowane zostanie BLOKADA, wówczas mutex blokuje dostęp do zmiennej, która jest inkrementowana określoną liczbę razy przez każdy z wątków. W przeciwnym razie wątki zmieniają ją bez żadnej synchronizacji, co może prowadzić do błędu - w tym przypadku do niepoprawnego zliczenia.

#define _POSIX_C_SOURCE	200809L
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <errno.h>

#define test_errno(msg) do{if (errno) {perror(msg); exit(EXIT_FAILURE);}} while(0)


#define	N 10	/* liczba wątków */
#define K 1000	/* liczba iteracji (z tą wartością należy eksperymentować) */

pthread_mutex_t blokada;
int licznik = 0;		// globalny licznik, powinien być chroniony blokadą

void ms_sleep(unsigned ms) {
	struct timespec req;
	req.tv_sec  = (ms / 1000);
	req.tv_nsec = (ms % 1000 * 1000000);
	nanosleep(&req, NULL);
}
//------------------------------------------------------------------------

void* watek(void* numer) {
	int i;
	for (i=0; i < K; i++) {
#ifdef BLOKADA
		errno = pthread_mutex_lock(&blokada);
		test_errno("pthread_mutex_lock");
#endif
		licznik = licznik + 1;
		ms_sleep(1);
#ifdef BLOKADA
		errno = pthread_mutex_unlock(&blokada);
		test_errno("pthread_mutex_unlock");
#endif
	}

	return NULL;
}
//------------------------------------------------------------------------

int main() {
	pthread_t id[N];
	int i;

	printf("licznik = %d\n", licznik);

	errno = pthread_mutex_init(&blokada, NULL);
	test_errno("pthread_mutex_init");

	/* utworzenie wątku */
	for (i=0; i < N; i++) {
		errno = pthread_create(&id[i], NULL, watek, (void*)i);
		test_errno("pthread_create");
	}

	/* oczekiwanie na jego zakończenie */
	for (i=0; i < N; i++) {
		errno = pthread_join(id[i], NULL);
		test_errno("pthread_join");
	}

	printf("licznik = %d, spodziewana wartość = %d %s\n",
		licznik,
		N*K,
		(licznik != N*K ? "BŁĄD!!!" : "")
	);

	return EXIT_SUCCESS;
}
//------------------------------------------------------------------------

Przykładowe wyjście (z błędem):

$ ./przyklad
licznik = 0
licznik = 9968, spodziewana wartość = 10000 BŁĄD!!!

Typy mutexów

edytuj

Opcja XSI. Jednym z atrybutów mutexu jest jego typ, czy też rodzaj:

  1. zwykły (normal),
  2. rekursywny (recursive),
  3. bezpieczny (error check).

Na poziomie współpracy między wątkami rodzaj mutexu nie ma znaczenia, objawia się dopiero w obrębie jednego wątku w dwóch sytuacjach:

  • ponowna próba pozyskania blokady,
  • próba zwolnienia już zwolnionej blokady.

Ponowne blokowanie

edytuj

Można wyobrazić sobie sytuację (raczej prawdopodobną), gdy w programie istnieje funkcja pomocnicza, wykorzystywana przez wątki, która zakłada blokadę na pewne dane. Problem pojawia się w chwili, gdy wątek już pozyskał blokadę i wywołuje taką funkcję. Wówczas z punktu widzenia blokady wątek próbuje wykonać następującą sekwencję:

pthread_mutex_lock(&mutex);	// (1)
pthread_mutex_lock(&mutex);	// (2) - w funkcji pomocniczej
/* ... */
pthread_mutex_unlock(&mutex)	// (3) - w funkcji pomocniczej
pthread_mutex_unlock(&mutex)	// (4)
  • W przypadku mutexu zwykłego wykona się pierwsza funkcja pthread_mutex_lock (1), zaś na drugim jej wywołaniu (2) wątek zatrzyma się, oczekując na zwolnienie blokady - co nigdy nie nastąpi, bowiem sterowanie nie dojdzie do wiersza (3) ani (4). Występuje zakleszczenie (ang. deadlock).
  • W przypadku mutexu rekursywnego wykonają się wszystkie funkcje związane z blokadą. Mutex tego typu posiada dodatkowy licznik zagnieżdżeń, który z każdym wywołaniem funkcji pthrad_mutex_lock jest zwiększany, natomiast wywołanie pthread_mutex_unlock zmniejsza go - gdy osiągnie zero, blokada jest zwalniana.
  • W przypadku mutexu bezpiecznego drugie wywołanie pthread_mutex_lock zwróci kod błędu EDEADLK, oznaczający, że wątek już posiada tę blokadę.

Ponowne odblokowanie

edytuj

Jeśli blokada jest zwolniona ponowne wywołanie pthread_unlock mutexy bezpieczne i rekursywny zwracają błąd. Zachowanie zwykłego mutexu jest nieokreślone!

Przykład

edytuj

Ilustracja różnicy w działaniu mutexów.

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>

#define __USE_UNIX98
#include <pthread.h>

#define test_errno(msg) do{if (errno) {perror(msg); exit(EXIT_FAILURE);}} while(0)

pthread_t id;
pthread_mutex_t mutex;
pthread_mutexattr_t mutexattr;

void* watek(void* _arg) {
	int errno;

	// 1
	puts("przed wykonaniem pthread_mutex_lock (1)");
		errno = pthread_mutex_lock(&mutex);
		test_errno("pthread_mutex_lock (1)");
	puts("... wykonano pthread_mutex_lock (1)");

	// 2
	puts("przed wykonaniem pthread_mutex_lock (2)");
		errno = pthread_mutex_lock(&mutex);
		test_errno("pthread_mutex_lock (2)");
	puts("... wykonano pthread_mutex_lock (2)");

	// 3
	puts("przed wykonaniem pthread_mutex_unlock (2)");
		errno = pthread_mutex_unlock(&mutex);
		test_errno("pthread_mutex_unlock (2)");
	puts("... wykonano pthread_mutex_unlock (2)");

	// 4
	puts("przed wykonaniem pthread_mutex_unlock (1)");
		errno = pthread_mutex_unlock(&mutex);
		test_errno("pthread_mutex_unlock (1)");
	puts("... wykonano pthread_mutex_unlock (1)");

	return NULL;
}
//------------------------------------------------------------------------

int main(int argc, char* argv[]) {
	int errno;

	pthread_mutexattr_init(&mutexattr);

	if (argc > 1) {
		switch (atoi(argv[1])) {
			case 1:
				puts("mutex typu PTHREAD_MUTEX_ERRORCHECK");
				pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_ERRORCHECK);
				break;
			case 2:
				puts("mutex typu PTHREAD_MUTEX_RECURSIVE");
				pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
				break;
			default:
				puts("mutex typu PTHREAD_MUTEX_NORMAL");
				pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_NORMAL);
				break;
		}
	}
	else {
		puts("użycie: program [0|1|2]");
		return EXIT_FAILURE;
	}

	/* inicjalizacja mutexu */
	errno = pthread_mutex_init(&mutex, &mutexattr);
	test_errno("pthread_mutex_init");

	/* utworzenie wątku */
	errno = pthread_create(&id, NULL, watek, NULL);
	test_errno("pthread_create");

	/* oczekiwanie na jego zakończenie */
	pthread_join(id, NULL);
	test_errno("pthread_join");

	puts("program zakończony");
	return EXIT_SUCCESS;
}
//------------------------------------------------------------------------
  • Wynik dla mutexu zwykłego - wystąpiło zakleszczenie, program "zawiesił się" i musiał zostać przerwany ręcznie:
$ ./przyklad 0
mutex typu PTHREAD_MUTEX_NORMAL
przed wykonaniem pthread_mutex_lock (1)
... wykonano pthread_mutex_lock (1)
przed wykonaniem pthread_mutex_lock (2)
^C
  • Wynik dla mutexu sprawdzającego - nie dopuszczono do zakleszczenia:
$ ./przyklad 1
mutex typu PTHREAD_MUTEX_ERRORCHECK
przed wykonaniem pthread_mutex_lock (1)
... wykonano pthread_mutex_lock (1)
przed wykonaniem pthread_mutex_lock (2)
pthread_mutex_lock (2): Resource deadlock avoided
  • Wynik dla mutexu rekursywnego - blokada jest pozyskiwana wielokrotnie:
$ ./przyklad 2
mutex typu PTHREAD_MUTEX_RECURSIVE
przed wykonaniem pthread_mutex_lock (1)
... wykonano pthread_mutex_lock (1)
przed wykonaniem pthread_mutex_lock (2)
... wykonano pthread_mutex_lock (2)
przed wykonaniem pthread_mutex_unlock (2)
... wykonano pthread_mutex_unlock (2)
przed wykonaniem pthread_mutex_unlock (1)
... wykonano pthread_mutex_unlock (1)
program zakończony

Atrybuty mutexu

edytuj

Inicjalizacja i usuwanie

edytuj
  • pthread_mutexattr_t

Funkcje

edytuj
  • int pthread_mutexattr_destroy(pthread_mutexattr_t *attr)  (doc)
  • int pthread_mutexattr_init(pthread_mutexattr_t *attr)  (doc)

Typ mutexu

edytuj

Opisane wyżej

Funkcje

edytuj
  • int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type)  (doc)
  • int pthread_mutexattr_gettype(const pthread_mutexattr_t *atter, int *type)  (doc)

Współdzielenie mutexu z innymi procesami

edytuj

Patrz rozdział synchronizacja między wątkami różnych procesów.

Funkcje

edytuj
  • int pthread_mutexattr_setpshared(pthread_mutexattr_t *attr, int pshared)  (doc)
  • int pthread_mutexattr_getpshared(const pthread_mutexattr_t *attr, int *pshared)  (doc)

Zmiana priorytetu wątku posiadającego blokadę

edytuj

Dostępne, gdy istnieje rozszerzenie TPP (oraz TPI).

Wartość atrybutu decyduje o strategii wykonywania programu, gdy wiele wątków o różnych priorytetach stara się o uzyskanie blokady. Atrybut może mieć wartości:

  1. PTHREAD_PRIO_NONE,
  2. PTHREAD_PRIO_PROTECT,
  3. PTHREAD_PRIO_INHERIT (opcja TPI).

W przypadku PTHREAD_PRIO_NONE priorytet wątku, który pozyskuje blokadę nie zmienia się.

W dwóch pozostałych przypadkach z mutexem powiązany zostaje pewien priorytet i gdy wątek uzyska blokadę, wówczas jego priorytet jest podbijany do wartość z mutexu (o ile oczywiście był wcześniej niższy). Innymi słowy w obrębie sekcji krytycznej wątek może działać z wyższym priorytetem.

Sposób ustalania priorytetu mutexu zależy od atrybutu:

  • PTHREAD_PRIO_INHERIT - wybierany jest maksymalny priorytet spośród wątków oczekujących na uzyskanie danej blokady;
  • PTHREAD_PRIO_PROTECT - priorytet jest ustalany przez programistę funkcją pthread_mutexattr_setprioceiling lub pthread_mutex_setprioceiling (opisane w następnej sekcji).

Dodatkowo jeśli wybrano wartość PTHREAD_PRIO_PROTECT, wówczas wszelkie próby założenia blokady funkcjami pthread_mutex_XXXlock z poziomu wątków o priorytecie niższym niż ustawiony dla mutexa nie powiodą się - zostanie zwrócona wartość błędu EINVAL.

Funkcje

edytuj
  • int pthread_mutexattr_setprotocol(pthread_mutexattr_t *attr, int protocol)  (doc)
  • int pthread_mutexattr_getprotocol(const pthread_mutexattr_t *attr, int *protocol)  (doc)

Minimalny priorytet wątku zakładające blokadę

edytuj

Dostępne w opcji TPP. Funkcje ustalają/odczytują bieżący priorytet związany z mutexem.

Funkcje działające na atrybutach

edytuj
  • int pthread_mutexattr_setprioceiling(pthread_mutexattr_t *attr, int prioceiling)  (doc)
  • int pthread_mutexattr_getprioceiling(const pthread_mutexattr_t *attr, int *prioceiling)  (doc)

Funkcje działające bezpośrednio na mutexie

edytuj
  • int pthread_mutex_setprioceiling(pthread_mutex_t *attr, int prioceiling)  (doc)
  • int pthread_mutex_getprioceiling(const pthread_mutex_t *attr, int *prioceiling)  (doc)