Последнее обновление: 31.10.2015

Нередко в потоках используются некоторые разделяемые ресурсы, общие для всей программы. Это могут быть общие переменные, файлы, другие ресурсы. Например:

Class Program { static int x=0; static void Main(string args) { for (int i = 0; i < 5; i++) { Thread myThread = new Thread(Count); myThread.Name = "Поток " + i.ToString(); myThread.Start(); } Console.ReadLine(); } public static void Count() { x = 1; for (int i = 1; i < 9; i++) { Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x); x++; Thread.Sleep(100); } } }

Здесь у нас запускаются пять потоков, которые работают с общей переменной x. И мы предполагаем, что метод выведет все значения x от 1 до 8. И так для каждого потока. Однако в реальности в процессе работы будет происходить переключение между потоками, и значение переменной x становится непредсказуемым.

Решение проблемы состоит в том, чтобы синхронизировать потоки и ограничить доступ к разделяемым ресурсам на время их использования каким-нибудь потоком. Для этого используется ключевое слово lock . Оператор lock определяет блок кода, внутри которого весь код блокируется и становится недоступным для других потоков до завершения работы текущего потока. И мы можем переделать предыдущий пример следующим образом:

Class Program { static int x=0; static object locker = new object(); static void Main(string args) { for (int i = 0; i < 5; i++) { Thread myThread = new Thread(Count); myThread.Name = "Поток " + i.ToString(); myThread.Start(); } Console.ReadLine(); } public static void Count() { lock (locker) { x = 1; for (int i = 1; i < 9; i++) { Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x); x++; Thread.Sleep(100); } } } }

Для блокировки с ключевым словом lock используется объект-заглушка, в данном случае это переменная locker . Когда выполнение доходит до оператора lock, объект locker блокируется, и на время его блокировки монопольный доступ к блоку кода имеет только один поток. После окончания работы блока кода, объект locker освобождается и становится доступным для других потоков.

Процессом (process) называется экземпляр программы, загруженной в память. Этот экземпляр может создавать нити (thread), которые представляют собой последовательность инструкций на выполнение. Важно понимать, что выполняются не процессы, а именно нити.

Причем любой процесс имеет хотя бы одну нить. Эта нить называется главной (основной) нитью приложения.

Так как практически всегда нитей гораздо больше, чем физических процессоров для их выполнения, то нити на самом деле выполняются не одновременно, а по очереди (распределение процессорного времени происходит именно между нитями). Но переключение между ними происходит так часто, что кажется, будто они выполняются параллельно.

В зависимости от ситуации нити могут находиться в трех состояниях. Во-первых, нить может выполняться, когда ей выделено процессорное время, т.е. она может находиться в состоянии активности. Во-вторых, она может быть неактивной и ожидать выделения процессора, т.е. быть в состоянии готовности. И есть еще третье, тоже очень важное состояние - состояние блокировки. Когда нить заблокирована, ей вообще не выделяется время. Обычно блокировка ставится на время ожидания какого-либо события. При возникновении этого события нить автоматически переводится из состояния блокировки в состояние готовности. Например, если одна нить выполняет вычисления, а другая должна ждать результатов, чтобы сохранить их на диск. Вторая могла бы использовать цикл типа "while(!isCalcFinished) continue;", но легко убедиться на практике, что во время выполнения этого цикла процессор занят на 100 % (это называется активным ожиданием). Таких вот циклов следует по возможности избегать, в чем оказывает неоценимую помощь механизм блокировки. Вторая нить может заблокировать себя до тех пор, пока первая не установит событие, сигнализирующее о том, что чтение окончено.

Синхронизация нитей в ОС Windows

В Windows реализована вытесняющая многозадачность - это значит, что в любой момент система может прервать выполнение одной нити и передать управление другой. Ранее, в Windows 3.1, использовался способ организации, называемый кооперативной многозадачностью: система ждала, пока нить сама не передаст ей управление и именно поэтому в случае зависания одного приложения приходилось перезагружать компьютер.

Все нити, принадлежащие одному процессу, разделяют некоторые общие ресурсы - такие, как адресное пространство оперативной памяти или открытые файлы. Эти ресурсы принадлежат всему процессу, а значит, и каждой его нити. Следовательно, каждая нить может работать с этими ресурсами без каких-либо ограничений. Но... Если одна нить еще не закончила работать с каким-либо общим ресурсом, а система переключилась на другую нить, использующую этот же ресурс, то результат работы этих нитей может чрезвычайно сильно отличаться от задуманного. Такие конфликты могут возникнуть и между нитями, принадлежащими различным процессам. Всегда, когда две или более нитей используют какой-либо общий ресурс, возникает эта проблема.

Пример. Несинхронизированная работа нитей: если временно приостановить выполнение нити вывода на экран (пауза), фоновая нить заполнения массива будет продолжать работать.

#include #include int a; HANDLE hThr; unsigned long uThrID; void Thread(void* pParams) { int i, num = 0; while (1) { for (i=0; i<5; i++) a[i] = num; num++; } } int main(void) { hThr=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Thread,NULL,0,&uThrID); while(1) printf("%d %d %d %d %d\n", a, a, a, a, a); return 0; }

Именно поэтому необходим механизм, позволяющий потокам согласовывать свою работу с общими ресурсами. Этот механизм получил название механизма синхронизации нитей (thread synchronization).

Этот механизм представляет собой набор объектов операционной системы, которые создаются и управляются программно, являются общими для всех нитей в системе (некоторые - для нитей, принадлежащих одному процессу) и используются для координирования доступа к ресурсам. В качестве ресурсов может выступать все, что может быть общим для двух и более нитей - файл на диске, порт, запись в базе данных, объект GDI, и даже глобальная переменная программы (которая может быть доступна из нитей, принадлежащих одному процессу).

Объектов синхронизации существует несколько, самые важные из них - это взаимоисключение (mutex), критическая секция (critical section), событие (event) и семафор (semaphore). Каждый из этих объектов реализует свой способ синхронизации. Также в качестве объектов синхронизации могут использоваться сами процессы и нити (когда одна нить ждет завершения другой нити или процесса); а также файлы, коммуникационные устройства, консольный ввод и уведомления об изменении.

