Настройка источников задач — различия между версиями

Материал из Course Orchestra
Перейти к: навигация, поиск
 
Строка 22: Строка 22:
 
* '''pollingperiod''' — интервал опроса таблицы.
 
* '''pollingperiod''' — интервал опроса таблицы.
 
* '''maxthreads''' — максимальный уровень параллелизма.
 
* '''maxthreads''' — максимальный уровень параллелизма.
 +
* '''maxerrorlength''' — максимальная длина сообщения об ошибке (применяется в случае, когда поле errortext ограничено по длине и мы не можем влиять на его длину, например, при интеграции с базой данных внешней системы).
  
 
Находящаяся в базе данных таблица задач, указываемая в параметре '''tablename''', является ключевым компонентом для этого способа работы. В этой таблице организуется очередь задач на выполнение. Добавление задачи в очередь может осуществляться вставкой новой записи в эту таблицу.
 
Находящаяся в базе данных таблица задач, указываемая в параметре '''tablename''', является ключевым компонентом для этого способа работы. В этой таблице организуется очередь задач на выполнение. Добавление задачи в очередь может осуществляться вставкой новой записи в эту таблицу.

Текущая версия на 11:02, 23 октября 2018

Общий параметр

terminationtimeout (необязательный, по умолчанию 4000) — время в миллисекундах на завершение задачи при остановке сервиса, прежде чем она будет завершена принудительно.

Настойка источников задач

Таблица задач

Пример конфигурации:

        <dbtable>
		<tablename>flute.tasks</tablename>
		<pollingperiod>6000</pollingperiod>
		<terminationtimeout>10000</terminationtimeout>
		<maxthreads>4</maxthreads>
        </dbtable>


  • tablename — имя таблицы с задачами.
  • pollingperiod — интервал опроса таблицы.
  • maxthreads — максимальный уровень параллелизма.
  • maxerrorlength — максимальная длина сообщения об ошибке (применяется в случае, когда поле errortext ограничено по длине и мы не можем влиять на его длину, например, при интеграции с базой данных внешней системы).

Находящаяся в базе данных таблица задач, указываемая в параметре tablename, является ключевым компонентом для этого способа работы. В этой таблице организуется очередь задач на выполнение. Добавление задачи в очередь может осуществляться вставкой новой записи в эту таблицу.

В дистрибутиве системы Flute находится Celesta-гранула flute, содержащая таблицу flute.tasks. Тем не менее, не исключаются сценарии использования Flute с любыми базами данных, в которых содержится таблица задач с ожидаемым набором полей. Выбрать произвольную таблицу базы данных можно при настройке системы в файле flute.properties.

Формат таблицы задач следующий:

Имя столбца Тип столбца Описание
id INT NOT NULL IDENTITY PRIMARY KEY Номер задания. Задания выполняются по очереди, определяемой их номерами. Доступно Celesta-процедуре через params.id.
type VARCHAR(6) NOT NULL DEFAULT 'SCRIPT' Интерпертировать поле script как имя Jython-процедуры (SCRIPT, по умолчанию) или Java-процедуры (PROC).
script VARCHAR(250) Название Celesta-процедуры в трёхкомпонентной форме (<имя гранулы>.<имя модуля>.<имя процедуры>).
parameters TEXT строка или XML-документ с параметрами, передаваемыми Celesta-процедуре через params.params.
status INT NOT NULL DEFAULT 0 Статус задания:
0 – не обрабатывалось
1 – в обработке
2 – обработано успешно
3 – обработано с ошибкой.

Для работы системы требуется установка ограничений NOT NULL и DEFAULT 0 на данное поле. Корректная работа системы flute со значениями в данном столбце, отличными от 0..3, включая NULL, не гарантируется. Ограничение DEFAULT 0 необходимо для того, чтобы вставляемые в таблицу новые записи сразу же получали правильный статус.

result BLOB Файл с результатом выполнения. Процедура может записать сюда данные, используя params.resultstream.
errortext TEXT Если процедура завершилась с ошибкой, сюда записывается текст ошибки. Процедура может записать сюда и сообщение с дополнительной информацией в случае успешного выполнения, используя params.message.

DDL-скрипт для создания таблицы задач:

create table flutetasks(
    id int not null identity primary key,
    script nvarchar(250) not null,
    parameters nvarchar(250),
    status int not null default 0,
    result image,
    errortext nvarchar(max));
  1. Система запускает дополнительный поток параллельного выполнения, готовый к запуску Python-скрипта. При этом, если число уже параллельно выполняющихся на текущий момент потоков достигло максимума, указанного в настройке maxthreads, система ожидает до тех пор, пока один из потоков не закончит свою работу.
  2. Из таблицы задач, заданной в настройках приложения, извлекается первая по номеру строка с ещё необработанным заданием (в статусе «не обрабатывалось») и ей устанавливается статус «в обработке». Если ни одной не обрабатывавшейся строки не найдено, система «засыпает» на время pollingperiod, и затем вновь просматривает содержимое таблицы задач.
  3. Строка с заданием содержит указание на имя Celesta-процедуры в виде трёхкомпонентной ссылки <имя гранулы>.<имя модуля>.<имя процедуры> и строку с передаваемыми скрипту параметрами.
  4. Информация, записанная в результате выполнения скрипта в переменную resultstream сохраняется в BLOB-поле result таблицы заданий. Если скрипт установит текстовое значение в переменную message, оно также будет записано в поле errortext таблицы заданий. Строка в таблице заданий получает статус «обработано успешно».
  5. В случае, если обработка потоком выполнения какого-либо задания завершилась неудачно из-за ошибки в Python-скрипте или по внешним причинам, система выставляет строке задания статус «выполнено с ошибкой» и в поле errortext выводит текстовое описание ошибки (которое также лог-файл c указанием времени возникновения).

Redis-очереди

Пример конфигурации:

	<redisqueue>
                <maxthreads>2</maxthreads>
		<queuename>q1</queuename>
	</redisqueue>
  • queuename — имя очереди с задачами.
  • maxthreads — максимальный уровень параллелизма.

Система извлекает из очереди, указанной в параметре queuename, сообщения, которые должны иметь следующий JSON-формат:

{script:foo.hello.run, 
 params:'hello from redis'}

Как и в случае с источником dbtable, система запускает обработку параллельно, используя до maxthreads потоков.

CRON-расписания и бесконечные циклы

Для данных источников задач параметры запуска прописываются непосредственно в конфигурации, пример настройки:

	<scheduledtask>
		<schedule>5 * * * *</schedule>
		<script>foo.module.script</script>
		<params>234</params>
	</scheduledtask>

	<looptask>
		<script>foo.hello.run</script>
		<waitonsuccess>1000</waitonsuccess>
		<waitonfailure>30000</waitonfailure>
                <count>2</count>
                <finalizer>foo.hello.finalizerun</finalizer>
	</looptask>
  • script — скрипт для выполнения
  • params — параметры скрипта для выполнения
  • schedule — расписание в формате CRON. (Справку по этому формату можно найти во многих местах в Интернете.)
  • waitonsuccess — пауза между выполнениями скриптов в бесконечном цикле, если в скрипте не произошло необработанного исключения.
  • waitonfailure — пауза между выполнениями скриптов в бесконечном цикле, если в скрипте произошло необработанное исключение.
  • count — количество экземпляров одинакового looptask, которые будут созданы. По умолчанию равно единице.
  • finalizer — процедура, которая будет запущена в момент остановки сервиса Flute (например, для того чтобы разрегистрировать сервис, организованный при помощи looptask).