Синхронизация между потоком main и потоком, вызывающим OnXxx

Страницы: 1
RSS
Синхронизация между потоком main и потоком, вызывающим OnXxx
 
Quik исполняет Lua скрипт на двух потоках. То, что внутри скриптовой функции main, исполняется на выделенном треде. Коллбеки OnXxxx исполняются на основном потоке Quik. Задача - гарантировать правильность последовательности вычислений, которые происходят внутри main и коллбеков. То есть, чтоб когда случился, например OnQuote, была бы гарантия, что исполняющийся параллельно main закончил свою часть вычислений и общие данные были бы в готовом состоянии. Примерный скрипт такой:
Код
data = {};

function update_ui()
   Clear(table_id)
   for i,v = in ipairs(data) do
      InsertRow(table_id, ...)
   end
end

function calculate_data_item(v)
   -- Calculate new value based on v
   -- Long, thread unsafe operation in real code, depends on other data[..] items
   -- Adding 1 here only as example
   return v + 1;
end

function update_all_data_items()
   for i,v = in ipairs(data) do
      data[i] = calculate_data_item(v);
   end
end

function main()
   -- Request level 2 quotes for each security
   -- Init table, columns ... 
   while true do
      -- START atomic operation on data array
      update_all_data_items()
      update_ui()
      -- End atomic operation
      sleep(1000)   
   end
end

function OnQuote(p1, p2)
   -- START atomic operation on data[p2]
   data[p2] = calculate_data_item(...)
   update_ui()
   -- End atomic operation on data[p2]
end

Там где в коментарии START - должна начинаться часть эксклюзивного кода, там где End - заканчиваться. Если организовать ожидание одним потоком пока завершит вычисления другой, то удается добиться полной корректности. Однако, некоторые функции Qlua (по всей видимости связанные с UI) блокируют исполнение, пока им не ответит основной тред Quik. В примере вызывается InsertRow и он внутри реализован так, что будет блокировать исполнение, пока ему не ответит основной тред Quik.

Происходит следующее:
1. main в точке START эксклюзивно запирает исполнение
2. Случается OnQuote и блокирует ожиданием основной поток Quik, пока исполнение заперто
3. В то же время main продолжает испололнение, благополучно завершает вычисление данных, подходит к вызову InsertRow
4. Внутри InsertRow происходит нечто, блокирующе зависящее от основного потока Quik
5. Основной поток Quik не откликается, потому что он в пункте 2 и ждет пока замок будет разлочен в main
6. main не может дойти до точки End, где замок разлочивается, потому что он ждет, пока InsertRow вернет исполнение

Получается, что внутри синхронизированного кода нельзя использовать ряд Qlua функций.
Вопросы:
1. Почему так происходит: InsertRow и подобные используют блокирующую отправку сообщений Windows (типа SendMessage вместо PostMessage) или это из-за логики локинга Quik? Понимание причины помогло бы придумать обходную схему
2. Какие функции поименно обладают таким же поведением как InsertRow? Понимание этого помогло бы писать скрипты так, чтоб хотя бы просто не натыкаться на эту проблему (не использовать внутри синхронизированного кода)
 
Один из вариантов кардинально решить проблему многопоточности при программировании на QLua описан ниже.

В потоке main делаем всю логику торгового робота, а из потока коллбэков только передаём данные в поток main через очередь. Очередь решает задачу синхронизации, коллбэки завершают свою работу максимально быстро, не тормозя UI терминала.

Реализация очереди функций на базе массива позволяет в потоке коллбэков указать, какие данные нужно использовать, и как именно. При получении функции в потоке main остаётся лишь запустить её.
Код
--
-- Реализация очереди из функций и их исполнения.
--

local Executor = {}

--- Конструктор.
-- @param self объект
local function new(self)
    local queue = {
        head = 1,
        tail = 0
    }
    setmetatable(queue, self)
    self.__index = self
    return queue
end

Executor.new = new

--- Поместить функцию в очередь на выполнение.
-- @param self объект
-- @param f функция
local function submit(self, f)
    if type(f) == "function" then
        self[self.tail + 1] = f
        self.tail = self.tail + 1
    end
end

Executor.submit = submit

