19 августа 2012

Hadoop MapReduce. Основные концепции и архитектура (Платформа Hadoop. Часть 3)

В предыдущем посте были рассмотрены концепции и структуру распределенной файловой системы HDFS. Ниже поговорим об архитектуре фреймворка распределенных вычислений Hadoop MapReduce, программной модели map/reduce и концепциях, лежащих в ее основе.

Программная модель map/reduce

Выполнение распределенных задач на платформе Hadoop происходит в рамках парадигмы map/reduce*.
map/reduce – это парадигма (программная модель) выполнения распределенных вычислений для больших объемов данных.
В общем случае, для map/reduce выделяют 2 фазы:
  • map(ƒ, c)
    Принимает функцию ƒ и список c. Возвращает выходной список, являющийся результатом применения функции ƒ к каждому элементу входного списка c.
    map(f, c)
  • reduce(ƒ, c)
    Принимает функцию ƒ и список c. Возвращает объект, образованный через свертку коллекции c через функцию ƒ.
    reduce(f, c)
Программная модель map/reduce была позаимствована из функционального программирования, хотя в реализации Hadoop и имеет некоторые семантические отличия от прототипа в функциональных языках.
Как и в функциональных языках, при использовании программной модели map/reduce:
  • входные данные не изменяются;
  • разработчик кодирует, что нужно сделать, а не как нужно сделать.
На январь 2012 года широко известны следующие программные реализации модели map/reduce:
  • Google MapReduce – закрытая реализация от Google на C++;
  • CouchDB и MongoDB – реализации для NoSQL баз данных;
  • Hadoop MapReduce – открытая реализация на Java для Apache Hadoop.
Полный список проектов, использующих программные реализации программной модели map/reduce можно найти в источнике [20].

Обзор Hadoop MapReduce

Hadoop MapReduce – программная модель (framework) выполнения распределенных вычислений для больших объемов данных в рамках парадигмы map/reduce, представляющая собой набор Java-классов и исполняемых утилит для создания и обработки заданий на параллельную обработку.
Основные концепции Hadoop MapReduce можно сформулировать как:
  • обработка/вычисление больших объемов данных;
  • масштабируемость;
  • автоматическое распараллеливание заданий;
  • работа на ненадежном оборудовании;
  • автоматическая обработка отказов выполнения заданий.
Работу Hadoop MapReduce можно условно поделить на следующие этапы:
  1. Input read
    Входные данные делятся на блоки данных предопределенного размера (от 16 Мб до 128 Мб) – сплиты (от англ. split). MapReduce Framework закрепляет за каждой функцией Map определенный сплит.
  2. Map
    Каждая функция Map получает на вход список пар «ключ/значение» <k,v>, обрабатывает их и на выходе получает ноль или более пар <k',v'>, являющихся промежуточным результатом.
    map(k, v) -> [(k', v')] где k' - в общем случае, произвольный ключ, не совпадающий с k.
    Все операции map() выполняются параллельно и не зависят от результатов работы друг друга. Каждая функция map() получает на вход свой уникальный набор данных, не повторяющийся ни для какой другой функции map().
  3. Partition / Combine
    Целью этапа partition (разделение) является распределение промежуточных результатов, полученных на этапе map, по reduce-заданиям.
    (k', reducers_count) -> reducer_id где reducers_count - количество узлов, на которых запускается операция свертки;
    reducer_id - идентификатор целевого узла.
    В простейшем случае, reducer_id = hash(k') mod reducers_count Основная цель этапа partition – это балансировка нагрузки. Некорректно реализованная функция partition может привести к неравномерному распределению данных между reduce-узлами.
    Функция combine запускается после map-фазы. В ней происходит промежуточная свертка, локальных по отношению к функции map, значений. [(k', v')] -> (k', [v']) Основное значение функции combine – комбинирование промежуточных данных, что в свою очередь ведет, к уменьшению объема передаваемой между узлами информации.
    MapReduce Combine()
  4. Copy / Сompare / Merge
    На этом этапе происходит:
    • Copy: копирование результатов, полученных в результате работы функций map и combine (если такая была определена), с map-узлов на reduce-узлы.
    • Сompare (или Sort): сортировка, группировка по ключу k полученных в результате операции copy промежуточных значений на reduce-узле. compare(k'n, k'n+1) -> {-1, 0, +1}
    • Merge: «слияние» данных, полученных от разных узлов, для операции свёртки.
  5. Reduce
    Framework вызывает функцию reduce для каждого уникального ключа k' в отсортированном списке значений.
    reduce(k', [v']) -> [v''] Все операции reduce() выполняются параллельно и не зависят от результатов работы друг друга. Таким образом, результаты работы каждой функции reduce() пишутся в отдельный выходной поток.
  6. Output write
    Результаты, полученные на этапе reduce, записываются в выходной поток (в общем случае, файловые блоки в HDFS). Каждый reduce-узел пишет в собственный выходной поток.
MapReduce flow
Разработчику приложения для Hadoop MapReduce необходимо реализовать базовый обработчик, который на каждом вычислительном узле кластера обеспечит преобразование исходных пар «ключ/значение» в промежуточный набор пар «ключ/значение» (класс, реализующий интерфейс Mapper), и обработчик, сводящий промежуточный набор пар в окончательный, сокращённый набор (класс, реализующий интерфейс Reducer) [20].
Все остальные фазы выполняются программной моделью MapReduce без дополнительного кодирования со стороны разработчика. Кроме того, среда выполнения Hadoop MapReduce выполняет следующие функции:
  • планирование заданий;
  • распараллеливание заданий;
  • перенос заданий к данным;
  • синхронизация выполнения заданий;
  • перехват «проваленных» заданий;
  • обработка отказов выполнения заданий и перезапуск проваленных заданий;
  • оптимизация сетевых взаимодействий.

