Добрый день,
Выкладываю работающий скрипт, который написал ИИ под моим руководством.
-------------------------------
Скрипт позволяет загружать историю с биржи MOEX.
-------------------------
Состоит из двух файлов.
---------------
config.json
data_fetcher.py
Выкладываю работающий скрипт, который написал ИИ под моим руководством.
-------------------------------
Скрипт позволяет загружать историю с биржи MOEX.
-------------------------
Состоит из двух файлов.
---------------
config.json
| Код |
|---|
{
"instruments": [
{
"name": "SBER",
"start_date": "2015-01-01",
"decimal_places": 2,
"intervals": [1, 10, 60, 24],
"parameters": ["open", "high", "low", "close", "volume"],
"board": "TQBR"
},
{
"name": "GAZP",
"start_date": "2015-01-01",
"decimal_places": 2,
"intervals": [1, 10, 60, 24],
"parameters": ["open", "high", "low", "close", "volume"],
"board": "TQBR"
}
]
}
|
| Код |
|---|
import os
import pandas as pd
import requests
import json
from datetime import datetime, timedelta
def load_config(config_file="config.json"):
"""
Загружает конфигурацию из JSON файла
Параметры:
config_file (str): Путь к файлу конфигурации
Возвращает:
dict: Конфигурация
"""
if not os.path.exists(config_file):
print(f"Файл конфигурации {config_file} не найден")
return None
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
return config
except Exception as e:
print(f"Ошибка при загрузке конфигурации: {e}")
return None
def fetch_moex_data(ticker, start_date, end_date, interval=60):
"""
Загружает данные с MOEX API для указанного тикера и временного периода
Параметры:
ticker (str): Тикер инструмента
start_date (datetime): Начальная дата загрузки
end_date (datetime): Конечная дата загрузки
interval (int): Интервал свечей в минутах (по умолчанию 60 минут)
Возвращает:
pd.DataFrame: DataFrame с загруженными данными
"""
data = []
current_start = start_date
print(f"Загрузка данных с MOEX для {ticker} (интервал: {interval} мин) с {start_date} по {end_date}...")
while True:
url = f"http://iss.moex.com/iss/engines/stock/markets/shares/boards/TQBR/securities/{ticker}/candles.json"
params = {
'from': current_start.strftime('%Y-%m-%d'),
'till': end_date.strftime('%Y-%m-%d'),
'interval': interval,
'start': 0
}
try:
response = requests.get(url, params=params)
response.raise_for_status() # Проверка на ошибки HTTP
json_data = response.json()
if 'candles' not in json_data or 'data' not in json_data['candles']:
print("Нет данных в ответе API")
break
candles = pd.DataFrame(json_data['candles']['data'],
columns=json_data['candles']['columns'])
if len(candles) == 0:
break
data.append(candles)
# Обновляем время для следующего запроса
current_start = (pd.to_datetime(candles['end'].iloc[-1]) + timedelta(minutes=interval))
if len(candles) < 500: # Если получено меньше 500 записей, это последняя порция
break
except requests.exceptions.RequestException as e:
print(f"Ошибка при запросе данных: {e}")
break
print(f"Загружено {len(data)} блоков данных")
if not data:
return pd.DataFrame()
# Объединяем все блоки данных и удаляем дубликаты
return pd.concat(data).drop_duplicates()
def process_raw_data(raw_data):
"""
Обрабатывает сырые данные: разделяет begin на date и time, удаляет end
Параметры:
raw_data (pd.DataFrame): Сырые данные с MOEX
Возвращает:
pd.DataFrame: Обработанные данные
"""
# Разделение begin на date и time
raw_data['date'] = pd.to_datetime(raw_data['begin']).dt.date
# Преобразование времени в формат без двоеточий (HHMMSS)
raw_data['time'] = pd.to_datetime(raw_data['begin']).dt.strftime('%H%M%S')
# Удаление ненужных столбцов
columns_to_keep = ['date', 'time', 'open', 'high', 'low', 'close', 'volume']
processed_data = raw_data[columns_to_keep].copy()
# Сортировка по дате и времени
processed_data = processed_data.sort_values(['date', 'time'])
return processed_data
def get_last_available_date(ticker_dir):
"""
Определяет последнюю дату, за которую есть данные в структуре каталогов
Параметры:
ticker_dir (str): Путь к каталогу тикера
Возвращает:
datetime.date: Последняя дата или None, если данных нет
"""
last_date = None
# Проходим по всем подкаталогам (годы)
if not os.path.exists(ticker_dir):
return None
for year_dir in os.listdir(ticker_dir):
year_path = os.path.join(ticker_dir, year_dir)
if not os.path.isdir(year_path):
continue
# Проходим по месяцам
for month_dir in os.listdir(year_path):
month_path = os.path.join(year_path, month_dir)
if not os.path.isdir(month_path):
continue
# Проходим по дням
for day_dir in os.listdir(month_path):
day_path = os.path.join(month_path, day_dir)
if not os.path.isdir(day_path):
continue
# Проверяем, что имя дня - это число (день месяца)
if not day_dir.isdigit():
continue
# Формируем дату: год, месяц, день
try:
year = int(year_dir)
month = int(month_dir)
day = int(day_dir)
current_date = datetime(year, month, day).date()
except:
continue
# Сравниваем с последней датой
if last_date is None or current_date > last_date:
last_date = current_date
return last_date
def update_data_file(ticker, interval, output_dir, start_date, end_date):
"""
Обновляет файлы данных, добавляя новые записи при необходимости
Параметры:
ticker (str): Тикер инструмента
interval (int): Интервал свечей в минутах
output_dir (str): Путь к каталогу для сохранения данных
start_date (datetime): Начальная дата для загрузки
end_date (datetime): Конечная дата для загрузки
Возвращает:
None
"""
# Определяем последнюю дату, за которую есть данные
last_date = get_last_available_date(output_dir)
if last_date is None:
# Данных нет, загружаем с начальной даты
new_start_date = start_date
print("Не найдено существующих данных. Загружаем все данные...")
else:
# Загружаем с последней даты (включительно)
new_start_date = datetime.combine(last_date, datetime.min.time())
print(f"Найдены данные до {last_date}. Загружаем новые данные начиная с этой даты...")
# Загружаем данные с MOEX
raw_data = fetch_moex_data(ticker, new_start_date, end_date, interval)
if raw_data.empty:
print("Нет новых данных для загрузки")
return
# Обрабатываем данные
processed_data = process_raw_data(raw_data)
# Сохраняем данные, разбивая по датам
save_data_by_date(ticker, interval, output_dir, processed_data)
print(f"Загружено и сохранено {len(processed_data)} записей")
def save_data_by_date(ticker, interval, output_dir, data):
"""
Сохраняет данные с разбиением по датам в подкаталоги
Параметры:
ticker (str): Тикер инструмента
interval (int): Интервал в минутах
output_dir (str): Базовый каталог для сохранения
data (pd.DataFrame): Данные для сохранения
Возвращает:
None
"""
# Получаем имя интервала
interval_name = get_interval_name(interval)
# Группируем данные по датам
grouped = data.groupby('date')
for date, day_data in grouped:
# Разбиваем дату на год, месяц, день
year = date.year
month = date.month
day = date.day
# Форматируем месяц и день с ведущими нулями
month_str = f"{month:02d}"
day_str = f"{day:02d}"
# Формируем путь к каталогу для этой даты
date_dir = os.path.join(output_dir, str(year), month_str, day_str)
# Создаем каталог, если он не существует
os.makedirs(date_dir, exist_ok=True)
# Создаем подкаталог для интервала
interval_dir = os.path.join(date_dir, interval_name)
os.makedirs(interval_dir, exist_ok=True)
# Удаляем столбец date перед сохранением
day_data_to_save = day_data.drop(columns=['date'])
# Сохраняем каждый параметр в отдельный файл
for column in day_data_to_save.columns:
file_path = os.path.join(interval_dir, column)
# Сохраняем столбец без индекса и без заголовка
day_data_to_save[column].to_csv(file_path, index=False, header=False)
print(f"Сохранено {len(day_data_to_save[column])} значений параметра '{column}' за {date} в {file_path}")
def get_interval_name(interval):
"""
Преобразует интервал в минутах в текстовое представление
Параметры:
interval (int): Интервал в минутах
Возвращает:
str: Текстовое представление интервала
"""
if interval == 1:
return "1min"
elif interval == 10:
return "10min"
elif interval == 60:
return "1hour"
elif interval == 1440:
return "1day"
else:
return f"{interval}min"
if __name__ == "__main__":
# Загрузка конфигурации
config = load_config()
if not config:
print("Не удалось загрузить конфигурацию. Выход...")
exit(1)
# Параметры загрузки
end_date = datetime.now()
print(f"Начало загрузки данных для тикеров: {', '.join([item['ticker'] for item in config['tickers']])}")
print(f"Период: с {min([datetime.strptime(item['start_date'], '%Y-%m-%d') for item in config['tickers']])} по {end_date.strftime('%Y-%m-%d')}")
print("-" * 50)
# Обработка каждого тикера и интервала
for ticker_config in config['tickers']:
ticker = ticker_config['ticker']
start_date_str = ticker_config['start_date']
intervals = ticker_config['intervals']
# Преобразование начальной даты из строки в datetime
try:
start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
except ValueError:
print(f"Ошибка в формате даты для тикера {ticker}: {start_date_str}")
continue
print(f"\nОбработка тикера: {ticker} (начальная дата: {start_date_str})")
print(f"Интервалы: {', '.join([get_interval_name(i) for i in intervals])}")
# Формирование пути к каталогу для сохранения данных
base_dir = "moex"
ticker_dir = os.path.join(base_dir, ticker)
# Обработка каждого интервала для текущего тикера
for interval in intervals:
print(f"\nОбработка интервала: {get_interval_name(interval)}")
# Обновление файла данных
update_data_file(ticker, interval, ticker_dir, start_date, end_date)
# Проверка сохраненных данных
print("\nПроверка сохраненных данных:")
interval_name = get_interval_name(interval)
# Проверяем наличие подкаталогов с датами
if os.path.exists(ticker_dir):
# Собираем все даты из структуры каталогов
dates_found = []
# Проходим по годам
for year_dir in sorted(os.listdir(ticker_dir)):
year_path = os.path.join(ticker_dir, year_dir)
if not os.path.isdir(year_path):
continue
# Проходим по месяцам
for month_dir in sorted(os.listdir(year_path)):
month_path = os.path.join(year_path, month_dir)
if not os.path.isdir(month_path):
continue
# Проходим по дням
for day_dir in sorted(os.listdir(month_path)):
day_path = os.path.join(month_path, day_dir)
if not os.path.isdir(day_path):
continue
# Проверяем наличие подкаталога с интервалом
interval_path = os.path.join(day_path, interval_name)
if os.path.isdir(interval_path):
# Формируем дату для отображения
try:
year = int(year_dir)
month = int(month_dir)
day = int(day_dir)
date_str = f"{year:04d}-{month:02d}-{day:02d}"
dates_found.append(date_str)
except:
pass
if dates_found:
print(f"Найдено {len(dates_found)} дат с данными:")
# Показываем последние 5 дат
for date_str in sorted(dates_found)[-5:]:
# Формируем путь к каталогу интервала
interval_path = os.path.join(ticker_dir, date_str.split('-')[0],
date_str.split('-')[1], date_str.split('-')[2],
interval_name)
# if os.path.exists(interval_path):
# print(f" {date_str}:")
# Проверяем наличие файлов с параметрами
# for param in ['time', 'open', 'high', 'low', 'close', 'volume']:
# param_file = os.path.join(interval_path, param)
# if os.path.exists(param_file):
# # Читаем файл
# param_data = pd.read_csv(param_file, header=None)
# print(f" {param}: {len(param_data)} значений")
# Выводим последние 3 значения для проверки
# if len(param_data) > 0:
# print(f" Последние 3 значения: {', '.join(map(str, param_data.tail(3).values.flatten()))}")
# else:
# print(f" {date_str}: каталог интервала не найден")
else:
print("Данные не найдены")
else:
print(f"Каталог тикера {ticker} не найден")
print("-" * 30)
# print("-" * 50)
print("\nЗагрузка данных завершена для всех инструментов и интервалов")
|