Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
14.03.2026 08:31:26
Цитата
Oleg Kuzembaev написал: Ваше пожелание зарегистрировано. Мы постараемся рассмотреть его и сообщить Вам результаты анализа. Впоследствии, по результатам анализа, будет приниматься решение о реализации пожелания в будущих версиях ПО.
Здравствуйте. Спасибо за ответ. ---------------- Вариант реализации очередей с добавлениями. 1. //#define win32_API // В реализации очередей используется win32_API 2. const int QUEUE_RECORDING_MODE = 0; // Режим записи в очереди: 0 - без сигналов (более эффективный чем с сигналами); 1 - с сигналами. Режим записи в очереди без сигналов добавлен т.к. выдача сигнала "тяжелая" системная функция, в ~100 раз по времени ЦП затратнее, чем одна запись в очередь. В этом режиме сигналы пишущим потоком не выдаются. В потоках чтения реализованы два цикла: 1) первый внешний цикл: пользовательский интервал ожидания (аналогичный как в существующем sleep); 2) второй внутри первого, опрашивающий и обрабатывающий очереди событий (функционально аналогичный выполнению существующих коллбеков). 3. В тесте очередей учтен случай подключение читающего потока (№ 5) к обработке очередей в произвольный момент времени.
Код
// ============== Вариант реализации очередей событий QUIK (Lock-free, ~300 строк) ============
// Параметр Q_MAX определяет количество очередей в схеме очередей. Схем обработки очередей можно
// создать несколько.
// Данные, передаваемые в информационных очередях, строки.
// 1. Формат записи данных - текст описания таблицы Lua:
// {name = <Имя коллбека>, tbl = {<Таблица для формирования вызова функции пользователя>}}
// После чтения выполнять десереализацию текста таблицы Lua и дальше обрабатывать с учетом значения name.
// name обеспечивает возможность группировка коллбеков по очередям.
// 2. Коллбеки можно, но необязательно, сгруппировать и группы распределить по очередям с учетом того,
// что при чтении обработка начинается с 0-й очереди. Параметры очередей size_max следует задать
// такими, чтобы в них помещались строки их данных.
// Для коллбеков пользовательских таблиц QUIK, наверное, имеет смысл использовать все таки отдельную
// очередь.
// 3. Параметр FILLING_THRESHOLD - порог заполненности очередей для выдачи предупреждения о высоком уровне
// заполненности очередей.
// 4. После первого цикла любой очереди, память ее созданных элементов переиспользуется
// (кроме чтения указателя нет затрат на управление ее памятью).
// 5. Если для читающего потока в какой то его очереди возникнет переполнение, то об этом
// выдается сообщение.
// 6. Реализованы режимы записи в очереди: 0 - без сигналов (более эффективный); 1 - с сигналами.
// ----------------------------------------------------------------------
// Результат теста (оценка эффективности реализации очередей)
// Режим записи в очереди: без сигналов.
// Писатель выполняет в цикле запись в 10 очередей. Циклов записи : 1000
// T - время выполнения потоков с учетом их пауз в млс.
//-->Писатель. Пауза: 1. T (млс.): 1977. Обработано записей: 10000. Время ЦП (млс.): 3.4494
//Читатель 3. Пауза: 50. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.3174
//Читатель 1. Пауза: 5. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.1746
//Читатель 4. Пауза: 100. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.2129
//Читатель 2. Пауза: 10. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.2194
// -------------------------------------------------------------------------------
// Краткое ТЗ на разработку очередей
// 1) Создаются несколько циклических информационных очередей с разными типами данных и общим объектом
// управления ими.
// 2) В очереди пишет данные один поток, а читают эти очереди несколько потоков.
// 3) Дополнительно создается специальная служебная циклическая очередь: битовые шкалы непустых информационных
// очередей, записываемых пишущим потоком, после записи в информационные очереди.
// 4) У читающих потоков должны быть свои указатели чтения циклических очередей и их локальные битовые шкалы
// непустых очередей, хранящиеся в их объекте доступа к информационным очередям.
// 5) Читающие потоки циклически, выполнив свои коды переходят в состояние ожидания сигнала появления данных
// в их очередях или истечения заданного интервала времени на их общем объекте ожидания.
// 6) При записи в любую очередь пишущий поток после записи в информационные очереди записывает битовые шкалы
// непустых очередей в служебную очередь шкал непустых очередей и [активирует в режиме 1] все читающие потоки.
// 7) Каждый читающий поток, при пробуждении, читает все появившиеся записи в служебной очереди шкал непустых
// очередей и формирует свою локальную шкалу непустых очередей и далее, на ее основании, читает без
// синхронизации, непустые очереди с признаком : состояние прочитанных очередей в виде размера непрочитанных
// данных.
// 8) Реализовать эффективное управление памятью при передаче данных в этих очередях.
// C++20 (используется std::countr_zero) ---
#include <shlobj.h>
#include <windows.h>>
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <mutex>
//#include <condition_variable>
//#include <atomic>
//#include <bitset>
//#include <intrin.h>
#pragma comment(lib, "winmm.lib") // Установить точность таймера 1 мс
// Параметры тестирования очередей ---
const int N_MAX = 2000; // Количество циклов записи при тестировании
//#define win32_API // В реализации очередей используется win32_API
const int QUEUE_RECORDING_MODE = 0; // Режим записи в очереди: 0 - без сигналов; 1 - с сигналами
const INT64 Q_SIZE = 64; // Длина информационной очереди по умолчанию
const INT64 Q_SIZE_SL = 256; // Длина служебной очереди шкал непустых информационных очередей
const int Q_MAX = 128; // Максимальное количество очередей
const int Количество_очередей = 120; // Количество используемых очередей ( <= Q_MAX)
const int LEN_STR_MAX = 4096; // Размер буфера потоков чтения (максимум size_max_str всех используемых очередей)
const double FILLING_THRESHOLD = 0.4; // Порог заполненности очередей для выдачи предупреждения об уровне заполненности очередей
// Системное время (синхронизированное). Точность 0,1 мкс.---
static INT64 T_OS_high_mls()
{
FILETIME lpSystemTimeAsFileTime;
GetSystemTimePreciseAsFileTime(&lpSystemTimeAsFileTime); // Системное время (синхронизированное)
INT64 tt = lpSystemTimeAsFileTime.dwHighDateTime;
tt = (tt << 32) + lpSystemTimeAsFileTime.dwLowDateTime;
return tt;
}
// Строка-объект информационной очереди (переиспользуемая) --
struct q_str {
INT64 tail = 0; // Указатель записи ---
INT64 size_max = 0; // Максимальный размер памяти строки (с учетом символа конца строки '\0')
INT64 size = 0; // Текущий размер памяти строки (с учетом символа конца строки '\0')
int n_q = 0; // Номер очереди
char* str = NULL; // Память строки (размер size_max) ----
int state = 0; // Состояние (не используется, но на всякий случай): 1 - занята пишущим потоком; 2 - в очереди.
};
// Параметры очереди, задаваемые при ее инициализации --
struct q_parm {
std::string name_Queue = ""; // имя очереди
INT64 q_size = 64; // длина очереди
INT64 size_max_str = 64; // максимальная память строки в информационной очереди
};
// Шаблонная циклическая очередь (Lock-free)
// Используется для служебной и информационных очередей
template<typename T>
class InfoQueue {
public:
std::vector<T> buffer;
INT64 tail = 0; // пишет только один писатель
std::string name_Queue = ""; // имя очереди (можно менять при начальной инициализации)
INT64 q_size = 64; // размер очереди (можно менять при начальной инициализации)
INT64 size_max_str = 64; // максимальная память строки в очереди (можно менять при начальной инициализации)
InfoQueue(INT64 s = Q_SIZE) : buffer(s), q_size(s) {}
// Изменение длины очереди ---
void q_size_set(int s) {
buffer.reserve(s); q_size = s;
}
// запись в очередь --
bool push(T val) {
buffer[tail++ % q_size] = val;
return true;
}
// Количества доступных элементов очереди для конкретного читателя
// local_head - указатель чтения (читающего потока)
// Если отрицательное значение, то: количество пропущенных при чтении записей очереди --
INT64 get_pending_count(INT64 local_head) const {
INT64 t = tail - local_head;
return (t <= q_size ? t : q_size - t);
}
};
// Копировать строку-объект в буфер читающего потока для последующей обработки ---
void q_str_copy(q_str* s, q_str* s1, InfoQueue<q_str*>* q)
{
//s1->size_max = s->size_max; // ! Задана при создании буфере --
s1->size = s->size;
s1->n_q = s->n_q;
if (s1->str == NULL) throw std::runtime_error("*** Ошибка: нет памяти для копирования");
// s1->size - размер памяти строки (с учетом символа конца строки '\0')
// Длина строки: (s1->size - 1)
memcpy(s1->str, s->str, s1->size); s1->tail = s->tail; // ! Обязательно в таком порядке --
s1->state = s->state;
// Дополнительный контроль переполнение очереди (после копирования в буфер потока чтения)
if ((q->tail - s1->tail) > q->q_size) std::cout << " *** Ошибка переполнение очереди (при копировании) " << s1->n_q << "\n";
}
// Объявления (определяются позже)
struct ReaderAccess;
struct Manager;
//-------------------------------
// Информационные очереди событий --
struct DataCluster {
// Деструктор (! выход из блока кода, в котором определен DataCluster
// только при завершения потоков, использующих его) --
~DataCluster() {
for (int i = 0; i < Q_MAX; ++i) {
INT64 size = q[i].buffer.size();
INT64 N = (q[i].tail >= size) ? size : q[i].tail; // Буфер может быть неполным ---
for (int j = 0; j < N; ++j) {
q_str* q_s = q[i].buffer[1];
delete[]q_s->str;
delete q_s;
}
}
}
int count_q = 64; // Количество используемых информационных очередей (<= Q_MAX) --
// При количестве очередей > 64 один бит шкалы непустых очередей соотносится
// к нескольким ((count_q - 1) / 64 + 1) последовательным очередям, в которых,
// возможно, есть записи.
UINT32 Queue_grouping = 1;
InfoQueue<q_str*> q[Q_MAX];
// Задать количество используемых очередей и их параметры
void set_parm_q(int n, q_parm* parm) {
if (n > 0 && n <= Q_MAX)
count_q = n;
else
throw std::runtime_error(" Количество используемых очередей > Q_MAX или <= 0"); /* #### ошибка*/
Queue_grouping = (count_q - 1) / 64 + 1;
if (parm != NULL)
for (int i = 0; i < count_q; ++i) {
q[i].name_Queue = parm->name_Queue;
q[i].q_size = parm->q_size;
q[i].q_size_set(parm->q_size);
q[i].size_max_str = parm->size_max_str;
}
}
// Функция формирования маски группировки очередей (многозначное отображение очередей на шкалу непустых)
uint64_t q_mask(int q_i) { return (uint64_t)1 << (q_i / Queue_grouping); }
// Запрос памяти потоком писателем под запись очереди --
// Запрос памяти системы и формирование строки-объекта -----
q_str* get_q_s(int q_i) {
INT64 size_max_str = q[q_i].size_max_str;
char* str = new char[size_max_str];
str[size_max_str - 1] = '\0';
q_str* q_s = new q_str;
q_s->str = str;
q_s->size_max = size_max_str;
q_s->size = 0;
q_s->n_q = q_i;
return q_s;
}
// Запрос строки-объекта потоком записи для формирования строки
q_str* get_str(int q_i) {
q_str* q_s;
INT64 tail = q[q_i].tail;
INT64 t = tail % q[q_i].q_size;
if (q[q_i].buffer[t] != NULL) { // Получение памяти из очереди --
q_s = q[q_i].buffer[t];
q_s->tail = tail;
}
else { // Запрос памяти у системы --
q_s = get_q_s(q_i);
q_s->tail = tail;
}
return q_s;
}
// Инициализация используемых очередей строками-объектами --
// #### Вряд ли стоит использовать ---
void initialization_q_str() {
if (count_q > 0) {
for (int i = 0; i < count_q; ++i) {
int size = q[i].q_size;
for (int j = 0; j < size; ++j) {
if (q[i].buffer[j] == NULL)
q[i].buffer[j] = get_q_s(i);
}
}
}
}
// Начальная инициализация указателей чтения в потоке-читателе на основе указателей
// записи в очереди (при начальной инициализации потока)
// Объявление.
void init_access(ReaderAccess* access, Manager* mgr, DataCluster* dc);
// запись в очередь --
bool push(int q_n, q_str* val) {
if (q_n < 0 || q_n >= count_q) throw std::runtime_error("*** Ошибка: нет такой очереди");
q[q_n].buffer[q[q_n].tail++ % q[q_n].q_size] = val;
return true;
}
};
//struct ReaderAccess; // Используется в Manager
#ifndef win32_API
// Объект управления потоками чтения
struct Manager {
INT64 signal_counter = 0; // Счетчик сигналов записи
int Выдавать_сигналы = 0; // 0 - без выдачи сигнала; 1 - выдача сигнала
InfoQueue<uint64_t> service_q{ Q_SIZE_SL }; // Служебная очередь битовых шкал
//--------- condition_variable --
std::mutex mtx; // Для синхронизации при ожидании
std::condition_variable cv; // Общий объект ожидания
void pulse(uint64_t mask) {
service_q.push(mask); ++signal_counter;
if (Выдавать_сигналы == 1)
cv.notify_all();
}
void wait(ReaderAccess* access, int tt); // !! Реализация после struct ReaderAccess (иначе недоступны поля)
};
#else // !win32_API
// Вариант 2. #### Этот вариант работает, но предыдущий лучше --
struct Manager {
INT64 signal_counter = 0; // Счетчик сигналов записи
int Выдавать_сигналы = 0; // 0 - без выдачи сигнала; 1 - выдача сигнала
InfoQueue<uint64_t> service_q{ Q_SIZE_SL }; // Служебная очередь битовых шкал
//--------- condition_variable --
CRITICAL_SECTION cs;
CONDITION_VARIABLE cv;
Manager() {
InitializeCriticalSection(&cs);
InitializeConditionVariable(&cv);
}
bool dataReady = false;
void pulse(uint64_t mask) {
service_q.push(mask);
++signal_counter;
EnterCriticalSection(&cs); // #### ?
if (Выдавать_сигналы == 1) {
dataReady = true;
WakeAllConditionVariable(&cv); // Будим тех, кто еще спит
}
LeaveCriticalSection(&cs); // #### ?
}
void wait(ReaderAccess* access, int tt); // !! Реализация после struct ReaderAccess (иначе недоступны поля)
};
#endif // !win32_API
// Объект доступа к очередям для читающих потоков --
struct ReaderAccess {
INT64 h[Q_MAX] = { 0 }; // Локальные указатели чтения очередей
INT64 h_service = 0; // Указатель чтения служебной очереди
uint64_t local_bitmask = 0; // Локальная шкала непустых очередей потока-читателя
INT64 signal_counter = 0; // Локальный счетчик сигналов записи
// -------- Статистика очередей ---------
INT64 NN[Q_MAX] = { 0 }; // Коичество обращений к непустым очередям
double statistics[Q_MAX] = { 0 }; // Сумма относительной заполненности очередей (среднее = statistics [i]/ (NN[i]))
double q_filling_threshold = 0.4; // Порог заполненности очередей для выдачи предупреждения --
int period_agr = 9; // Период скользящей
// ------- Данные снимка состояния очередей потока чтения ---
int q_len[Q_MAX] = { 0 }; // Количество непустых непрочитанных записей в очередях
int q_n[Q_MAX] = { 0 }; // Номера непустых очередей
int q_p = 0; // Длина векторов q_n и q_len
// Снимок состояния очередей потока-читателя ("мгновенный", чтобы меньше было повторных чтений очередей).
void snapshot_queue_state(DataCluster& dc, Manager& mgr) {
q_p = 0; // Сброс длины векторов q_n и q_len
// Формирование шкалы непустых очередей потока-читателя и снимка состояния очередей потока-читателя
INT64 count = mgr.service_q.get_pending_count(h_service);
if (count > 0) {
// Формируем общую локальную шкалу непустых очередей потока-читателя --
local_bitmask = 0;
uint64_t mask_val;
for (int i = 0; i < count; ++i) {
mask_val = mgr.service_q.buffer[h_service % mgr.service_q.q_size];
local_bitmask |= mask_val;
++h_service;
}
// Формирование снимка состояния очередей потока-читателя (на основе его шкалы) --
int pos_old = 0;
int Queue_grouping = dc.Queue_grouping;
while (local_bitmask) { // Эффективная обработка шкалы непустых очередей --
//// ! Способ определения позиции младшего разряда для старых стандартов (MSVC) -----
// unsigned long pos3 = 0;
// _BitScanForward64(&pos3, local_bitmask); // позиция младшего разряда с 1
//// -------------------------------------------------
int Сдвиг = std::countr_zero(local_bitmask); // C++20 (позиция младшего разряда с 1)
int pos = Сдвиг + pos_old;
pos_old = pos + 1;
pos *= Queue_grouping;
for (int i = 0; i < Queue_grouping; ++i) { // обработка групп очередей разряда шкалы --
INT64 q_count = dc.q[pos + i].get_pending_count(h[pos + i]);
if (q_count > 0) {
int p = q_p;
q_n[p] = pos + i;
q_len[p] = q_count;
// Статистика заполненности очереди (относительная, приблизительно средне скользящая)
if (++NN[pos + i] < period_agr) {
statistics[pos + i] += (double)q_count / (double)dc.q[pos + i].q_size;
}
else {
statistics[pos + i] += (double)q_count / (double)dc.q[pos + i].q_size - statistics[pos + i] / period_agr;
}
// --------------------------------------------------------------------------------
++q_p;
}
else
if (q_count < 0) std::cout << " *** Ошибка переполнение очереди (при чтении) q = " << pos + i << ": q_count = " << q_count << "\n"; // #### Ошибка переполнение очереди
}
local_bitmask >>= Сдвиг + 1;
}
}
else if (count < 0) std::cout << " *** -- Ошибка переполнение служебной очереди шкал --\n";
//std::cout << ". Шкала : " << std::bitset<64>(local_bitmask) << "\n";
}
// Чтение строки-объекта очереди --
// q_n - номер очереди
q_str* get_q_str(int q_n, DataCluster& dc) {
q_str* val = dc.q[q_n].buffer[h[q_n] % dc.q[q_n].q_size];
++h[q_n];
return val;
}
};
// ! Реализация члена DataCluster (расположена после struct ReaderAccess, Manager и DataCluster
// чтобы в методе были доступны их поля)
// Начальная инициализация указателей чтения в потоке-читателе на основе указателей
// записи в очереди (при начальной инициализации потока)
void DataCluster::init_access(ReaderAccess* access, Manager* mgr, DataCluster* dc) {
int count_q = dc->count_q;
if (count_q <= 0) throw std::runtime_error("*** Ошибка: не задано количество используемых очередей");
// Получение указателя служебной очереди
access->h_service = mgr->service_q.tail;
// Получение указателей информационных очередей
for (int i = 0; i < count_q; ++i) access->h[i] = q[i].tail;
}
// ! Реализация члена Manager (расположена после struct ReaderAccess чтобы в методе были
// доступны ее поля)
// Пауза
#ifndef win32_API
void Manager::wait(ReaderAccess* access, int dt) {
if (access->signal_counter == signal_counter) {// не было сигнала записи при отсутствии ожидания в потоке чтения
std::unique_lock<std::mutex> lock(mtx);
cv.wait_for(lock, std::chrono::milliseconds(2)) == std::cv_status::timeout;
}
access->signal_counter = signal_counter; // Обрабатываемый сигнал записи в потоке чтения
}
#else // !win32_API
// Вариант 2. #### -------
void Manager::wait(ReaderAccess* access, int dt) {
if (access->signal_counter == signal_counter) {// не было сигнала записи при отсутствии ожидания в потоке чтения
EnterCriticalSection(&cs);
// Пытаемся дождаться условия в течение заданного timeout
if (!dataReady) {
if (!SleepConditionVariableCS(&cv, &cs, 2)) {
if (GetLastError() == ERROR_TIMEOUT) {
//std::cout << "Thread [" << tt << "ms] TIMED OUT!" << std::endl;
}
}
else {
//std::cout << "Thread [" << tt << "ms] GOT SIGNAL!" << std::endl;
}
dataReady = false;
}
LeaveCriticalSection(&cs);
}
access->signal_counter = signal_counter; // Обрабатываемый сигнал записи в потоке чтения
}
#endif // !win32_API
// Тестирование очередей событий ------------------------------------------------
// Формат записываемых строк общий для всех очередей: тексты таблиц Lua (сереализация).
// Читающий поток выполняет десереализацию полученных строк и запуск Lua-функций обработки событий.
// ----------------------------------------------------
// Функция потока-писателя
void producer(DataCluster& dc, Manager& mgr) {
std::cout << "Писатель. Запись выполняется в 10 очередей. Циклов записи: " << N_MAX << std::endl;
std::cout << "T - время выполнения потоков с учетом их пауз." << std::endl;
// -------------------------------------
INT64 tt = T_OS_high_mls(); // Для подсчета времени ЦП --
INT64 TT = tt; // Для замера времени выполнения ---
INT64 TCP = 0; // Время ЦП ---
INT64 NRQ = 0; // Количество обработанных записей
int dt = 1;
int q_n_test = 0;
for (int i = 1; i <= N_MAX; ++i) {
tt = T_OS_high_mls() - tt;
TCP += tt;
//std::this_thread::sleep_for(std::chrono::milliseconds(dt)); // Пауза потока записи
Sleep(dt); // И так можно
tt = T_OS_high_mls();
// %%%%%% Подготовка данных для записи в очереди в виде C-строк
// Запись строк в очереди --
uint64_t mask = 0; // шкала непустых очередей
// Запись в разные очереди
// !! rec->size - размер памяти строки (с учетом символа конца строки '\0')
// Длина строки: (rec->size - 1)
const char* source = "{name = 'OnCleanUp',tbl = { ....] }";
int sz = strlen(source) + 1;
q_n_test %= 2;
int q_n_test1 = q_n_test * 4; // Для записи в разные очереди
q_str* rec = dc.get_str(0 + q_n_test1); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(0 + q_n_test1, rec)) mask |= dc.q_mask(0 + q_n_test1); ++NRQ;
rec = dc.get_str(1 + q_n_test1); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(1 + q_n_test1, rec)) mask |= dc.q_mask(1 + q_n_test1); ++NRQ;
rec = dc.get_str(2 + q_n_test1); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(2 + q_n_test1, rec)) mask |= dc.q_mask(2 + q_n_test1); ++NRQ;
rec = dc.get_str(3); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(3 + q_n_test1, rec)) mask |= dc.q_mask(3 + q_n_test1); ++NRQ;
++q_n_test;
rec = dc.get_str(40); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(40, rec)) mask |= dc.q_mask(40); ++NRQ;
rec = dc.get_str(41); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(41, rec)) mask |= dc.q_mask(41); ++NRQ;
rec = dc.get_str(40); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(40, rec)) mask |= dc.q_mask(40); ++NRQ;
rec = dc.get_str(41); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(41, rec)) mask |= dc.q_mask(41); ++NRQ;
rec = dc.get_str(42); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(42, rec)) mask |= dc.q_mask(42); ++NRQ;
rec = dc.get_str(100); memcpy(rec->str, "шшшшшш", 7); rec->size = 7;
if (i >= N_MAX) rec->state= -1; // Признак читателям завершить поток ---
if (dc.push(100, rec)) mask |= dc.q_mask(100); ++NRQ; // "шшшшшш"
// std::cout << "[P] Данные записаны, маска: " << std::bitset<64>(mask) << std::endl; // ####
// Запись маски и активация потоков
mgr.pulse(mask);
}
std::stringstream out; // можно использовать <<
out << "---> Писатель. Пауза:" << dt
<< ". T (млс.):"
<< (int)((T_OS_high_mls() - TT) * 0.0001) << ". Обработано записей:" << NRQ
<< ". Время ЦП (млс.):" << TCP * 0.0001 << "\n";
std::string result = out.str(); // Получаем std::string
std::cout << result;
}
// Заглушка функции обработки событий --
// Выполняет десереализацию полученных строк и запуск Lua-функций обработки событий
// в зависимости от name (вида очереди)
// 1) !! Память q_s возвращать не надо
// 2) !! q_s->size - размер памяти строки (с учетом символа конца строки '\0')
// Длина строки: (q_s->size - 1)
void event_handling(q_str* q_s, std::string name) {
}
// Функция потоков-читателей --
// filling_threshold - порог заполненности очередей (<текущее количество записей очереди> / <размер очереди>)
// для выдачи предупреждений: "Превышен порог заполненности очередей"
// dt - пауза (млс.)
void consumer(int id, DataCluster& dc, Manager& mgr, double filling_threshold, int dt) {
ReaderAccess access;
access.q_filling_threshold = filling_threshold;
// Инициализация access.h и access->h_service с учетом текущего состояния очередей
dc.init_access(&access, &mgr, &dc); // "Встраивание" потока чтения в очереди
// Буфер строки-объекта для дальнейшей обработки - q_str_c --
const char str_copy[LEN_STR_MAX] = {'\0'};
q_str q_str_c; q_str_c.str = (char*)str_copy; q_str_c.size_max = LEN_STR_MAX;
// -------------------------------------
INT64 tt = T_OS_high_mls();
INT64 TT = tt; // Для замера времени выполнения ---
dt = dt > 1 ? dt : 1;
INT64 DT = dt * 10000;
INT64 DTT = 0;
INT64 TCP = 0; // Для замера времени ЦП обработки очередей
INT64 NRQ = 0; // Количество обработанных записей
INT64 NACT = 0; // Количество активаций потока чтения
while (true) { // Пользовательский цикл обработки
tt = T_OS_high_mls();
DTT = T_OS_high_mls() + DT;
while ((T_OS_high_mls() - DTT) < 0) { // Цикл обработки очерелей событий
tt = (T_OS_high_mls() - tt); // время обработки потока в цикле --
TCP += tt;
// Пауза
mgr.wait(&access, dt);
tt = T_OS_high_mls();
++NACT;
// ------------------------------------------------------------------
// Получение снимка состояния очередей потока
access.snapshot_queue_state(dc, mgr);
// Обработка снимка состояния очередей потока:
// 1) access.q_p - вектор номеров непустых очередей
// 2) access.q_len - вектор длин необработанных очередей
// 3) access.q_p - длина векторов 1), 2)
int q_p = access.q_p;
// Просмотр и обработка вектора непустых очередей потока чтения
for (int i = 0; i < q_p; ++i) {
int q_n = access.q_n[i];
int count = access.q_len[i]; // Количество записей в очереди --
// ------- Сбор статистики ----
double statistics_agr = access.statistics[q_n] / (access.NN[q_n] < access.period_agr ? access.NN[q_n] : access.period_agr);
if (statistics_agr > access.q_filling_threshold) // #### Предупреждающее сообщение --
std::cout << "[Поток: " << id << "] Очередь "
<< q_n << ". Заполненность (средняя): " << statistics_agr
<< ". *** Превышен порог заполненности очередей: " << access.q_filling_threshold << "\n";
//// Отладочная печать
//if (count > 0 || count < 0)
// std::cout << "[Поток: " << id << "] Очередь "
// << q_n << ". Заполненность (средняя): " << statistics_agr
// << ". Количество: " << count << ". Ук чтения " << access.h[q_n] << "\n";
// --------------------
INT64 q_size = dc.q[q_n].q_size;
// Чтение и обработка очередей
while (count--) {
q_str* val_q = access.get_q_str(q_n, dc);
// Копировать запись очереди в буфер обработки q_str_c (типа q_str),
// созданный в стеке --
q_str_copy(val_q, &q_str_c, &dc.q[q_n]); val_q = NULL; // ! далее использовать только q_str_c
event_handling(&q_str_c, dc.q[q_n].name_Queue); // Вызов функции обработки записи очереди
++NRQ; // Количество обработанных записей
// Результат теста
// Завершение потока при тестировании --
if (q_str_c.state == -1) { // Признак читателям завершить поток ---
std::stringstream out; // можно использовать <<
out << "Читатель " << id << ". Пауза:" << dt
<< ". T (млс.):"
<< (int)((T_OS_high_mls() - TT) * 0.0001) << ". Обработано записей:" << NRQ
<< ". Циклов обработки:" << NACT
<< ". Время ЦП (млс.):" << TCP * 0.0001 << "\n";
std::string result = out.str(); // Получаем std::string
std::cout << result;
return;
}
// Отладочная печать
//const char* val_str = q_str_c.str;
//if (access.h[q_n] == N_MAX) {
// std::cout << " Поток " << id << ". Очередь " << q_n << " -> Val(str): " << val_str << ". Указатель чтения "
// << access.h[q_n] << ". Интервал (млс.) " << (T_OS_high_mls() - TT) * 0.0001 << "\n";
//}
}
}
}
//// %%%%%%%%%%%% Обработка остальных кодов (пользователя) в потоке чтения --
//if (dt > 50)
// std::cout << " Поток " << "\n";
}
}
// --------------------- Тест очередей -------------------------
int main() {
SetConsoleCP(1251); // Кодировка 1251 (! #include <shlobj.h>)
setlocale(LC_ALL, ""); // #### Русификация вывода ввода (работает) -----
timeBeginPeriod(1); // Установить точность 1 мс
// ==================================
#ifdef win32_API
std::cout << "В реализации используется win32_API" << std::endl;
#endif
DataCluster dc; // Очереди ---
// 1. Для каждой очереди должен быть задан размер памяти для максимальной строки, передаваемой
// в ней (с учетом признака конца строки '\0').
// 2. В вызове метода set_parm_q второй параметр: вектор типа q_parm * с параметрами очередей
// 3. Порядок элементов в векторе определяет приоритет обработки очередей (0 -> 1 ...)
q_parm* Параметры_очередей = NULL; // Параметры_очередей[Количество_очередей]
dc.set_parm_q(Количество_очередей, Параметры_очередей);
//dc.initialization_q_str(); // #### Вряд ли стоит использовать ---
Manager mgr; // Объект управления потоками чтения
mgr.Выдавать_сигналы = QUEUE_RECORDING_MODE; // Режим записи в очереди: 0 - без сигналов; 1 - с сигналами
std::cout << "Режим записи в очереди: " << (mgr.Выдавать_сигналы == 0 ? "без сигналов" : "с сигналами") << std::endl;
// Читающие потоки --
// Последний параметр интервал ожидания (млс.)
std::thread c1(consumer, 1, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 5);
std::thread c2(consumer, 2, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 10);
std::thread c3(consumer, 3, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 50);
std::thread c4(consumer, 4, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 100);
// Пишущий поток --
std::thread p(producer, std::ref(dc), std::ref(mgr));
// подключение читающего потока к обработке очередей в произвольный момент времени.
const int Задержка_запуска_читателя = 700;
std::cout << "Задержка_запуска_читателя 5 на " << Задержка_запуска_читателя << " млс." << std::endl;
Sleep(Задержка_запуска_читателя);
std::thread c5(consumer, 5, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 1);
p.join(); c1.join(); c2.join(); c3.join(); c4.join(); c5.join();
timeEndPeriod(1); // Освободить точность 1 млс.
char nazv_sh[100];
std::cout << "Введите что-нибудь:" << std::endl;
std::cin >> nazv_sh;
return 0;
}
Превышено максимальное количество транзакций 50 в интервале 1 сек, Некорректная работа sendTransaction
Пользователь
Сообщений: Регистрация: 12.05.2020
13.03.2026 11:00:57
Цитата
Михаил Понамаренко написал: При ошибке "Превышено максимальное количество транзакций 50 в интервале 1 сек"Не возвращается описание ошибки через sendTransaction.
Я задавал такой же вопрос. Ссылка на диалог:
Гарантируется ли вызов колбэка при получении Квиком новых данных?, Вопросы разработчикам QUIK
Пользователь
Сообщений: Регистрация: 12.05.2020
11.03.2026 18:54:24
Цитата
paluke написал: То есть поставили вы в коллбеке sleep и данные не принимаются, новых событий нет.
И торги на бирже остановлены ?
Цитата
paluke написал: Так коллбеки в том же потоке, что и прием данных
Вы это проверяли?
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
11.03.2026 08:57:19
В классе InfoQueue, в методе
Цитата
TGB написал: // запись в очередь -- bool push(T val) { if (tail < q_size) buffer[tail % q_size] = val; ++tail; return true; }
Ошибка: строка с if лишняя. Правильно:
Код
// запись в очередь --
bool push(T val) {
buffer[tail % q_size] = val;
++tail;
return true;
}
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
09.03.2026 21:59:17
В соответствии с ТЗ (п.1,2, 4-8) ниже выложен код на C++ варианта реализации (Lock-free) циклических очередей событий QUIK. При использовании этого варианта в реализации п. 9-11 необходимости нет. 1. Блокировок между пишущим основным потоком QUIK и читающими потоками (main) нет. 2. Результата выполненных тестов эффективности приведены в комментариях кодов. В тестах накладные расходы времени ЦП: 1) на запись в очередь одного сообщения пишущим потоком ~0.007 мкс. 2) на чтение одного сообщения читающим потоком ~0.13 мкс. 3. Порог заполненности очередей обеспечивает выдачу предупреждения о высоком уровне заполненности очередей для читающих потоков. Если для читающего потока в какой то его очереди возникнет переполнение, то об этом выдается сообщение. 4. Независимо от задаваемой паузы ожидания в читающем потоке, он активируется всякий раз, при появлении новых записей в его очередях. Вычисляемая (реальная) пауза читающего потока = 'пауза ожидания' - 'время обработки его цикла (новых коллбеков и кодов пользователя)', но >= 1 млс. В выложенном коде устанавливается разрешение таймера = 1 млс. (вместо 16,...). 5. Все основные характеристики очередей параметризированы. Функция запроса свойств очередей в выложенном коде не реализована из-за ее простоты. 6. Выложенный вариант обеспечивает, при его использовании, сохранения существующего API QLua.
Код
// ============== Вариант реализации очередей событий QUIK (Lock-free, ~300 строк) ============
// Параметр Q_MAX определяет количество очередей в схеме очередей. Схем обработки очередей можно
// создать несколько.
// Данные, передаваемые в информационных очередях, строки.
// 1. Формат записи данных - текст описания таблицы Lua:
// {name = <Имя коллбека>, tbl = {<Таблица для формирования вызова функции пользователя>}}
// После чтения выполнять десереализацию текста таблицы Lua и дальше обрабатывать с учетом значения name.
// name обеспечивает возможность группировка коллбеков по очередям.
// 2. Коллбеки можно, но необязательно, сгруппировать и группы распределить по очередям с учетом того,
// что при чтении обработка начинается с 0-й очереди. Параметры очередей size_max следует задать
// такими, чтобы в них помещались строки их данных.
// Для коллбеков пользовательских таблиц QUIK, наверное, имеет смысл использовать все таки отдельную
// очередь.
// 3. Параметр FILLING_THRESHOLD - порог заполненности очередей для выдачи предупреждения о высоком уровне
// заполненности очередей.
// 4. После первого цикла любой очереди, память ее созданных элементов переиспользуется
// (кроме чтения указателя нет затрат на управление ее памятью).
// 5. Если для читающего потока в какой то его очереди возникнет переполнение, то об этом
// выдается сообщение.
// ----------------------------------------------------------------------
// Результат теста (оценка эффективности реализации очередей)
// Писатель выполняет в цикле запись в 10 очередей. Циклов записи : 1000
// T - время выполнения потоков с учетом их пауз в млс.
//-->Писатель. Пауза: 1. T (млс.): 1977. Обработано записей: 10000. Время ЦП (млс.): 0.0686
//Читатель 3. Пауза: 50. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.3174
//Читатель 1. Пауза: 5. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.1746
//Читатель 4. Пауза: 100. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.2129
//Читатель 2. Пауза: 10. T (млс.): 2023. Обработано записей: 10000. Время ЦП (млс.): 1.2194
// -------------------------------------------------------------------------------
// Краткое ТЗ на разработку очередей
// 1) Создаются несколько циклических информационных очередей с разными типами данных и общим объектом
// управления ими.
// 2) В очереди пишет данные один поток, а читают эти очереди несколько потоков.
// 3) Дополнительно создается специальная служебная циклическая очередь: битовые шкалы непустых информационных
// очередей, записываемых пишущим потоком, после записи в информационные очереди.
// 4) У читающих потоков должны быть свои указатели чтения циклических очередей и их локальные битовые шкалы
// непустых очередей, хранящиеся в их объекте доступа к информационным очередям.
// 5) Читающие потоки циклически, выполнив свои коды переходят в состояние ожидания сигнала появления данных
// в их очередях или истечения заданного интервала времени на их общем объекте ожидания.
// 6) При записи в любую очередь пишущий поток после записи в информационные очереди записывает битовые шкалы
// непустых очередей в служебную очередь шкал непустых очередей и активирует все читающие потоки.
// 7) Каждый читающий поток, при активации, читает все появившиеся записи в служебной очереди шкал непустых
// очередей и формирует свою локальную шкалу непустых очередей и далее, на ее основании, читает без
// синхронизации, непустые очереди с признаком : состояние прочитанных очередей в виде размера непрочитанных
// данных.
// 8) Реализовать эффективное управление памятью при передаче данных в этих очередях.
// C++20 (используется std::countr_zero) ---
#include <shlobj.h>
#include <windows.h>>
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <mutex>
//#include <condition_variable>
//#include <atomic>
//#include <bitset>
//#include <intrin.h>
#pragma comment(lib, "winmm.lib") // Установить точность таймера 1 мс
// Параметры тестирования очередей ---
const int N_MAX = 1000; // Количество циклов записи при тестировании
const INT64 Q_SIZE = 64; // Длина информационной очереди по умолчанию
const INT64 Q_SIZE_SL = 1024; // Длина служебной очереди шкал непустых информационных очередей
const int Q_MAX = 128; // Максимальное количество очередей
const int Количество_очередей = 120; // Количество используемых очередей ( <= Q_MAX)
const int LEN_STR_MAX = 4096; // Размер буфера потоков чтения (максимум size_max_str всех используемых очередей)
const double FILLING_THRESHOLD = 0.4; // Порог заполненности очередей для выдачи предупреждения об уровне заполненности очередей
// Системное время (синхронизированное). Точность 0,1 мкс.---
static INT64 T_OS_high_mls()
{
FILETIME lpSystemTimeAsFileTime;
GetSystemTimePreciseAsFileTime(&lpSystemTimeAsFileTime); // Системное время (синхронизированное)
INT64 tt = lpSystemTimeAsFileTime.dwHighDateTime;
tt = (tt << 32) + lpSystemTimeAsFileTime.dwLowDateTime;
return tt;
}
// Строка-объект информационной очереди (переиспользуемая) --
struct q_str {
INT64 tail = 0; // Указатель записи ---
INT64 size_max = 0; // Максимальный размер памяти строки (с учетом символа конца строки '\0')
INT64 size = 0; // Текущий размер памяти строки (с учетом символа конца строки '\0')
int n_q = 0; // Номер очереди
char* str = NULL; // Память строки (размер len_max) ----
int state = 0; // Состояние (не используется, но на всякий случай): 1 - занята пишущим потоком; 2 - в очереди.
};
// Параметры очереди, задаваемые при ее инициализации --
struct q_parm {
std::string name_Queue = ""; // имя очереди
INT64 q_size = 0; // длина очереди
INT64 size_max_str = 64; // максимальная память строки в информационной очереди
};
// Шаблонная циклическая очередь (Lock-free)
// Используется для информационных очередей и служебной
template<typename T>
class InfoQueue {
public:
std::vector<T> buffer;
INT64 tail = 0; // пишет только один писатель
std::string name_Queue = ""; // имя очереди (можно менять при начальной инициализации)
INT64 q_size = 64; // размер очереди (можно менять при начальной инициализации)
INT64 size_max_str = 64; // максимальная память строки в очереди (можно менять при начальной инициализации)
InfoQueue(INT64 s = Q_SIZE) : buffer(s), q_size(s) {}
// Изменение длины очереди ---
void q_size_set(int s) {
buffer.reserve(s); q_size = s;
}
// запись в очередь --
bool push(T val) {
if (tail < q_size)
buffer[tail % q_size] = val;
++tail;
return true;
}
// Количества доступных элементов для конкретного читателя
// local_head - указатель чтения (читающего потока)
// Если отрицательное значение, то: количество пропущенных при чтении записей очереди --
INT64 get_pending_count(INT64 local_head) const {
INT64 t = tail - local_head;
return (t <= q_size ? t : q_size - t);
}
};
// Копировать строку-объект в буфер читающего потока для последующей обработки ---
void q_str_copy(q_str* s, q_str* s1, InfoQueue<q_str*>* q)
{
//s1->size_max = s->size_max; // ! Задана при создании буфере --
s1->size = s->size;
s1->n_q = s->n_q;
if (s1->str == NULL) throw std::runtime_error("*** Ошибка: нет памяти для копирования");
// s1->size - размер памяти строки (с учетом символа конца строки '\0')
// Длина строки: (s1->size - 1)
memcpy(s1->str, s->str, s1->size); s1->tail = s->tail; // ! Обязательно в таком порядке --
s1->state = s->state;
// Дополнительный контроль переполнение очереди (после копирования в буфер потока чтения)
if ((q->tail - s1->tail) > q->q_size) std::cout << " *** Ошибка переполнение очереди (при копировании) " << s1->n_q << "\n";
}
// Информационные очереди событий --
struct DataCluster {
// Деструктор (! выход из блока кода, в котором определен DataCluster
// только при завершения потоков, использующих его) --
~DataCluster() {
for (int i = 0; i < Q_MAX; ++i) {
INT64 size = q[i].buffer.size();
INT64 N = (q[i].tail >= size) ? size : q[i].tail; // Буфер может быть неполным ---
for (int j = 0; j < N; ++j) {
q_str* q_s = q[i].buffer[1];
delete[]q_s->str;
delete q_s;
}
}
}
int count_q = 64; // Количество используемых информационных очередей (<= Q_MAX) --
// При количестве очередей > 64 один бит шкалы непустых очередей соотносится
// к нескольким ((count_q - 1) / 64 + 1) последовательным очередям, в которых,
// возможно, есть записи.
UINT32 Queue_grouping = 1;
InfoQueue<q_str*> q[Q_MAX];
// Задать количество используемых очередей и их параметры
void set_parm_q(int n, q_parm* parm) {
if (n > 0 && n <= Q_MAX)
count_q = n;
else
throw std::runtime_error(" Количество используемых очередей > Q_MAX или <= 0"); /* #### ошибка*/
Queue_grouping = (count_q - 1) / 64 + 1;
if (parm != NULL)
for (int i = 0; i < count_q; ++i) {
q[i].name_Queue = parm->name_Queue;
q[i].q_size = parm->q_size;
q[i].q_size_set(parm->q_size);
q[i].size_max_str = parm->size_max_str;
}
}
// Функция формирования маски группировки очередей (многозначное отображение очередей на шкалу непустых)
uint64_t q_mask(int q_i) { return (uint64_t)1 << (q_i / Queue_grouping); }
// Запрос памяти потоком писателем под запись очереди --
// Запрос памяти системы и формирование строки-объекта -----
q_str* get_q_s(int q_i) {
INT64 size_max_str = q[q_i].size_max_str;
char* str = new char[size_max_str];
str[size_max_str - 1] = '\0';
q_str* q_s = new q_str;
q_s->str = str;
q_s->size_max = size_max_str;
q_s->size = 0;
q_s->n_q = q_i;
return q_s;
}
// Запрос строки-объекта потоком записи для формирования строки
q_str* get_str(int q_i) {
q_str* q_s;
INT64 tail = q[q_i].tail;
INT64 t = tail % q[q_i].q_size;
if (q[q_i].buffer[t] != NULL) { // Получение памяти из очереди --
q_s = q[q_i].buffer[t];
q_s->tail = tail;
}
else { // Запрос памяти у системы --
q_s = get_q_s(q_i);
q_s->tail = tail;
}
return q_s;
}
// Инициализация используемых очередей строками-объектами --
// #### Вряд ли стоит использовать ---
void initialization_q_str() {
if (count_q > 0) {
for (int i = 0; i < count_q; ++i) {
int size = q[i].q_size;
for (int j = 0; j < size; ++j) {
if (q[i].buffer[j] == NULL)
q[i].buffer[j] = get_q_s(i);
}
}
}
}
// Начальная инициализация указателей чтения в потоке-читателе на основе указателей
// записи в очереди (при начальной инициализации потока)
void init_access(INT64 *tail_v) {
if (count_q <= 0) throw std::runtime_error("*** Ошибка: не задано количество используемых очередей");
for (int i = 0; i < count_q; ++i) tail_v[i] = q[i].tail;
}
// запись в очередь --
bool push(int q_n, q_str* val) {
if (q_n >= count_q) throw std::runtime_error("*** Ошибка: нет такой очереди");
if (q[q_n].tail < q[q_n].q_size)
q[q_n].buffer[q[q_n].tail % q[q_n].q_size] = val;
++q[q_n].tail;
return true;
}
};
struct ReaderAccess; // Используется в Manager
// Объект управления потоками чтения
struct Manager {
INT64 signal_counter = 0; // Счетчик сигналов записи
std::mutex mtx; // Для синхронизации при ожидании
std::condition_variable cv; // Общий объект ожидания
InfoQueue<uint64_t> service_q{ Q_SIZE_SL }; // Служебная очередь битовых шкал
void wait(ReaderAccess* access, int tt); // !! Реализация после struct ReaderAccess (иначе недоступны поля)
void pulse(uint64_t mask) { service_q.push(mask); ++signal_counter; cv.notify_all(); }
//// ------- win32 API --
//HANDLE cv = CreateEvent(NULL, FALSE, FALSE, NULL); // Общий объект ожидания
//void pulse() { ++signal_counter; PulseEvent(cv); }
private:
INT64 tt = T_OS_high_mls(); // Для коррекции интервала ожидания --
};
// Объект доступа к очередям для читающих потоков --
struct ReaderAccess {
INT64 h[Q_MAX] = { 0 }; // Локальные указатели чтения
INT64 h_service = 0; // Указатель чтения служебной очереди
uint64_t local_bitmask = 0; // Локальная шкала непустых очередей потока-читателя
INT64 signal_counter = 0; // Локальный счетчик сигналов записи
// -------- Статистика очередей ---------
INT64 NN[Q_MAX] = { 0 }; // Коичество обращений к непустым очередям
double statistics[Q_MAX] = { 0 }; // Сумма относительной заполненности очередей (среднее = statistics [i]/ (NN[i]))
double q_filling_threshold = 0.4; // Порог заполненности очередей для выдачи предупреждения --
int period_agr = 9; // Период скользящей
// ------- Данные снимка состояния очередей потока чтения ---
int q_len[Q_MAX] = { 0 }; // Количество непустых непрочитанных записей в очередях
int q_n[Q_MAX] = { 0 }; // Номера непустых очередей
int q_p = 0; // Длина векторов q_n и q_len
// Снимок состояния очередей потока-читателя ("мгновенный", чтобы меньше было повторных чтений очередей).
void snapshot_queue_state(DataCluster& dc, Manager& mgr) {
q_p = 0; // Сброс длины векторов q_n и q_len
// Формирование шкалы непустых очередей потока-читателя
uint64_t mask_val;
INT64 count = mgr.service_q.get_pending_count(h_service);
if (count > 0) {
// Формируем общую локальную шкалу непустых очередей потока-читателя --
for (int i = 0; i < count; ++i) {
mask_val = mgr.service_q.buffer[h_service % mgr.service_q.q_size];
local_bitmask |= mask_val;
++h_service;
}
// Формирование снимка состояния очередей потока-читателя (на основе его шкалы) --
int pos_old = 0;
int Queue_grouping = dc.Queue_grouping;
while (local_bitmask) { // Эффективная обработка шкалы непустых очередей --
//// ! Способ определения позиции младшего разряда для старых стандартов (MSVC) -----
// unsigned long pos3 = 0;
// _BitScanForward64(&pos3, local_bitmask); // позиция младшего разряда с 1
//// -------------------------------------------------
int Сдвиг = std::countr_zero(local_bitmask); // C++20 (позиция младшего разряда с 1)
int pos = Сдвиг + pos_old;
pos_old = pos + 1;
pos *= Queue_grouping;
for (int i = 0; i < Queue_grouping; ++i) { // обработка групп очередей разряда шкалы --
INT64 q_count = dc.q[pos + i].get_pending_count(h[pos + i]);
if (q_count > 0) {
int p = q_p;
q_n[p] = pos + i;
q_len[p] = q_count;
// Статистика заполненности очереди (относительная, приблизительно средне скользящая)
if (++NN[pos + i] < period_agr) {
statistics[pos + i] += (double)q_count / (double)dc.q[pos + i].q_size;
}
else {
statistics[pos + i] += (double)q_count / (double)dc.q[pos + i].q_size - statistics[pos + i] / period_agr;
}
++q_p;
}
else
if (q_count < 0) std::cout << " *** Ошибка переполнение очереди (при чтении) q = " << pos + i << ": q_count = " << q_count << "\n"; // #### Ошибка переполнение очереди
}
local_bitmask >>= Сдвиг + 1;
}
}
else if (count < 0) std::cout << " *** -- Ошибка переполнение служебной очереди шкал --\n";
//std::cout << ". Шкала : " << std::bitset<64>(local_bitmask) << "\n";
}
// Чтение строки-объекта очереди --
// q_n - номер очереди
q_str* get_q_str(int q_n, DataCluster& dc) {
q_str* val = dc.q[q_n].buffer[h[q_n] % dc.q[q_n].q_size];
++h[q_n];
return val;
}
};
// ! Реализация члена Manager (расположена после struct ReaderAccess чтобы в методе были
// доступны ее поля)
// Пауза (подстраиваемая с учетом длительности цикла обработки)
void Manager::wait(ReaderAccess* access, int dt) {
std::unique_lock<std::mutex> lock(mtx);
if (access->signal_counter == signal_counter) {// не было сигнала записи при отсутствии ожидания в потоке чтения
// Вычисление коррекции паузы с учетом времени обработки цикла --
tt = (T_OS_high_mls() - tt) * 0.0001; // время обработки потока в цикле (млс.) --
tt = (dt - tt);
tt = tt > 0 ? tt : 1;
cv.wait_for(lock, std::chrono::milliseconds(tt)) == std::cv_status::timeout;
//WaitForSingleObject(cv, tt); // вместо предыдущего оператора с комментированием первого оператора метода
}
tt = T_OS_high_mls();
access->signal_counter = signal_counter; // Обрабатываемый сигнал записи в потоке чтения
}
// Тестирование очередей событий ------------------------------------------------
// Функция потока-писателя
// Формат записываемых строк общий для всех очередей: тексты таблиц Lua (сереализация).
// Читающий поток выполняет десереализацию полученных строк и запуск Lua-функций обработки событий
void producer(DataCluster& dc, Manager& mgr) {
std::cout << "Писатель. Запись выполняется в 10 очередей. Циклов записи: " << N_MAX << std::endl;
std::cout << "T - время выполнения потоков с учетом их пауз." << std::endl;
// -------------------------------------
INT64 tt = T_OS_high_mls(); // Для коррекции интервала ожидания --
INT64 TT = tt; // Для замера времени выполнения ---
INT64 TCP = 0; // Для замера времени ЦП ---
INT64 NRQ = 0; // Количество обработанных записей
int dt = 1;
for (int i = 1; i <= N_MAX; ++i) {
tt = T_OS_high_mls() - tt;
TCP += tt;
std::this_thread::sleep_for(std::chrono::milliseconds(dt)); // Пауза потока записи
//Sleep(dt); // И так можно
// %%%%%% Подготовка данных для записи в очереди в виде C-строк
// Запись строк в очереди --
uint64_t mask = 0; // шкала непустых очередей
//// Запись в разные очереди
// !! rec->size - размер памяти строки (с учетом символа конца строки '\0')
// Длина строки: (rec->size - 1)
const char* source = "{name = 'OnCleanUp',tbl = { ....] }"; int sz = strlen(source) + 1;
q_str* rec = dc.get_str(0); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(0, rec)) mask |= dc.q_mask(0); ++NRQ;
rec = dc.get_str(1); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(1, rec)) mask |= dc.q_mask(1); ++NRQ;
rec = dc.get_str(2); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(2, rec)) mask |= dc.q_mask(2); ++NRQ;
rec = dc.get_str(3); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(3, rec)) mask |= dc.q_mask(3); ++NRQ;
rec = dc.get_str(40); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(40, rec)) mask |= dc.q_mask(40); ++NRQ;
rec = dc.get_str(41); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(41, rec)) mask |= dc.q_mask(41); ++NRQ;
rec = dc.get_str(40); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(40, rec)) mask |= dc.q_mask(40); ++NRQ;
rec = dc.get_str(41); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(41, rec)) mask |= dc.q_mask(41); ++NRQ;
rec = dc.get_str(42); memcpy(rec->str, source, sz); rec->size = sz;
if (dc.push(42, rec)) mask |= dc.q_mask(42); ++NRQ;
rec = dc.get_str(100); memcpy(rec->str, "шшшшшш", 7); rec->size = 7;
if (i >= N_MAX) rec->state= -1; // Признак читателям завершить поток ---
if (dc.push(100, rec)) mask |= dc.q_mask(100); ++NRQ; // "шшшшшш"
// std::cout << "[P] Данные записаны, маска: " << std::bitset<64>(mask) << std::endl; // ####
// Запись маски и активация потоков
mgr.pulse(mask);
tt = T_OS_high_mls();
}
std::stringstream out; // можно использовать <<
out << "---> Писатель. Пауза:" << dt
<< ". T (млс.):"
<< (int)((T_OS_high_mls() - TT) * 0.0001) << ". Обработано записей:" << NRQ
<< ". Время ЦП (млс.):" << TCP * 0.0001 << "\n";
std::string result = out.str(); // Получаем std::string
std::cout << result;
}
// Заглушка функции обработки событий.
// Выполняет десереализацию полученных строк и запуск Lua-функций обработки событий
// в зависимости от name (вида очереди)
// 1) !! Память q_s возвращать не надо
// 2) !! q_s->size - размер памяти строки (с учетом символа конца строки '\0')
// Длина строки: (q_s->size - 1)
void event_handling(q_str* q_s, std::string name) {
}
// Функция потоков-читателей --
// filling_threshold - порог заполненности очередей (<текущее количество записей очереди> / <размер очереди>)
// для выдачи предупреждений: "Превышен порог заполненности очередей"
// dt - пауза (млс.)
void consumer(int id, DataCluster& dc, Manager& mgr, double filling_threshold, int dt) {
ReaderAccess access;
access.q_filling_threshold = filling_threshold;
dc.init_access(access.h); // Инициализация access.h (указателей) потока чтения с учетом текущего состояния очередей --
// Буфер строки-объекта для дальнейшей обработки - q_str_c --
const char str_copy[LEN_STR_MAX] = {'\0'};
q_str q_str_c; q_str_c.str = (char*)str_copy; q_str_c.size_max = LEN_STR_MAX;
// -------------------------------------
INT64 tt = T_OS_high_mls(); // Для коррекции интервала ожидания --
INT64 TT = tt; // Для замера времени выполнения ---
INT64 TCP = 0; // Для замера времени ЦП ---
INT64 NRQ = 0; // Количество обработанных записей
while (true) {
tt = (T_OS_high_mls() - tt); // время обработки потока в цикле --
TCP += tt;
// Пауза (подстраиваемая с учетом длительности цикла обработки)
// с ожиданием сигнала записи в одну из информационных очередей --
mgr.wait(&access, dt);
tt = T_OS_high_mls();
// ------------------------------------------------------------------
// Получение снимка состояния очередей потока ("мгновенный",
// чтобы меньше было повторных чтений очередей).
access.snapshot_queue_state(dc, mgr);
// Обработка снимка состояния очередей потока:
// 1) access.q_p - вектор номеров непустых очередей
// 2) access.q_len - вектор длин необработанных очередей
// 3) access.q_p - длина векторов 1), 2)
int q_p = access.q_p;
// Просмотр и обработка вектора непустых очередей потока чтения
for (int i = 0; i < q_p; ++i) {
int q_n = access.q_n[i];
int count = access.q_len[i]; // Количество записей в очереди --
// ------- Сбор статистики ----
double statistics_agr = access.statistics[q_n] / (access.NN[q_n] < access.period_agr ? access.NN[q_n] : access.period_agr);
if (statistics_agr > access.q_filling_threshold) // #### Предупреждающее сообщение --
std::cout << "[Поток: " << id << "] Очередь "
<< q_n << ". Заполненность (средняя): " << statistics_agr
<< ". *** Превышен порог заполненности очередей: " << access.q_filling_threshold << "\n";
//// Отладочная печать
//if (count > 0 || count < 0)
// std::cout << "[Поток: " << id << "] Очередь "
// << q_n << ". Заполненность (средняя): " << statistics_agr
// << ". Количество: " << count << ". Ук чтения " << access.h[q_n] << "\n";
// --------------------
INT64 q_size = dc.q[q_n].q_size;
// Чтение и обработка очередей
while (count--) {
q_str* val_q = access.get_q_str(q_n, dc);
// Копировать запись очереди в буфер обработки q_str_c (типа q_str),
// созданный в стеке --
q_str_copy(val_q, &q_str_c, &dc.q[q_n]); val_q = NULL; // ! далее использовать только q_str_c
event_handling(&q_str_c, dc.q[q_n].name_Queue); // Вызов функции обработки записи очереди
++NRQ; // Количество обработанных записей
// Результат теста
// Завершение потока при тестировании --
if (q_str_c.state == -1) { // Признак читателям завершить поток ---
std::stringstream out; // можно использовать <<
out << "Читатель " << id << ". Пауза:" << dt
<< ". T (млс.):"
<< (int)((T_OS_high_mls() - TT) * 0.0001) << ". Обработано записей:" << NRQ
<< ". Время ЦП (млс.):" << TCP * 0.0001 << "\n";
std::string result = out.str(); // Получаем std::string
std::cout << result;
return;
}
// Отладочная печать
//const char* val_str = q_str_c.str;
//if (access.h[q_n] == N_MAX) {
// std::cout << " Поток " << id << ". Очередь " << q_n << " -> Val(str): " << val_str << ". Указатель чтения "
// << access.h[q_n] << ". Интервал (млс.) " << (T_OS_high_mls() - TT) * 0.0001 << "\n";
//}
// ----------------------------------------------------
//// %%%%%%%%%%%% Обработка остальных данных в потоке чтения --
}
}
}
}
// --------------------- Тест очередей -------------------------
int main() {
SetConsoleCP(1251); // Кодировка 1251 (! #include <shlobj.h>)
setlocale(LC_ALL, ""); // #### Русификация вывода ввода (работает) -----
timeBeginPeriod(1); // Установить точность 1 мс
// ==================================
DataCluster dc; // Очереди ---
// 1. Для каждой очереди должен быть задан размер памяти для максимальной строки, передаваемой
// в ней (с учетом признака конца строки '\0').
// 2. В вызове метода set_parm_q второй параметр: вектор типа q_parm * с параметрами очередей
// 3. Порядок элементов в векторе определяет приоритет обработки очередей (0 -> 1 ...)
q_parm* Параметры_очередей = NULL; // Параметры_очередей[Количество_очередей]
dc.set_parm_q(Количество_очередей, Параметры_очередей);
//dc.initialization_q_str(); // #### Вряд ли стоит использовать ---
Manager mgr; // Объект управления потоками чтения
// Читающие потоки --
// Последний параметр интервал ожидания (млс.)
std::thread c1(consumer, 1, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 5);
std::thread c2(consumer, 2, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 10);
std::thread c3(consumer, 3, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 50);
std::thread c4(consumer, 4, std::ref(dc), std::ref(mgr), FILLING_THRESHOLD, 100);
// Пишущий поток --
std::thread p(producer, std::ref(dc), std::ref(mgr));
p.join(); c1.join(); c2.join(); c3.join(); c4.join();
timeEndPeriod(1); // Освободить точность
char nazv_sh[100];
std::cout << "Введите что-нибудь:" << std::endl;
std::cin >> nazv_sh;
return 0;
}
Гарантируется ли вызов колбэка при получении Квиком новых данных?, Вопросы разработчикам QUIK
Пользователь
Сообщений: Регистрация: 12.05.2020
06.03.2026 11:01:04
Цитата
Anton Belonogov написал: Подтверждаем: при получении терминалом новой записи или обновления существующей гарантируется вызов соответствующей функции обратного вызова.
Здравствуйте. Есть скорость v1 (количество/сек.) появления событий, по которым должны запускаться коллбеки. И есть скорость выполнения коллбеков v2 (количество/сек.) в скриптах пользователя. Пока v1 <= v2 можно как то, наверное, гарантировать. Но скоростью v2 QUIK не управляет. Например, пользователь в тексте коллбека, по недоразумению, написал строку: sleep(3000) и на 3 сек. все коллбеки, во всех выполняемых скриптах перестали обрабатываться. Или выполняет долгую обработку в коллбеке. Единственное, что можно бы было гарантировать в этих условиях - это выдачу сообщения о ситуации, когда v1 > v2. Но я таких сообщений никогда не видел (даже в экспериментах). Или я что то не понимаю?
Система принятия решений и/или Нечеткая логика(FuzzyLogic), Нечеткая логика или Система принятия решений в трейдинге
Согласен. В вашем конкретном случае достаточно первого варианта.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
01.03.2026 11:28:58
Вариант ТЗ на модификацию интерфейса QLua c QUIK 1. Вместо существующей схемы запуска коллбеков создаются циклические очереди (соответствующие коллбекам). 2. В очереди пишет данные событий служебный поток QUIK, а читают эти очереди и выполняют функции их обработки потоки main скриптов пользователя в теле модифицированной функции sleep. 3. В меню, добавленное в окно запуска пользовательских скриптов, обеспечить возможность изменения пользователем умолчания параметров очередей, а также функций фильтрации данных, записываемых в очереди . 4. У читающих потоков должны быть свои указатели чтения циклических очередей и битовые шкалы непустых очередей, хранящиеся в объекте доступа к их очередям. 5. Читающие потоки циклически, выполнив свои коды переходят в состояние ожидания сигнала появления данных в их очередях или истечения заданного интервала времени. 6. При записи в любую очередь пишущий поток меняет состояние битовых шкал непустых очередей и активирует все читающие потоки. 7. Каждый читающий поток при чтении очереди, выбираемой на основании своей битовой шкалы непустых очередей, получает признак: состояние прочитанных очередей в виде размера непрочитанных данных. 8. Добавить в код динамическое управление количеством очередей и реализовать полностью Lock-Free механизм уведомления без использования std::mutex. 9. Добавить логику удаления очередей «на лету» (dynamic removal) с использованием механизма безопасного освобождения памяти, такого как Hazard Pointers. 10. Реализовать идиому Hazard Pointers с очередью на удаление (retire list). 11. Реализовать пользовательскую функцию запроса свойств очередей. 12. Подключать в QUIKе коды Lua без внесения изменений в его исходники. 13. Перенести все необходимые функции (с внесением изменений, учитывающих п.1 - 12), реализованные в текущей версии QUIK в исходниках Lua, в пакет dll. ------- Что в написанном выше не понятно? В моих сообщениях 91 и 92 приведены "сырые", но работающие прототипы, частично реализующие это ТЗ. На возражения, замечания и предложения по улучшению ТЗ, постараюсь ответить.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
28.02.2026 15:33:43
Мне стало интересно, а более продвинутый вариант циклических очередей ИИ сможет реализовать? Промт для ИИ Написать программу на C++, в которой: 1) Создаются несколько циклических очередей с разными типами данных. 2) В очереди пишет данные один поток, а читают эти очереди несколько потоков. 3) У читающих потов должны быть свои указатели чтения циклических очередей и битовые шкалы непустых очередей, хранящиеся в объекте доступа к их очередям. 4) При записи в любую очередь пишущий поток меняет состояние битовых шкал непустых очередей и активирует все читающие потоки. 5) Каждый читающий поток при чтении очереди, выбираемой на основании своей битовой шкалы непустых очередей, получает признак: состояние прочитанных очередей в виде размера непрочитанных данных. 6) Добавить в код динамическое управление количеством очередей и реализовать полностью Lock-Free механизм уведомления без использования std::mutex.
Смог (все комментарии ИИ):
Код
// Это сложная задача, требующая использования атомарных операций (std::atomic) и барьеров памяти для реализации Lock-Free взаимодействия.
// Для выполнения условия динамического управления очередями и уведомления без мьютексов, мы будем использовать std::atomic_flag или
// std::atomic<bool> для сигнализации (busy-wait или wait/notify в C++20) и атомарные указатели.
// Ниже представлен концептуальный код, реализующий эту логику:
// C++20 ----
#include <iostream>
#include <vector>
#include <atomic>
#include <thread>
#include <memory>
#include <optional>
// Шаблонная циклическая очередь (Lock-Free для одного писателя и нескольких читателей со своими индексами)
template<typename T, size_t Capacity>
class CircularQueue {
T buffer[Capacity];
std::atomic<size_t> head{0}; // Указатель записи
public:
void push(T item) {
size_t h = head.load(std::memory_order_relaxed);
buffer[h % Capacity] = item;
head.store(h + 1, std::memory_order_release);
}
bool try_pop(size_t& reader_idx, T& out_item, size_t& pending_count) {
size_t current_head = head.load(std::memory_order_acquire);
if (reader_idx < current_head) {
out_item = buffer[reader_idx % Capacity];
reader_idx++;
pending_count = current_head - reader_idx;
return true;
}
pending_count = 0;
return false;
}
};
// Объект доступа для каждого читающего потока
struct ReaderContext {
std::atomic<uint64_t> bitmask{0}; // Битовая шкала непустых очередей
std::vector<size_t> read_indices; // Свои указатели чтения
std::atomic<bool> data_ready{false};
void update_mask(size_t queue_idx) {
bitmask.fetch_or(1ULL << queue_idx, std::memory_order_release);
data_ready.store(true, std::memory_order_release);
data_ready.notify_all(); // C++20 Lock-free уведомление
}
};
class QueueManager {
CircularQueue<int, 1024> int_queue;
CircularQueue<float, 1024> float_queue;
std::vector<std::shared_ptr<ReaderContext>> readers;
public:
void add_reader(std::shared_ptr<ReaderContext> ctx) {
ctx->read_indices.resize(2, 0); // 0 для int, 1 для float
readers.push_back(ctx);
}
void write_int(int val) {
int_queue.push(val);
for (auto& r : readers) r->update_mask(0);
}
void write_float(float val) {
float_queue.push(val);
for (auto& r : readers) r->update_mask(1);
}
void reader_thread_func(std::shared_ptr<ReaderContext> ctx, int id) {
while (true) {
// Ожидание данных (Lock-free wait)
ctx->data_ready.wait(false, std::memory_order_acquire);
uint64_t mask = ctx->bitmask.load(std::memory_order_acquire);
for (size_t i = 0; i < 2; ++i) {
if (mask & (1ULL << i)) {
size_t pending = 0;
if (i == 0) {
int val;
while (int_queue.try_pop(ctx->read_indices[0], val, pending)) {
printf("[Reader %d] Int: %d (Pending: %zu)\n", id, val, pending);
}
} else {
float val;
while (float_queue.try_pop(ctx->read_indices[1], val, pending)) {
printf("[Reader %d] Float: %.2f (Pending: %zu)\n", id, val, pending);
}
}
// Сбрасываем бит, если данных больше нет
if (pending == 0) ctx->bitmask.fetch_and(~(1ULL << i));
}
}
ctx->data_ready.store(false, std::memory_order_relaxed);
}
}
};
int main() {
QueueManager manager;
auto r1 = std::make_shared<ReaderContext>();
auto r2 = std::make_shared<ReaderContext>();
manager.add_reader(r1);
manager.add_reader(r2);
std::thread t1(&QueueManager::reader_thread_func, &manager, r1, 1);
std::thread t2(&QueueManager::reader_thread_func, &manager, r2, 2);
// Пишущий поток
std::thread writer([&]() {
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
manager.write_int(100 + i);
manager.write_float(1.5f * i);
}
});
writer.join();
t1.detach();
t2.detach();
return 0;
}
// Ключевые моменты реализации :
// Индивидуальные индексы : Каждый читатель хранит свои read_indices, что позволяет им двигаться по одной и
// той же очереди независимо.
// Битовая шкала : Используется std::atomic<uint64_t>, где каждый бит соответствует ID очереди.Это позволяет
// мгновенно проверить, где появились данные.
// Lock - Free Notify : Использован метод atomic::wait / notify_all(доступен с C++20).Он работает эффективнее
// обычного yield, так как позволяет потоку "заснуть" без использования мьютекса в пользовательском
// коде(на уровне ОС это может использовать фьютексы).
// Pending Count : Метод try_pop возвращает разницу между указателем записи и текущим индексом чтения,
// выполняя условие получения "размера непрочитанных данных".
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
28.02.2026 10:42:54
Цитата
Oleg Kuzembaev написал: необходимо достичь понимания, какая именно доработка требуется.Если у вас уже есть такая наготове, то просьба предметно описать ее: что именно стоит добавить? Как это будет выглядеть в вашем представлении?
Промт для ИИ Написать программу на C++, в которой: 1) Создаются несколько циклических очередей с разными типами данных. 2) В очереди пишет данные один поток, а читают эти очереди несколько потоков. 3) У читающих потов должны быть свои указатели чтения циклических очередей. 4) При записи в любую очередь пишущий поток активирует все читающие потоки. 5) Каждый читающий поток при чтении получает признак: состояние прочитанных очередей в виде размера непрочитанных данных. -------- Результат от ИИ, полученный в течении 5 сек.(компилируемый, код-основа для разработки очередей событий):
Код
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
template <typename T, size_t Size>
class CircularQueue {
public:
void push(T item) {
buffer[head % Size] = item;
head.fetch_add(1, std::memory_order_release);
}
// Каждый читатель передает свой локальный индекс
bool try_read(size_t& reader_idx, T& out_item, size_t& pending_count) {
size_t current_head = head.load(std::memory_order_acquire);
if (reader_idx < current_head) {
out_item = buffer[reader_idx % Size];
reader_idx++;
pending_count = current_head - reader_idx;
return true;
}
pending_count = 0;
return false;
}
private:
T buffer[Size];
std::atomic<size_t> head{0};
};
// Контейнер для очередей разных типов
struct SharedData {
CircularQueue<int, 100> int_queue;
CircularQueue<double, 100> double_queue;
std::mutex mtx;
std::condition_variable cv;
bool stop = false;
};
void writer(SharedData& data) {
for (int i = 1; i <= 5; ++i) {
{
std::lock_guard<std::mutex> lock(data.mtx);
data.int_queue.push(i * 10);
data.double_queue.push(i * 1.1);
std::cout << "[Writer] Pushed data set " << i << std::endl;
}
data.cv.notify_all(); // Активируем всех читателей
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
data.stop = true;
data.cv.notify_all();
}
void reader(int id, SharedData& data) {
size_t my_int_ptr = 0;
size_t my_dbl_ptr = 0;
while (true) {
std::unique_lock<std::mutex> lock(data.mtx);
data.cv.wait(lock, [&] { return data.stop || true; }); // Упрощенно: просыпаемся по сигналу
int val;
double dval;
size_t pending;
// Читаем из инт-очереди
while (data.int_queue.try_read(my_int_ptr, val, pending)) {
std::cout << "Reader " << id << " got INT: " << val << " (Left: " << pending << ")\n";
}
// Читаем из дабл-очереди
while (data.double_queue.try_read(my_dbl_ptr, dval, pending)) {
std::cout << "Reader " << id << " got DBL: " << dval << " (Left: " << pending << ")\n";
}
if (data.stop) break;
}
}
int main() {
SharedData data;
std::thread w(writer, std::ref(data));
std::thread r1(reader, 1, std::ref(data));
std::thread r2(reader, 2, std::ref(data));
w.join();
r1.join();
r2.join();
return 0;
}
// Основные моменты реализации (от ИИ):
// Индивидуальные указатели: Переменные my_int_ptr и my_dbl_ptr находятся в стеке каждого читающего потока. Они отслеживают прогресс конкретного
// читателя относительно общей «головы» (head) очереди.
// Шаблон очереди: Позволяет создавать экземпляры для int, double или любых других типов в одном контейнере SharedData.
// Синхронизация: std::condition_variable используется для мгновенной активации читателей пишущим потоком.
// Статус очереди: Метод try_read возвращает pending_count, что позволяет читателю знать, сколько данных еще осталось в буфере именно для него.
Система принятия решений и/или Нечеткая логика(FuzzyLogic), Нечеткая логика или Система принятия решений в трейдинге
Пользователь
Сообщений: Регистрация: 12.05.2020
27.02.2026 13:34:14
Цитата
VPM написал: Да, спасибо за вариант, про собственный флаг, даже не задумывался, взял на вооружение!
Этот вариант наглядный, но не самый эффективный и гибкий. Сам я использую для управления, всего двумя нужными мне коллбеками: OnCleanUp и OnTransReply их переопределение, то есть присвоение переменным коллбеков тех функций, которые мне нужны для обработки соответствующих событий. Например, если я не хочу (по как то причине) обрабатывать события OnTransReply, то выполняется OnTransRe ply = function() end Для продолжения обработки значение переменной OnTransReply заменяется на нужную мне функцию обработки (запись в очередь параметров событий).
Система принятия решений и/или Нечеткая логика(FuzzyLogic), Нечеткая логика или Система принятия решений в трейдинге
Пользователь
Сообщений: Регистрация: 12.05.2020
27.02.2026 11:25:03
Цитата
Йцукен написал: Это теоритическое предположение или есть исследования на эту тему?
Это не предположение. Вы это можете проверить эмпирически в своих скриптах. Это будет тем заметнее, чем больше скриптов работает одновременно и чем большая работа выполняется при инициализации скриптов.
Система принятия решений и/или Нечеткая логика(FuzzyLogic), Нечеткая логика или Система принятия решений в трейдинге
Пользователь
Сообщений: Регистрация: 12.05.2020
27.02.2026 11:20:56
Цитата
VPM написал: То есть это полноценная модульная система. Так?
Да.
И еще существенное замечание:
Цитата
Nikolay написал: Приход колбека должен быть не прерыванием, а добавлением события в очередь. Когда завершите всю работу с инициализацией, то разберете колбеки что пришли, если пришли. А порядок инициализации не испортится, если это сделать не в OnInit.
Система принятия решений и/или Нечеткая логика(FuzzyLogic), Нечеткая логика или Система принятия решений в трейдинге
Выполняйте инициализацию в main (при отключенных коллбеках) и используйте, как написал Nikolay, вместо dofile, require.
Система принятия решений и/или Нечеткая логика(FuzzyLogic), Нечеткая логика или Система принятия решений в трейдинге
Пользователь
Сообщений: Регистрация: 12.05.2020
27.02.2026 10:49:22
1.
Цитата
VPM написал: Ну хорошо, давайте по порядку. Вот вариант от TGB , Так?
Не так. Прочитайте мое сообщение #737.
2.
Цитата
VPM написал: Именно здесь, ни кто не по беспокоит, ни какие колбеки не прилетят, пока зависимости в модулях не установлены.
Тогда почему бы вам не сделать управление коллбеками, хотя бы так?:
Код
-- В самом начале вашего текста скрипта ---
is_Callback = false --- Коллбеки вас не беспокоят --
--- Во всех ваших коллбеках первая строка:
if not is_Callback then return end
--- В любом месте вашего скрипта.
-- Включить обработку коллбеков:
is_Callback = true
-- Выключить обработку коллбеков:
is_Callback = false
И пока вы в своем скрипте не выполните; is_Callback = true коллбеки вас не будут беспокоить. Если вам в скрипте, по какой то надобности, потребуется отключить обработку коллбеков, то: is_Callback = false.
Система принятия решений и/или Нечеткая логика(FuzzyLogic), Нечеткая логика или Система принятия решений в трейдинге
Пользователь
Сообщений: Регистрация: 12.05.2020
27.02.2026 09:36:37
Цитата
VPM написал: TGB , Ну так огромная! А именно в локализации глобальных переменных для данного проекта. Да медленнее но Гарантированно!
А так быстро и гарантированно (с локализацией глобальных переменных для данного проекта):
Код
function OnInit_VPM()
-- Логирование (ваш код)
Logger.init(log_file)
Logger.log("INIT", "", "Starting robot")
---- Весь остальной ваш код ---
end
function main()
--- Инициализация (в начале main)--
OnInit_VPM()
--- Дальше ваш код ---
end
Зачем вам "медленно но Гарантированно!", когда можно быстро и гарантированно? Что из написанного мной вам непонятно? Более того, как я отмечал:
Цитата
TGB написал: в рассмотренном варианте при запуске нескольких скриптов сразу они начинают выполняться быстрее, чем с кодами в OnInit и body.
Это означает, что перезапуск QUIK, при инициализации в main, выполняется быстрее по сравнению с инициализацией в OnInit. Зачем вам долгий перезапуск? ------ Вы какой то Сусанин . Из возможных решений все время выбираете и рекламируете самые сложные и неэффективные. Вы хотите пользователей завести в болото
Система принятия решений и/или Нечеткая логика(FuzzyLogic), Нечеткая логика или Система принятия решений в трейдинге
Пользователь
Сообщений: Регистрация: 12.05.2020
26.02.2026 21:12:39
Цитата
VPM написал: Ваши заключения абсолютно верны, и подход с использованием main для инициализации и работы в многозадачной среде действительно более эффективен в случае с QUIK, где важна производительность и отсутствие блокировок. Ну все этим пользуются, и я не исключение.
и А тогда зачем вы хотите использовать OnInit, выполняемый в служебном потоке? Где вы видите разницу между функцией OnInit, выполняемой перед запуском main и точно такой же функцией, но с другим именем, выполняемой сразу вначале main (кроме того что в первом случае загружается служебный поток)?
Функция getDepoEx может приводить к зависаниям терминала
Пользователь
Сообщений: Регистрация: 12.05.2020
26.02.2026 16:16:01
Цитата
Йцукен написал: потому что getDepoEx, getSecurityInfo, в случае ошибки не возвращают ничего (процедуры, другими словами).
В синтаксисе Lua есть понятия: результат функции (отсутствие/одно или несколько значений) и выражение. Функция может не возвращать результата, но есть синтаксические конструкции, которые рассматриваются как выражения (это можно посмотреть в описании языка). Например, присвоение: local v = f (). В этом случае вызов функции справа от равенства считается выражением. В выражении функция, не возвращающая результат, заменяется на nil. Результатом выражения может быть только одно значение. Явным способом привести к выражению можно синтаксическую конструкцию, заключив ее в круглые скобки. Например: message(type((getSecurityInfo("", "")))), результат печати: nil
Система принятия решений и/или Нечеткая логика(FuzzyLogic), Нечеткая логика или Система принятия решений в трейдинге
Пользователь
Сообщений: Регистрация: 12.05.2020
26.02.2026 14:27:05
Цитата
TGB написал: можно разместить в начале main в виде функции
Уточнение: если очень хотите, определение этой функции в переменной с именем отличным от OnInit можно разместить в области body, но вызвать только в начале main.
Система принятия решений и/или Нечеткая логика(FuzzyLogic), Нечеткая логика или Система принятия решений в трейдинге
Пользователь
Сообщений: Регистрация: 12.05.2020
26.02.2026 14:01:52
Цитата
VPM написал: OnInit, или просто прозрение, которым хочется поделиться.
Я бы не стал это комментировать, но ведь кто то может это воспринять как открытие и руководство к действию. Попробую объяснить ваше заблуждение (очередное). Я в своих скриптах не использую ни OnInit ни кода в body по следующим причинам: 1) Код OnInit и код body будут исполняться в основном служебном единственном потоке, который обслуживает: - запуск всех Lua-скриптов пользователя; - запуск коллбеков всех Lua-скриптов пользователя; - обработка всех коллбеков таблиц QUIK (это не таблицы Lua); - обработка всех индикаторов пользователя. Это значит, что пока он будет "возиться" с кодами OnInit и body другого он ничего делать не будет. На всякий случай попробуйте в начале OnInit написать строку: sleep(30000) и посмотрите что получится . 2) Всю необходимую инициализацию (включая пакеты) можно разместить в начале main в виде функции, вызываемой сразу после ее определения и это будет выполняться в отдельном созданном потоке main, а основной служебный поток продолжит параллельно (если у ПК несколько ядер) выполнять свою перечисленную ранее работу. 3) в рассмотренном варианте при запуске нескольких скриптов сразу они начинают выполняться быстрее, чем с кодами в OnInit и body.
Функция getDepoEx может приводить к зависаниям терминала
Пользователь
Сообщений: Регистрация: 12.05.2020
26.02.2026 07:32:55
1.
Цитата
Йцукен написал: Предлагаю заинтересованным подтвердить или опровергнуть это.
Подтверждаю. В выложенном экземпляре QUIK у меня ошибка проявляется.
2.
Цитата
Nikolay написал: Да, любое зависание основного окна приложения - это ошибка, баг.
Согласен. Локализована типичная ошибка синхронизации. В моей песочнице эта ошибка не проявляется.
3.
Цитата
Йцукен написал: Но когда посреди торгов, у вас неожиданно всё зависнет, - прибежите на форум.
Согласен. Такие нетривиальные ошибки надо выкладывать и исправлять (разработчику), а пользователей, сумевших их локализовать, поощрять .
4. Вообще то, многие ошибки, обнаруженные в QUIK, по моему мнению, во многом, следствие существующей дефектной (сложной) его архитектуры , о чем я написал в своей ветке "Что стоило бы изменить в QUIK по-крупному" два года назад.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
24.02.2026 15:45:40
Цитата
TGB написал: предложение можно сформулировать следующим образом: функциональность обработки событий QUIK расширить, добавив в API пользователя функцию задания свойств служебных очередей событий
Оформить это, наверное, лучше в виде меню задания свойств служебных очередей событий QLua, вызываемое в окне запуска скриптов пользователя, при отсутствии запущенных скриптов. Промежуточный итог обсуждения интерфейса QUIK со скриптами QLua, при реализации предложений этой ветки следующий: пользователям, при этом, вносить изменения в их существующие скрипты не требуется.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
21.02.2026 12:49:30
У меня было предложение, которое стоит повторить:
Цитата
TGB написал: На форумах ARQA для комментирующих пользователей ввести месячный лимит трафика, после которого не будет возможность вводить комментарии. Значением этого лимита могло быть: Годовой трафик nikolz / 12 / 10. Но, конечно, насчет значения лимита, решать ARQA. Наличие такого лимита, обеспечило бы: - экономию дискового пространства баз форумов; - автоматическое модерирование форумов за счет принуждения думать о краткости и четкости текстов, пишущих комментаторами; - - удобство для читающих комментарии, в которых будет меньше флуда.
Это может показаться неактуальным из-за снижения активности пишущих, но могло бы улучшить качество форумов в перспективе.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 17:57:57
Цитата
Йцукен написал: Логичней было бы, чтобы терминал не дублировал пропущенные колбэки, по которым скрипт не получит новых данных.
Это хорошее ваше предложение можно сформулировать следующим образом: функциональность обработки событий QUIK расширить, добавив в API пользователя функцию задания свойств служебных очередей событий: 1) длин очередей; 2) функций фильтрации событий перед их записью в соответствующие очереди; 3) приоритетов обработки очередей; 4) может быть что то еще.. Все свойства имеют значение по умолчанию.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 14:48:52
Цитата
nikolz написал: Я специально выложил для начинающих писателей не псевдокод, а скрипт на Lua с очередью. Да и в документации на QLua есть еще один пример.
Какой же вы непонятливый. До сих пор не поняли, что обсуждаются не существующие реализации, а вариант изменения этих реализаций. Все таки, похоже, что вы полуинтеллектуальный робот-спамер устаревшей версии, без способности учета контекста обсуждаемой темы при генерации спама.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 14:21:42
Цитата
nikolz написал: все что делается внутри sleep - это передается ядру время на которое надо остановить поток.
Вы, как дятел , про свое, не относящееся к обсуждаемому. Вы это читали ?:
Цитата
TGB написал: -- Отличие только в реализации функции sleep. -- При ее вызове с заданным интервалом внутри нее выполняется (код на C++): WaitForSingleObject( , ); // здесь ожидание истечения или выдачи сигнала QUIK (неблокирующей функцией Pulse в служебном потоке) о записи в очереди новых событий. -- Когда sleep "срывается" с , то в ней анализируются очереди (это можно сделать эффективно, используя битовую шкалу непустых очередей скрипта) и выполняются соответствующие с параметрами считанными из очередей. -- Функционально это не отличается от того, что реализовано сейчас, но выполняется в потоке пользователя main.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 13:00:33
Цитата
nikolz написал: Sleep останавливает поток main, а функции колбеков вызываются в другом потоке, на который sleep не действует.
Вы опять только пишите не читая. Мы же с Йцукен обсуждаем не системный Sleep и даже sleep QLua, а предложенный мной запускающий коллбеки. Вам опять надо проложиться между комментариев ?
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 12:35:25
Цитата
Йцукен написал: А как вы узнаете, что это повторы, пока не обработаете их?
Вы сначала создаете таблицу фильтрации со значениями полей фильтрации заведомо не совпадающими с ожидаемыми данными.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 11:49:34
Цитата
Йцукен написал: А смысл несколько раз подряд обрабатывать, например OnQuote, если каждый раз мы будем обрабатывать одни и те же данные?Или вы предлагаете хранить данные для каждого колбэка?
Если вы не хотите повторно обрабатывать данные, то фильтруйте их. Для фильтрации данных можно создать, для соответствующего вида коллбека, таблицу с фильтруемыми полями, обновляемыми при поступлении новых данных и использовать эту таблицу в начале коллбека с тем. чтобы не обрабатывать ненужные вам повторы.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
20.02.2026 11:01:47
Цитата
Йцукен написал: а что делать, если между слипами данные поступили по инструменту несколько раз?
При "просыпании" sleep, функции чтения очередей (коллбеки) будут выполняться (последовательно, без пропусков) столько раз сколько необработанных записей в очередях. Если очереди не переполняются, а это можно контролировать и не допускать, то события не теряются, но возможны задержки в их отработке. Как было указано ранее, sleep "просыпается" либо по времени, либо по записи в очереди данных из QUIK. Но при этом в в sleep всегда анализируются и при необходимости обрабатываются непустые служебные очереди скрипта.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
19.02.2026 16:00:27
Цитата
Йцукен написал: колбэки внутри sleep должны вызываться или как?
Да, так называемые коллбеки, должны вызываться внутри sleep (выполняемой в потоке main) с данными из очередей.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
19.02.2026 15:46:21
Цитата
Йцукен написал: можете написать прсевдокод, как по-вашему, использовать очередь событий?
Я так понимаю, в случае реализации предложенного.
У пользователя никаких изменений:
Код
<Определение функций обработки событий ровно так, как это сейчас>
--- Весь код скрипта без изменений, в том числе sleep(<Интервал>) --
-- Отличие только в реализации функции sleep.
-- При ее вызове с заданным интервалом внутри нее выполняется (код на C++):
WaitForSingleObject(<ОбъектОжидания>, <Интервал>); // здесь ожидание истечения <Интервала> или выдачи сигнала QUIK (неблокирующей функцией Pulse в служебном потоке) о записи в очереди новых событий.
-- Когда sleep "срывается" с <ОбъектаОжидания>, то в ней анализируются очереди (это можно сделать эффективно, используя битовую шкалу непустых очередей скрипта) и выполняются соответствующие <Функций обработки событий> с параметрами считанными из очередей.
-- Функционально это не отличается от того, что реализовано сейчас, но выполняется в потоке пользователя main.
-- Тому, кому интересны детали выполнения sleep, может изменить в своем скрипте формат вызова sleep: вместо sleep(<Интервал>) строка
local <Количество обработанных элементов очереди>, <Данные о потерях событий в очередях> = sleep(<Интервал>)
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
19.02.2026 13:06:40
Цитата
Oleg Kuzembaev написал: В данном случае, нам видится возможность зарегистрировать пожелание на доработку будущих версий ПО. Однако необходимо достичь понимания, какая именно доработка требуется.Если у вас уже есть такая наготове, то просьба предметно описать ее: что именно стоит добавить? Как это будет выглядеть в вашем представлении?
1. Я исхожу из того, что среди пользователей QUIK могут быть квалифицированные IT-разработчики, которые могут улучшить мои предложения и поэтому выкладываю их в публичку. 2. Сохранение существующего API QLua, с тем, чтобы обеспечить совместимость с существующими разработками пользователей, является основным ограничением на предлагаемое. ------------------- I. В текущей версии QUIK в одном основном потоке обслуживаются: - запуск всех Lua-скриптов пользователя; - запуск коллбеков всех Lua-скриптов пользователя; - обработка всех коллбеков таблиц QUIK (это не таблицы Lua); - обработка всех индикаторов пользователя. Притом, что в текущий момент у подавляющего количества ПК много ядер ЦП, написанное выше явный перебор. Наверное, нет проблем перечисленное выше обрабатывать в отдельных потоках, так как перечисленные выше функции, в моем представлении, не сильно связаны друг с другом и, на первый взгляд, между ними не требуется синхронизация.
II. Интерфейс взаимодействия QUIK c Lua-скриптом пользователя, реализованный в виде коллбеков, предполагает многопоточный режим использования Lua, порождающий неприятные проблемы параллельного программирования (для решения которых сами же разработчики предлагают использовать потокобезопасную очередь между коллбеками и потоком main). Мне представляется, что имеет смысл вместо коллбеков использовать активную очередь событий, При этом не требуется использовать Lua в многопоточном, редко используемом и не очень стабильном режиме. При этом не будет проблем с подключением новых версий Lua. Более того, скрипты пользователя будут выполняться несколько быстрее из-за отсутствия синхронизации, требуемой в многопоточном варианте использования Lua. Конкретные предложения: 1) Lua подключать в однопоточном режиме, нативном варианте, без внесения изменений в исходники. Все необходимые коды QLua перенести в dll-пакет. 2) Для взаимодействия со скриптами пользователя использовать общие для них служебные циклические очереди (в соответствии с существующими функциями обратного вызова). Длины очередей задавать по умолчанию с возможностью их изменения пользователем. Для каждого запускаемого скрипта пользователя создавать блоки доступа (с соответствующими указателями, обеспечивающими чтение очередей без блокировки) к общим циклически очередям. 3) Функции чтения очередей регистрировать под теми же именами и так же, как это делается для существующих коллбеков. 4) Выполнять функции чтения очередей в модифицированной следующим образом sleep: - sleep выполняется по истечению заданного в ней интервала времени или при получения сигнала поступления данных в одну из циклических очередей (в пропуске сигналов нет проблем, так есть запуск по времени); - выполнение функции начинается с анализа появления новых данных в циклических очередях скрипта (конкретного) и запуска зарегистрированных его функций с теми параметрами из очередей, как это делается в существующей версии QUIK; - в качестве результата в модифицированной sleep выдавать значения: количество считанных записей в очередях, признак отсутствия потерь данных в очередях из-за их переполнения (с указанием очередей, в которых это возникло), и, возможно, еще что-то. Предложенное: 1) устраняет зависимость служебного потока от пользовательского, когда (служебный) в существующей версии QUIK может быть блокирован пользовательским, а пользовательский служебным; 2) устраняет необходимость внесения правок в исходники Lua; при этом скрипты пользователя будут выполняться несколько быстрее из-за отсутствия синхронизации, требуемой в многопоточном варианте использования Lua; 3) обеспечивает контроль потерь данных в очередях, а также возможность исключения этих потерь за счет увеличения длин очередей пользователем или уменьшения времени ожидания в sleep; 4) убирает проблемы синхронизации внутри скрипта пользователя.
III. В QUIK реализована функциональность просмотра графика котировок бумаг QUIK. Но отсутствует возможность просмотра котировок бумаг, сохраненных во внешних файлах. С учетом существующей функциональности QUIK, как мне представляется, реализация этой возможности не потребует больших усилий.
IV. Пожелание: сократить время восстановления взаимодействия QUIK с сервером при перезапуске хотя бы до 10 сек.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
19.02.2026 09:28:27
Цитата
Oleg Kuzembaev написал: В указанный период на сервере проводились технические работы, которые и стали причиной невозможности снятия заявки
Здравствуйте! Спасибо за ответ. У меня просьба к вам: донести до разработчиков мои комментарии 1, 51.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
VPM написал: За ссылку спасибо, но там столько не понятных для меня слов, что не хватает моей компетенции разобраться.
Если вы не понимаете то, что написано вами, то бесполезно вам что то объяснять. Все равно не поймете.
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
16.02.2026 09:52:26
Цитата
VPM написал: А что тут комментировать, ЧУШЬ полная!
Это ваше добровольное признание :
Цитата
VPM написал: Не ну ладно, я "несу всякую ахинею", разбирая свои идеи по винтикам и полочкам на примерах.
На всякий случай ссылка:
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 16:43:45
Цитата
VPM написал: архитектура с чёткими инвариантами и детерминированными компонентами позволяет предсказать поведение системы в любых ситуациях, включая сбои, и гарантировать, что она не совершит опасных действий (например, не превысит лимиты, не создаст дублирующий ордер).
Это заклинания . Вы думаете, что они работают?
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 16:41:17
У вас много текста но нет простых вещей:
Цитата
TGB написал: где доказательство безопасности счета?
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 16:24:52
VPM Забыл добавить: где доказательство безопасности счета?
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 16:19:51
VPM Понятно, вас с "толку" не сбить . Но интересно, что вы напишите на следующее. Обрабатывая коллбеки и восстанавливая их результат, вы, по сути, повторяете (но в условиях неопределенности) работу которая выполняется в QUIK при формировании его таблиц. 1. Зачем повторять функциональность которую реализовали разработчики и которая отлажена, скорее всего лучше чем "призрачные" коллбеки (зависящие от кода main), так как это было давно (до появления QLua) и проще контролируется? 2. Полагаете, что вы квалифицированнее разработчиков? 3. Вы же вроде согласны с принципом "меньше дергаешься - реже падаешь". Вам нечем заняться и хочется поразвлечься с коллбеками?
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 15:50:40
Цитата
VPM написал: безопасность счёта можно доказать математически
Определите что такое безопасность счета и приведите математическое доказательство (можете посоветоваться с ИИ).
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 15:47:01
Цитата
VPM написал: как она обрабатывает неопределённость!
Мне, кажется, что обычно всем желательна определенность, особенно в сфере финансов, а вы жаждете приключений .Такое тоже бывает.
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 15:34:43
Цитата
VPM написал: которая должна быть математически корректной, отказоустойчивой и масштабируемой, без событийной модели и формальных инвариантов не обойтись.Потому что её поведение можно доказать на реальной торговле на любых объёмах и с любыми стратегиями, а не просто протестировать!
Зачем вы используете, не первый раз громкие понятия. от которых, похоже, очень далеки? Вы сделали в математике открытие: "доказать на реальной торговле" . Вам это надо опубликовать в научном журнале.
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 15:18:45
Цитата
Йцукен написал: Это ваш выбор. Но я написал о другом.Ваш подход также не исключает получения "сырых", как вы выразились, колбэков по заявкам / сделкам.Просто вы получаете их гораздо реже в силу того, что, как правило, колбэки с заполненными параметрами идут сразу следом за частично заполненными.
1. Читайте:
Цитата
TGB написал: В моих роботах единственный обрабатываемый коллбек это OnTransReply
2. Вы проверяли задержку между коллбэком с заполненными параметрами и появлением записи в таблице? Если нет, то проверьте.
Цитата
Йцукен написал: Колбэки OnParam и OnQuote сообщают, когда произошло изменение. Как вы определяете эти изменения?
1. Я OnQuote не пользуюсь. QUIK не предназначен для высокочастотной торговли. Из стакана вам приходит история с задержкой как минимум 2-3 сек. и ваша реакция, скорее всего, тоже > 2 сек. Хотелось увидеть реальные доходные стратегии в таких условиях. 2. Если мне нужны какие то данные из текущей таблицы торгов, то я их читаю в цикле, с нужным мне периодом (у меня есть возможность создавать такие циклы, кроме основного) и фильтрую, чтобы не было повторов, по параметру VOLTODAY. Получается не хуже по результату и эффективности использования ПК, чем при использовании OnParam.
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 14:45:20
1.
Цитата
VPM написал: Ну или просто банально OnTransReply может потеряться. Вы ждёте 120 секунд — это огромный интервал.
Кто вас заставляет ждать 120 сек.? Ждите 1 млс. И, вообще, возитесь с коллбеками сколько хотите . Для вас чем сложнее, тем лучше.
Я знаю как делать сложные системы не теоретически. И один из основных принципов их создания это не делать лишних движений. Несделанное, во-первых, не ломается, а во вторых, супер эффективно, так как не требует ресурсов. С тем подходом к созданию роботов, который использует Nikolay и я не видно проблем создания сложных роботов. Кстати, хотя я стараюсь находить и использовать простые решения, робот у меня не сильно простой. Во всяком случае он полностью автономный. Я, конечно, смотрю что он делает, но уже давно он работает месяцами без моего вмешательства.
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 14:15:06
Цитата
Nikolay написал: Правда такой подход уже сложнее реализовывать, если данные гоняются через socket в алгоритмы, написанные не на lua.
У меня есть простая функциональность создания собственных событий в виде таблиц событий, в которые записываются функции с их параметрами при подписке на событие. Если надо, то при наступлении в скрипте события, например, появления каких то данных выполняется запуск всех функций таблицы соответствующей такому событию.
Не приходит полная версия OnTrade
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 13:23:37
Цитата
Йцукен написал: Но вы так-то тоже обрабатываете "сырые" колбэки и получаете соответствующие проблемы, которые потом закрываете "костылями".
В моих роботах единственный обрабатываемый коллбек это OnTransReply по той причине, что в нем есть информация о возможных причинах отказа в выставлении заявки. При этом я предполагаю, что он может теряться и отслеживаю в таймере выставление заявки по времени, проверяя таблицу заявок (orders соответственно stop_orders). Если заявка не появилась в соответствующей таблице в течении 120 секунд, то это ошибка в выставлении и есть ветка отработки этой ошибки. Все остальное, включая случаи необходимости получения текущих данных таблиц текущих торгов и тд. я беру из таблиц QUIK, кешируя для быстрого доступа индексы используемых таблиц. Признаком изменения состояния таблиц (исключая orders, stop_orders) является изменение длины, которую можно получать функцией getNumberOf. Признаком возможных изменений в существующих записях orders и stop_orders является изменение длины таблицы сделок. Вообще, для торговли надо знать состояние своего счета и какую то картину рынка. Все это можно получить из таблиц QUIK и я не вижу смысла возни с коллбеками.
Если бы я был архитектором QUIK, Что стоило бы изменить в QUIK по-крупному
Пользователь
Сообщений: Регистрация: 12.05.2020
15.02.2026 11:56:46
Цитата
nikolz написал: Что именно по-вашему блокируется "пока в потоке main не выполнится sleep" Напишите конкретно от какого до какого момента исполнения кода в функции Main блокируется
Вы опять не читаете или ничего не соображаете. У меня же написано конкретно то, что давно известно:
Цитата
TGB написал: В существующей версии QUIK выполнение коллбека блокируется до тех пор пока в потоке main не выполнится sleep или сишная функция.
Выполните в main строку: for i = 0, 10000000000 do end и вы заблокируете секунд на 30 не только коллбеки, но и диалог QUIK. -----
Цитата
nikolz написал: Вы сказали буквально следующее: Колбеки всегда остановлены если в main нет функции sleep или параметр у нее равен нулю.
Вы опять меня "передергиваете". Где я это писал буквально? Читайте мою приведенную выше цитату. Вы понимаете что означает слово буквально? Зачем вы меня "передергиваете" и пытаетесь приписать свои фантазии? Вы продолжаете работать прокладкой между комментариями ?