Архитектура Hadoop MapReduce

Hadoop MapReduce использует архитектуру «master-worker», где master – единственный экземпляр управляющего процесса (JobTracker), как правило, запущенный на отдельной машине (вычислительном узле). Worker-процессы – это произвольное множество процессов TaskTracker, исполняющихся на DataNode.
JobTracker и TaskTracker «лежат» над уровнем хранения HDFS, и запускаются/исполняются в соответствии со следующими правилами:
  • экземпляр JobTracker исполняется на NameNode-узле HDFS;
  • экземпляры TaskTracker исполняются на DataNode-узле;
  • TaskTracker исполняются в соответствии с принципом «данные близко», т.е. процесс TaskTracker располагается топологически максимально близко с узлом DataNode, данные которого обрабатываются.
Вышеописанные принципы расположения JobTracker- и TaskTracker-процессов позволяют существенно сократить объемы передаваемых по сети данных и сетевые задержки, связанные с передачей этих данных – основные «узкие места» производительности в современных распределенных системах.
JobTracker является единственным узлом, на котором выполняется приложение MapReduce, вызываемое программным клиентом. JobTracker выполняет следующие функции:
  • планирование индивидуальных (по отношению к DataNode) заданий map и reduce, промежуточных свёрток;
  • координация заданий;
  • мониторинг выполнения заданий;
  • переназначение завершившихся неудачей заданий другим узлам TaskTracker.
В свою очередь, TaskTracker выполняет следующие функции:
  • исполнение map- и reduce-заданий;
  • управление исполнением заданий;
  • отправка сообщений о статусе задачи и завершении работы узлу JobTracker;
  • отправка диагностических heartbeat-сообщений узлу JobTracker.
Взаимодействие TaskTracker-узлов с узлом JobTracker идет посредством RPC-вызовов, причем вызовы идут только от TaskTracker. Аналогичный принцип взаимодействия реализован в HDFS – между узлами DataNode и NameNode-узлом. Такое решение уменьшает зависимость управляющего процесса JobTracker от процессов TaskTracker.
Взаимодействие JobTracker-узла с клиентом (программным) проходит по следующей схеме: JobTracker принимает задание (Job) от клиента и разбивает задание на множество M map-задач и множество R reduce-задач. Узел JobTracker использует информацию о файловых блоках (количество блоков и их месторасположение), расположенную в узле NamеNode, находящемуся локально, чтобы решить, сколько подчиненных задач необходимо создать на узлах типа TaskTracker. TaskTracker получает от JobTracker список задач (тасков), загружает код и выполняет его. Периодично TaskTracker отсылает JobTracker статус выполнения задачи.
Взаимодействия TaskTracker-узлов с программным клиентом отсутствуют.
По аналогии с архитектурой HDFS, где NameNode является единичной точкой отказа (Single point of failure), JobTracker также является таковой. Принцип восстановления в узлах JobTracker и TaskTracker описан ниже.
При сбое TaskTracker-узла JobTracker-узел переназначает задания неисправного узла другому узлу TaskTracker. В случае неисправности JobTracker-узла, для продолжения исполнения MapReduce-приложения, необходим перезапуск JobTracker-узла. При перезапуске узел JobTracker читает из специального журнала данные, о последней успешной контрольной точке (checkpoint), восстанавливает свое состояние на момент записи checkpoint и продолжает работу с места последней контрольной точки.

Преимущества и недостатки Hadoop MapReduce

При создании архитектуры/разработке программных систем с использованием Hadoop MapReduce следует учитывать следующие аспекты использования MapReduce:
Преимущества:
  • Эффективная работа с большим (от 100 Гб) объемом данных;
  • Масштабируемость;
  • Отказоустойчивость;
  • Унифицированность подхода;
  • Предоставление разработчику сравнительно «чистой» абстракции;
  • Снижение требований к квалификации разработчика, в том числе его знаний и опыта по написанию многопоточного кода;
  • Дешевизна лицензирования (Open Source).
Ограничения:
  • Смешение ответственности для Reducer (сортировка и агрегация данных). Таким образом, Reducer – это все, что «не map»;
  • Отсутствие контроля над потоком данных у разработчика (поток данных управляется фреймворком Hadoop MapReduce автоматически);
  • Как следствие предыдущего пункта, невозможность простыми средствами организовать взаимодействие между параллельно выполняющимися потоками.
Недостатки:
  • Применение MapReduce по производительности менее эффективно, чем специализированные решения;
  • Эффективность применение MapReduce снижается при малом количество машин в кластере (высоки издержки на взаимодействие, а степень распараллеливания невелика);
  • Невозможно предсказать окончание стадии map;
  • Этап свертки не начинается до окончания стадии map;
  • Как следствие предыдущего пункта, задержки в исполнении любого запущенного map-задания ведут к задержке выполнения задачи целиком;
  • Низкая утилизация ресурсов вследствие жесткого деления ресурсов кластера на map- и reduce-слоты.
  • Сбой узла JobTracker приводит к простою всего кластера.
В следующей заключительной статье подведем итоги цикла «Платформа Hadoop»: поговорим о том, насколько платформа Hadoop является «серебряной пулей» в мире распределенных вычислений; о спектре решаемых ею задач и ограничениях платформы и о критике и перспективах платформы.
Как всегда будут рад Вашим комментариям и с удовольствием отвечу на появившиеся вопросы.
* Тут и далее я умышленно применяю написание «map/reduce» вместо более привычного «MapReduce», чтобы отличать парадигму (программную модель) map/reduce от реализаций этой программной модели – Hadoop MapReduce и Google MapReduce.

Автор статьи

,
Machine Learning Preacher, Microsoft AI MVP && Coffee Addicted