Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
14.02.2026 10:16:21
Вопрос к поддержке: В QUIKе много различных коллбеков. Где гарантии, что они не пропускают события, по которым должны выполняться? Пропуск событий в заявленных коллбеках, даже если это происходит нечасто, это ошибка QUIK. В существующей версии QUIK коллбеки выполняются в единственном служебном потоке. Что происходит в QUIKе если возникает очередное новое событие, по которому должен выполниться коллбек и при этом не был обработан предыдущий? Есть служебные очереди необработанных событий? В существующей версии QUIK выполнение коллбека блокируется до тех пор пока в потоке main не выполнится sleep или сишная функция. Выше написанное означает полную зависимость служебного потока от пользовательского. Зачем это сделано в QUIKe? В этой ветке были изложены предложения, которые, по моему мнению, могли бы устранить описанные выше проблемы. Что в этих предложениях не понятно или вызывает сомнения?
Пользователь
Сообщений: Регистрация: 02.01.2026
14.02.2026 13:27:05
Цитата
TGB написал: В существующей версии QUIK выполнение коллбека блокируется до тех пор пока в потоке main не выполнится sleep или сишная функция. Выше написанное означает полную зависимость служебного потока от пользовательского. Зачем это сделано в QUIKe?
Блокируется на время выполнения байт-кода. Не представляю, какой код нужно написать без вызова сишных функций, выполняющийся длительное время, чтобы это можно было заметить.
Пользователь
Сообщений: Регистрация: 30.01.2015
14.02.2026 15:11:44
Функция sleep уступает свободное время потока следующему потоку. Например sleep(1000) в main означает, что поток main будет остановлен системой на 1000 ms. ----------------------- Таким образом, функция sleep выполняется быстро, так как ее задача сообщить ОС чтобы та разбудила поток через заданное время. ОC устанавливает таймер на событие "запустить поток main через 1000 ms. ------------------------ Все эти действия выполняются буквально мкс . Поэтому ничего не блокируется для исполнения sleep.
Пользователь
Сообщений: Регистрация: 30.01.2015
14.02.2026 15:13:41
чтобы остановить выполнение потоко с колбеком надо в функцию колбека поставить sleep. Поток колбека будет остановлен на время указанное в sleep.
Пользователь
Сообщений: Регистрация: 30.01.2015
14.02.2026 15:20:35
и еще... Если значение аргумента sleep равно нулю, поток освобождает оставшуюся часть своего интервала времени для любого потока с таким же приоритетом, готовым к выполнению. Если других готовых к выполнению потоков с таким же приоритетом нет, выполнение текущего потока не приостанавливается.
Пользователь
Сообщений: Регистрация: 12.05.2020
14.02.2026 18:29:42
Цитата
nikolz написал: Поэтому ничего не блокируется для исполнения sleep.
Когда же вы научитесь читать ? Вы читаете тексты перед тем как писать? Ведь написано:
Цитата
TGB написал: блокируется до тех пор пока в потоке main не выполнится sleep или сишная функция
Где вы видите у меня фразу: "для исполнения"? У вас какое то недержание ваших текстов.
написал: Поэтому ничего не блокируется для исполнения sleep.
Когда же вы научитесь читать :: ? Вы читаете тексты перед тем как писать? Ведь написано:
Цитата
написал: блокируется до тех пор пока в потоке main не выполнится sleep или сишная функция
Где вы видите у меня фразу: "для исполнения"? У вас какое то недержание ваших текстов.
Вы тоже не умеете читать. Что именно по-вашему блокируется "пока в потоке main не выполнится sleep" Напишите конкретно от какого до какого момента исполнения кода в функции Main блокируется -------------------- Я написал именно на это ваше высказывание. Ничего не блокируется "до тех пор пока в потоке main не выполнится sleep или сишная функция" =================== Поясняю специально для Вас: Поток может останавливается в многопоточной системе если он обращается к ПАМЯТИ ДАННЫХ , к которой обращается в данный момент другой поток. И то если потоки должны писать в эту память. Если они читатели то никакой блокировки никто не делает. --------------------------------------- Теперь скажите К КАКОЙ ПАМЯТИ ВСЕГДА одновременно обращаются колбек и поток main для записи данных в эту память "до тех пор пока в потоке main не выполнится sleep или сишная функция" --------------------------------------
Пользователь
Сообщений: Регистрация: 30.01.2015
15.02.2026 07:21:05
, Если Вы знаете русский язык, то фраза "до тех пор пока в потоке main не выполнится sleep или сишная функция" означает, что время измеряется от некоторого момента до выполнения sleep до момента окончания выполнения sleep. ----------------- И понять эту фразу очень сложно. Время до оператора sleep - это время обработки очереди в цикле в main (пример есть в документации). Если бы в это время колбеки остановились, то никакой очереди создать было бы невозможно. -------------------- момент времени после исполнения sleep Если в sleep 0, то это практически время начала исполнения sleep B этом случае оно совпадает с началом цикла обработки очереди. ----------------------------- таким образом Вы сказали буквально следующее: Колбеки всегда остановлены если в main нет функции sleep или параметр у нее равен нулю. Но колбеки прекрасно работают и без sleep в main. ---------------------------- Согласитесь, что вы сказали чушь.
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 11:56:46
Цитата
nikolz написал: Что именно по-вашему блокируется "пока в потоке main не выполнится sleep" Напишите конкретно от какого до какого момента исполнения кода в функции Main блокируется
Вы опять не читаете или ничего не соображаете. У меня же написано конкретно то, что давно известно:
Цитата
TGB написал: В существующей версии QUIK выполнение коллбека блокируется до тех пор пока в потоке main не выполнится sleep или сишная функция.
Выполните в main строку: for i = 0, 10000000000 do end и вы заблокируете секунд на 30 не только коллбеки, но и диалог QUIK. -----
Цитата
nikolz написал: Вы сказали буквально следующее: Колбеки всегда остановлены если в main нет функции sleep или параметр у нее равен нулю.
Вы опять меня "передергиваете". Где я это писал буквально? Читайте мою приведенную выше цитату. Вы понимаете что означает слово буквально? Зачем вы меня "передергиваете" и пытаетесь приписать свои фантазии? Вы продолжаете работать прокладкой между комментариями ?
В указанный период на сервере проводились технические работы, которые и стали причиной невозможности снятия заявки через Рабочее место и средствами скриптов. Приносим свои извинения за доставленные неудобства.
Пользователь
Сообщений: Регистрация: 12.05.2020
19.02.2026 09:28:27
Цитата
Oleg Kuzembaev написал: В указанный период на сервере проводились технические работы, которые и стали причиной невозможности снятия заявки
Здравствуйте! Спасибо за ответ. У меня просьба к вам: донести до разработчиков мои комментарии 1, 51.
QUIK clients support
Сообщений: Регистрация: 11.08.2025
19.02.2026 10:00:57
Благодарим вас за обратную связь. Такая информация довольно ценна для улучшения продуктов от нашей компании.
В данном случае, нам видится возможность зарегистрировать пожелание на доработку будущих версий ПО. Однако необходимо достичь понимания, какая именно доработка требуется. Если у вас уже есть такая наготове, то просьба предметно описать ее: что именно стоит добавить? Как это будет выглядеть в вашем представлении? После сбора всей информации, мы с радостью зарегистрируем такое пожелание.
Пользователь
Сообщений: Регистрация: 12.05.2020
19.02.2026 13:06:40
Цитата
Oleg Kuzembaev написал: В данном случае, нам видится возможность зарегистрировать пожелание на доработку будущих версий ПО. Однако необходимо достичь понимания, какая именно доработка требуется.Если у вас уже есть такая наготове, то просьба предметно описать ее: что именно стоит добавить? Как это будет выглядеть в вашем представлении?
1. Я исхожу из того, что среди пользователей QUIK могут быть квалифицированные IT-разработчики, которые могут улучшить мои предложения и поэтому выкладываю их в публичку. 2. Сохранение существующего API QLua, с тем, чтобы обеспечить совместимость с существующими разработками пользователей, является основным ограничением на предлагаемое. ------------------- I. В текущей версии QUIK в одном основном потоке обслуживаются: - запуск всех Lua-скриптов пользователя; - запуск коллбеков всех Lua-скриптов пользователя; - обработка всех коллбеков таблиц QUIK (это не таблицы Lua); - обработка всех индикаторов пользователя. Притом, что в текущий момент у подавляющего количества ПК много ядер ЦП, написанное выше явный перебор. Наверное, нет проблем перечисленное выше обрабатывать в отдельных потоках, так как перечисленные выше функции, в моем представлении, не сильно связаны друг с другом и, на первый взгляд, между ними не требуется синхронизация.
II. Интерфейс взаимодействия QUIK c Lua-скриптом пользователя, реализованный в виде коллбеков, предполагает многопоточный режим использования Lua, порождающий неприятные проблемы параллельного программирования (для решения которых сами же разработчики предлагают использовать потокобезопасную очередь между коллбеками и потоком main). Мне представляется, что имеет смысл вместо коллбеков использовать активную очередь событий, При этом не требуется использовать Lua в многопоточном, редко используемом и не очень стабильном режиме. При этом не будет проблем с подключением новых версий Lua. Более того, скрипты пользователя будут выполняться несколько быстрее из-за отсутствия синхронизации, требуемой в многопоточном варианте использования Lua. Конкретные предложения: 1) Lua подключать в однопоточном режиме, нативном варианте, без внесения изменений в исходники. Все необходимые коды QLua перенести в dll-пакет. 2) Для взаимодействия со скриптами пользователя использовать общие для них служебные циклические очереди (в соответствии с существующими функциями обратного вызова). Длины очередей задавать по умолчанию с возможностью их изменения пользователем. Для каждого запускаемого скрипта пользователя создавать блоки доступа (с соответствующими указателями, обеспечивающими чтение очередей без блокировки) к общим циклически очередям. 3) Функции чтения очередей регистрировать под теми же именами и так же, как это делается для существующих коллбеков. 4) Выполнять функции чтения очередей в модифицированной следующим образом sleep: - sleep выполняется по истечению заданного в ней интервала времени или при получения сигнала поступления данных в одну из циклических очередей (в пропуске сигналов нет проблем, так есть запуск по времени); - выполнение функции начинается с анализа появления новых данных в циклических очередях скрипта (конкретного) и запуска зарегистрированных его функций с теми параметрами из очередей, как это делается в существующей версии QUIK; - в качестве результата в модифицированной sleep выдавать значения: количество считанных записей в очередях, признак отсутствия потерь данных в очередях из-за их переполнения (с указанием очередей, в которых это возникло), и, возможно, еще что-то. Предложенное: 1) устраняет зависимость служебного потока от пользовательского, когда (служебный) в существующей версии QUIK может быть блокирован пользовательским, а пользовательский служебным; 2) устраняет необходимость внесения правок в исходники Lua; при этом скрипты пользователя будут выполняться несколько быстрее из-за отсутствия синхронизации, требуемой в многопоточном варианте использования Lua; 3) обеспечивает контроль потерь данных в очередях, а также возможность исключения этих потерь за счет увеличения длин очередей пользователем или уменьшения времени ожидания в sleep; 4) убирает проблемы синхронизации внутри скрипта пользователя.
III. В QUIK реализована функциональность просмотра графика котировок бумаг QUIK. Но отсутствует возможность просмотра котировок бумаг, сохраненных во внешних файлах. С учетом существующей функциональности QUIK, как мне представляется, реализация этой возможности не потребует больших усилий.
IV. Пожелание: сократить время восстановления взаимодействия QUIK с сервером при перезапуске хотя бы до 10 сек.
Пользователь
Сообщений: Регистрация: 02.01.2026
19.02.2026 14:43:11
TGB, можете написать прсевдокод, как по-вашему, использовать очередь событий?
Пользователь
Сообщений: Регистрация: 12.05.2020
19.02.2026 15:46:21
Цитата
Йцукен написал: можете написать прсевдокод, как по-вашему, использовать очередь событий?
Я так понимаю, в случае реализации предложенного.
У пользователя никаких изменений:
Код
<Определение функций обработки событий ровно так, как это сейчас>
--- Весь код скрипта без изменений, в том числе sleep(<Интервал>) --
-- Отличие только в реализации функции sleep.
-- При ее вызове с заданным интервалом внутри нее выполняется (код на C++):
WaitForSingleObject(<ОбъектОжидания>, <Интервал>); // здесь ожидание истечения <Интервала> или выдачи сигнала QUIK (неблокирующей функцией Pulse в служебном потоке) о записи в очереди новых событий.
-- Когда sleep "срывается" с <ОбъектаОжидания>, то в ней анализируются очереди (это можно сделать эффективно, используя битовую шкалу непустых очередей скрипта) и выполняются соответствующие <Функций обработки событий> с параметрами считанными из очередей.
-- Функционально это не отличается от того, что реализовано сейчас, но выполняется в потоке пользователя main.
-- Тому, кому интересны детали выполнения sleep, может изменить в своем скрипте формат вызова sleep: вместо sleep(<Интервал>) строка
local <Количество обработанных элементов очереди>, <Данные о потерях событий в очередях> = sleep(<Интервал>)
Пользователь
Сообщений: Регистрация: 02.01.2026
19.02.2026 15:52:01
TGB, колбэки внутри sleep должны вызываться или как?
Пользователь
Сообщений: Регистрация: 12.05.2020
19.02.2026 16:00:27
Цитата
Йцукен написал: колбэки внутри sleep должны вызываться или как?
Да, так называемые коллбеки, должны вызываться внутри sleep (выполняемой в потоке main) с данными из очередей.
Пользователь
Сообщений: Регистрация: 02.01.2026
20.02.2026 10:03:39
TGB, а что делать, если между слипами данные поступили по инструменту несколько раз?
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 11:01:47
Цитата
Йцукен написал: а что делать, если между слипами данные поступили по инструменту несколько раз?
При "просыпании" sleep, функции чтения очередей (коллбеки) будут выполняться (последовательно, без пропусков) столько раз сколько необработанных записей в очередях. Если очереди не переполняются, а это можно контролировать и не допускать, то события не теряются, но возможны задержки в их отработке. Как было указано ранее, sleep "просыпается" либо по времени, либо по записи в очереди данных из QUIK. Но при этом в в sleep всегда анализируются и при необходимости обрабатываются непустые служебные очереди скрипта.
Пользователь
Сообщений: Регистрация: 02.01.2026
20.02.2026 11:24:39
А смысл несколько раз подряд обрабатывать, например OnQuote, если каждый раз мы будем обрабатывать одни и те же данные? Или вы предлагаете хранить данные для каждого колбэка?
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 11:49:34
Цитата
Йцукен написал: А смысл несколько раз подряд обрабатывать, например OnQuote, если каждый раз мы будем обрабатывать одни и те же данные?Или вы предлагаете хранить данные для каждого колбэка?
Если вы не хотите повторно обрабатывать данные, то фильтруйте их. Для фильтрации данных можно создать, для соответствующего вида коллбека, таблицу с фильтруемыми полями, обновляемыми при поступлении новых данных и использовать эту таблицу в начале коллбека с тем. чтобы не обрабатывать ненужные вам повторы.
Пользователь
Сообщений: Регистрация: 02.01.2026
20.02.2026 12:09:03
Цитата
TGB написал: Если вы не хотите повторно обрабатывать данные, то фильтруйте их.
А как вы узнаете, что это повторы, пока не обработаете их?
Цитата
TGB написал: Для фильтрации данных можно создать, для соответствующего вида коллбека, таблицу с фильтруемыми полями, обновляемыми при поступлении новых данных и использовать эту таблицу в начале коллбека с тем. чтобы не обрабатывать ненужные вам повторы.
Вам надо будет сначала, как минимум запросить данные, чтобы сравнить с предыдущими.
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 12:35:25
Цитата
Йцукен написал: А как вы узнаете, что это повторы, пока не обработаете их?
Вы сначала создаете таблицу фильтрации со значениями полей фильтрации заведомо не совпадающими с ожидаемыми данными.
Пользователь
Сообщений: Регистрация: 30.01.2015
20.02.2026 12:50:07
Цитата
Йцукен написал: колбэки внутри sleep должны вызываться или как?
Sleep останавливает поток main, а функции колбеков вызываются в другом потоке, на который sleep не действует.
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 13:00:33
Цитата
nikolz написал: Sleep останавливает поток main, а функции колбеков вызываются в другом потоке, на который sleep не действует.
Вы опять только пишите не читая. Мы же с Йцукен обсуждаем не системный Sleep и даже sleep QLua, а предложенный мной запускающий коллбеки. Вам опять надо проложиться между комментариев ?
написал: Sleep останавливает поток main, а функции колбеков вызываются в другом потоке, на который sleep не действует.
Вы опять только пишите не читая. Мы же с Йцукен обсуждаем не системный Sleep и даже sleep QLua, а предложенный мной запускающий коллбеки. Вам опять надо проложиться между комментариев :: ?
Что вы так переживаете? Я написал пояснение о работе sleep в потоках. Так как г-н нефига в этом не понимает, а Вы ему ( я прочитал Ваш ответ)
Да, так называемые коллбеки, должны вызываться внутри sleep (выполняемой в потоке main) с данными из очередей.
Это полная чушь. Ничего внутри sleep не вызывается т к функция sleep выполняется ядром OS. В это время никакой колбек не успеет ничего сделать. -------------------
Пользователь
Сообщений: Регистрация: 30.01.2015
20.02.2026 13:44:29
все что делается внутри sleep - это передается ядру время на которое надо остановить поток. Ядро настраивает таймер на событие -разбудить поток через надцать секунд и передает управление следующему в очереди потоку.
Пользователь
Сообщений: Регистрация: 02.01.2026
20.02.2026 14:18:03
Цитата
В любой теме, где есть обсуждение, Он уже оставил свой след. Не вникая в суть предложения, Он вещает свой «важный» бред.
В любой теме, где есть обсуждение, Он уже оставил свой след. Не вникая в суть предложения, Он вещает свой «важный» бред.
Прекрасно, Наконец-то вы занялись самокритикой. Сами сочинили или опять плагиат?
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 14:21:42
Цитата
nikolz написал: все что делается внутри sleep - это передается ядру время на которое надо остановить поток.
Вы, как дятел , про свое, не относящееся к обсуждаемому. Вы это читали ?:
Цитата
TGB написал: -- Отличие только в реализации функции sleep. -- При ее вызове с заданным интервалом внутри нее выполняется (код на C++): WaitForSingleObject( , ); // здесь ожидание истечения или выдачи сигнала QUIK (неблокирующей функцией Pulse в служебном потоке) о записи в очереди новых событий. -- Когда sleep "срывается" с , то в ней анализируются очереди (это можно сделать эффективно, используя битовую шкалу непустых очередей скрипта) и выполняются соответствующие с параметрами считанными из очередей. -- Функционально это не отличается от того, что реализовано сейчас, но выполняется в потоке пользователя main.
Пользователь
Сообщений: Регистрация: 30.01.2015
20.02.2026 14:23:41
Цитата
Йцукен написал: , можете написать прсевдокод, как по-вашему, использовать очередь событий?
Вы не читаете форум. Я специально выложил для начинающих писателей не псевдокод, а скрипт на Lua с очередью. Да и в документации на QLua есть еще один пример. -------------------- Ах, ошибся, Вы же не писатель, Вы же ПОЭТ.
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 14:48:52
Цитата
nikolz написал: Я специально выложил для начинающих писателей не псевдокод, а скрипт на Lua с очередью. Да и в документации на QLua есть еще один пример.
Какой же вы непонятливый. До сих пор не поняли, что обсуждаются не существующие реализации, а вариант изменения этих реализаций. Все таки, похоже, что вы полуинтеллектуальный робот-спамер устаревшей версии, без способности учета контекста обсуждаемой темы при генерации спама.
TGB написал: без способности учета контекста обсуждаемой темы при генерации спама.
У него памяти не хватает, чтобы загрузить весь контекст в память. Вон весь форум загадил кучей новых тем про нехватку памяти.
Пользователь
Сообщений: Регистрация: 02.01.2026
20.02.2026 15:16:51
Цитата
TGB написал: Вы сначала создаете таблицу фильтрации со значениями полей фильтрации заведомо не совпадающими с ожидаемыми данными.
Ну т.е., фактически запросить данные, обработать их (как минимум сравнить с предыдущими). Логичней было бы, чтобы терминал не дублировал пропущенные колбэки, по которым скрипт не получит новых данных. Хотя, кому-то наоборот нужна вся история изменений (для индикатора какого-нибудь). Не зря же есть CreateDataSource для параметров инструментов. Но тогда QUIK должен хранить всю историю изменений. В общем, этот момент вам надо продумать.
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 17:57:57
Цитата
Йцукен написал: Логичней было бы, чтобы терминал не дублировал пропущенные колбэки, по которым скрипт не получит новых данных.
Это хорошее ваше предложение можно сформулировать следующим образом: функциональность обработки событий QUIK расширить, добавив в API пользователя функцию задания свойств служебных очередей событий: 1) длин очередей; 2) функций фильтрации событий перед их записью в соответствующие очереди; 3) приоритетов обработки очередей; 4) может быть что то еще.. Все свойства имеют значение по умолчанию.
Пользователь
Сообщений: Регистрация: 12.05.2020
21.02.2026 12:49:30
У меня было предложение, которое стоит повторить:
Цитата
TGB написал: На форумах ARQA для комментирующих пользователей ввести месячный лимит трафика, после которого не будет возможность вводить комментарии. Значением этого лимита могло быть: Годовой трафик nikolz / 12 / 10. Но, конечно, насчет значения лимита, решать ARQA. Наличие такого лимита, обеспечило бы: - экономию дискового пространства баз форумов; - автоматическое модерирование форумов за счет принуждения думать о краткости и четкости текстов, пишущих комментаторами; - - удобство для читающих комментарии, в которых будет меньше флуда.
Это может показаться неактуальным из-за снижения активности пишущих, но могло бы улучшить качество форумов в перспективе.
Пользователь
Сообщений: Регистрация: 02.01.2026
21.02.2026 13:11:39
Скрытый текст
Цитата
TGB написал: похоже, что вы полуинтеллектуальный робот-спамер устаревшей версии, без способности учета контекста обсуждаемой темы при генерации спама
Цитата
Чат-боты глупеют в длинных диалогах
Эксперты Microsoft Research и Salesforce изучили более 200 000 диалогов пользователей в популярных чат-ботах (GPT-4.1, Gemini 2.5 Pro, Claude 3.7 Sonnet, o3, DeepSeek R1 и Llama 4), и как оказалось – умные помощники дают 90% верных ответов на отдельные запросы.
Однако если общение превращается в полноценный диалог, то точность ИИ падает уже до 65%. В многоходовых обсуждениях нейросети начинают давать ответы, даже не прочитав полностью вопрос пользователя. К тому же они часто опираются на свой первый ответ, даже если пользователь сказал что он неверен. Кроме того, новые ответы в диалогах с каждым разом становятся все длиннее – ИИ начинают больше предполагать и галлюцинировать.
В итоге исследователи рекомендуют придерживаться старой истины: краткость – сестра таланта. Чем короче беседа с ИИ, тем лучше будет качество ответов.
Пользователь
Сообщений: Регистрация: 12.05.2020
24.02.2026 15:45:40
Цитата
TGB написал: предложение можно сформулировать следующим образом: функциональность обработки событий QUIK расширить, добавив в API пользователя функцию задания свойств служебных очередей событий
Оформить это, наверное, лучше в виде меню задания свойств служебных очередей событий QLua, вызываемое в окне запуска скриптов пользователя, при отсутствии запущенных скриптов. Промежуточный итог обсуждения интерфейса QUIK со скриптами QLua, при реализации предложений этой ветки следующий: пользователям, при этом, вносить изменения в их существующие скрипты не требуется.
Пользователь
Сообщений: Регистрация: 12.05.2020
28.02.2026 10:42:54
Цитата
Oleg Kuzembaev написал: необходимо достичь понимания, какая именно доработка требуется.Если у вас уже есть такая наготове, то просьба предметно описать ее: что именно стоит добавить? Как это будет выглядеть в вашем представлении?
Промт для ИИ Написать программу на C++, в которой: 1) Создаются несколько циклических очередей с разными типами данных. 2) В очереди пишет данные один поток, а читают эти очереди несколько потоков. 3) У читающих потов должны быть свои указатели чтения циклических очередей. 4) При записи в любую очередь пишущий поток активирует все читающие потоки. 5) Каждый читающий поток при чтении получает признак: состояние прочитанных очередей в виде размера непрочитанных данных. -------- Результат от ИИ, полученный в течении 5 сек.(компилируемый, код-основа для разработки очередей событий):
Код
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
template <typename T, size_t Size>
class CircularQueue {
public:
void push(T item) {
buffer[head % Size] = item;
head.fetch_add(1, std::memory_order_release);
}
// Каждый читатель передает свой локальный индекс
bool try_read(size_t& reader_idx, T& out_item, size_t& pending_count) {
size_t current_head = head.load(std::memory_order_acquire);
if (reader_idx < current_head) {
out_item = buffer[reader_idx % Size];
reader_idx++;
pending_count = current_head - reader_idx;
return true;
}
pending_count = 0;
return false;
}
private:
T buffer[Size];
std::atomic<size_t> head{0};
};
// Контейнер для очередей разных типов
struct SharedData {
CircularQueue<int, 100> int_queue;
CircularQueue<double, 100> double_queue;
std::mutex mtx;
std::condition_variable cv;
bool stop = false;
};
void writer(SharedData& data) {
for (int i = 1; i <= 5; ++i) {
{
std::lock_guard<std::mutex> lock(data.mtx);
data.int_queue.push(i * 10);
data.double_queue.push(i * 1.1);
std::cout << "[Writer] Pushed data set " << i << std::endl;
}
data.cv.notify_all(); // Активируем всех читателей
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
data.stop = true;
data.cv.notify_all();
}
void reader(int id, SharedData& data) {
size_t my_int_ptr = 0;
size_t my_dbl_ptr = 0;
while (true) {
std::unique_lock<std::mutex> lock(data.mtx);
data.cv.wait(lock, [&] { return data.stop || true; }); // Упрощенно: просыпаемся по сигналу
int val;
double dval;
size_t pending;
// Читаем из инт-очереди
while (data.int_queue.try_read(my_int_ptr, val, pending)) {
std::cout << "Reader " << id << " got INT: " << val << " (Left: " << pending << ")\n";
}
// Читаем из дабл-очереди
while (data.double_queue.try_read(my_dbl_ptr, dval, pending)) {
std::cout << "Reader " << id << " got DBL: " << dval << " (Left: " << pending << ")\n";
}
if (data.stop) break;
}
}
int main() {
SharedData data;
std::thread w(writer, std::ref(data));
std::thread r1(reader, 1, std::ref(data));
std::thread r2(reader, 2, std::ref(data));
w.join();
r1.join();
r2.join();
return 0;
}
// Основные моменты реализации (от ИИ):
// Индивидуальные указатели: Переменные my_int_ptr и my_dbl_ptr находятся в стеке каждого читающего потока. Они отслеживают прогресс конкретного
// читателя относительно общей «головы» (head) очереди.
// Шаблон очереди: Позволяет создавать экземпляры для int, double или любых других типов в одном контейнере SharedData.
// Синхронизация: std::condition_variable используется для мгновенной активации читателей пишущим потоком.
// Статус очереди: Метод try_read возвращает pending_count, что позволяет читателю знать, сколько данных еще осталось в буфере именно для него.
Пользователь
Сообщений: Регистрация: 12.05.2020
28.02.2026 15:33:43
Мне стало интересно, а более продвинутый вариант циклических очередей ИИ сможет реализовать? Промт для ИИ Написать программу на C++, в которой: 1) Создаются несколько циклических очередей с разными типами данных. 2) В очереди пишет данные один поток, а читают эти очереди несколько потоков. 3) У читающих потов должны быть свои указатели чтения циклических очередей и битовые шкалы непустых очередей, хранящиеся в объекте доступа к их очередям. 4) При записи в любую очередь пишущий поток меняет состояние битовых шкал непустых очередей и активирует все читающие потоки. 5) Каждый читающий поток при чтении очереди, выбираемой на основании своей битовой шкалы непустых очередей, получает признак: состояние прочитанных очередей в виде размера непрочитанных данных. 6) Добавить в код динамическое управление количеством очередей и реализовать полностью Lock-Free механизм уведомления без использования std::mutex.
Смог (все комментарии ИИ):
Код
// Это сложная задача, требующая использования атомарных операций (std::atomic) и барьеров памяти для реализации Lock-Free взаимодействия.
// Для выполнения условия динамического управления очередями и уведомления без мьютексов, мы будем использовать std::atomic_flag или
// std::atomic<bool> для сигнализации (busy-wait или wait/notify в C++20) и атомарные указатели.
// Ниже представлен концептуальный код, реализующий эту логику:
// C++20 ----
#include <iostream>
#include <vector>
#include <atomic>
#include <thread>
#include <memory>
#include <optional>
// Шаблонная циклическая очередь (Lock-Free для одного писателя и нескольких читателей со своими индексами)
template<typename T, size_t Capacity>
class CircularQueue {
T buffer[Capacity];
std::atomic<size_t> head{0}; // Указатель записи
public:
void push(T item) {
size_t h = head.load(std::memory_order_relaxed);
buffer[h % Capacity] = item;
head.store(h + 1, std::memory_order_release);
}
bool try_pop(size_t& reader_idx, T& out_item, size_t& pending_count) {
size_t current_head = head.load(std::memory_order_acquire);
if (reader_idx < current_head) {
out_item = buffer[reader_idx % Capacity];
reader_idx++;
pending_count = current_head - reader_idx;
return true;
}
pending_count = 0;
return false;
}
};
// Объект доступа для каждого читающего потока
struct ReaderContext {
std::atomic<uint64_t> bitmask{0}; // Битовая шкала непустых очередей
std::vector<size_t> read_indices; // Свои указатели чтения
std::atomic<bool> data_ready{false};
void update_mask(size_t queue_idx) {
bitmask.fetch_or(1ULL << queue_idx, std::memory_order_release);
data_ready.store(true, std::memory_order_release);
data_ready.notify_all(); // C++20 Lock-free уведомление
}
};
class QueueManager {
CircularQueue<int, 1024> int_queue;
CircularQueue<float, 1024> float_queue;
std::vector<std::shared_ptr<ReaderContext>> readers;
public:
void add_reader(std::shared_ptr<ReaderContext> ctx) {
ctx->read_indices.resize(2, 0); // 0 для int, 1 для float
readers.push_back(ctx);
}
void write_int(int val) {
int_queue.push(val);
for (auto& r : readers) r->update_mask(0);
}
void write_float(float val) {
float_queue.push(val);
for (auto& r : readers) r->update_mask(1);
}
void reader_thread_func(std::shared_ptr<ReaderContext> ctx, int id) {
while (true) {
// Ожидание данных (Lock-free wait)
ctx->data_ready.wait(false, std::memory_order_acquire);
uint64_t mask = ctx->bitmask.load(std::memory_order_acquire);
for (size_t i = 0; i < 2; ++i) {
if (mask & (1ULL << i)) {
size_t pending = 0;
if (i == 0) {
int val;
while (int_queue.try_pop(ctx->read_indices[0], val, pending)) {
printf("[Reader %d] Int: %d (Pending: %zu)\n", id, val, pending);
}
} else {
float val;
while (float_queue.try_pop(ctx->read_indices[1], val, pending)) {
printf("[Reader %d] Float: %.2f (Pending: %zu)\n", id, val, pending);
}
}
// Сбрасываем бит, если данных больше нет
if (pending == 0) ctx->bitmask.fetch_and(~(1ULL << i));
}
}
ctx->data_ready.store(false, std::memory_order_relaxed);
}
}
};
int main() {
QueueManager manager;
auto r1 = std::make_shared<ReaderContext>();
auto r2 = std::make_shared<ReaderContext>();
manager.add_reader(r1);
manager.add_reader(r2);
std::thread t1(&QueueManager::reader_thread_func, &manager, r1, 1);
std::thread t2(&QueueManager::reader_thread_func, &manager, r2, 2);
// Пишущий поток
std::thread writer([&]() {
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
manager.write_int(100 + i);
manager.write_float(1.5f * i);
}
});
writer.join();
t1.detach();
t2.detach();
return 0;
}
// Ключевые моменты реализации :
// Индивидуальные индексы : Каждый читатель хранит свои read_indices, что позволяет им двигаться по одной и
// той же очереди независимо.
// Битовая шкала : Используется std::atomic<uint64_t>, где каждый бит соответствует ID очереди.Это позволяет
// мгновенно проверить, где появились данные.
// Lock - Free Notify : Использован метод atomic::wait / notify_all(доступен с C++20).Он работает эффективнее
// обычного yield, так как позволяет потоку "заснуть" без использования мьютекса в пользовательском
// коде(на уровне ОС это может использовать фьютексы).
// Pending Count : Метод try_pop возвращает разницу между указателем записи и текущим индексом чтения,
// выполняя условие получения "размера непрочитанных данных".
Пользователь
Сообщений: Регистрация: 12.05.2020
01.03.2026 11:28:58
Вариант ТЗ на модификацию интерфейса QLua c QUIK 1. Вместо существующей схемы запуска коллбеков создаются циклические очереди (соответствующие коллбекам). 2. В очереди пишет данные событий служебный поток QUIK, а читают эти очереди и выполняют функции их обработки потоки main скриптов пользователя в теле модифицированной функции sleep. 3. В меню, добавленное в окно запуска пользовательских скриптов, обеспечить возможность изменения пользователем умолчания параметров очередей, а также функций фильтрации данных, записываемых в очереди . 4. У читающих потоков должны быть свои указатели чтения циклических очередей и битовые шкалы непустых очередей, хранящиеся в объекте доступа к их очередям. 5. Читающие потоки циклически, выполнив свои коды переходят в состояние ожидания сигнала появления данных в их очередях или истечения заданного интервала времени. 6. При записи в любую очередь пишущий поток меняет состояние битовых шкал непустых очередей и активирует все читающие потоки. 7. Каждый читающий поток при чтении очереди, выбираемой на основании своей битовой шкалы непустых очередей, получает признак: состояние прочитанных очередей в виде размера непрочитанных данных. 8. Добавить в код динамическое управление количеством очередей и реализовать полностью Lock-Free механизм уведомления без использования std::mutex. 9. Добавить логику удаления очередей «на лету» (dynamic removal) с использованием механизма безопасного освобождения памяти, такого как Hazard Pointers. 10. Реализовать идиому Hazard Pointers с очередью на удаление (retire list). 11. Реализовать пользовательскую функцию запроса свойств очередей. 12. Подключать в QUIKе коды Lua без внесения изменений в его исходники. 13. Перенести все необходимые функции (с внесением изменений, учитывающих п.1 - 12), реализованные в текущей версии QUIK в исходниках Lua, в пакет dll. ------- Что в написанном выше не понятно? В моих сообщениях 91 и 92 приведены "сырые", но работающие прототипы, частично реализующие это ТЗ. На возражения, замечания и предложения по улучшению ТЗ, постараюсь ответить.
Пользователь
Сообщений: Регистрация: 12.05.2020
09.03.2026 21:59:17
В соответствии с ТЗ (п.1,2, 4-8) ниже выложен код на C++ варианта реализации (Lock-free) циклических очередей событий QUIK. При использовании этого варианта в реализации п. 9-11 необходимости нет. 1. Блокировок между пишущим основным потоком QUIK и читающими потоками (main) нет. 2. Результата выполненных тестов эффективности приведены в комментариях кодов. В тестах накладные расходы времени ЦП: 1) на запись в очередь одного сообщения пишущим потоком ~0.007 мкс. 2) на чтение одного сообщения читающим потоком ~0.13 мкс. 3. Порог заполненности очередей обеспечивает выдачу предупреждения о высоком уровне заполненности очередей для читающих потоков. Если для читающего потока в какой то его очереди возникнет переполнение, то об этом выдается сообщение. 4. Независимо от задаваемой паузы ожидания в читающем потоке, он активируется всякий раз, при появлении новых записей в его очередях. Вычисляемая (реальная) пауза читающего потока = 'пауза ожидания' - 'время обработки его цикла (новых коллбеков и кодов пользователя)', но >= 1 млс. В выложенном коде устанавливается разрешение таймера = 1 млс. (вместо 16,...). 5. Все основные характеристики очередей параметризированы. Функция запроса свойств очередей в выложенном коде не реализована из-за ее простоты. 6. Выложенный вариант обеспечивает, при его использовании, сохранения существующего API QLua.
Код
// ============== Вариант реализации очередей событий QUIK (Lock-free, ~300 строк) ============
// Параметр Q_MAX определяет количество очередей в схеме очередей. Схем обработки очередей можно
// создать несколько.
// Данные, передаваемые в информационных очередях, строки.
// 1. Формат записи данных - текст описания таблицы Lua:
// {name = <Имя коллбека>, tbl = {<Таблица для формирования вызова функции пользователя>}}
// После чтения выполнять десереализацию текста таблицы Lua и дальше обрабатывать с учетом значения name.
// name обеспечивает возможность группировка коллбеков по очередям.
// 2. Коллбеки можно, но необязательно, сгруппировать и группы распределить по очередям с учетом того,
// что при чтении обработка начинается с 0-й очереди. Параметры очередей size_max следует задать
// такими, чтобы в них помещались строки их данных.
// Для коллбеков пользовательских таблиц QUIK, наверное, имеет смысл использовать все таки отдельную
// очередь.
// 3. Параметр FILLING_THRESHOLD - порог заполненности очередей для выдачи предупреждения о высоком уровне
// заполненности очередей.
// 4. После первого цикла любой очереди, память ее созданных элементов переиспользуется
// (кроме чтения указателя нет затрат на управление ее памятью).
// 5. Если для читающего потока в какой то его очереди возникнет переполнение, то об этом
// выдается сообщение.
// ----------------------------------------------------------------------
// Результат теста (оценка эффективности реализации очередей)
// Писатель выполняет в цикле запись в 10 очередей. Циклов записи : 1000
// T - время выполнения потоков с учетом их пауз в млс.
//-->Писатель. Пауза: 1. T (млс.): 1977. Обработано записей: 10000. Время ЦП (млс.): 0.0686
//Читатель 3. Пауза: 50. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.3174
//Читатель 1. Пауза: 5. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.1746
//Читатель 4. Пауза: 100. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.2129
//Читатель 2. Пауза: 10. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.2194
// -------------------------------------------------------------------------------
// Краткое ТЗ на разработку очередей
// 1) Создаются несколько циклических информационных очередей с разными типами данных и общим объектом
// управления ими.
// 2) В очереди пишет данные один поток, а читают эти очереди несколько потоков.
// 3) Дополнительно создается специальная служебная циклическая очередь: битовые шкалы непустых информационных
// очередей, записываемых пишущим потоком, после записи в информационные очереди.
// 4) У читающих потоков должны быть свои указатели чтения циклических очередей и их локальные битовые шкалы
// непустых очередей, хранящиеся в их объекте доступа к информационным очередям.
// 5) Читающие потоки циклически, выполнив свои коды переходят в состояние ожидания сигнала появления данных
// в их очередях или истечения заданного интервала времени на их общем объекте ожидания.
// 6) При записи в любую очередь пишущий поток после записи в информационные очереди записывает битовые шкалы
// непустых очередей в служебную очередь шкал непустых очередей и активирует все читающие потоки.
// 7) Каждый читающий поток, при активации, читает все появившиеся записи в служебной очереди шкал непустых
// очередей и формирует свою локальную шкалу непустых очередей и далее, на ее основании, читает без
// синхронизации, непустые очереди с признаком : состояние прочитанных очередей в виде размера непрочитанных
// данных.
// 8) Реализовать эффективное управление памятью при передаче данных в этих очередях.
// C++20 (используется std::countr_zero) ---
#include <shlobj.h>
#include <windows.h>>
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <mutex>
//#include <condition_variable>
//#include <atomic>
//#include <bitset>
//#include <intrin.h>
#pragma comment(lib, "winmm.lib") // Установить точность таймера 1 мс
// Параметры тестирования очередей ---
const int N_MAX = 1000; // Количество циклов записи при тестировании
const INT64 Q_SIZE = 64; // Длина информационной очереди по умолчанию
const INT64 Q_SIZE_SL = 1024; // Длина служебной очереди шкал непустых информационных очередей
const int Q_MAX = 128; // Максимальное количество очередей
const int Количество_очередей = 120; // Количество используемых очередей ( <= Q_MAX)
const int LEN_STR_MAX = 4096; // Размер буфера потоков чтения (максимум size_max_str всех используемых очередей)
const double FILLING_THRESHOLD = 0.4; // Порог заполненности очередей для выдачи предупреждения об уровне заполненности очередей
// Системное время (синхронизированное). Точность 0,1 мкс.---
static INT64 T_OS_high_mls()
{
FILETIME lpSystemTimeAsFileTime;
GetSystemTimePreciseAsFileTime(&lpSystemTimeAsFileTime); // Системное время (синхронизированное)
INT64 tt = lpSystemTimeAsFileTime.dwHighDateTime;
tt = (tt << 32) + lpSystemTimeAsFileTime.dwLowDateTime;
return tt;
}
// Строка-объект информационной очереди (переиспользуемая) --
struct q_str {
INT64 tail = 0; // Указатель записи ---
INT64 size_max = 0; // Максимальный размер памяти строки (с учетом символа конца строки '\0')
INT64 size = 0; // Текущий размер памяти строки (с учетом символа конца строки '\0')
int n_q = 0; // Номер очереди
char* str = NULL; // Память строки (размер len_max) ----
int state = 0; // Состояние (не используется, но на всякий случай): 1 - занята пишущим потоком; 2 - в очереди.
};
// Параметры очереди, задаваемые при ее инициализации --
struct q_parm {
std::string name_Queue = ""; // имя очереди
INT64 q_size = 0; // длина очереди
INT64 size_max_str = 64; // максимальная память строки в информационной очереди
};
// Шаблонная циклическая очередь (Lock-free)
// Используется для информационных очередей и служебной
template<typename T>
class InfoQueue {
public:
std::vector<T> buffer;
INT64 tail = 0; // пишет только один писатель
std::string name_Queue = ""; // имя очереди (можно менять при начальной инициализации)
INT64 q_size = 64; // размер очереди (можно менять при начальной инициализации)
INT64 size_max_str = 64; // максимальная память строки в очереди (можно менять при начальной инициализации)
InfoQueue(INT64 s = Q_SIZE) : buffer(s), q_size(s) {}
// Изменение длины очереди ---
void q_size_set(int s) {
buffer.reserve(s); q_size = s;
}
// запись в очередь --
bool push(T val) {
if (tail < q_size)
buffer[tail % q_size] = val;
++tail;
return true;
}
// Количества доступных элементов для конкретного читателя
// local_head - указатель чтения (читающего потока)
// Если отрицательное значение, то: количество пропущенных при чтении записей очереди --
INT64 get_pending_count(INT64 local_head) const {
INT64 t = tail - local_head;
return (t <= q_size ? t : q_size - t);
}
};
// Копировать строку-объект в буфер читающего потока для последующей обработки ---
void q_str_copy(q_str* s, q_str* s1, InfoQueue<q_str*>* q)
{
//s1->size_max = s->size_max; // ! Задана при создании буфере --
s1->size = s->size;
s1->n_q = s->n_q;
if (s1->str == NULL) throw std::runtime_error("*** Ошибка: нет памяти для копирования");
// s1->size - размер памяти строки (с учетом символа конца строки '\0')
// Длина строки: (s1->size - 1)
memcpy(s1->str, s->str, s1->size); s1->tail = s->tail; // ! Обязательно в таком порядке --
s1->state = s->state;
// Дополнительный контроль переполнение очереди (после копирования в буфер потока чтения)
if ((q->tail - s1->tail) > q->q_size) std::cout << " *** Ошибка переполнение очереди (при копировании) " << s1->n_q << "\n";
}
// Информационные очереди событий --
struct DataCluster {
// Деструктор (! выход из блока кода, в котором определен DataCluster
// только при завершения потоков, использующих его) --
~DataCluster() {
for (int i = 0; i < Q_MAX; ++i) {
INT64 size = q[i].buffer.size();
INT64 N = (q[i].tail >= size) ? size : q[i].tail; // Буфер может быть неполным ---
for (int j = 0; j < N; ++j) {
q_str* q_s = q[i].buffer[1];
delete[]q_s->str;
delete q_s;
}
}
}
int count_q = 64; // Количество используемых информационных очередей (<= Q_MAX) --
// При количестве очередей > 64 один бит шкалы непустых очередей соотносится
// к нескольким ((count_q - 1) / 64 + 1) последовательным очередям, в которых,
// возможно, есть записи.
UINT32 Queue_grouping = 1;
InfoQueue<q_str*> q[Q_MAX];
// Задать количество используемых очередей и их параметры
void set_parm_q(int n, q_parm* parm) {
if (n > 0 && n <= Q_MAX)
count_q = n;
else
throw std::runtime_error(" Количество используемых очередей > Q_MAX или <= 0"); /* #### ошибка*/
Queue_grouping = (count_q - 1) / 64 + 1;
if (parm != NULL)
for (int i = 0; i < count_q; ++i) {
q[i].name_Queue = parm->name_Queue;
q[i].q_size = parm->q_size;
q[i].q_size_set(parm->q_size);
q[i].size_max_str = parm->size_max_str;
}
}
// Функция формирования маски группировки очередей (многозначное отображение очередей на шкалу непустых)
uint64_t q_mask(int q_i) { return (uint64_t)1 << (q_i / Queue_grouping); }
// Запрос памяти потоком писателем под запись очереди --
// Запрос памяти системы и формирование строки-объекта -----
q_str* get_q_s(int q_i) {
INT64 size_max_str = q[q_i].size_max_str;
char* str = new char[size_max_str];
str[size_max_str - 1] = '\0';
q_str* q_s = new q_str;
q_s->str = str;
q_s->size_max = size_max_str;
q_s->size = 0;
q_s->n_q = q_i;
return q_s;
}
// Запрос строки-объекта потоком записи для формирования строки
q_str* get_str(int q_i) {
q_str* q_s;
INT64 tail = q[q_i].tail;
INT64 t = tail % q[q_i].q_size;
if (q[q_i].buffer[t] != NULL) { // Получение памяти из очереди --
q_s = q[q_i].buffer[t];
q_s->tail = tail;
}
else { // Запрос памяти у системы --
q_s = get_q_s(q_i);
q_s->tail = tail;
}
return q_s;
}
// Инициализация используемых очередей строками-объектами --
// #### Вряд ли стоит использовать ---
void initialization_q_str() {
if (count_q > 0) {
for (int i = 0; i < count_q; ++i) {
int size = q[i].q_size;
for (int j = 0; j < size; ++j) {
if (q[i].buffer[j] == NULL)
q[i].buffer[j] = get_q_s(i);
}
}
}
}
// Начальная инициализация указателей чтения в потоке-читателе на основе указателей
// записи в очереди (при начальной инициализации потока)
void init_access(INT64 *tail_v) {
if (count_q <= 0) throw std::runtime_error("*** Ошибка: не задано количество используемых очередей");
for (int i = 0; i < count_q; ++i) tail_v[i] = q[i].tail;
}
// запись в очередь --
bool push(int q_n, q_str* val) {
if (q_n >= count_q) throw std::runtime_error("*** Ошибка: нет такой очереди");
if (q[q_n].tail < q[q_n].q_size)
q[q_n].buffer[q[q_n].tail % q[q_n].q_size] = val;
++q[q_n].tail;
return true;
}
};
struct ReaderAccess; // Используется в Manager
// Объект управления потоками чтения
struct Manager {
INT64 signal_counter = 0; // Счетчик сигналов записи
std::mutex mtx; // Для синхронизации при ожидании
std::condition_variable cv; // Общий объект ожидания
InfoQueue<uint64_t> service_q{ Q_SIZE_SL }; // Служебная очередь битовых шкал
void wait(ReaderAccess* access, int tt); // !! Реализация после struct ReaderAccess (иначе недоступны поля)
void pulse(uint64_t mask) { service_q.push(mask); ++signal_counter; cv.notify_all(); }
//// ------- win32 API --
//HANDLE cv = CreateEvent(NULL, FALSE, FALSE, NULL); // Общий объект ожидания
//void pulse() { ++signal_counter; PulseEvent(cv); }
private:
INT64 tt = T_OS_high_mls(); // Для коррекции интервала ожидания --
};
// Объект доступа к очередям для читающих потоков --
struct ReaderAccess {
INT64 h[Q_MAX] = { 0 }; // Локальные указатели чтения
INT64 h_service = 0; // Указатель чтения служебной очереди
uint64_t local_bitmask = 0; // Локальная шкала непустых очередей потока-читателя
INT64 signal_counter = 0; // Локальный счетчик сигналов записи
// -------- Статистика очередей ---------
INT64 NN[Q_MAX] = { 0 }; // Коичество обращений к непустым очередям
double statistics[Q_MAX] = { 0 }; // Сумма относительной заполненности очередей (среднее = statistics [i]/ (NN[i]))
double q_filling_threshold = 0.4; // Порог заполненности очередей для выдачи предупреждения --
int period_agr = 9; // Период скользящей
// ------- Данные снимка состояния очередей потока чтения ---
int q_len[Q_MAX] = { 0 }; // Количество непустых непрочитанных записей в очередях
int q_n[Q_MAX] = { 0 }; // Номера непустых очередей
int q_p = 0; // Длина векторов q_n и q_len
// Снимок состояния очередей потока-читателя ("мгновенный", чтобы меньше было повторных чтений очередей).
void snapshot_queue_state(DataCluster& dc, Manager& mgr) {
q_p = 0; // Сброс длины векторов q_n и q_len
// Формирование шкалы непустых очередей потока-читателя
uint64_t mask_val;
INT64 count = mgr.service_q.get_pending_count(h_service);
if (count > 0) {
// Формируем общую локальную шкалу непустых очередей потока-читателя --
for (int i = 0; i < count; ++i) {
mask_val = mgr.service_q.buffer[h_service % mgr.service_q.q_size];
local_bitmask |= mask_val;
++h_service;
}
// Формирование снимка состояния очередей потока-читателя (на основе его шкалы) --
int pos_old = 0;
int Queue_grouping = dc.Queue_grouping;
while (local_bitmask) { // Эффективная обработка шкалы непустых очередей --
//// ! Способ определения позиции младшего разряда для старых стандартов (MSVC) -----
// unsigned long pos3 = 0;
// _BitScanForward64(&pos3, local_bitmask); // позиция младшего разряда с 1
//// -------------------------------------------------
int Сдвиг = std::countr_zero(local_bitmask); // C++20 (позиция младшего разряда с 1)
int pos = Сдвиг + pos_old;
pos_old = pos + 1;
pos *= Queue_grouping;
for (int i = 0; i < Queue_grouping; ++i) { // обработка групп очередей разряда шкалы --
INT64 q_count = dc.q[pos + i].get_pending_count(h[pos + i]);
if (q_count > 0) {
int p = q_p;
q_n[p] = pos + i;
q_len[p] = q_count;
// Статистика заполненности очереди (относительная, приблизительно средне скользящая)
if (++NN[pos + i] < period_agr) {
statistics[pos + i] += (double)q_count / (double)dc.q[pos + i].q_size;
}
else {
statistics[pos + i] += (double)q_count / (double)dc.q[pos + i].q_size - statistics[pos + i] / period_agr;
}
++q_p;
}
else
if (q_count < 0) std::cout << " *** Ошибка переполнение очереди (при чтении) q = " << pos + i << ": q_count = " << q_count << "\n"; // #### Ошибка переполнение очереди
}
local_bitmask >>= Сдвиг + 1;
}
}
else if (count < 0) std::cout << " *** -- Ошибка переполнение служебной очереди шкал --\n";
//std::cout << ". Шкала : " << std::bitset<64>(local_bitmask) << "\n";
}
// Чтение строки-объекта очереди --
// q_n - номер очереди
q_str* get_q_str(int q_n, DataCluster& dc) {
q_str* val = dc.q[q_n].buffer[h[q_n] % dc.q[q_n].q_size];
++h[q_n];
return val;
}
};
// ! Реализация члена Manager (расположена после struct ReaderAccess чтобы в методе были
// доступны ее поля)
// Пауза (подстраиваемая с учетом длительности цикла обработки)
void Manager::wait(ReaderAccess* access, int dt) {
std::unique_lock<std::mutex> lock(mtx);
if (access->signal_counter == signal_counter) {// не было сигнала записи при отсутствии ожидания в потоке чтения
// Вычисление коррекции паузы с учетом времени обработки цикла --
tt = (T_OS_high_mls() - tt) * 0.0001; // время обработки потока в цикле (млс.) --
tt = (dt - tt);
tt = tt > 0 ? tt : 1;
cv.wait_for(lock, std::chrono::milliseconds(tt)) == std::cv_status::timeout;
//WaitForSingleObject(cv, tt); // вместо предыдущего оператора с комментированием первого оператора метода
}
tt = T_OS_high_mls();
access->signal_counter = signal_counter; // Обрабатываемый сигнал записи в потоке чтения
}
// Тестирование очередей событий ------------------------------------------------
// Функция потока-писателя
// Формат записываемых строк общий для всех очередей: тексты таблиц Lua (сереализация).
// Читающий поток выполняет десереализацию полученных строк и запуск Lua-функций обработки событий
void producer(DataCluster& dc, Manager& mgr) {
std::cout << "Писатель. Запись выполняется в 10 очередей. Циклов записи: " << N_MAX << std::endl;
std::cout << "T - время выполнения потоков с учетом их пауз." << std::endl;
// -------------------------------------
INT64 tt = T_OS_high_mls(); // Для коррекции интервала ожидания --
INT64 TT = tt; // Для замера времени выполнения ---
INT64 TCP = 0; // Для замера времени ЦП ---
INT64 NRQ = 0; // Количество обработанных записей
int dt = 1;
for (int i = 1; i <= N_MAX; ++i) {
tt = T_OS_high_mls() - tt;
TCP += tt;
std::this_thread::sleep_for(std::chrono::milliseconds(dt)); // Пауза потока записи
//Sleep(dt); // И так можно
// %%%%%% Подготовка данных для записи в очереди в виде C-строк
// Запись строк в очереди --
uint64_t mask = 0; // шкала непустых очередей
//// Запись в разные очереди
// !! rec->size - размер памяти строки (с учетом символа конца строки '\0')
// Длина строки: (rec->size - 1)
const char* source = "{name = 'OnCleanUp',tbl = { ....] }"; int sz = strlen(source) + 1;
q_str* rec = dc.get_str(0); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(0, rec)) mask |= dc.q_mask(0); ++NRQ;
rec = dc.get_str(1); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(1, rec)) mask |= dc.q_mask(1); ++NRQ;
rec = dc.get_str(2); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(2, rec)) mask |= dc.q_mask(2); ++NRQ;
rec = dc.get_str(3); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(3, rec)) mask |= dc.q_mask(3); ++NRQ;
rec = dc.get_str(40); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(40, rec)) mask |= dc.q_mask(40); ++NRQ;
rec = dc.get_str(41); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(41, rec)) mask |= dc.q_mask(41); ++NRQ;
rec = dc.get_str(40); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(40, rec)) mask |= dc.q_mask(40); ++NRQ;
rec = dc.get_str(41); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(41, rec)) mask |= dc.q_mask(41); ++NRQ;
rec = dc.get_str(42); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(42, rec)) mask |= dc.q_mask(42); ++NRQ;
rec = dc.get_str(100); memcpy(rec->str, "шшшшшш", 7); rec->size = 7;
if (i >= N_MAX) rec->state= -1; // Признак читателям завершить поток ---
if (dc.push(100, rec)) mask |= dc.q_mask(100); ++NRQ; // "шшшшшш"
// std::cout << "[P] Данные записаны, маска: " << std::bitset<64>(mask) << std::endl; // ####
// Запись маски и активация потоков
mgr.pulse(mask);
tt = T_OS_high_mls();
}
std::stringstream out; // можно использовать <<
out << "---> Писатель. Пауза:" << dt
<< ". T (млс.):"
<< (int)((T_OS_high_mls() - TT) * 0.0001) << ". Обработано записей:" << NRQ
<< ". Время ЦП (млс.):" << TCP * 0.0001 << "\n";
std::string result = out.str(); // Получаем std::string
std::cout << result;
}
// Заглушка функции обработки событий.
// Выполняет десереализацию полученных строк и запуск Lua-функций обработки событий
// в зависимости от name (вида очереди)
// 1) !! Память q_s возвращать не надо
// 2) !! q_s->size - размер памяти строки (с учетом символа конца строки '\0')
// Длина строки: (q_s->size - 1)
void event_handling(q_str* q_s, std::string name) {
}
// Функция потоков-читателей --
// filling_threshold - порог заполненности очередей (<текущее количество записей очереди> / <размер очереди>)
// для выдачи предупреждений: "Превышен порог заполненности очередей"
// dt - пауза (млс.)
void consumer(int id, DataCluster& dc, Manager& mgr, double filling_threshold, int dt) {
ReaderAccess access;
access.q_filling_threshold = filling_threshold;
dc.init_access(access.h); // Инициализация access.h (указателей) потока чтения с учетом текущего состояния очередей --
// Буфер строки-объекта для дальнейшей обработки - q_str_c --
const char str_copy[LEN_STR_MAX] = {'\0'};
q_str q_str_c; q_str_c.str = (char*)str_copy; q_str_c.size_max = LEN_STR_MAX;
// -------------------------------------
INT64 tt = T_OS_high_mls(); // Для коррекции интервала ожидания --
INT64 TT = tt; // Для замера времени выполнения ---
INT64 TCP = 0; // Для замера времени ЦП ---
INT64 NRQ = 0; // Количество обработанных записей
while (true) {
tt = (T_OS_high_mls() - tt); // время обработки потока в цикле --
TCP += tt;
// Пауза (подстраиваемая с учетом длительности цикла обработки)
// с ожиданием сигнала записи в одну из информационных очередей --
mgr.wait(&access, dt);
tt = T_OS_high_mls();
// ------------------------------------------------------------------
// Получение снимка состояния очередей потока ("мгновенный",
// чтобы меньше было повторных чтений очередей).
access.snapshot_queue_state(dc, mgr);
// Обработка снимка состояния очередей потока:
// 1) access.q_p - вектор номеров непустых очередей
// 2) access.q_len - вектор длин необработанных очередей
// 3) access.q_p - длина векторов 1), 2)
int q_p = access.q_p;
// Просмотр и обработка вектора непустых очередей потока чтения
for (int i = 0; i < q_p; ++i) {
int q_n = access.q_n[i];
int count = access.q_len[i]; // Количество записей в очереди --
// ------- Сбор статистики ----
double statistics_agr = access.statistics[q_n] / (access.NN[q_n] < access.period_agr ? access.NN[q_n] : access.period_agr);
if (statistics_agr > access.q_filling_threshold) // #### Предупреждающее сообщение --
std::cout << "[Поток: " << id << "] Очередь "
<< q_n << ". Заполненность (средняя): " << statistics_agr
<< ". *** Превышен порог заполненности очередей: " << access.q_filling_threshold << "\n";
//// Отладочная печать
//if (count > 0 || count < 0)
// std::cout << "[Поток: " << id << "] Очередь "
// << q_n << ". Заполненность (средняя): " << statistics_agr
// << ". Количество: " << count << ". Ук чтения " << access.h[q_n] << "\n";
// --------------------
INT64 q_size = dc.q[q_n].q_size;
// Чтение и обработка очередей
while (count--) {
q_str* val_q = access.get_q_str(q_n, dc);
// Копировать запись очереди в буфер обработки q_str_c (типа q_str),
// созданный в стеке --
q_str_copy(val_q, &q_str_c, &dc.q[q_n]); val_q = NULL; // ! далее использовать только q_str_c
event_handling(&q_str_c, dc.q[q_n].name_Queue); // Вызов функции обработки записи очереди
++NRQ; // Количество обработанных записей
// Результат теста
// Завершение потока при тестировании --
if (q_str_c.state == -1) { // Признак читателям завершить поток ---
std::stringstream out; // можно использовать <<
out << "Читатель " << id << ". Пауза:" << dt
<< ". T (млс.):"
<< (int)((T_OS_high_mls() - TT) * 0.0001) << ". Обработано записей:" << NRQ
<< ". Время ЦП (млс.):" << TCP * 0.0001 << "\n";
std::string result = out.str(); // Получаем std::string
std::cout << result;
return;
}
// Отладочная печать
//const char* val_str = q_str_c.str;
//if (access.h[q_n] == N_MAX) {
// std::cout << " Поток " << id << ". Очередь " << q_n << " -> Val(str): " << val_str << ". Указатель чтения "
// << access.h[q_n] << ". Интервал (млс.) " << (T_OS_high_mls() - TT) * 0.0001 << "\n";
//}
// ----------------------------------------------------
//// %%%%%%%%%%%% Обработка остальных данных в потоке чтения --
}
}
}
}
// --------------------- Тест очередей -------------------------
int main() {
SetConsoleCP(1251); // Кодировка 1251 (! #include <shlobj.h>)
setlocale(LC_ALL, ""); // #### Русификация вывода ввода (работает) -----
timeBeginPeriod(1); // Установить точность 1 мс
// ==================================
DataCluster dc; // Очереди ---
// 1. Для каждой очереди должен быть задан размер памяти для максимальной строки, передаваемой
// в ней (с учетом признака конца строки '\0').
// 2. В вызове метода set_parm_q второй параметр: вектор типа q_parm * с параметрами очередей
// 3. Порядок элементов в векторе определяет приоритет обработки очередей (0 -> 1 ...)
q_parm* Параметры_очередей = NULL; // Параметры_очередей[Количество_очередей]
dc.set_parm_q(Количество_очередей, Параметры_очередей);
//dc.initialization_q_str(); // #### Вряд ли стоит использовать ---
Manager mgr; // Объект управления потоками чтения
// Читающие потоки --
// Последний параметр интервал ожидания (млс.)
std::thread c1(consumer, 1, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 5);
std::thread c2(consumer, 2, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 10);
std::thread c3(consumer, 3, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 50);
std::thread c4(consumer, 4, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 100);
// Пишущий поток --
std::thread p(producer, std::ref(dc), std::ref(mgr));
p.join(); c1.join(); c2.join(); c3.join(); c4.join();
timeEndPeriod(1); // Освободить точность
char nazv_sh[100];
std::cout << "Введите что-нибудь:" << std::endl;
std::cin >> nazv_sh;
return 0;
}