--- Получить очередную функцию из очереди.
-- @param self объект
-- @return функция
local function get(self)
    if self.head > self.tail then
        return nil
    else
        local f = self[self.head]
        self[self.head] = nil
        self.head = self.head + 1
        return f
    end
end

Executor.get = get

--- Выполнять функции из очереди либо пока они там есть, либо пока не будет выполнено указанное количество функций.
-- @param self объект
-- @param max максимальное количество исполняемых за один раз функций
local function execute(self, max)
    max = max or 1000000
    while max > 0 do
        local f = self:get()
        if f == nil then
            return
        else
            f()
            max = max - 1
        end
    end
end

Executor.execute = execute

return Executor

Используется это примерно так.
Код
local tradeCounters = {} -- таблица[secCode], содержащая количество обезличенных сделок по каждому инструменту

В потоке коллбэков пишем, например:
Код
function OnAllTrade(t)
    executor:submit(function()
            tradeCounters[t.sec_code] = (tradeCounters[t.sec_code] or 0) + 1
    end)
end

В потоке main пишем цикл, достаточно часто вызывающий функцию execute, т.е. что-то типа:
Код
while not interrupted do
    ....
    executor:execute()
    ....
end
В результате в потоке main выполнится функция, которую мы задали в потоке коллбэков с данными, которые в тот момент были доступны.

Удачи!
 
Здравствуйте, спасибо за ответ.