Любой объект синхронизации может находиться в так называемом сигнальном состоянии. Для каждого типа объектов это состояние имеет различный смысл. Нити могут проверять текущее состояние объекта и/или ждать изменения этого состояния и таким образом согласовывать свои действия. При этом гарантируется, что когда нить работает с объектами синхронизации (создает их, изменяет состояние) система не прервет ее выполнения, пока она не завершит это действие. Таким образом, все конечные операции с объектами синхронизации являются атомарными (неделимыми.

Работа с объектами синхронизации

Чтобы создать тот или иной объект синхронизации, производится вызов специальной функции WinAPI типа Create... (напр. CreateMutex). Этот вызов возвращает дескриптор объекта (HANDLE), который может использоваться всеми нитями, принадлежащими данному процессу. Есть возможность получить доступ к объекту синхронизации из другого процесса - либо унаследовав дескриптор этого объекта, либо, что предпочтительнее, воспользовавшись вызовом функции открытия объекта (Open...). После этого вызова процесс получит дескриптор, который в дальнейшем можно использовать для работы с объектом. Объекту, если только он не предназначен для использования внутри одного процесса, обязательно присваивается имя. Имена всех объектов должны быть различны (даже если они разного типа). Нельзя, например, создать событие и семафор с одним и тем же именем.

По имеющемуся дескриптору объекта можно определить его текущее состояние. Это делается с помощью т.н. ожидающих функций. Чаще всего используется функция WaitForSingleObject. Эта функция принимает два параметра, первый из которых - дескриптор объекта, второй - время ожидания в мсек. Функция возвращает WAIT_OBJECT_0, если объект находится в сигнальном состоянии, WAIT_TIMEOUT - если истекло время ожидания, и WAIT_ABANDONED, если объект-взаимоисключение не был освобожден до того, как владеющая им нить завершилась. Если время ожидания указано равным нулю, функция возвращает результат немедленно, в противном случае она ждет в течение указанного промежутка времени. В случае, если состояние объекта станет сигнальным до истечения этого времени, функция вернет WAIT_OBJECT_0, в противном случае функция вернет WAIT_TIMEOUT. Если в качестве времени указана символическая константа INFINITE, то функция будет ждать неограниченно долго, пока состояние объекта не станет сигнальным.

Очень важен тот факт, что обращение к ожидающей функции блокирует текущую нить, т.е. пока нить находится в состоянии ожидания, ей не выделяется процессорного времени.

Критические секции

Объект-критическая секция помогает программисту выделить участок кода, где нить получает доступ к разделяемому ресурсу, и предотвратить одновременное использование ресурса. Перед использованием ресурса нить входит в критическую секцию (вызывает функцию EnterCriticalSection). Если после этого какая-либо другая нить попытается войти в ту же самую критическую секцию, ее выполнение приостановится, пока первая нить не покинет секцию с помощью вызова LeaveCriticalSection. Используется только для нитей одного процесса. Порядок входа в критическую секцию не определен.

Существует также функция TryEnterCriticalSection, которая проверяет, занята ли критическая секция в данный момент. С ее помощью нить в процессе ожидания доступа к ресурсу может не блокироваться, а выполнять какие-то полезные действия.

Пример. Синхронизация нитей с помощью критических секций.

#include #include CRITICAL_SECTION cs; int a; HANDLE hThr; unsigned long uThrID; void Thread(void* pParams) { int i, num = 0; while (1) { EnterCriticalSection(&cs); for (i=0; i<5; i++) a[i] = num; num++; LeaveCriticalSection(&cs); } } int main(void) { InitializeCriticalSection(&cs); hThr=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Thread,NULL,0,&uThrID); while(1) { EnterCriticalSection(&cs); printf("%d %d %d %d %d\n", a, a, a, a, a); LeaveCriticalSection(&cs); } return 0; }

Взаимоисключения

Объекты-взаимоисключения (мьютексы, mutex - от MUTual EXclusion) позволяют координировать взаимное исключение доступа к разделяемому ресурсу. Сигнальное состояние объекта (т.е. состояние "установлен") соответствует моменту времени, когда объект не принадлежит ни одной нити и его можно "захватить". И наоборот, состояние "сброшен" (не сигнальное) соответствует моменту, когда какая-либо нить уже владеет этим объектом. Доступ к объекту разрешается, когда нить, владеющая объектом, освободит его.

Две (или более) нити могут создать мьютекс с одним и тем же именем, вызвав функцию CreateMutex. Первая нить действительно создает мьютекс, а следующие - получают дескриптор уже существующего объекта. Это дает возможность нескольким нитям получить дескриптор одного и того же мьютекса, освобождая программиста от необходимости заботиться о том, кто в действительности создает мьютекс. Если используется такой подход, желательно установить флаг bInitialOwner в FALSE, иначе возникнут определенные трудности при определении действительного создателя мьютекса.

Несколько нитей могут получить дескриптор одного и того же мьютекса, что делает возможным взаимодействие между процессами. Можно использовать следующие механизмы такого подхода:

  • Дочерний процесс, созданный при помощи функции CreateProcess может наследовать дескриптор мьютекса в случае, если при создании мьютекса функцией CreateMutex был указан параметр lpMutexAttributes.
  • Нить может получить дубликат существующего мьютекса с помощью функции DuplicateHandle.
  • Нить может указать имя существующего мьютекса при вызове функций OpenMutex или CreateMutex.

Для того чтобы объявить взаимоисключение принадлежащим текущей нити, надо вызвать одну из ожидающих функций. Нить, которой принадлежит объект, может его "захватывать" повторно сколько угодно раз (это не приведет к самоблокировке), но столько же раз она должна будет его освобождать с помощью функции ReleaseMutex.

Для синхронизации нитей одного процесса более эффективно использование критических секций.

Пример. Синхронизация нитей с помощью мьютексов.

#include #include HANDLE hMutex; int a; HANDLE hThr; unsigned long uThrID; void Thread(void* pParams) { int i, num = 0; while (1) { WaitForSingleObject(hMutex, INFINITE); for (i=0; i<5; i++) a[i] = num; num++; ReleaseMutex(hMutex); } } int main(void) { hMutex=CreateMutex(NULL, FALSE, NULL); hThr=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Thread,NULL,0,&uThrID); while(1) { WaitForSingleObject(hMutex, INFINITE); printf("%d %d %d %d %d\n", a, a, a, a, a); ReleaseMutex(hMutex); } return 0; }

События

Объекты-события используются для уведомления ожидающих нитей о наступлении какого-либо события. Различают два вида событий - с ручным и автоматическим сбросом. Ручной сброс осуществляется функцией ResetEvent. События с ручным сбросом используются для уведомления сразу нескольких нитей. При использовании события с автосбросом уведомление получит и продолжит свое выполнение только одна ожидающая нить, остальные будут ожидать дальше.

Функция CreateEvent создает объект-событие, SetEvent - устанавливает событие в сигнальное состояние, ResetEvent - сбрасывает событие. Функция PulseEvent устанавливает событие, а после возобновления ожидающих это событие нитей (всех при ручном сбросе и только одной при автоматическом), сбрасывает его. Если ожидающих нитей нет, PulseEvent просто сбрасывает событие.

Пример. Синхронизация нитей с помощью событий.

#include #include HANDLE hEvent1, hEvent2; int a; HANDLE hThr; unsigned long uThrID; void Thread(void* pParams) { int i, num = 0; while (1) { WaitForSingleObject(hEvent2, INFINITE); for (i=0; i<5; i++) a[i] = num; num++; SetEvent(hEvent1); } } int main(void) { hEvent1=CreateEvent(NULL, FALSE, TRUE, NULL); hEvent2=CreateEvent(NULL, FALSE, FALSE, NULL); hThr=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Thread,NULL,0,&uThrID); while(1) { WaitForSingleObject(hEvent1, INFINITE); printf("%d %d %d %d %d\n", a, a, a, a, a); SetEvent(hEvent2); } return 0; }

Семафоры

Объект-семафор - это фактически объект-взаимоисключение со счетчиком. Данный объект позволяет "захватить" себя определенному количеству нитей. После этого "захват" будет невозможен, пока одна из ранее "захвативших" семафор нитей не освободит его. Семафоры применяются для ограничения количества нитей, одновременно работающих с ресурсом. Объекту при инициализации передается максимальное число нитей, после каждого "захвата" счетчик семафора уменьшается. Сигнальному состоянию соответствует значение счетчика больше нуля. Когда счетчик равен нулю, семафор считается не установленным (сброшенным).

Функция CreateSemaphore создает объект-семафор с указанием и максимально возможного начального его значения, OpenSemaphore – возвращает дескриптор существующего семафора, захват семафора производится с помощью ожидающих функций, при этом значение семафора уменьшается на единицу, ReleaseSemaphore - освобождение семафора с увеличением значения семафора на указанное в параметре число.

Пример. Синхронизация нитей с помощью семафоров.

#include #include HANDLE hSem; int a; HANDLE hThr; unsigned long uThrID; void Thread(void* pParams) { int i, num = 0; while (1) { WaitForSingleObject(hSem, INFINITE); for (i=0; i<5; i++) a[i] = num; num++; ReleaseSemaphore(hSem, 1, NULL); } } int main(void) { hSem=CreateSemaphore(NULL, 1, 1, "MySemaphore1"); hThr=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Thread,NULL,0,&uThrID); while(1) { WaitForSingleObject(hSem, INFINITE); printf("%d %d %d %d %d\n", a, a, a, a, a); ReleaseSemaphore(hSem, 1, NULL); } return 0; }

Защищенный доступ к переменным

Существует ряд функций, позволяющих работать с глобальными переменными из всех нитей, не заботясь о синхронизации, т.к. эти функции сами за ней следят – их выполнение атомарно. Это функции InterlockedIncrement, InterlockedDecrement, InterlockedExchange, InterlockedExchangeAdd и InterlockedCompareExchange. Например, функция InterlockedIncrement атомарно увеличивает значение 32-битной переменной на единицу, что удобно использовать для различных счетчиков.

Для получения полной информации о назначении, использовании и синтаксисе всех функций WIN32 API необходимо воспользоваться системой помощи MS SDK, входящей в состав сред программирования Borland Delphi или CBuilder, а также MSDN, поставляемым в составе системы программирования Visual C.

План занятия:

· Семафоры

· Мьютекс

· Правила упрощенного параллелизма

· Рекурсивный мьютекс

· Условные переменные

Механизмами синхронизации являются средства операционной системы, которые помогают решать основную задачу синхронизации - обеспечивать координацию потоков, которые работают с совместно используемыми данными. Такие средства являются минимальными блоками для построения многопоточных программ, их называют синхронизационными механизмами.

Синхронизационные механизмы подразделяют на следующие основные категории:

  • универсальные для низкого уровня - можно использовать различными способами (семафоры);
  • простые для низкого уровня - каждый из которых приспособлен к решению только одной задачи (мьютексы и условные переменные);
  • универсальные для высокого уровня, выраженные через простые - к этой группе относится концепция монитора, которая может быть выражена через мьютексы и условные переменные;
  • простые для высокого уровня - приспособленные к решению конкретной синхронизационной задачи (блокировка чтения-записи и барьеры).

Семафоры

Концепцию семафоров предложил в 1965 году Э. Дейкстра - известный голландский специалист в области компьютерных наук. Семафоры являются старейшими синхронизационными примитивами из числа применяемых на практике.

Семафор - это совместно используемый неотъемлемый целочисленный счетчик, для которого задано начальное значение и определены следующие атомарные операции.

  • Уменьшение семафора (down): если значение семафора больше нуля, его уменьшают на единицу, если же значение равно нулю, этот поток переходит в состояние ожидания до тех пор, пока оно не станет больше нуля (говорят, что поток «ожидает семафор» или «заблокирован на семафоре»). Эту операцию называют также ожиданиям - wait;
  • Увеличение семафора (up): значение семафора увеличивается на единицу; когда при этом есть потоки, которые ожидают на семафоре, один из них выходит из ожидания и выполняет свою операцию уменьшения. Если на семафоре ожидают несколько потоков, то вследствие выполнения операции увеличения, его значение остается нулевым, но один из потоков продолжает выполнение (в большинстве реализаций выбор этого потока будет случайным). Эту операцию также называют сигнализацией - post.

Фактически значение семафора определяет количество потоков, которое может пройти через этот семафор без блокировки. Когда для семафора задано нулевое начальное значение, то он будет блокировать все потоки до тех пор, пока какой-то поток его не "откроет", выполнив операцию up. Операции up и down могут быть выполнены любыми потоками, имеющих доступ к семафора.


Мьютекс называют синхронизацией примитив, что не допускает выполнения некоторого фрагмента кода более чем одним потоком. Фактически мьютекс является реализацией блокировки на уровне ОС.

Мьютекс, как и следует из его названия, реализует взаимное исключение. Его основная задача - блокировать все потоки, которые пытаются получить доступ к коду, когда этот код уже выполняет некоторый поток.

Мьютекс может находиться в двух состояниях: свободном и занятом. Начальным состоянием является «свободный». Над мьютекс возможны две атомарные операции.

  • Занять мьютекс: если мьютекс был свободен, он становится занятым, и поток продолжает свое выполнение (входя в критическую секцию); если мьютекс был занят, поток переходит в состояние ожидания (говорят, что поток «ожидает мьютекс», или «заблокирован на мьютекс»), выполнение продолжает другой поток. Поток, который занял мьютекс, называют владельцем мьютекс;
  • Освободить мьютекс: мьютекс становится свободным; если на нем ожидают несколько потоков, из них выбирают один, он начинает выполняться, занимает мьютекс и входит в критическую секцию. В большинстве реализаций выбор потока будет случайным. Освободить мьютекс может только его владелец.

Правила упрощенного параллелизма

Правила упрощенного параллелизма предназначены для упрощения программирования на базе мьютексов. Они основываются на том очевидном факте, что мьютекс защищает не код критической секции, а совместно используемые данные внутри этой секции. Особенности работы упрощенного параллелизма:

  • Каждая переменная, которую совместно использует более одного поток, должна быть защищена отдельным мьютексом;
  • Перед каждой операцией изменения такой переменной, соответствующий мьютекс должен быть занят, а после изменения освобожден;
  • Если надо работать одновременно с несколькими совместно используемыми переменными, необходимо занять все их мьютексы до начала работы и освободить их только после полного окончания работы.

Рекурсивный мьютекс

Рекурсивный мьютекс - особый вид мьютекса. Он позволяет повторное занятие тем же потоком, а также отслеживает, какой поток пытается его занять.

Когда это не тот поток, который уже занимает, мьютекс ведет себя как обычный, и поток переходит в состояние ожидания. Когда же это тот самый поток, внутренний счетчик блокировок этого мьютекс увеличивают на единицу, и поток продолжает свое выполнение. В случае увольнения мьютекс внутренний счетчик уменьшается на единицу, для других потоков рекурсивный мьютекс будет разблокирована только тогда, когда счетчик дойдет до нуля (то есть когда все его блокировки одним потоком будут сняты).

Рекурсивные мьютексы менее эффективны в реализации, не могут быть использованы вместе с условными переменными (этот вопрос рассмотрим в следующем разделе), поэтому обращаться к ним нужно только тогда, когда без этого нельзя обойтись. Например, библиотека функций может использовать такие мьютексы для того, чтобы избежать взаимных блокировок при повторном вызове таких функций одним потоком.

Условные переменные

Условной переменной называют синхронизационные примитивы, позволяющие организовать ожидания выполнения условия внутри критической секции, заданной мьютексом. Условная переменная всегда связана с конкретным мьютексом и данными, защищенными этим мьютексом. Для условной переменной определены следующие операции:

  1. Ожидания (wait). Дополнительным входным параметром эта операция принимает мьютекс, который должен находиться в закрытом состоянии. Вызов ожидания происходит в ситуации, когда не выполняется некоторое условие и нужны потоки для продолжения работы. Вследствие выполнения ожидания поток прекращается (говорят, что он «ожидает условной переменной»), а мьютекс открывается (эти два действия происходят атомарно). Так другие потоки получают возможность войти в критическую секцию и изменить там данные, которые она защищает, возможно, выполнив условие, необходимое потоку. На этом операция ожидания не заканчивается - ее завершит другой поток, вызвав операцию сигнализации после того, как условие будет выполнено.
  2. Сигнализация (signal). Эту операцию поток должен выполнить после того, как войдет в критическую секцию и завершит работу с данными (выполнив условие, которое ожидал поток, вызвавший операцию wait). Эта операция проверяет, нет ли потоков, ожидающих условной переменной, и если такие потоки есть, переводит один из них в состояние готовности. В результате восстановления поток завершает выполнение операции ожидания и блокирует мьютекс (обновления и блокировки тоже происходят атомарно). Если нет ни одного потока, который ожидает условной переменной, операция сигнализирования не делает ничего, и информацию о ее выполнении в системе не сохраняют.
  3. Широковещательная сигнализация (broadcast) отличается от обычной тем, что перевод в состояние готовности и восстановление выполняют для всех потоков, ожидающих этой условной переменной, а не только для одного из них.

Таким образом, выполнение операции ожидания состоит из следующих этапов: открытие мьютекс, ожидания (пока другой поток не выполнит операцию signal или broadcast), закрытие мьютекс.

Синхронизация потоков

При построении многопоточного приложения необходимо гарантировать, что любая часть разделяемых данных защищена от возможности изменения их значений множеством потоков. Учитывая, что все потоки в AppDomain имеют параллельный доступ к разделяемым данным приложения, представьте, что может случиться, если несколько потоков одновременно обратятся к одному и тому же элементу данных. Поскольку планировщик потоков случайным образом будет приостанавливать их работу, что если поток А будет прерван до того, как завершит свою работу? А вот что: поток В после этого прочтет нестабильные данные.

Чтобы проиллюстрировать проблему, связанную с параллелизмом, давайте рассмотрим следующий пример:

Public class MyTheard { public void ThreadNumbers() { // Информация о потоке Console.WriteLine("{0} поток использует метод ThreadNumbers",Thread.CurrentThread.Name); // Выводим числа Console.Write("Числа: "); for (int i = 0; i

Прежде чем посмотреть на тестовые запуски, давайте еще раз проясним проблему. Первичный поток внутри этого домена приложений начинает свое существование, порождая десять вторичных рабочих потоков. Каждый рабочий поток должен вызвать метод ThreadNumbers() на одном и том же экземпляре MyTheard. Учитывая, что никаких мер для блокировки разделяемых ресурсов этого объекта (консоли) не предпринималось, есть хороший шанс, что текущий поток будет отключен, прежде чем метод ThreadNumbers() сможет напечатать полные результаты. Поскольку в точности не известно, когда это может случиться (и может ли вообще), будут получаться непредвиденные результаты. Например, может появиться следующий вывод:

Ясно, что здесь присутствует определенная проблема. Как только каждый поток требует от MyTheard печати числовых данных, планировщик потоков меняет их местами в фоновом режиме. В результате получается несогласованный вывод. Для решения подобных проблем в C# используется синхронизация .

В основу синхронизации положено понятие блокировки, посредством которой организуется управление доступом к кодовому блоку в объекте. Когда объект заблокирован одним потоком, остальные потоки не могут получить доступ к заблокированному кодовому блоку. Когда же блокировка снимается одним потоком, объект становится доступным для использования в другом потоке.

Средство блокировки встроено в язык C#. Благодаря этому все объекты могут быть синхронизированы. Синхронизация организуется с помощью ключевого слова lock . Она была предусмотрена в C# с самого начала, и поэтому пользоваться ею намного проще, чем кажется на первый взгляд. В действительности синхронизация объектов во многих программах на C# происходит практически незаметно.

Ниже приведена общая форма блокировки:

lock(lockObj) { // синхронизируемые операторы }

где lockObj обозначает ссылку на синхронизируемый объект. Если же требуется синхронизировать только один оператор, то фигурные скобки не нужны. Оператор lock гарантирует, что фрагмент кода, защищенный блокировкой для данного объекта, будет использоваться только в потоке, получающем эту блокировку. А все остальные потоки блокируются до тех пор, пока блокировка не будет снята. Блокировка снимается по завершении защищаемого ею фрагмента кода.

Блокируемым считается такой объект, который представляет синхронизируемый ресурс. В некоторых случаях им оказывается экземпляр самого ресурса или же произвольный экземпляр объекта, используемого для синхронизации. Следует, однако, иметь в виду, что блокируемый объект не должен быть общедоступным, так как в противном случае он может быть заблокирован из другого, неконтролируемого в программе фрагмента кода и в дальнейшем вообще не разблокируется.

В прошлом для блокировки объектов очень часто применялась конструкция lock (this). Но она пригодна только в том случае, если this является ссылкой на закрытый объект. В связи с возможными программными и концептуальными ошибками, к которым может привести конструкция lock (this), применять ее больше не рекомендуется. Вместо нее лучше создать закрытый объект, чтобы затем заблокировать его.

Давайте модифицируем предыдущий пример добавив в него синхронизацию:

Public class MyTheard { private object threadLock = new object(); public void ThreadNumbers() { // Используем маркер блокировки lock (threadLock) { // Информация о потоке Console.WriteLine("{0} поток использует метод ThreadNumbers", Thread.CurrentThread.Name); // Выводим числа Console.Write("Числа: "); for (int i = 0; i

Как только поток войдет в контекст lock, маркер блокировки (в данном случае - текущий объект) станет недоступным другим потокам до тех пор, пока блокировка не будет снята по выходе из контекста lock. Таким образом, если поток А захватит маркер блокировки, другие потоки не смогут войти ни в один из контекстов, использующих тот же маркер, до тех пор, пока поток А не освободит его.

В предыдущих частях статьи я рассказал об общих принципах и конкретных методах построения многопоточных приложений. Различным потокам практически всегда периодически требуется взаимодействие между собой, при этом неизбежно возникает необходимость в синхронизации. Сегодня мы рассмотрим важнейший, самый мощный и универсальный инструмент синхронизации Windows: объекты синхронизации ядра.

WaitForMultipleObjects и другие функции ожидания

Как вы помните, для синхронизации потоков, обычно требуется временно приостановить выполнение одного из потоков. При этом он должен быть переведён средствами операционной системы в состояние ожидания, в котором он не занимает процессорное время. Мы уже знаем две функции, которые могут это делать: SuspendThread и ResumeThread . Но как я рассказывал в предыдущей части статьи, в силу некоторых особенностей эти функции непригодны для синхронизации.

Сегодня мы рассмотрим другую функцию, которая также переводит в поток в состояние ожидания, но в отличие от SuspendThread/ResumeThread , специально предназначена именно для организации синхронизации. Это WaitForMultipleObjects . Поскольку эта функция очень важна, я несколько отступлю от своего правила не вдаваться в детали API и расскажу о ней подробнее, даже приведу ее прототип:

DWORD WaitForMultipleObjects(

DWORD nCount, // число объектов в массиве lpHandles

CONST HANDLE * lpHandles, // указатель на массив описателей объектов ядра

BOOL bWaitAll, // флаг, означающей надо ли дожидаться всех объектов или достаточно одного

DWORD dwMilliseconds // таймаут

Главный параметр этой функции - это указатель на массив хэндлов объектов ядра. О том, что это за объекты, мы поговорим ниже. Пока нам важно знать то, что любой из таких объектов может находиться в одном из двух состояний: нейтральном или «сигнализирующем» (signaled state). Если флаг bWaitAll равен FALSE, функция вернет управление, как только хотя бы один из объектов подаст сигнал. А если флаг равен TRUE, это произойдет только тогда, когда сразу все объекты начнут сигнализировать (как мы увидим, это важнейшее свойство этой функции). В первом случае по возвращаемому значению можно узнать, какой именно из объектов подал сигнал. Надо вычесть из него константу WAIT_OBJECT_0 , и получится индекс в массиве lpHandles. Если время ожидания превысило указанный в последнем параметре таймаут, функция прекратит ожидание и вернет значение WAIT_TIMEOUT . В качестве таймаута можно указать константу INFINITE , и тогда функция будет ждать «до упора», а можно наоборот 0, и тогда поток вообще не будет приостановлен. В последнем случае функция вернет управление немедленно, но по ее результату можно будет узнать состояние объектов. Последний прием используется очень часто. Как видите, эта функция обладает богатыми возможностями. Имеется еще несколько WaitForXXX функций, но все они представляют собой вариации на тему главной. В частности, WaitForSingleObject представляет собой всего лишь ее упрощенный вариант. Остальные имеют каждая свою дополнительную функциональность, но применяются, в общем-то, реже. Например, они дают возможность реагировать не только на сигналы объектов ядра, но и на поступление новых оконных сообщений в очередь потока. Их описание, так же как и детальные сведения о WaitForMultipleObjects , вы, как обычно, найдете в MSDN.

Теперь о том, что же это за таинственные «объекты ядра». Начнем с того, что в их число входят сами потоки и процессы. Они переходят в сигнализирующее состояние сразу по завершении. Это очень важная возможность, поскольку очень часто бывает необходимо отслеживать момент завершения потока или процесса. Пусть, например, наше серверное приложение с набором рабочих потоков должно завершиться. Управляющий поток при этом должен каким-либо способом проинформировать рабочие потоки о том, что пора заканчивать работу (например, установив глобальный флаг), после чего дождаться, пока все потоки не завершатся, сделав все необходимые для корректного завершения действия: освободив ресурсы, информировав клиентов о завершении работы, закрыв сетевые соединения и т.п.

То, что потоки включают сигнал по окончанию работы, позволяет решить задачу синхронизации с завершением потока предельно легко:

// Пусть для простоты у нас будет всего один рабочий поток. Запускаем его:

HANDLE hWorkerThread = :: CreateThread(...);

// Перед окончанием работы надо каким-либо образом сообщаем рабочему потоку, что пора закачивать.

// Ждем завершения потока:

DWORD dwWaitResult = :: WaitForSingleObject( hWorkerThread, INFINITE );

if ( dwWaitResult != WAIT_OBJECT_0 ) { /* обработка ошибки */ }

// "Хэндл" потока можно закрыть:

VERIFY(:: CloseHandle( hWorkerThread );

/* Если CloseHandle завершилась неудачей и вернула FALSE, я не выбрасываю исключение. Во-первых, даже если бы это произошло из-за системной ошибки, это не имело бы прямых последствий для нашей программы, ведь раз мы закрываем хендл, значит никакой работы с ним в дальнейшем не предполагается. Реально же неудача CloseHandle может означать только ошибку в вашей программе. Поэтому вставим здесь макрос VERIFY, чтобы не пропустить её на этапе отладки приложения. */

Аналогично будет выглядеть код, ожидающий завершения процесса.

Если бы такой встроенной в систему возможности не было, рабочий поток должен был бы сам как-то передать информацию о своем завершении главному. Даже если бы он делал бы это в самую последнюю очередь, главный поток не мог бы быть уверенным, что рабочему не осталось выполнить хотя бы пару ассемблерных инструкций. В отдельных ситуациях (например, если код потока находится в DLL, которая должна быть выгружена по его завершении) это может привести к фатальным последствиям.

Хочу напомнить вам, что даже после того, как поток (или процесс) завершился, его описатели все равно остаются в силе до тех пор, пока явно не будут закрыты функцией CloseHandle . (Кстати, не забывайте делать это!) Это сделано как раз для того, чтобы в любой момент можно было бы проверить состояние потока.

Итак, функция WaitForMultipleObjects (и ее аналоги) позволяет синхронизировать выполнение потока с состоянием объектов синхронизации, в частности других потоков и процессов.

Специальные объекты ядра

Перейдем к рассмотрению объектов ядра, которые предназначены специально для синхронизации. Это события, семафоры и мьютексы. Кратко рассмотрим каждый из них:

Событие (event)

Пожалуй, самый простой и фундаментальный синхронизирующий объект. Это всего лишь флаг, которому функциями SetEvent/ResetEvent можно задать состояние: сигнализирующее или нейтральное. Событие - это самый удобный способ передать сигнал ожидающему потоку, что некое событие свершилось (потому оно так и называется), и можно продолжать работу. С помощью события мы легко решим проблему синхронизации при инициализации рабочего потока:

// Пусть для простоты хэндл события будет храниться в глобальной переменной:

HANDLE g_hEventInitComplete = NULL; // никогда не оставляем переменную неинициализированной!

{ // код в главном потоке

// создаем событие

g_hEventInitComplete = :: CreateEvent( NULL,

FALSE, // об этом параметре мы еще поговорим

FALSE, // начальное состояние - нейтральное

if (! g_hEventInitComplete ) { /* не забываем про обработку ошибок */ }

// создаем рабочий поток

DWORD idWorkerThread = 0 ;

HANDLE hWorkerThread = :: CreateThread( NULL, 0 , & WorkerThreadProc, NULL, 0 , & idWorkerThread );

if (! hWorkerThread ) { /* обработка ошибки */ }

// ждем сигнала от рабочего потока

DWORD dwWaitResult = :: WaitForSingleObject( g_hEventInitComplete, INFINITE );

if ( dwWaitResult != WAIT_OBJECT_0 ) { /* ошибка */ }

// вот теперь можно быть уверенным, что рабочий поток завершил инициализацию.

VERIFY(:: CloseHandle( g_hEventInitComplete )); // не забываем закрывать ненужные объекты

g_hEventInitComplete = NULL;

// функция рабочего потока

DWORD WINAPI WorkerThreadProc( LPVOID _parameter )

InitializeWorker(); // инициализация

// сигнализируем, что инициализация завершена

BOOL isOk = :: SetEvent( g_hEventInitComplete );

if (! isOk ) { /* ошибка */ }

Надо заметить, что существуют две заметно отличающиеся разновидности событий. Мы можем выбрать одну из них с помощью второго параметра функции CreateEvent . Если он TRUE, создается событие, состояние которого управляется только вручную, то есть функциями SetEvent/ResetEvent . Если же он FALSE, будет создано событие с автосбросом. Это означает, что как только некий поток, ожидающий данного события, будет освобожден сигналом от этого события, оно автоматически будет сброшено обратно в нейтральное состояние. Наиболее ярко их отличие проявляется в ситуации, когда одного события ожидают сразу несколько потоков. Событие с ручным управлением подобно стартовому пистолету. Как только оно будет установлено в сигнализирующее состояние, будут освобождены сразу все потоки. Событие же с автосбросом похоже на турникет в метро: оно отпустит лишь один поток и вернется в нейтральное состояние.

Мьютекс (mutex)

По сравнению с событием это более специализированный объект. Обычно он используется для решения такой распространенной задачи синхронизации, как доступ к общему для нескольких потоков ресурсу. Во многом он похож на событие с автосбросом. Основное отличие состоит в том, что он имеет специальную привязку к конкретному потоку. Если мьютекс находится в сигнальном состоянии - это означает, что он свободен и не принадлежит ни одному потоку. Как только некий поток дождался этого мьютекса, последний сбрасывается в нейтральное состояние (здесь он как раз похож на событие с автосбросом), а поток становится его хозяином до тех пор, пока явно не освободит мьютекс функцией ReleaseMutex , или же не завершится. Таким образом, чтобы быть уверенным, что с разделяемыми данными одновременно работает только один поток, следует все места, где происходит такая работа, окружить парой: WaitFor - ReleaseMutex :

HANDLE g_hMutex;

// Пусть хэндл мьютекса хранится в глобальной переменной. Его конечно надо создать заранее, до запуска рабочих потоков. Будем считать, что это уже было сделано.

int iWait = :: WaitForSingleObject( g_hMutex, INFINITE );

switch ( iWait ) {

case WAIT_OBJECT_0: // Все нормально

break ;

case WAIT_ABANDONED: /* Какой-то поток завершился, забыв вызвать ReleaseMutex. Скорее всего, это означает ошибку в вашей программе! Поэтому на всякий случай вставим здесь ASSERT, но в окончательно версии (release) будем считать этот код успешным. */

ASSERT( false );

break ;

default :

// Здесь должна быть обработка ошибки.

// Защищенный мьютексом участок кода.

ProcessCommonData();

VERIFY(:: ReleaseMutex( g_hMutex ));

Чем же мьютекс лучше события с автосбросом? В приведенном примере его также можно было бы использовать, только ReleaseMutex надо было бы заменить на SetEvent . Однако может возникнуть следующая сложность. Чаще всего работать с общими данными приходится в нескольких местах. Что будет, если ProcessCommonData в нашем примере вызовет функцию, которая работает с этими же данными и в которой уже есть своя пара WaitFor - ReleaseMutex (на практике это встречается весьма часто)? Если бы мы использовали событие, программа, очевидно, зависла бы, поскольку внутри защищенного блока событие находится в нейтральном состоянии. Мьютекс же устроен более хитро. Для потока-хозяина он всегда остается в сигнализирующем состоянии, несмотря на то, что для всех остальных потоков он при этом находится в нейтральном. Поэтому если поток захватил мьютекс, повторный вызов WaitFor функции не приведет к блокировке. Более того, в мьютекс встроен еще и счетчик, так что ReleaseMutex должна быть вызвана столько же раз, сколько было вызовов WaitFor . Таким образом, мы можем смело защищать каждый участок кода, работающий с общими данными, парой WaitFor - ReleaseMute x, не волнуясь о том, что этот код может быть вызван рекурсивно. Это делает мьютекс очень простым в использовании инструментом.

Семафор (semaphore)

Еще более специфический объект синхронизации. Должен сознаться, что в моей практике еще не было случая, где он пригодился бы. Семафор предназначен для того, чтобы ограничить максимальное число потоков, одновременно работающих с неким ресурсом. По сути, семафор - это событие со счетчиком. Пока этот счетчик больше нуля, семафор находится в сигнализирующем состоянии. Однако каждый вызов WaitFor уменьшает этот счетчик на единицу до тех пор, пока он не станет равным нулю и семафор перейдет в нейтральное состояние. Подобно мьютексу, для семафора есть функция ReleaseSemaphor , увеличивающая счётчик. Однако в отличие от мьютекса семафор не привязан к потоку и повторный вызов WaitFor/ReleaseSemaphor еще раз уменьшит/увеличит счетчик.

Как можно использовать семафор? Например, с его помощью можно искусственно ограничить многопоточность. Как я уже рассказывал, слишком много одновременно активных потоков может заметно ухудшить производительность всей системы из-за частых переключений контекстов. И если уж нам пришлось создать слишком много рабочих потоков, можно ограничить число одновременно активных потоков числом порядка числа процессоров.

Что ещё можно сказать про объекты синхронизации ядра? Очень удобна возможность давать им имена. Соответствующий параметр есть у всех функций, создающих объекты синхронизации: CreateEvent , CreateMutex , CreateSemaphore . Если вы дважды вызовете, к примеру CreateEvent , оба раза указав одно и тоже непустое имя, то второй раз функция вместо того, чтобы создать новый объект, вернет хэндл уже существующего. Это произойдет, даже если второй вызов был сделан из другого процесса. Последнее очень удобно в тех случаях, когда требуется синхронизировать потоки, принадлежащие разным процессам.

Когда объект синхронизации вам больше не требуется, не забывайте вызвать функцию CloseHandle , о которой я уже упоминал, когда рассказывал о потоках. На самом деле, она не обязательно сразу же удалит объект. Дело в том, что хэндлов у объекта может быть создано несколько, тогда он будет удален только когда будет закрыт последний из них.

Хочу напомнить, что лучший способ гарантировать, чтобы CloseHandle или подобная «очищающая» функция была обязательно вызвана, даже в случае нештатной ситуации, - это поместить ее в деструктор. Об этом, кстати, в свое время неплохо и очень подробно рассказывалось в статье Кирилла Плешивцева «Умный деструктор». В приведённых выше примерах я не использовал этот приём исключительно в учебных целях, чтобы работа функций API была более наглядной. В реальном же коде для очистки следует всегда использовать классы-оболочки с умными деструкторами.

Кстати, с функцией ReleaseMutex и подобными постоянно возникает та же проблема, что и с CloseHandle . Она обязана быть вызвана по окончании работы с общими данными, независимо от того, насколько успешно эта работа была завершена (ведь могло быть выброшено исключение). Последствия «забывчивости» здесь более серьёзны. Если не вызванный CloseHandle приведёт лишь у утечке ресурсов (что тоже плохо!), то не освобожденный мьютекс не позволит другим потокам работать с общим ресурсом до самого завершения давшего сбой потока, что скорее всего не позволит приложению нормально функционировать. Чтобы избежать этого, нам опять поможет специально обученный класс с умным деструктором.

Заканчивая обзор объектов синхронизации, хочется упомянуть об объекте, которого нет в Win32 API. Многие мои коллеги высказывают недоумение, почему в Win32 нет специализированного объекта типа «один пишет, многие читают». Этакий «продвинутый мьютекс», следящий за тем, чтобы получить доступ к общим данным на запись мог бы одновременно только один поток, а только на чтение - сразу несколько. Подобный объект можно найти в UNIX"ах. Некоторые библиотеки, например от Borland, предлагают эмулировать его на основе стандартных объектов синхронизации. Впрочем, реальная польза от таких эмуляций весьма сомнительна. Эффективно такой объект может быть реализован только на уровне ядра операционной системы. Но в ядре Windows подобный объект не предусмотрен.

Почему же разработчики ядра Windows NT не позаботились об этом? Чем мы хуже UNIX? На мой взгляд, ответ заключается в том, что реальной потребности в таком объекте для Windows пока просто не возникало. На обычной однопроцессорной машине, где потоки все равно физически не могут работать одновременно, он будет практически эквивалентен мьютексу. На многопроцессорной машине он может дать выигрыш за счёт того, что позволит читающим потокам работать параллельно. Вместе с тем, реально этот выигрыш станет ощутим лишь когда вероятность «столкновения» читающих потоков велика. Несомненно, что к примеру на 1024-процессорной машине подобный объект ядра будет жизненно необходим. Подобные машины существуют, но это - специализированные системы, работающие под специализированными ОС. Зачастую, такие ОС строят на основе UNIX, вероятно оттуда объект типа «один пишет, многие читают» попал и в более общеупотребительные версии этой системы. Но на привычных нам x86-машинах установлен, как правило, всего один и лишь изредка два процессора. И только самые продвинутые модели процессоров типа Intel Xeon поддерживают 4-х и даже более процессорные конфигурации, но такие системы пока остаются экзотикой. Но даже на такой «продвинутой» системе «продвинутый мьютекс» сможет дать заметный выигрыш в производительности лишь в очень специфических ситуациях.

Таким образом, реализация «продвинутого» мьютекса просто не стоит свеч. На «малопроцессорной» машине он может оказаться даже менее эффективным из-за усложнения логики объекта по сравнению со стандартным мьютексом. Учтите, реализация подобного объекта не так проста, как может показаться на первый взгляд. При неудачной реализации, если читающих потоков будет слишком много, пишущему потоку будет просто «не пробиться» к данным. По этим причинам я также не рекомендую вам пытаться эмулировать такой объект. В реальных приложениях на реальных машинах обычный мьютекс или критическая секция (о которой речь пойдет в следующей части статьи) прекрасно справится с задачей синхронизации доступа к общим данным. Хотя, я полагаю, с развитием ОС Windows объект ядра «один пишет многие читают» рано или поздно появится.

Примечание. На самом деле, объект «один пишет - многие читают» в Windows NT всё-таки есть. Просто, когда я писал эту статью, ещё не знал об этом. Этот объект носит название «ресурсы ядра» и не доступен для програм пользовательского режима, вероятно поэтому и не слишком известен. Подобности о нём можно найти в DDK. Спасибо Константину Манурину, что указал мне на это.

Deadlock

А теперь вернемся к функции WaitForMultipleObjects , точнее к ее третьему параметру, bWaitAll. Я обещал рассказать, почему возможность ожидания сразу нескольких объектов так важна.

Почему необходима функция, позволяющая ожидать один из нескольких объектов, понятно. В отсутствие специальной функции это можно было бы сделать, разве что последовательно проверяя состояние объектов в пустом цикле, что, конечно же, недопустимо. А вот необходимость в специальной функции, позволяющей ожидать момента, когда сразу несколько объектов перейдут в сигнальное состояние, не так очевидна. Действительно, представим следующую характерную ситуацию: нашему потоку в определенный момент необходим доступ сразу к двум наборам общих данных, за каждый из которых отвечает свой мьютекс, назовем их А и В. Казалось бы, поток может сначала подождать, пока освободиться мьютекс А, захватить его, затем подождать освобождения мьютекса В... Вроде бы, можно обойтись парой вызовов WaitForSingleObject . Действительно, это будет работать, но только до тех пор, пока все остальные потоки будут захватывать мьютексы в том же порядке: сначала А, потом В. Что произойдет, если некий поток попытается сделать наоборот: захватить сначала В, затем А? Рано или поздно возникнет ситуация, когда один поток захватил мьютекс А, другой В, первый ждет, когда освободится В, второй А. Понятно, что они этого никогда не дождутся и программа зависнет.

Подобная взаимная блокировка (deadlock) является очень характерной ошибкой. Как и все ошибки, связанные с синхронизацией, она проявляется лишь время от времени и способна испортить программисту немало нервов. В то же время взаимоблокировкой чревата практически любая схема, в которую вовлечены несколько объектов синхронизации. Поэтому, этой проблеме следует уделять особое внимание на этапе проектирования такой схемы.

В приведенном простом примере избежать блокировки довольно легко. Надо потребовать, чтобы все потоки захватывали мьютексы в определенном порядке: сначала А, потом В. Однако в сложной программе, где много различным образом связанных друг с другом объектов, добиться этого обычно бывает не так просто. В блокировку могут быть вовлечены не два, а много объектов и потоков. Поэтому, самый надёжный способ избежать взаимной блокировки в ситуации, когда потоку требуется сразу несколько объектов синхронизации - это захватывать их все одним вызовом функции WaitForMultipleObjects с параметром bWaitAll=TRUE. По правде говоря, при этом мы всего лишь перекладываем проблему взаимных блокировок на ядро операционной системы, но главное - это уже будет не наша забота. Впрочем, в сложной программе со множеством объектов, когда не всегда сразу можно сказать, какие именно из них потребуются для выполнения той или иной операции, свести все вызовы WaitFor в одно место и объединить тоже часто оказывается не просто.

Таким образом, есть два пути избежать возникновения deadlock. Надо либо следить, чтобы объекты синхронизации всегда захватывались потоками строго в одном и том же порядке, либо чтобы они захватывались единым вызовом WaitForMultipleObjects . Последний метод более прост и предпочтителен. Однако на практике, с выполнением и того и другого требования постоянно возникают сложности, приходится сочетать оба этих подхода. Проектирование сложных схем синхронизации часто является весьма нетривиальной задачей.

Пример организации синхронизации

В большинстве типичных ситуаций, вроде тех, что я описывал выше, организовать синхронизацию не составляет труда, достаточно события или мьютекса. Но периодически встречаются более сложные случаи, где решение проблемы не так очевидно. Хочу проиллюстрировать это на конкретном примере из своей практики. Как вы увидите, решение оказалось удивительно простым, но прежде чем я нашел его, мне пришлось перепробовать несколько неудачных вариантов.

Итак, задача. Практически во всех современных менеджерах закачек (download managers), или попросту говоря «качалках» есть возможность ограничения трафика, чтобы работающая в фоновом режиме «качалка» не сильно мешала пользователю лазить по Сети. Я разрабатывал похожую программу, и передо мной была поставлена задача реализовать именно такую «фичу». Моя качалка работала по классической схеме многопоточности, когда каждой задачей, в данном случае скачиванием конкретного файла, занимается отдельный поток. Ограничение трафика должно было быть суммарным для всех потоков. То есть, нужно было добиться, чтобы в течение заданного интервала времени все потоки считывали из своих сокетов не более определенного количества байтов. Просто разделить этот лимит поровну между потоками, очевидно, будет неэффективно, поскольку скачивание файлов может идти весьма неравномерно, один будет качаться быстро, другой медленно. Следовательно, нужен общий для всех потоков счетчик, сколько байт считано, и сколько еще можно считать. Здесь-то и не обойтись без синхронизации. Дополнительную сложность задаче придало требование того, чтобы в любой момент любой из рабочих потоков можно было остановить.

Сформулируем задачу более детально. Систему синхронизации я решил заключить в специальный класс. Вот его интерфейс:

class CQuota {

public : // methods

void Set( unsigned int _nQuota );

unsigned int Request( unsigned int _nBytesToRead, HANDLE _hStopEvent );

void Release( unsigned int _nBytesRevert, HANDLE _hStopEvent );

Периодически, скажем раз в секунду, управляющий поток вызывает метод Set, устанавливая квоту на скачивание. Перед тем, как рабочий поток считает данные, полученные из сети, он вызывает метод Request, который проверяет, что текущая квота не равна нулю, и если да, возвращает не превышающее текущую квоту число байтов, которые можно считать. Квота соответственно уменьшается на это число. Если же при вызове Request квота равна нулю, вызывающий поток должен подождать, пока она не появится. Иногда случается, что реально получено меньше байтов, чем было запрошено, в таком случае поток возвращает часть выделенной ему квоты методом Release. И, как я уже сказал, пользователь в любой момент может дать команду прекратить скачивание. В таком случае ожидание надо прервать независимо от наличия квоты. Для этого используется специальное событие: _hStopEvent. Поскольку задачи можно запускать и останавливать независимо друг от друга, для каждого рабочего потока используется свое событие остановки. Его описатель передается методам Request и Release.

В одном из неудачных вариантов я попробовал использовать сочетание мьютекса, синхронизирующего доступ к классу CQuota и события, сигнализирующего наличие квоты. Однако в эту схему никак не вписывается событие остановки. Если поток желает получить квоту, то его состояние ожидания должно управляться сложным логическим выражением: ((мьютекс И событие наличия квоты) ИЛИ событие остановки). Но WaitForMultipleObjects такого не позволяет, можно объединить несколько объектов ядра либо операцией И, либо ИЛИ, но не вперемешку. Попытка разделить ожидание двумя последовательными вызовами WaitForMultipleObjects неизбежно приводит к deadlock. В общем, этот путь оказался тупиковым.

Не буду больше напускать тумана и расскажу решение. Как я уже говорил, мьютекс очень похож на событие с автосбросом. И здесь мы имеем как раз тот редкий случай, когда удобнее использовать именно его, но не одно а сразу два:

class CQuota {

private: // data

unsigned int m_nQuota;

CEvent m_eventHasQuota;

CEvent m_eventNoQuota;

Одновременно может быть установлено только одно из этих событий. Любой поток, произведший манипуляции с квотой должен установить первое событие, если оставшаяся квота не равна нулю, и второе, если квота исчерпана. Поток, желающий получить квоту, должен подождать первого события. Потоку же, увеличивающего квоту, достаточно дождаться любого из этих событий, поскольку если оба они находятся в сброшенном состоянии, это означает, что с квотой в данный момент работает другой поток. Таким образом, два события выполняют сразу две функции: синхронизации доступа к данным и ожидание. Наконец, поскольку поток ожидает одного из двух событий, сюда легко включается и событие, дающее сигнал на остановку.

Приведу для примера реализацию метода Request. Остальные реализуются аналогично. Я слегка упростил код, использовавшийся в реальном проекте:

unsigned int CQuota:: Request( unsigned int _nRequest, HANDLE _hStopEvent )

if (! _nRequest ) return 0 ;

unsigned int nProvide = 0 ;

HANDLE hEvents[ 2 ];

hEvents[ 0 ] = _hStopEvent; // Событие остановки имеет больший приоритет. Ставим его первым.

hEvents[ 1 ] = m_eventHasQuota;

int iWaitResult = :: WaitForMultipleObjects( 2 , hEvents, FALSE, INFINITE );

switch ( iWaitResult ) {

case WAIT_FAILED:

// ОШИБКА

throw new CWin32Exception;

case WAIT_OBJECT_0:

// Событие остановки. Я обрабатывал его с помощью специального исключения, но ничто не мешает реализовать это как-то иначе.

throw new CStopException;

case WAIT_OBJECT_0+ 1 :

// Событие "квота доступна"

ASSERT( m_nQuota ); // Если сигнал подало это событие, но квоты на самом деле нет, значит где-то мы ошиблись. Надо искать баг!

if ( _nRequest >= m_nQuota ) {

nProvide = m_nQuota;

m_nQuota = 0 ;

m_eventNoQuota. Set();

else {

nProvide = _nRequest;

m_nQuota -= _nRequest;

m_eventHasQuota. Set();

break ;

return nProvide;

Маленькое замечание. Библиотека MFC в том проекте не использовалась, но, как вы наверно уже догадались, я сделал собственный класс CEvent, оболочку вокруг объекта ядра «событие», подобную MFC"шной. Как я уже говорил, такие простые классы-оболочки очень полезны, когда есть некий ресурс (в данном случае объект ядра), который необходимо не забыть освободить по окончанию работы. В остальном же нет разницы, писать ли SetEvent(m_hEvent) или m_event.Set().

Надеюсь, этот пример поможет вам спроектировать собственную схему синхронизации, если вам встретится нетривиальная ситуация. Главное - максимально тщательно проанализируйте вашу схему. Не может ли возникнуть ситуации, в которой она сработает неправильно, в частности, не может ли возникнуть блокировка? Отлавливать такие ошибки в отладчике - обычно безнадежное дело, здесь помогает только детальный анализ.

Итак, мы рассмотрели важнейшее средство синхронизации потоков: объекты синхронизации ядра. Это мощный и универсальный инструмент. С его помощью можно строить даже очень сложные схемы синхронизации. К счастью, такие нетривиальные ситуации встречаются нечасто. Кроме того, за универсальность всегда приходится расплачиваться производительностью. Поэтому во многих случаях стоит использовать другие средства синхронизации потоков, имеющиеся в Windows, такие как критические секции и атомарные операции. Они не так универсальны, зато просты и эффективны. О них мы и поговорим в следующей части.