From 1a66ef99894cd67792f38c18d941cdf75ee0d83d Mon Sep 17 00:00:00 2001 From: Yury Kurlykov Date: Wed, 24 Jun 2020 03:31:21 +1000 Subject: [PATCH] Add 13th lab --- README.md | 3 +- lab13/Makefile | 13 ++++ lab13/README.md | 34 ++++++++++ lab13/queue.c | 142 +++++++++++++++++++++++++++++++++++++++ lab13/queue_cond.c | 157 ++++++++++++++++++++++++++++++++++++++++++++ lab13/queue_mutex.c | 155 +++++++++++++++++++++++++++++++++++++++++++ lab13/testing.ic | 90 +++++++++++++++++++++++++ lab13/utils.c | 72 ++++++++++++++++++++ lab13/utils.h | 21 ++++++ 9 files changed, 686 insertions(+), 1 deletion(-) create mode 100644 lab13/Makefile create mode 100644 lab13/README.md create mode 100644 lab13/queue.c create mode 100644 lab13/queue_cond.c create mode 100644 lab13/queue_mutex.c create mode 100644 lab13/testing.ic create mode 100644 lab13/utils.c create mode 100644 lab13/utils.h diff --git a/README.md b/README.md index 70a150e..b891172 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,13 @@ - [Лабораторная работа 10](lab10/README.md) - [Лабораторная работа 11](lab11/README.md) - [Лабораторная работа 12](lab12/README.md) +- [Лабораторная работа 13](lab13/README.md) ## Запуск Требуется CMake 3.16+ (более ранние версии не проверялись) и компилятор с поддержкой C11. -Для лабораторной №11 требуются make и gcc. +Для лабораторных №11 и №13 требуются make и gcc. ```bash ./run_lab.sh diff --git a/lab13/Makefile b/lab13/Makefile new file mode 100644 index 0000000..5f9fbcc --- /dev/null +++ b/lab13/Makefile @@ -0,0 +1,13 @@ +queue: queue.c utils.c utils.h + gcc -g -Wall queue.c utils.c -o queue -lpthread + +queue_mutex: queue_mutex.c utils.c utils.h + gcc -g -Wall queue_mutex.c utils.c -o queue_mutex -lpthread + +queue_cond: queue_cond.c utils.c utils.h + gcc -g -Wall queue_cond.c utils.c -o queue_cond -lpthread + +all: queue queue_cond queue_mutex + +clean: + rm -f queue queue_cond queue_mutex *~ \ No newline at end of file diff --git a/lab13/README.md b/lab13/README.md new file mode 100644 index 0000000..82c0c8d --- /dev/null +++ b/lab13/README.md @@ -0,0 +1,34 @@ +# Лабораторная работа №13 + +Для тестирования трёх вариантов реализации очереди были реализованы следующие тесты: + +1. `empty_queue_test()` для теста проблемы producer-consumer. Если программа некорректно +работает при наличии потоков-потребителей, она выдаст ошибку при попытке взять элемент из +пустой очереди. + +2. `thread_safe_test()` для проверки корректности работы многопоточности. Идея заключается +в заполнении очереди ненулевыми значениями из нескольких потоков. Если программа работает некорректно на нескольких +потоках, то программа выдаст ошибку. Ошибка определяется в наличии нулей между ненулевыми значениями, +которые могут оказаться там ввиду гонок данных, например, в `queue_push()`. + +Тесты лежат в отдельном файле [testing.ic](./testing.ic), этот файл инклудится в каждой из программ. + +Результаты предсказуемы: + +- наивная реализация не проходит мультипоточный тест (producer-consumer, +очевидно, тоже); +- реализация на мьютексах корректно работает с несколькими потоками, но проблема producer-consumer +остаётся; +- реализация с использованием условных переменных проходит все тесты. + +В наивной реализации проблемы начинаются в `queue_push()`. +В этом участке кода происходит data race, а так как `next_in` ничем не защищён, следовательно, +двойной инкремент неизбежен при достаточно большом размере очереди. +```c +queue->array[queue->next_in] = item; +queue->next_in = queue_incr(queue, queue->next_in); +``` + +Для решения проблемы producer-consumer была добавлена conditional variable для +проверки на пустоту очереди, тем самым поток-потребитель дождётся, пока в очереди +не появится хотя бы один элемент. \ No newline at end of file diff --git a/lab13/queue.c b/lab13/queue.c new file mode 100644 index 0000000..8f13bd8 --- /dev/null +++ b/lab13/queue.c @@ -0,0 +1,142 @@ +/* Example code for Think OS. + +Copyright 2015 Allen Downey +License: Creative Commons Attribution-ShareAlike 3.0 + +*/ + +#include +#include +#include +#include +#include "utils.h" + +#define NUM_CHILDREN 2 +#define QUEUE_LENGTH 1600000 + +// QUEUE + +typedef struct { + int *array; + int length; + int next_in; + int next_out; +} Queue; + +Queue *make_queue(int length) +{ + Queue *queue = (Queue *) malloc(sizeof(Queue)); + queue->length = length; + queue->array = (int *) malloc(length * sizeof(int)); + queue->next_in = 0; + queue->next_out = 0; + return queue; +} + +int queue_incr(Queue *queue, int i) +{ + return (i+1) % queue->length; +} + +int queue_empty(Queue *queue) +{ + // queue is empty if next_in and next_out are the same + return (queue->next_in == queue->next_out); +} + +int queue_full(Queue *queue) +{ + // queue is full if incrementing next_in lands on next_out + return (queue_incr(queue, queue->next_in) == queue->next_out); +} + +void queue_push(Queue *queue, int item) { + if (queue_full(queue)) { + perror_exit("queue is full"); + } + + queue->array[queue->next_in] = item; + queue->next_in = queue_incr(queue, queue->next_in); +} + +int queue_pop(Queue *queue) { + if (queue_empty(queue)) { + perror_exit("queue is empty"); + } + + int item = queue->array[queue->next_out]; + queue->next_out = queue_incr(queue, queue->next_out); + return item; +} + +// SHARED + +typedef struct { + Queue *queue; +} Shared; + +Shared *make_shared() +{ + Shared *shared = check_malloc(sizeof(Shared)); + shared->queue = make_queue(QUEUE_LENGTH); + return shared; +} + +// THREAD + +pthread_t make_thread(void *(*entry)(void *), Shared *shared) +{ + int ret; + pthread_t thread; + + ret = pthread_create(&thread, NULL, entry, (void *) shared); + if (ret != 0) { + perror_exit("pthread_create failed"); + } + return thread; +} + +void join_thread(pthread_t thread) +{ + int ret = pthread_join(thread, NULL); + if (ret == -1) { + perror_exit("pthread_join failed"); + } +} + +// PRODUCER-CONSUMER + +void *producer_entry(void *arg) +{ + int i; + Shared *shared = (Shared *) arg; + for (i=0; iqueue, i); + } + pthread_exit(NULL); +} + +void *consumer_entry(void *arg) +{ + int i; + int item; + Shared *shared = (Shared *) arg; + + for (i=0; iqueue); + printf("consuming item %d\n", item); + } + pthread_exit(NULL); +} + +// TEST CODE + +#include "testing.ic" + +int main() +{ + thread_safe_test(); + empty_queue_test(); + return 0; +} diff --git a/lab13/queue_cond.c b/lab13/queue_cond.c new file mode 100644 index 0000000..ac7181c --- /dev/null +++ b/lab13/queue_cond.c @@ -0,0 +1,157 @@ +/* Example code for Think OS. + +Copyright 2015 Allen Downey +License: Creative Commons Attribution-ShareAlike 3.0 + +*/ + +#include +#include +#include +#include +#include "utils.h" + +#define NUM_CHILDREN 2 +#define QUEUE_LENGTH 1600000 +#define NUM_ITEMS (QUEUE_LENGTH - 1) + +// QUEUE + +typedef struct { + int *array; + int length; + int next_in; + int next_out; + Mutex *mutex; + Cond *nonempty; + Cond *nonfull; +} Queue; + +Queue *make_queue(int length) +{ + Queue *queue = (Queue *) malloc(sizeof(Queue)); + queue->length = length; + queue->array = (int *) malloc(length * sizeof(int)); + queue->next_in = 0; + queue->next_out = 0; + queue->mutex = make_mutex(); + queue->nonempty = make_cond(); + queue->nonfull = make_cond(); + return queue; +} + +int queue_incr(Queue *queue, int i) +{ + return (i+1) % queue->length; +} + +int queue_empty(Queue *queue) +{ + // queue is empty if next_in and next_out are the same + int res = (queue->next_in == queue->next_out); + return res; +} + +int queue_full(Queue *queue) +{ + // queue is full if incrementing next_in lands on next_out + int res = (queue_incr(queue, queue->next_in) == queue->next_out); + return res; +} + +void queue_push(Queue *queue, int item) { + mutex_lock(queue->mutex); + while (queue_full(queue)) { + cond_wait(queue->nonfull, queue->mutex); + } + + queue->array[queue->next_in] = item; + queue->next_in = queue_incr(queue, queue->next_in); + mutex_unlock(queue->mutex); + cond_signal(queue->nonempty); +} + +int queue_pop(Queue *queue) { + mutex_lock(queue->mutex); + while (queue_empty(queue)) { + cond_wait(queue->nonempty, queue->mutex); + } + + int item = queue->array[queue->next_out]; + queue->next_out = queue_incr(queue, queue->next_out); + mutex_unlock(queue->mutex); + cond_signal(queue->nonfull); + return item; +} + +// SHARED + +typedef struct { + Queue *queue; +} Shared; + +Shared *make_shared() +{ + Shared *shared = check_malloc(sizeof(Shared)); + shared->queue = make_queue(QUEUE_LENGTH); + return shared; +} + +// THREAD + +pthread_t make_thread(void *(*entry)(void *), Shared *shared) +{ + int ret; + pthread_t thread; + + ret = pthread_create(&thread, NULL, entry, (void *) shared); + if (ret != 0) { + perror_exit("pthread_create failed"); + } + return thread; +} + +void join_thread(pthread_t thread) +{ + int ret = pthread_join(thread, NULL); + if (ret == -1) { + perror_exit("pthread_join failed"); + } +} + +// PRODUCER-CONSUMER + +void *producer_entry(void *arg) +{ + int i; + Shared *shared = (Shared *) arg; + for (i=0; iqueue, i); + } + pthread_exit(NULL); +} + +void *consumer_entry(void *arg) +{ + int i; + int item; + Shared *shared = (Shared *) arg; + + for (i=0; iqueue); + printf("consuming item %d\n", item); + } + pthread_exit(NULL); +} + +// TEST CODE + +#include "testing.ic" + +int main() +{ + thread_safe_test(); + empty_queue_test(); + return 0; +} diff --git a/lab13/queue_mutex.c b/lab13/queue_mutex.c new file mode 100644 index 0000000..2887ad7 --- /dev/null +++ b/lab13/queue_mutex.c @@ -0,0 +1,155 @@ +/* Example code for Think OS. + +Copyright 2015 Allen Downey +License: Creative Commons Attribution-ShareAlike 3.0 + +*/ + +#include +#include +#include +#include +#include "utils.h" + +#define NUM_CHILDREN 2 +#define QUEUE_LENGTH 1600000 + +// QUEUE + +typedef struct { + int *array; + int length; + int next_in; + int next_out; + Mutex *mutex; +} Queue; + +Queue *make_queue(int length) +{ + Queue *queue = (Queue *) malloc(sizeof(Queue)); + queue->length = length; + queue->array = (int *) malloc(length * sizeof(int)); + queue->next_in = 0; + queue->next_out = 0; + queue->mutex = make_mutex(); + return queue; +} + +int queue_incr(Queue *queue, int i) +{ + // NOTE: you must hold the mutex to call this function. + return (i+1) % queue->length; +} + +int queue_empty(Queue *queue) +{ + // NOTE: you must hold the mutex to call this function. + // queue is empty if next_in and next_out are the same + int res = (queue->next_in == queue->next_out); + return res; +} + +int queue_full(Queue *queue) +{ + // NOTE: you must hold the mutex to call this function. + // queue is full if incrementing next_in lands on next_out + int res = (queue_incr(queue, queue->next_in) == queue->next_out); + return res; +} + +void queue_push(Queue *queue, int item) { + mutex_lock(queue->mutex); + if (queue_full(queue)) { + mutex_unlock(queue->mutex); + perror_exit("queue is full"); + } + + queue->array[queue->next_in] = item; + queue->next_in = queue_incr(queue, queue->next_in); + mutex_unlock(queue->mutex); +} + +int queue_pop(Queue *queue) { + mutex_lock(queue->mutex); + if (queue_empty(queue)) { + mutex_unlock(queue->mutex); + perror_exit("queue is empty"); + } + + int item = queue->array[queue->next_out]; + queue->next_out = queue_incr(queue, queue->next_out); + mutex_unlock(queue->mutex); + return item; +} + +// SHARED + +typedef struct { + Queue *queue; +} Shared; + +Shared *make_shared() +{ + Shared *shared = check_malloc(sizeof(Shared)); + shared->queue = make_queue(QUEUE_LENGTH); + return shared; +} + +// THREAD + +pthread_t make_thread(void *(*entry)(void *), Shared *shared) +{ + int ret; + pthread_t thread; + + ret = pthread_create(&thread, NULL, entry, (void *) shared); + if (ret != 0) { + perror_exit("pthread_create failed"); + } + return thread; +} + +void join_thread(pthread_t thread) +{ + int ret = pthread_join(thread, NULL); + if (ret == -1) { + perror_exit("pthread_join failed"); + } +} + +// PRODUCER-CONSUMER + +void *producer_entry(void *arg) +{ + int i; + Shared *shared = (Shared *) arg; + for (i=0; iqueue, i); + } + pthread_exit(NULL); +} + +void *consumer_entry(void *arg) +{ + int i; + int item; + Shared *shared = (Shared *) arg; + + for (i=0; iqueue); + printf("consuming item %d\n", item); + } + pthread_exit(NULL); +} + +// TEST CODE + +#include "testing.ic" + +int main() +{ + thread_safe_test(); + empty_queue_test(); + return 0; +} diff --git a/lab13/testing.ic b/lab13/testing.ic new file mode 100644 index 0000000..fc62f4b --- /dev/null +++ b/lab13/testing.ic @@ -0,0 +1,90 @@ +#pragma once +#include + +void queue_test() { + int i; + int item; + int length = 128; + + Queue *queue = make_queue(length); + assert(queue_empty(queue)); + for (i = 0; i < length - 1; i++) { + queue_push(queue, i); + } + assert(queue_full(queue)); + for (i = 0; i < 10; i++) { + item = queue_pop(queue); + assert(i == item); + } + assert(!queue_empty(queue)); + assert(!queue_full(queue)); + for (i = 0; i < 10; i++) { + queue_push(queue, i); + } + assert(queue_full(queue)); + for (i = 0; i < 10; i++) { + item = queue_pop(queue); + } + assert(item == 19); +} + +void empty_queue_test() { + int i; + pthread_t child[NUM_CHILDREN]; + + Shared *shared = make_shared(); + + child[1] = make_thread(consumer_entry, shared); + sleep(1); + child[0] = make_thread(producer_entry, shared); + + for (i = 0; i < NUM_CHILDREN; i++) { + join_thread(child[i]); + } +} + +void *producer_positive(void *arg) +{ + int i; + Shared *shared = (Shared *)arg; + for (i = 0; i < (QUEUE_LENGTH - 1) / 2; i++) { + queue_push(shared->queue, 1); + } + pthread_exit(NULL); +} + +void *producer_negative(void *arg) +{ + int i; + int item; + Shared *shared = (Shared *)arg; + + for (i = 0; i < (QUEUE_LENGTH - 1) - ((QUEUE_LENGTH - 1) / 2); i++) { + queue_push(shared->queue, -1); + } + pthread_exit(NULL); +} + +void thread_safe_test() { + int i; + pthread_t child[NUM_CHILDREN]; + + Shared *shared = make_shared(); + + for (unsigned int k = 0; k < QUEUE_LENGTH; k++) { + shared->queue->array[k] = 0; + } + + child[0] = make_thread(producer_positive, shared); + child[1] = make_thread(producer_negative, shared); + + for (i=0; iqueue->array[k])); + was_zero = was_zero || !shared->queue->array[k]; + } +} \ No newline at end of file diff --git a/lab13/utils.c b/lab13/utils.c new file mode 100644 index 0000000..84aa8e6 --- /dev/null +++ b/lab13/utils.c @@ -0,0 +1,72 @@ +/* Example code for Think OS. + +Copyright 2015 Allen Downey +License: Creative Commons Attribution-ShareAlike 3.0 + +*/ + +#include +#include +#include +#include "utils.h" + +// UTILITY CODE + +void perror_exit(char *s) +{ + perror(s); + exit(-1); +} + +void *check_malloc(int size) +{ + void *p = malloc(size); + if (p == NULL) perror_exit("malloc failed"); + return p; +} + +// MUTEX WRAPPER + +Mutex *make_mutex() +{ + Mutex *mutex = check_malloc(sizeof(Mutex)); + int n = pthread_mutex_init(mutex, NULL); + if (n != 0) perror_exit("make_lock failed"); + return mutex; +} + +void mutex_lock(Mutex *mutex) +{ + int n = pthread_mutex_lock(mutex); + if (n != 0) perror_exit("lock failed"); +} + +void mutex_unlock(Mutex *mutex) +{ + int n = pthread_mutex_unlock(mutex); + if (n != 0) perror_exit("unlock failed"); +} + +// COND WRAPPER + +Cond *make_cond() +{ + Cond *cond = check_malloc(sizeof(Cond)); + int n = pthread_cond_init(cond, NULL); + if (n != 0) perror_exit("make_cond failed"); + + return cond; +} + +void cond_wait(Cond *cond, Mutex *mutex) +{ + int n = pthread_cond_wait(cond, mutex); + if (n != 0) perror_exit("cond_wait failed"); +} + +void cond_signal(Cond *cond) +{ + int n = pthread_cond_signal(cond); + if (n != 0) perror_exit("cond_signal failed"); +} + diff --git a/lab13/utils.h b/lab13/utils.h new file mode 100644 index 0000000..70ab38a --- /dev/null +++ b/lab13/utils.h @@ -0,0 +1,21 @@ +/* Example code for Think OS + +Copyright 2015 Allen Downey +License: Creative Commons Attribution-ShareAlike 3.0 + +*/ + +typedef pthread_mutex_t Mutex; +typedef pthread_cond_t Cond; + +void perror_exit(char *s); +void *check_malloc(int size); + +Mutex *make_mutex(); +void mutex_lock(Mutex *mutex); +void mutex_unlock(Mutex *mutex); + +Cond *make_cond(); +void cond_wait(Cond *cond, Mutex *mutex); +void cond_signal(Cond *cond); +