В коде выше чтение и запись в submit/get не синхронизированы. Что случится, если вызов будет произведен одновременно на двух родных потоках (мейн и колбек)?
Например, колбек вызовет submit, который уже модифицирует очередь при присвеоении, но еще не завершит операцию, а get уже начнет из нее читать?
В Lua скрипте такую схему можно было бы корректно реализовать при помощи функций, описанных в разделе "Потокобезопасные функции для работы с таблицами Lua" в qlua.chm.
На C++ подобную схему я делаю проще, с заведомо потокобезопасной структурой данных для очереди (как например тут https://github.com/elelel/qluacpp-tutorial/tree/master/log_all_trades ).
Все же, хотелось бы избежать копирования всего и вся из коллбеков в очередь. Если обработку производить в main, нужно будет саспендить этот поток (единственный
не созданный нами поток, который мы имеем право саспендить) и пробуждать по мере надобности. Но для этого надо знать условия, когда его можно саспендить, чтоб не
происходило блокировки обоих ниток по сценарию как оригинальном посте.
 
Код написан с учетом Ваших опасений
Цитата
В коде выше чтение и запись в submit/get не синхронизированы. Что  случится, если вызов будет произведен одновременно на двух родных  потоках (мейн и колбек)?
Например, колбек вызовет submit, который уже модифицирует очередь  при присвеоении, но еще не завершит операцию, а get уже начнет из нее  читать?

submit меняет только tail, get меняет только head. И указатели двигаются только в те моменты, когда структура готова. За видимость переменных из разных потоков разработчики QLua уже подумали.
Цитата
Все же, хотелось бы избежать копирования всего и вся из коллбеков в очередь.

Передавайте только те данные, которые нужны. Полагаю, что если использовать что-то блокирующее, у Вас производительность понизится.

Как эвристику можно использовать следующее наблюдение: если при чтении из очереди она оказалась пуста, можно поставить sleep с аргументом побольше, если же непуста, то вообще не вызывать sleep.

Если очередь разгребается быстрее, чем пополняется, то в памяти растёт только массив, на базе которого реализована очередь. Этот  массив занимает в памяти относительно небольшой размер, а перезапуск скрипта, который происходит раз в несколько дней, приводит к старту с малого размера массива.
 
Цитата
submit меняет только tail, get меняет только head. И указатели двигаются только в те моменты, когда структура готова. За видимость переменных из разных потоков разработчики QLua уже подумали

Операция присвоения не атомарна. В документации среди потокобезопасных функций она или аналоги не перечислена. То есть, по документации, можно быть уверенным что в середине
sinsert(...) не случится параллельного вызова, который обращается к той же структуре данных. А вот в середине self[self.tail + 1] = f он вполне можент случиться. По крайней мере я нигде
не нашел признаков того, что они на каждое присвоение производят синхронизацию между тредами. Это было бы убийственно для производительности, представляете, везде где встречается = нужно было
бы брать лок, делать операцию, снимать лок. По этой же причине, кстати, insert не сделан потокобезопасным, иначе бы на каждый его вызов приходилось бы все это дело блокировать/разблокировать, вместо
этого сделали безопасный sinsert, который бы пользователь вызывал только когда он действительно нужен.
Цитата
Передавайте только те данные, которые нужны. Полагаю, что если использовать что-то блокирующее, у Вас производительность понизится.
Производительность понижается в результате отсутствия блокировки. Потому что тогда приходится делать поллинг общей переменной - флага или очереди, как у вас. То есть, ее нужно в цикле внутри main опрашивать.
Если она опрашивается в цикле, то там будет блокировка, та же самая, только по таймеру (через функцию sleep). Моя блокировка отличается от такой блокировке только тем, что разблокировка происходит
по уведомлению из другого треда напрямую непосредственно в момент, когда нужно произвести действия.
 
Когда ещё был старый форум, там это обсуждалось. Пусть кто-нибудь из представителей ARQA дальше по теме видимости изменения переменных отвечает (даёт ссылку на соответствующее обсуждение или заново тут ответит).

Приведённый мною код работает с конца 2013 года без каких-либо проблем.
Цитата
Моя блокировка отличается от такой блокировке только тем, что разблокировка происходит
по уведомлению из другого треда напрямую непосредственно в момент, когда нужно произвести действия.

Конечно, так и надо делать. Только в стандартном Lua для этого нет средств, а в QLua, наверное, через sinsert как-то можно извратиться и реализовать Lock. На практике же повелось так, что в main-потоке большинство пишет цикл, совершающий нужные проверки/действия и засыпающий, если это надо/есть возможность.
 
Цитата
Когда ещё был старый форум, там это обсуждалось. Пусть кто-нибудь из представителей ARQA дальше по теме видимости изменения переменных отвечает (даёт ссылку на соответствующее обсуждение или заново тут ответит).

Приведённый мною код работает с конца 2013 года без каких-либо проблем.
Если то, что я предполагаю, верно, то даже в этом случае встретить ошибку шанс минимален. По сути, это классическая ситуация с многопоточностью в нативном приложении, когда есть два потока и одна переменная, допустим int, и даже если известно, что один поток только пишет, а второй только читает, можно обойтись без локинга, если есть гарантия, что операции чтения и записи - атомарны. То, есть на C++ формально нельзя делать так, хотя в реальной жизни наткнуться на последствия проблемы очень маловероятно:
Цитата
int x;

thread1_proc() {
 while (...) {
    if (x == 1) {
      ...
    }
 }
}

thread2_proc() {
 while (...) {
   if (...) {
     x = 1;
   } else {
     x = ...
   }
 }
}
Но можно делать, если x будет объявлен как std::atomic<int> x; потому что иначе простая строчка типа x = 1 или x == 1 компилируются в много инструкций, которые могут быть перемешаны, например, с частью инструкций от следующей строки С++ кода.
Поэтому действительно хочется услышать кого-нибудь из разработчиков, действительно они каким-то образом гарантируют атомарность присвоения и чтения всех переменных Lua. Задача эта, как мне кажется, нетривиальная.

Цитата
Конечно, так и надо делать. Только в стандартном Lua для этого нет средств, а в QLua, наверное, через sinsert как-то можно извратиться и реализовать Lock. На практике же повелось так, что в main-потоке большинство пишет цикл, совершающий нужные проверки/действия и засыпающий, если это надо/есть возможность.
Все упирается в то, что в Lua в принципе нет исполнения одного контекста на разных реальных (таких, как тут) потоках. Не то что для этого нет средств, она в принципе не предназначена для того, чтоб с ней делали то, что сделано в Quik. Но нам даже из Lua можно сделать ровно то, что сделали разработчики Quik - использовать родные вызовы (например к WinApi) для работы с настоящими потоками. С их помощью можно сделать паузу в main (на что мы имеем право по документации) и уведомление о продолжении из колбеков. Единственное препятствие для такой схемы - то что некоторые вызовы, типа работы над табличным юзеринтерфейсом, блокирующие, в зависимости от ответа второго треда. Насчет этого требуется пояснения разработчиков, пока что приходится опытным путем устанавливать, какая у них логика локинга.
Страницы: 1
Читают тему
Наверх