local _is_running = 1
function OnStop ( )
_is_running = 0
return 10000
end
function isOnline ( )
return isConnected ( ) + _is_running == 2
end
local function dumpbatch ( batch, timeout, batchsize )
local batchsize = batchsize or 8
local timeout = timeout or 2
local basestr = getScriptPath ( ) .. "/dump/"
local filestr = basestr .. "%s %s %s%s.csv"
local liststr = basestr .. "_list.txt"
local stallstr = basestr .. "_stall.txt"
local intervals = {
['TICK'] = INTERVAL_TICK,
['1'] = INTERVAL_M1, ['2'] = INTERVAL_M2, ['3'] = INTERVAL_M3, ['4'] = INTERVAL_M4, ['5'] = INTERVAL_M5, ['6'] = INTERVAL_M6, ['10'] = INTERVAL_M10, ['15'] = INTERVAL_M15, ['20'] = INTERVAL_M20, ['30'] = INTERVAL_M30,
['1H'] = INTERVAL_H1, ['2H'] = INTERVAL_H2, ['4H'] = INTERVAL_H4, ['D'] = INTERVAL_D1, ['W'] = INTERVAL_W1, ['M'] = INTERVAL_MN1 }
do -- sanitization
assert ( type ( batch ) == 'table', "Invalid argument: 'batch' is not 'table'." )
assert ( type ( batchsize ) == 'number', "Invalid argument: 'batchsize' is not 'number'." )
assert ( type ( timeout ) == 'number', "Invalid argument: 'timeout' is not 'number'." )
for i = 1, #batch do
assert ( type ( batch[ i ] ) == 'table', "Invalid batch item: not 'table'.")
assert ( type ( batch[ i ].class ) == 'string', "Invalid batch item: 'class' is not 'string'." )
assert ( type ( batch[ i ].sec ) == 'string', "Invalid batch item: 'sec' is not 'string'." )
assert ( type ( batch[ i ].int ) == 'string', "Invalid batch item: 'int' is not 'string'." )
assert ( type ( batch[ i ].param ) == 'string', "Invalid batch item: 'param' is not 'string'." )
assert ( intervals[ batch[ i ].int ], "Invalid batch item: 'int' is not a valid time interval ( TICK, 1, 2, 3, 4, 5, 6, 10, 15, 20, 30, 1H, 2H, 4H, D, W, M )." )
end
end
local runbatch, stalled = { }, { }
while #batch + #runbatch > 0 do
if #runbatch < batchsize and #batch > 0 then -- process batch item
local entry = table.remove ( batch )
local fname = string.format ( filestr, entry.class, entry.sec, entry.int, entry.param )
local file = io.open ( fname, "r" )
if file then file:close ( ); goto runbatch end -- skip existing dumps
for i = 1, #stalled do -- skip stalled securities
if entry.class == stalled[ i ].class and entry.sec == stalled[ i ].sec then
table.insert ( stalled, entry )
goto runbatch
end
end
local source, error
if entry.param ~= "" then
source, error = CreateDataSource ( entry.class, entry.sec, intervals[ entry.int ], entry.param )
else
source, error = CreateDataSource ( entry.class, entry.sec, intervals[ entry.int ] )
end
if source then
source:SetEmptyCallback ( )
table.insert ( runbatch, { entry = entry, source = source, fname = fname, time = 0 } )
else
message ( string.format ( "Failed to create data source: %s\n%s %s %s %s" , error, entry.class, entry.sec, entry.int, entry.param ) )
end
end
::runbatch::
for e = #runbatch, 1, -1 do -- run batch
local entry = runbatch[ e ]
if entry.source:Size ( ) == 0 then
entry.time = entry.time + 0.100
if entry.time >= timeout then
table.insert ( stalled, entry.entry )
table.remove ( runbatch, e )
end
else
local file = io.open ( entry.fname, "w" )
local source = entry.source
for i = 1, source:Size ( ) do
file:write ( string.format ( "%04d%02d%02d,%02d%02d,%f,%f,%f,%f,%f\n",
source:T ( i ).year, source:T ( i ).month, source:T ( i ).day, source:T ( i ).hour, source:T ( i ).min,
source:O ( i ), source:H ( i ), source:L ( i ), source:C ( i ), source:V( i ) ) )
end
file:close ( )
source:Close ( )
file = io.open ( liststr, "a" )
file:write ( string.format ( "%s\n", entry.fname ) )
file:close ( )
table.remove ( runbatch, e )
end
end
if #runbatch > 0 then sleep ( 100 ) end
if not isOnline ( ) then return end
end
local file = io.open ( stallstr, "w" )
for i = 1, #stalled do
file:write ( string.format ( "%s %s %s %s\n", stalled[ i ].class, stalled[ i ].sec, stalled[ i ].int, stalled[ i ].param ) )
end
file:close ( )
return stalled
end |