#+SETUPFILE: https://fniessen.github.io/org-html-themes/org/theme-readtheorg.setup #+INCLUDE: "../common/org-header.org" #+TITLE: Семинар 8: Процессы, взаимодействие процессов через общую память, конвейеры, многопоточность, модели памяти Сегодня мы познакомимся с многопоточностью, общей памятью и с тем, какие эффекты вносит слабая модель памяти. Процессы изолируют программы друг от друга; каждая из них получает своё виртуальное адресное пространство и ничего не знает про другие программы. Однако процессам иногда необходимо общаться. Набор средств для общения между процессами достаточно широк: - Общие файлы - Общая память - Конвейеры (pipes), именованные и неименованные - Очереди сообщений - Сокеты - Сигналы * Создание новых процессов, fork Прежде всего необходимо понять механизм создания новых процессов. Для этого служит системный вызов =fork=, создающий точную копию процесса. Затем вместо копии мы можем заменить образ приложения с помощью вызова =execve=. Если же нам нужно просто сделать копию родительского процесса, которая ведёт себя немного по-другому, =execve= даже не нужен. *Вопрос* разберите этот исходный файл. #+BEGIN_SRC c /* fork-example.c */ #include #include int main() { // pid_t это, как и int, численный тип для идентификаторов процессов pid_t pid = fork(); // в зависимости от возвращаемого значения, которое попадёт в pid, // мы поймём, находимся ли мы в родительском процессе или в дочернем. if (pid == 0) { printf("I am a child, pid was %d\n", pid); printf("Child's pid is %d\n", getpid()); } else { printf("I am a parent, pid was %d\n", pid); printf("Parent's pid is %d\n", getpid()); } } #+END_SRC *Вопрос* что именно возвращает =fork()= в родительском процессе? А что в дочернем? См. =man fork=. * Создание разделённой памяти Общую память можно использовать через =mmap=, с которым мы познакомились на семинаре 4. Рассмотрим пример создания области памяти, которая разделена между процессами. *Вопрос* изучите данную программу. Скомпилируйте её, запустите. Проверьте правильность PID дочернего процесса с помощью =pstree= или иными способами. Изучите содержимое =/proc/PID/maps= для родительского и дочернего процессов, найдите общую область памяти. #+BEGIN_SRC c /* fork-example-1.c */ #include #include #include #include #include void* create_shared_memory(size_t size) { return mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); } int main() { void* shmem = create_shared_memory(128); printf("Shared memory at: %p\n" , shmem); int pid = fork(); if (pid == 0) { while (1); } else { printf("Child's pid is: %d\n", pid); while(1); } } #+END_SRC Наконец, попробуем что-то записать в эту память! Для этого нам, вообще говоря, необходимо синхронизировать родительский и дочерний процесс с помощью мьютекса. В целях простоты мы пока сделаем код, который *В ПРИНЦИПЕ НЕПРАВИЛЬНЫЙ*, так как не имеет синхронизации. Однако он продемонстрирует обмен информацией между процессами через память. *Задание 1* модифицируйте программу выше следующим образом: - Необходимо выделить память для 10 чисел типа =int= и сделать её общей для родительского и дочернего процессов. Внесите туда числа от 1 до 10. - Родительский процесс будет ждать завершения дочернего процесса с помощью =wait(NULL)=. После этого он должен вывести все 10 чисел на экран. - Дочерний процесс считывает два числа с помощью =scanf=: индекс в массиве и новое значение. Он меняет соответствующее число в массиве и завершает свою работу. * Конвейеры (pipes) Другим способом обмена данными между потоками являются конвейеры. Конвейер создается с помощью функции =pipe=. Конвейер --- это пара дескрипторов файлов, один из них предназначен только для записи данных (номер 1), второй только для чтения (номер 0). Все данные, записанные в дескриптор =1= буферизируются и становятся доступны для чтения по дескриптору =2=. #+BEGIN_SRC c int pipe_fd[2]; pipe(pipe_fd); #+END_SRC Все необходимые конвейеры должны быть созданы до вызова =fork=, тогда они будут доступны как в родительском, так и в дочернем процессе. Если мы хотим отправлять данные в обе стороны (от родителя ребенку и обратно), то следет создать пару конвейеров. После вызова =fork= и родительский и дочерний процессы наследуют все дескпиторы конвейеров, поэтому нужно закрыть ненужные дескрипторы, используя функцию =close=. Далее мы можем передевать данные посредством конвейера между процессами: писать в один конец конвейера, используя функцию =write=, и читать и другого конца, используя функцию =read=. #+BEGIN_SRC c /* pipe-example.c */ #include #include #include int main() { // Создадим два конвейера int pipes[2][2]; pipe(pipes[0]); pipe(pipes[1]); // Создадим дочерний процесс pid_t pid = fork(); if (pid == 0) { // Сохраним нужные дескпиторы конвейеров int to_parent_pipe = pipes[1][1]; int from_parent_pipe = pipes[0][0]; // И закроем ненужные close(pipes[1][0]); close(pipes[0][1]); // Отпавим один байт родителю char c = 'A'; write(to_parent_pipe, &c, 1); // И будем ждать, пока родитель не пришлет 1 байт в ответ while (read(from_parent_pipe, &c, 1) == 0) ; printf("Recieved from parent: %c\n", c); // Закроем дескпиторы close(to_parent_pipe); close(from_parent_pipe); return 0; } else { // Сохраним нужные дескпиторы конвейеров int from_child_pipe = pipes[1][0]; int to_child_pipe = pipes[0][1]; // И закроем ненужные close(pipes[1][1]); close(pipes[0][0]); // Будем ждать, пока ребенок не пришлет 1 байт char c; while (read(from_child_pipe, &c, 1) == 0) ; printf("Recieved from child: %c\n", c); // И отправим ответ c++; write(to_child_pipe, &c, 1); // Дождемся завершения ребенка waitpid(pid, NULL, 0); // Закроем дескпиторы close(from_child_pipe); close(to_child_pipe); return 0; } } #+END_SRC Можно регламентировать обмен данными через конвейеры, используя фиксированный формат сообщения, состоящий из заголовка и данных произвольного размера: #+BEGIN_SRC c enum { MESSAGE_MAGIC = 0xAFAF, // magic signature value MAX_MESSAGE_LEN = 4096 // maximum message length }; struct __attribute__((packed)) message_header { uint16_t magic; // magic signature uint16_t type; // type of the message uint16_t data_len; // length of data }; enum { // maximum data length MAX_MESSAGE_DATA_LEN = MAX_MESSAGE_LEN - sizeof(struct message_header) }; struct __attribute__((packed)) message { struct message_header header; // payload uint8_t data[MAX_MESSAGE_DATA_LEN]; }; #+END_SRC Тогда, отправка и получение сообщения в конвейер будут выглядеть следующим образом: #+BEGIN_SRC c int send(int fd, const struct message *msg) { int msg_size; /* Check if the input data is not empty */ if (fd < 0 || msg == NULL) return -1; /* Calculate the message size to send */ msg_size = sizeof(struct message_header) + msg->header.data_len; /* Check if message payload size is valid */ if (msg->header.data_len > MAX_MESSAGE_DATA_LEN) return -1; /* Write data to the output pipe (we assume it is ready) */ if (write(fd, msg, msg_size) != msg_size) return -2; return 0; } int receive(int fd, struct message *msg) { int msg_size; /* Check if the input data is not empty */ if (fd < 0 || msg == NULL) return -1; /* Try to read header */ msg_size = read(fd, &msg->header, sizeof(struct message_header)); if (msg_size == 0) return 0; /* Check header magic */ if (msg->header.magic != MESSAGE_MAGIC) return -2; /* Check if message has payload */ if (msg->header.data_len > MAX_MESSAGE_DATA_LEN) return -2; if (msg->header.data_len > 0) msg_size += read(fd, &msg->data, msg->header.data_len); /* Return number of bytes read */ return msg_size; } #+END_SRC *Вопрос* Рассмотрите [[./pipe-example-1.c][пример программы]], которая в дочернем потоке читает строки со входа и отправляет их родительскому процессу через конвейер. Нужно ли было в данном примере использовать специальное сообщение о завершении ввода из дочернего процесса? *Задание 2* Попробуйте синхронизировать процессы из *Задания 1*, используя конвейеры. Теперь дочерний процесс будет сообщать родительскому об изменении данных, отправляя сообщение по конвейеру. При получении сообщения родительский процесс должен вывести массив на экран. Дочерний процесс будт продолжать ожидать ввод от пользователя и менять эллементы массива пока не получит на вход отрицательный индекс - тогда он завершается. * Многопоточные приложения Вспомним базовые сведения о многопоточных приложениях. ** Создание новых потоков Для создания новых потоков мы будем использовать POSIX Threads (Pthreads). Для этого необходимо использовать дополнииельно ключ =-pthread= при выозве =gcc=. Новый поток создается функцией =pthread_create=, аргументы, передаваемые функции создания потока (4 аргумент) передаются вызываемой в новом потоке функции напрямую: #+begin_src c /* thread-example.c */ void* my_thread(void* arg) { for(int i = 0; i < 10; i++ ) { printf("%s\n", arg); sleep(1); } return NULL; } int main() { pthread_t t1, t2; pthread_create(&t1, NULL, my_thread, "Hello"); pthread_create(&t2, NULL, my_thread, "world"); pthread_exit( NULL ); return 0; } #+end_src Используемая в примере функция =pthread_exit= ожидает завершения всех запущенных потоков и завершает текущий поток. Поэтому код, идущий после неё никогда не будет выполнен (если бы он там был). Для ожидания завершения рыботы некоторого потока без завершения текущего следует использовать функцию =pthread_join=. ** Мьютексы Мьютекс (mutex - mutual exclutsion) --- один из вариантов синхронизации потоков, позволяющий, например, исключить одновременное использование одного ресурса различными потоками. Мьютекс может быть в одном из двух состояний --- закрытом (locked) или открытом (unlocked). Мьютекс создается, используя функцию =pthread_mutex_init=: #+begin_src c pthread_mutex_t m; pthread_mutex_init(&m, NULL); #+end_src Далее мьютекс можно закрыть, используя =pthread_mutex_lock=, и открыть, используя =pthread_mutex_unlock=: #+begin_src c pthread_mutex_lock(&m); // Тут можно спокойно использовать общий ресурс pthread_mutex_unlock(&m); #+end_src Для освобождения (удаления) мьютекса следует использовать функцию =pthread_mutex_destroy=. *Вопрос* Рассмотрите [[./mutex-example-1.c][пример работы с мьютексом]] (файл =mutex-example-1.c=). Что будет если [[./mutex-example.c][убрать использование мьютекса]] (файл =mutex-example.c=)? Объясните поведение. *Вопрос* Подробнее про мьютексы читайте на стр. 374--377 учебника. ** Семафоры Семафор --- это целочисленная переменная (типа =sem_t=), с которой можно совершать ряд действий: - Инициализировать её некоторым значением, функция =sem_init=. Семафор может использоваться в том числе для синхронизации процессов, для этого его следует инициализировать до вызова =fork=, используя общую память. - Войти (wait, enter) --- если значение переменной не 0, то выполнить декремент (уменьшить на 1) и продолжить выполнение. В противном случае ожидать увеличения значения переменной, после чего выполнить декремент и продолжить выполнение. Релизуется функцией =sem_wait=. - Выйти (post, leave) --- выполнить инкремент. Релизуется функцией =sem_post=. Семафоры позволяют организовать синхронизацию между параллельнымы потоками различными способами, напрмер: - Запретить более чем заданному количеству потоков выполнять фрагмент кода параллельно. - Заставить один поток ожидать выполнения действия другого потока. *Вопрос* Подробнее про семафоры читайте на стр. 382--384 учебника. *Вопрос* Чем семафоры отличаются от мьютексов? Чем семафор с двумя состояниями отличается от мьютекса? *Задание 3* Используте семафор для синхронизации между потоками в *Задании 2*. При выполнении задания 3 для синхронизации паралельных потоков и корректного завершения работы программы вам может понадобиться использовать несколько семафоров. Также может пригодиться функция =sem_trywait=, которая не блокирует вычисления в случае, если войти не удалось, а возврящает код ошибки (подробнее см. =man sem_trywait=). * Что такое модель памяти Для начала подумаем про компилятор языка. Описание алгоритма в программе включает в себя операции чтения и записи в память. Программист использует эти операции в том порядке, в котором ему удобно, но этот порядок может быть не самым производительным. Например, такие операции: #+begin_src c int x = 10; int y = 993; int z = y + x; print(y); #+end_src можно перемешать многими способами, например: #+begin_src c int y = 993; int x = 10; print(y); int z = y + x; #+end_src или: #+begin_src c int y = 993; print(y); int x = 10; int z = y + x; #+end_src Чтобы повысить производительность кода компилятор переставляет операции, оптимизируя программу. Такие перестановки называются /реордеринги/. При реордерингах результат выполнения программы (например, ввод-вывод) должен остаться неизменным, иначе такую перестановку было бы делать ошибочно. /Модель памяти/ определяет, какие перестановки чтений и записей допустимы. Можно классифицировать их с помощью спектра: 1) Очень слабые --- любые перестановки разрешены, пока они не меняют поведение однопоточной программы. 2) Слабые с зависимостями по данным --- нет перестановок, которые вмешиваются в косвенную адресацию. Если операции описываются как =a <- load x, z <- load a=, то между ними не будут вставляться другие инструкции. В первой категории даже это свойство не гарантируется. 3) Скорее сильные --- порядок записей сохраняется, чтения могут менять своё место. 4) Sequentially consistent --- вообще никакие операции чтения-записи не переставляются. Слабые разрешают больше перестановок, сильные --- меньше, и стремятся к полюсу sequentially consistent. Зачем эта градация? Реордеринги не видны когда код выполняется в одном потоке, но если есть несколько потоков и они взаимодействуют, то реордеринги начинают играть роль. Например, в такой ситуации: #+begin_src c int x = 0; int y = 0; void thread1() { x = 1; print(y); } void thread2() { y = 1; print(x); } #+end_src *Вопрос* в чём проблема в этом коде? Какие пары чисел могут быть выведены в зависимости от того, в каком порядке будут выполнены инструкции разных потоков? Какое влияние оказывают перестановки компилятором? Модель памяти может строиться для любого языка программирования или вообще вычислительной системы, где есть память. Мы взглянем на две модели памяти: для абстрактного вычислителя C и для Intel 64. Реордеринги в C можно увидеть через действия компилятора, который меняет порядок инструкций. В C достаточно слабая модель памяти. Это роднит её с, например, моделью памяти процессоров ARM7. В аппаратуре реордеринги случаются, например, при трансляции ассемблерных инструкций в микрокод, при которой порядок чтений и записей может измениться. В конце мы увидим пример того, как их увидеть. *Вопрос* Пусть произошли некоторые опасные реордеринги при компиляции. Будет ли из-за этого результат работы программы различным при каждом запуске? ** Барьеры памяти /Барьер памяти/ это место в программе, в котором заданы ограничения на реордеринги. Например, барьер по чтению может означать, что все операции чтения, которые были по тексту программы до барьера, должны произойти до него. Реордеринги не смогут перекинуть никакое чтение до барьера в текст ассемблерной программы после него. Барьеры бывают разные: - Чтение - Запись - Полный (и то, и другое) - Acquire (как полный, но операции после барьера не могут просочиться сквозь него) - Release (как полный, но операции до барьера не могут просочиться сквозь него) GCC использует конструкцию =asm volatile(""::: "memory")= для общего барьера (реордеринги при компиляции). *Вопрос* если мы используем ассемблерные вставки в программы, как это связано с необходимостью добавлять в неё барьеры памяти на уровне компиляции? Intel64 использует инструкции: - =lfence= для барьера по чтению - =sfence= для барьера по записи - =mfence= для полного барьера Соответственно, самый полный барьер будет выглядеть как инструкция =mfence= внутри ассемблерной вставки, для которой также указан атрибут =memory=, запрещающий компилятору реордеринги: #+begin_src c asm volatile("mfence"::: "memory") #+end_src *Задание 4* разберите пример =cpu-reordering.c=; он дублирует пример с чтениями и записью в =x= и =y=, но прогоняет его множество раз. При этом семафоры используются чтобы программа запускала вычисления в каждом потоке и по завершению итерации ждала, пока второй поток завершит свою итерацию. Объясните, как в этой программе используются семафоры и как происходит детектирование реордерингов. Затем попробуйте вставить полные барьеры времени компиляции туда, где это необходимо. Попробуйте также вариант с полным барьером для компиляции и с соответствующей инструкцией процессора для полного барьера. Объясните результаты. На низком уровне мьютексы и другие примитивы синхронизации реализованы с помощью барьеров. Действия, которые в коде происходят до работы с мьютексом или семафором, не должны оказаться из-за реордерингов после них. Не пользуйтесь =volatile= для синхронизации между потоками! Полное объяснение на стр.362--363 учебника.