В предыдущем посте были рассмотрены концепции и структуру распределенной файловой системы HDFS. Ниже поговорим об архитектуре фреймворка распределенных вычислений Hadoop MapReduce, программной модели map/reduce и концепциях, лежащих в ее основе.
Программная модель map/reduce
Выполнение распределенных задач на платформе Hadoop происходит в рамках парадигмы
map/reduce*.
map/reduce – это парадигма (программная модель) выполнения распределенных
вычислений для больших объемов данных.
В общем случае, для map/reduce выделяют 2 фазы:
- map(ƒ, c)
Принимает функцию ƒ и список c. Возвращает выходной список, являющийся результатом применения функции ƒ к каждому элементу входного списка c. - reduce(ƒ, c)
Принимает функцию ƒ и список c. Возвращает объект, образованный через свертку коллекции c через функцию ƒ.
Программная модель map/reduce была позаимствована из функционального программирования,
хотя в реализации Hadoop и имеет некоторые семантические отличия от прототипа в
функциональных языках.
Как и в функциональных языках, при использовании программной
модели map/reduce:
- входные данные не изменяются;
- разработчик кодирует, что нужно сделать, а не как нужно сделать.
На январь 2012 года широко известны следующие программные реализации модели map/reduce:
- Google MapReduce – закрытая реализация от Google на C++;
- CouchDB и MongoDB – реализации для NoSQL баз данных;
- Hadoop MapReduce – открытая реализация на Java для Apache Hadoop.
Полный список проектов, использующих программные реализации программной модели map/reduce
можно найти в источнике [20].
Обзор Hadoop MapReduce
Hadoop MapReduce – программная модель (framework) выполнения распределенных
вычислений для больших объемов данных в рамках парадигмы map/reduce, представляющая
собой набор Java-классов и исполняемых утилит для создания и обработки заданий на
параллельную обработку.
Основные концепции Hadoop MapReduce можно сформулировать как:
- обработка/вычисление больших объемов данных;
- масштабируемость;
- автоматическое распараллеливание заданий;
- работа на ненадежном оборудовании;
- автоматическая обработка отказов выполнения заданий.
Работу Hadoop MapReduce можно условно поделить на следующие этапы:
- Input read
Входные данные делятся на блоки данных предопределенного размера (от 16 Мб до 128 Мб) – сплиты (от англ. split). MapReduce Framework закрепляет за каждой функцией Map определенный сплит. - Map
Каждая функция Map получает на вход список пар «ключ/значение» <k,v>, обрабатывает их и на выходе получает ноль или более пар <k',v'>, являющихся промежуточным результатом.
map(k, v) -> [(k', v')]
где k' - в общем случае, произвольный ключ, не совпадающий с k.
Все операции map() выполняются параллельно и не зависят от результатов работы друг друга. Каждая функция map() получает на вход свой уникальный набор данных, не повторяющийся ни для какой другой функции map(). - Partition / Combine
Целью этапа partition (разделение) является распределение промежуточных результатов, полученных на этапе map, по reduce-заданиям.
(k', reducers_count) -> reducer_id
где reducers_count - количество узлов, на которых запускается операция свертки;
reducer_id - идентификатор целевого узла.
В простейшем случае,reducer_id = hash(k') mod reducers_count
Основная цель этапа partition – это балансировка нагрузки. Некорректно реализованная функция partition может привести к неравномерному распределению данных между reduce-узлами.
Функция combine запускается после map-фазы. В ней происходит промежуточная свертка, локальных по отношению к функции map, значений.[(k', v')] -> (k', [v'])
Основное значение функции combine – комбинирование промежуточных данных, что в свою очередь ведет, к уменьшению объема передаваемой между узлами информации. - Copy / Сompare / Merge
На этом этапе происходит:- Copy: копирование результатов, полученных в результате работы функций map и combine (если такая была определена), с map-узлов на reduce-узлы.
- Сompare (или Sort): сортировка, группировка по ключу k полученных в результате операции
copy промежуточных значений на reduce-узле.
compare(k'n, k'n+1) -> {-1, 0, +1}
- Merge: «слияние» данных, полученных от разных узлов, для операции свёртки.
- Reduce
Framework вызывает функцию reduce для каждого уникального ключа k' в отсортированном списке значений.
reduce(k', [v']) -> [v'']
Все операции reduce() выполняются параллельно и не зависят от результатов работы друг друга. Таким образом, результаты работы каждой функции reduce() пишутся в отдельный выходной поток. - Output write
Результаты, полученные на этапе reduce, записываются в выходной поток (в общем случае, файловые блоки в HDFS). Каждый reduce-узел пишет в собственный выходной поток.
Разработчику приложения для Hadoop MapReduce необходимо реализовать базовый обработчик,
который на каждом вычислительном узле кластера обеспечит преобразование исходных
пар «ключ/значение» в промежуточный набор пар «ключ/значение» (класс, реализующий
интерфейс Mapper), и обработчик, сводящий промежуточный набор пар в окончательный,
сокращённый набор (класс, реализующий интерфейс Reducer) [20].
Все остальные фазы выполняются программной моделью MapReduce без дополнительного
кодирования со стороны разработчика. Кроме того, среда выполнения Hadoop MapReduce выполняет следующие функции:
- планирование заданий;
- распараллеливание заданий;
- перенос заданий к данным;
- синхронизация выполнения заданий;
- перехват «проваленных» заданий;
- обработка отказов выполнения заданий и перезапуск проваленных заданий;
- оптимизация сетевых взаимодействий.
Архитектура Hadoop MapReduce
Hadoop MapReduce использует архитектуру «master-worker», где master –
единственный
экземпляр управляющего процесса (JobTracker), как правило, запущенный на отдельной
машине (вычислительном узле). Worker-процессы – это произвольное множество процессов
TaskTracker, исполняющихся на DataNode.
JobTracker и TaskTracker «лежат» над уровнем хранения HDFS, и запускаются/исполняются
в соответствии со следующими правилами:
- экземпляр JobTracker исполняется на NameNode-узле HDFS;
- экземпляры TaskTracker исполняются на DataNode-узле;
- TaskTracker исполняются в соответствии с принципом «данные близко», т.е. процесс TaskTracker располагается топологически максимально близко с узлом DataNode, данные которого обрабатываются.
Вышеописанные принципы расположения JobTracker- и TaskTracker-процессов позволяют
существенно сократить объемы передаваемых по сети данных и сетевые задержки, связанные
с передачей этих данных – основные «узкие места» производительности в современных
распределенных системах.
JobTracker является единственным узлом, на котором выполняется приложение MapReduce,
вызываемое программным клиентом. JobTracker выполняет следующие функции:
- планирование индивидуальных (по отношению к DataNode) заданий map и reduce, промежуточных свёрток;
- координация заданий;
- мониторинг выполнения заданий;
- переназначение завершившихся неудачей заданий другим узлам TaskTracker.
В свою очередь, TaskTracker выполняет следующие функции:
- исполнение map- и reduce-заданий;
- управление исполнением заданий;
- отправка сообщений о статусе задачи и завершении работы узлу JobTracker;
- отправка диагностических heartbeat-сообщений узлу JobTracker.
Взаимодействие TaskTracker-узлов с узлом JobTracker идет посредством RPC-вызовов,
причем вызовы идут только от TaskTracker. Аналогичный принцип взаимодействия реализован
в HDFS – между узлами DataNode и NameNode-узлом. Такое решение уменьшает зависимость
управляющего процесса JobTracker от процессов TaskTracker.
Взаимодействие JobTracker-узла с клиентом (программным) проходит по следующей схеме:
JobTracker принимает задание (Job) от клиента и разбивает задание на множество M
map-задач и множество R reduce-задач. Узел JobTracker использует информацию о файловых
блоках (количество блоков и их месторасположение), расположенную в узле NamеNode,
находящемуся локально, чтобы решить, сколько подчиненных задач необходимо создать
на узлах типа TaskTracker.
TaskTracker получает от JobTracker список задач (тасков), загружает код и выполняет его. Периодично TaskTracker отсылает JobTracker статус выполнения задачи.
Взаимодействия TaskTracker-узлов с программным клиентом отсутствуют.
По аналогии с архитектурой HDFS, где NameNode является единичной точкой отказа (Single
point of failure), JobTracker также является таковой. Принцип восстановления в узлах
JobTracker и TaskTracker описан ниже.
При сбое TaskTracker-узла JobTracker-узел переназначает задания неисправного узла
другому узлу TaskTracker. В случае неисправности JobTracker-узла, для продолжения
исполнения MapReduce-приложения, необходим перезапуск JobTracker-узла. При перезапуске
узел JobTracker читает из специального журнала данные, о последней успешной контрольной
точке (checkpoint), восстанавливает свое состояние на момент записи checkpoint и
продолжает работу с места последней контрольной точки.
Преимущества и недостатки Hadoop MapReduce
При создании архитектуры/разработке программных систем с использованием Hadoop MapReduce
следует учитывать следующие аспекты использования MapReduce:
Преимущества:
- Эффективная работа с большим (от 100 Гб) объемом данных;
- Масштабируемость;
- Отказоустойчивость;
- Унифицированность подхода;
- Предоставление разработчику сравнительно «чистой» абстракции;
- Снижение требований к квалификации разработчика, в том числе его знаний и опыта по написанию многопоточного кода;
- Дешевизна лицензирования (Open Source).
Ограничения:
- Смешение ответственности для Reducer (сортировка и агрегация данных). Таким образом, Reducer – это все, что «не map»;
- Отсутствие контроля над потоком данных у разработчика (поток данных управляется фреймворком Hadoop MapReduce автоматически);
- Как следствие предыдущего пункта, невозможность простыми средствами организовать взаимодействие между параллельно выполняющимися потоками.
Недостатки:
- Применение MapReduce по производительности менее эффективно, чем специализированные решения;
- Эффективность применение MapReduce снижается при малом количество машин в кластере (высоки издержки на взаимодействие, а степень распараллеливания невелика);
- Невозможно предсказать окончание стадии map;
- Этап свертки не начинается до окончания стадии map;
- Как следствие предыдущего пункта, задержки в исполнении любого запущенного map-задания ведут к задержке выполнения задачи целиком;
- Низкая утилизация ресурсов вследствие жесткого деления ресурсов кластера на map- и reduce-слоты.
- Сбой узла JobTracker приводит к простою всего кластера.
В следующей заключительной статье подведем итоги цикла «Платформа Hadoop»: поговорим о том, насколько платформа Hadoop является «серебряной пулей» в мире распределенных вычислений; о спектре решаемых ею задач и ограничениях платформы и о критике и перспективах платформы.
Как всегда будут рад Вашим комментариям и с удовольствием отвечу на появившиеся вопросы.
* Тут и далее я умышленно применяю написание «map/reduce» вместо более
привычного «MapReduce», чтобы отличать парадигму (программную модель) map/reduce
от реализаций этой программной модели – Hadoop MapReduce и Google MapReduce.