Принцип работы Flute
Содержание
Настройка источников задач
Celesta-процедуры, запускаемые Flute
Каждая Celesta-процедура, помимо стандартного для Celesta параметра context, должна принимать объект-параметр flute, содержащий следующие атрибуты:
- id — идентификатор задачи (только для источника типа dbtable),
- sourceId — идентификатор источника задачи (GUID, генерируемый системой Flute для каждого источника при инициализации),
- params — строка с дополнительными параметрами задачи,
- resultstream — поток вывода, в который можно записывать данные результата (только для источника типа dbtable),
- message — переменная, в которую можно записывать сообщение (только для источника типа dbtable).
Например:
def foo(context, flute):
#Вывод в stdout параметров скрипта
print flute.params
Также скрипту будут доступны: все API, предоставляемые Celesta, все Python-классы, находящиеся в папке pylib, и все Java-классы, находящиеся в папке lib установочной директории Flute. По умолчанию при установке Flute в эти папки записывается стандартная библиотека Python и Java-модули для вывода отчётов в формате электронных таблиц, соответственно, но пользователь может установить туда и собственные библиотеки.
Настойка источников задач
Таблица задач
Пример конфигурации:
<dbtable>
<tablename>flute.tasks</tablename>
<pollingperiod>6000</pollingperiod>
<terminationtimeout>10000</terminationtimeout>
<maxthreads>4</maxthreads>
</dbtable>
Находящаяся в базе данных таблица задач, указываемая в параметре dbtable, является ключевым компонентом для этого способа работы. В этой таблице организуется очередь задач на выполнение. Добавление задачи в очередь может осуществляться вставкой новой записи в эту таблицу.
В дистрибутиве системы Flute находится Celesta-гранула flute, содержащая таблицу flute.tasks. Тем не менее, не исключаются сценарии использования Flute с любыми базами данных, в которых содержится таблица задач с ожидаемым набором полей. Выбрать произвольную таблицу базы данных можно при настройке системы в файле flute.properties.
Формат таблицы задач следующий:
Имя столбца | Тип столбца | Описание |
---|---|---|
id | INT NOT NULL IDENTITY PRIMARY KEY | Номер задания. Задания выполняются по очереди, определяемой их номерами. Доступно Celesta-процедуре через params.id. |
script | VARCHAR(250) | Название Celesta-процедуры в трёхкомпонентной форме (<имя гранулы>.<имя модуля>.<имя процедуры>). |
parameters | TEXT | строка или XML-документ с параметрами, передаваемыми Celesta-процедуре через params.params. |
status | INT NOT NULL DEFAULT 0 | Статус задания:
Для работы системы требуется установка ограничений NOT NULL и DEFAULT 0 на данное поле. Корректная работа системы flute со значениями в данном столбце, отличными от 0..3, включая NULL, не гарантируется. Ограничение DEFAULT 0 необходимо для того, чтобы вставляемые в таблицу новые записи сразу же получали правильный статус. |
result | BLOB | Файл с результатом выполнения. Процедура может записать сюда данные, используя params.resultstream. |
errortext | TEXT | Если процедура завершилась с ошибкой, сюда записывается текст ошибки. Процедура может записать сюда и сообщение с дополнительной информацией в случае успешного выполнения, используя params.message. |
DDL-скрипт для создания таблицы задач:
create table flutetasks(
id int not null identity primary key,
script nvarchar(250) not null,
parameters nvarchar(250),
status int not null default 0,
result image,
errortext nvarchar(max));
- Система запускает дополнительный поток параллельного выполнения, готовый к запуску Python-скрипта. При этом, если число уже параллельно выполняющихся на текущий момент потоков достигло максимума, указанного в настройке maxthreads, система ожидает до тех пор, пока один из потоков не закончит свою работу.
- Из таблицы задач, заданной в настройках приложения, извлекается первая по номеру строка с ещё необработанным заданием (в статусе «не обрабатывалось») и ей устанавливается статус «в обработке». Если ни одной не обрабатывавшейся строки не найдено, система «засыпает» на время pollingperiod, и затем вновь просматривает содержимое таблицы задач.
- Строка с заданием содержит указание на имя Celesta-процедуры в виде трёхкомпонентной ссылки <имя гранулы>.<имя модуля>.<имя процедуры> и строку с передаваемыми скрипту параметрами.
- Информация, записанная в результате выполнения скрипта в переменную resultstream сохраняется в BLOB-поле result таблицы заданий. Если скрипт установит текстовое значение в переменную message, оно также будет записано в поле errortext таблицы заданий. Строка в таблице заданий получает статус «обработано успешно».
- В случае, если обработка потоком выполнения какого-либо задания завершилась неудачно из-за ошибки в Python-скрипте или по внешним причинам, система выставляет строке задания статус «выполнено с ошибкой» и в поле errortext выводит текстовое описание ошибки (которое также лог-файл c указанием времени возникновения).
Redis-очереди
Пример конфигурации:
<redisqueue>
<maxthreads>2</maxthreads>
<queuename>q1</queuename>
</redisqueue>
Система извлекает из очереди, указанной в параметре queuename, сообщения, которые должны иметь следующий JSON-формат:
{script:foo.hello.run,
params:'hello from redis'}
Как и в случае с источником dbtable, система запускает обработку параллельно, используя до maxthreads потоков.
CRON-расписания и бесконечные циклы
Для данных источников задач параметры запуска прописываются непосредственно в конфигурации, пример настройки:
<scheduledtask>
<schedule>5 * * * *</schedule>
<script>foo.module.script</script>
<params>234</params>
</scheduledtask>
<looptask>
<script>foo.hello.run</script>
<waitonsuccess>1000</waitonsuccess>
<waitonfailure>30000</waitonfailure>
</looptask>
- schedule — расписание в формате CRON. Справку по этому формату можно найти во многих местах в Интернете.
- waitonsuccess — пауза между выполнениями скриптов в бесконечном цикле, если в скрипте не произошло необработанного исключения.
- waitonfailure — пауза между выполнениями скриптов в бесконечном цикле, если в скрипте произошло необработанное исключение.