Принцип работы Flute

Материал из Course Orchestra
Перейти к: навигация, поиск

Настройка источников задач

Celesta-процедуры, запускаемые Flute

Каждая Celesta-процедура, помимо стандартного для Celesta параметра context, должна принимать объект-параметр flute, содержащий следующие атрибуты:

  • id — идентификатор задачи (только для источника типа dbtable),
  • sourceId — идентификатор источника задачи (GUID, генерируемый системой Flute для каждого источника при инициализации),
  • params — строка с дополнительными параметрами задачи,
  • resultstream — поток вывода, в который можно записывать данные результата (только для источника типа dbtable),
  • message — переменная, в которую можно записывать сообщение (только для источника типа dbtable).

Например:

def foo(context, flute):
    #Вывод в stdout параметров скрипта
    print flute.params

Также скрипту будут доступны: все API, предоставляемые Celesta, все Python-классы, находящиеся в папке pylib, и все Java-классы, находящиеся в папке lib установочной директории Flute. По умолчанию при установке Flute в эти папки записывается стандартная библиотека Python и Java-модули для вывода отчётов в формате электронных таблиц, соответственно, но пользователь может установить туда и собственные библиотеки.

Настойка источников задач

Таблица задач

Пример конфигурации:

        <dbtable>
		<tablename>flute.tasks</tablename>
		<pollingperiod>6000</pollingperiod>
		<terminationtimeout>10000</terminationtimeout>
		<maxthreads>4</maxthreads>
        </dbtable>

Находящаяся в базе данных таблица задач, указываемая в параметре dbtable, является ключевым компонентом для этого способа работы. В этой таблице организуется очередь задач на выполнение. Добавление задачи в очередь может осуществляться вставкой новой записи в эту таблицу.

В дистрибутиве системы Flute находится Celesta-гранула flute, содержащая таблицу flute.tasks. Тем не менее, не исключаются сценарии использования Flute с любыми базами данных, в которых содержится таблица задач с ожидаемым набором полей. Выбрать произвольную таблицу базы данных можно при настройке системы в файле flute.properties.

Формат таблицы задач следующий:

Имя столбца Тип столбца Описание
id INT NOT NULL IDENTITY PRIMARY KEY Номер задания. Задания выполняются по очереди, определяемой их номерами. Доступно Celesta-процедуре через params.id.
script VARCHAR(250) Название Celesta-процедуры в трёхкомпонентной форме (<имя гранулы>.<имя модуля>.<имя процедуры>).
parameters TEXT строка или XML-документ с параметрами, передаваемыми Celesta-процедуре через params.params.
status INT NOT NULL DEFAULT 0 Статус задания:
0 – не обрабатывалось
1 – в обработке
2 – обработано успешно
3 – обработано с ошибкой.

Для работы системы требуется установка ограничений NOT NULL и DEFAULT 0 на данное поле. Корректная работа системы flute со значениями в данном столбце, отличными от 0..3, включая NULL, не гарантируется. Ограничение DEFAULT 0 необходимо для того, чтобы вставляемые в таблицу новые записи сразу же получали правильный статус.

result BLOB Файл с результатом выполнения. Процедура может записать сюда данные, используя params.resultstream.
errortext TEXT Если процедура завершилась с ошибкой, сюда записывается текст ошибки. Процедура может записать сюда и сообщение с дополнительной информацией в случае успешного выполнения, используя params.message.

DDL-скрипт для создания таблицы задач:

create table flutetasks(
    id int not null identity primary key,
    script nvarchar(250) not null,
    parameters nvarchar(250),
    status int not null default 0,
    result image,
    errortext nvarchar(max));
  1. Система запускает дополнительный поток параллельного выполнения, готовый к запуску Python-скрипта. При этом, если число уже параллельно выполняющихся на текущий момент потоков достигло максимума, указанного в настройке maxthreads, система ожидает до тех пор, пока один из потоков не закончит свою работу.
  2. Из таблицы задач, заданной в настройках приложения, извлекается первая по номеру строка с ещё необработанным заданием (в статусе «не обрабатывалось») и ей устанавливается статус «в обработке». Если ни одной не обрабатывавшейся строки не найдено, система «засыпает» на время pollingperiod, и затем вновь просматривает содержимое таблицы задач.
  3. Строка с заданием содержит указание на имя Celesta-процедуры в виде трёхкомпонентной ссылки <имя гранулы>.<имя модуля>.<имя процедуры> и строку с передаваемыми скрипту параметрами.
  4. Информация, записанная в результате выполнения скрипта в переменную resultstream сохраняется в BLOB-поле result таблицы заданий. Если скрипт установит текстовое значение в переменную message, оно также будет записано в поле errortext таблицы заданий. Строка в таблице заданий получает статус «обработано успешно».
  5. В случае, если обработка потоком выполнения какого-либо задания завершилась неудачно из-за ошибки в Python-скрипте или по внешним причинам, система выставляет строке задания статус «выполнено с ошибкой» и в поле errortext выводит текстовое описание ошибки (которое также лог-файл c указанием времени возникновения).

Redis-очереди

Пример конфигурации:

	<redisqueue>
                <maxthreads>2</maxthreads>
		<queuename>q1</queuename>
	</redisqueue>

Система извлекает из очереди, указанной в параметре queuename, сообщения, которые должны иметь следующий JSON-формат:

{script:foo.hello.run, 
 params:'hello from redis'}

Как и в случае с источником dbtable, система запускает обработку параллельно, используя до maxthreads потоков.

CRON-расписания и бесконечные циклы

Для данных источников задач параметры запуска прописываются непосредственно в конфигурации, пример настройки:

	<scheduledtask>
		<schedule>5 * * * *</schedule>
		<script>foo.module.script</script>
		<params>234</params>
	</scheduledtask>

	<looptask>
		<script>foo.hello.run</script>
		<waitonsuccess>1000</waitonsuccess>
		<waitonfailure>30000</waitonfailure>
	</looptask>
  • schedule — расписание в формате CRON. Справку по этому формату можно найти во многих местах в Интернете.
  • waitonsuccess — пауза между выполнениями скриптов в бесконечном цикле, если в скрипте не произошло необработанного исключения.
  • waitonfailure — пауза между выполнениями скриптов в бесконечном цикле, если в скрипте произошло необработанное исключение.