Google Photon. Обработка данных со скоростью света*
Статья из цикла «Google Platform»
Photon – масштабируемая, отказоустойчивая и географически распределенная система обработки потоковых данных в режиме реального времени. Система является внутренним продуктом Google и используется в Google Advertising System. Research paper [5], описывающие базовые принципы и архитектуру Photon, был представлен на научной конференции ACM SIGMOD в 2013 году.
В research paper [5] заявлено, что пиковая нагрузка на систему может составлять миллионы событий в минуту со средней end-to-end задержкой менее 10 секунд.
Photon решает вполне конкретную задачу: необходимо соединить (выполнить операцию join) два непрерывных потока данных в режиме реального времени. Так в упоминаемой уже Google Advertising System один из этих потоков – поток поисковых запросов, другой – поток переходов по рекламным объявлениям.
Photon является географически распределенной системой и автоматически способен обрабатывать случаи деградации инфраструктуры, в т.ч. отказа дата-центра. В геораспределенных системах крайне непросто гарантировать время доставки сообщений (в первую очередь, из-за сетевых задержек), поэтому Photon допускает, что обрабатываемые потоковые данные могут быть не упорядочены по времени.
Используемые сервисы: GFS, PaxosDB, TrueTime.
Базовые принципы
В [5] объяснение принципов работы Photon идет в следующем контексте: пользователь ввел поисковый запрос (query) в момент времени t1 и перешел по рекламному объявлению (click) в момент времени t2. В этом же контексте, если не задано иного, в этой статье будут объяснены принципы работы Photon.
Принцип объединения потоков (join) взят из мира РСУБД: поток query имеет уникальный идентификатор query_id (условно Primary Key), поток click имеет уникальный идентификатор click_id и включает в себя некоторый query_id (условно Foreign Key). Объединение потоков происходит по query_id.
Следующий важный момент: ситуация, когда один click event посчитан дважды, либо, наоборот, не посчитан, будет вести, соответственно, либо к судебным искам со стороны рекламодателей, либо к упущенным выгодам со стороны Google. Отсюда, крайне важно обеспечить at-most-once семантику обработки событий.
Другое требование – обеспечить near-exact семантику, т.е. чтобы большая часть событий была посчитана в режиме близкому real-time. События, не посчитанные в real-time, все равно должны быть посчитаны - exactly-once семантика.
Кроме того, для экземпляров Photon, работающих в разных дата-центрах, необходимо синхронизированное состояние (точнее только critical state, так как между ДЦ весь state слишком «дорого» реплицировать). Таким синхронизируемым critical state выбрали event_id (по сути, click_id). Critical state храниться в структуре IdRegistry – in-memory key-value хранилище, построенное на основе PaxosDB.
Последний – PaxosDB – реализует алгоритм Paxos для поддержки отказоустойчивости и согласованности данных.
Взаимодействие с клиентами
Worker-узлы взаимодействуют с IdRegistry по клиент-серверной модели. Архитектурно взаимодействие Worker-узлов с IdRegistry – это сетевое взаимодействие с очередью асинхронных сообщений.
Так клиенты – Worker-узлы - отправляют к IdRegistry только 1) запрос на поиск event_id (если event_id найден, значит он уже был обработан) и 2) запрос на вставку event_id (для случая, если на шаге 1 event_id не был найден). На стороне сервера запросы принимают RPC-обработчики, целью которых поставить запрос в очередь. Из очереди запросы забирает специальный процесс Registry Thread (синглтон), который и выполнит запись в PaxosDB и инициализирует обратный вызов (callback) клиенту.
Масштабируемость
Т.к. реплика IdRegistry происходит между географическим регионами, сетевые задержки между которыми могут достигать 100 мс [5], то это автоматически ограничивает пропускную способность IdRegistry до десяти последовательных транзакций (event_id commits) в секунду, в то время как требование к IdRegistry было равно 10K транзакций в секунду. Но и отказаться от геораспределенности и/или от синхронной репликации critical state с поддержкой решений конфликтов в кворуме также нельзя.
Тогда инженеры Google внедрили еще 2 практики, знакомые многим из мира СУБД:
- пакетная отправка запросов (batching) – «полезная» информация по event_id занимает менее 100 байт; запросы отправляются пакетами на IdRegistry Client. Там они попадают в очередь, которую разбирает процесс Registry Thread, в обязанности которого входит решение конфликтов, связанные с тем, что в очереди может быть более одного элемента с одинаковым event_id.
- sharding (+ динамический resharding) – все event_id делятся по диапазонам; транзакции по каждому из диапазонов отправляются на определенный IdRegistry.
Пакетная отправка запросов имеет и обратную сторону: кроме смешения семантики (Photon обрабатывает данные real-time, а некоторые его части работают в batching-режиме), batching-сценарий не подойдет для систем c небольшим количеством событий – время сбора полного пакета может занимать существенный интервал времени.
Компоненты
В рамках одного ДЦ выделают следующие компоненты:
- EventStore – обеспечивает эффективный поиск по queries (поток поисковых запросов в поисковой системе);
- Dispatcher – чтение потока кликов по рекламным объявлениям (clicks) и передача (feed) прочитанного Joiner;
- Joiner – stateless RPC-сервер, принимающий запросы от Dispatcher, обрабатывающий их и соединяющий (join) потоки queries и clicks.
Алгоритм добавления записи представлен ниже:
Взаимодействие между ДЦ:
Алгоритм добавления записи в Joined Click Logs опустим, отметив, что в работы систем с частым сетевым взаимодействием применение retry-политик и асинхронных вызовов является крайне эффективным способом увеличения надежности и масштабируемости системы, соответственно, без усложнения общего алгоритма работы.
Этими же приемами – retry-политик и асинхронных вызов – и воспользовались создатели Photon.
Логика повтора запросов (щелкните, чтобы открыть)
Как уже ранее упоминалось, ситуация, когда click_id поступил на обработку, а ассоциированный с ним query_id нет – не исключение. Все из-за того, что не обязательно поток поисковых запросов обработается к тому моменту, кода начнет обрабатываться поток кликов по контекстной рекламе.
Для надежного обеспечения at-least-once семантики обработки всех click_id была введена логика, по которой для случая, описанного выше, применяется логика повторения. Для избегания троллинга (throttling) системы самой собой время между неудачными запросами увеличивается по экспоненте – exponential backoff algorithm. После некоторого количества неудачных запросов или по прошествии определенного времени click помечается как «unjoinable».
Dispatcher
Dispatcher – процесс, ответственный за чтение логов кликов - clicks. Эти логи растут во времени непрерывно.
Для того, чтобы эффективно их читать, Dispatcher периодически сканирует директорию с логами и идентифицирует новые файлы и/или измененные, сохраняет состояние каждого файла в локальной GFS ячейке. Это состояние содержит список файлов и сдвиг от начала файла для данных, которые уже были обработаны. Таким образом при изменении файла, последний вычитывается не с начала, а с того момента, на котором обработка закончилась в прошлое чтение.
Обработка новых данных осуществляется параллельно несколькими процессами, каждый из которых расшаривает свое состояние, что позволяет различным процессам бесконфликтно работать на разными частями одного и того же файла.
Joiner
Joiner – реализация stateless RPC-сервера, принимающего запросы от Dispatcher. Приняв запрос от Dispatcher, Joiner извлекает из него click_id и query_id. После чего по query_id пытается получить информацию из EventStore.
В случае успеха, EventStore возвращает поисковый запрос соответствующий обрабатываемому click.
Далее Joiner удаляет дубликаты (с помощью IdRegistry) и генерирует выходной лог, содержащий объединенные (joined) значения – Joined Click Logs.
Если Dispatcher для обработки отказов использовал retry-логику, то в Joiner инженеры Google добавили еще один прием. Прием работает в случаях, когда Joiner отправил запрос к IdRegistry; последний успешно зарегистрировал click_id, но из-за сетевых проблем, либо по таймауту Joiner так и не получил ответ об успехе от IdRegistry.
Для этого с каждым «commit click_id»-запросом, который Joiner отправляет на IdRegistry, ассоциируется специальный токен. Токен сохраняется в IdRegistry. В случае, если ответ от IdRegistry не был получен, Joiner повторяет запрос с тем же токеном, что и в прошлом запросе, и IdRegistry без труда «понимает», что пришедший запрос уже обрабатывался.
Генерация уникальных токенов / Event_Id (щелкните, чтобы открыть)
Еще одним интересным приемом, который следует отметить, является способом генерации уникальных event_id.
Ясно, что гарантированная уникальность для event_id крайне важное требование для работы Photon. В то же время, алгоритм генерации уникального в рамках нескольких ДЦ значения может занять крайне значительное время и количество CPU-ресурсов.
Инженеры Google нашли элегантное решение: event_id можно уникально идентифицировать используя IP узла (ServerIP), Id процесса (ProcessId) и временную метку (Timestamp) узла, на котором данное событие было сгенерировано.
Как и в случае со Spanner, для минимизации несогласованности временных меток на различных узлах, используется TrueTime API.
EventStore
EventStore – это сервис, принимающий на вход query_id и возвращающий соответствующий query (информацию о поисковом запросе).
В Photon для EventStore имеются 2 реализации:
- CacheEventStore – распределенное [sharding по hash(query_id)] in-memory хранилище, к котором хранится полная информация по query. Таком образом, для ответа на запрос не требуется чтение с диска.
- LogsEventStore - key-value хранилище, где key – query_id, а value – имя log-файла, в котором хранится информацию по соответствующему query, и смещение (byte offset) в этом файле.
Так как Photon работает в режиме близком к реальному времени, то можно с уверенностью гарантировать, что вероятность нахождения query в CacheEventStore (при условии, что в query в него попадают с минимальной задержкой) будет очень высокой, а сам CacheEventStore может хранить события за относительно небольшой промежуток времени.
В researching paper [5] приводится статистика, что только 10% запросов «проходят мимо» in-memory кэша и, соответственно, обрабатываются LogsEventStore.
Результаты
Конфигурация
На момент публикации [5], т.е. в 2013 году, реплики IdRegistry развернуты в 5-ти датацентрах в 3-ех географических регионах, причем сетевые задержки между регионами превышают 100 мс. Другие компоненты Photon – Dispatchers, Joiners, etc. – развернуты в 2-ух географических регионах на западном и восточном побережье США.
В каждом из ДЦ количество IdRegistry-шардов превышает сотню, а количество экземпляров процессов Dispatcher и Joiner превышает тысячи.
Производительность
Photon обрабатывает миллиарды joined событий в день, в том числе, в периоды пиковых нагрузок миллионы событий в минуту. Объем clicks-логов, обрабатываемых за 24 часа, превышает терабайт, а объем суточных query-логов исчисляется десятками терабайт.
90% всех событий обрабатываются (join'ятся в один stream) в первые 7 секунд, после их появления.
В заключении
В заключении research paper [5], инженеры Google поделились хорошими практиками и своим планами на будущее.
Принципы не новы, но для полноты и законченности статьи, я их перечислю:
- Используйте RPC-коммуникации вместо записи на диск. Запросы, выходящие за физические границы узла, должны выполняться асинхронно, а клиент всегда должен рассчитывать, что не получит ответ по таймауту или из-за сетевых проблем.
- Минимизируйте критическое состояние (critical state) системы, т.к. его, в общем случае, приходится синхронно реплицировать. В идеале в критическое critical state системы должен включать в себя только метаданные системы.
- Sharding – друг масштабируемости :) Но и эту идею инженеры Google улучшили, сделав динамический resharding.
В планах создателей Photon захватить мир уменьшить end-to-end задержки за счет того, что сервера, которые
генерируют clicks и queries будут напрямую отправлять RPC-запросы к Joiner'ам (сейчас Dispatcher «ждет» этих событий).
Также планируется Photon «научить» объединять несколько потоков данных (в текущей реализации Photon умеет объединять только 2 потока).
Пожелаем создателям Photon удачи в реализации их планов! И ждем новых research paper!
Список источников*
- [5] Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, Ashish Gupta, Haifeng Jiang, Tianhao Qiu, et al. Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams, 2013.
* Полный список источников, используемый для подготовки цикла.
Комментариев нет:
Отправить комментарий