Добрый день,
Выкладываю работающий скрипт, который написал ИИ под моим руководством.
-------------------------------
Скрипт позволяет загружать историю с биржи MOEX.
-------------------------
Состоит из двух файлов.
---------------
config.json
data_fetcher.py
Выкладываю работающий скрипт, который написал ИИ под моим руководством.
-------------------------------
Скрипт позволяет загружать историю с биржи MOEX.
-------------------------
Состоит из двух файлов.
---------------
config.json
Код |
---|
{ "instruments": [ { "name": "SBER", "start_date": "2015-01-01", "decimal_places": 2, "intervals": [1, 10, 60, 24], "parameters": ["open", "high", "low", "close", "volume"], "board": "TQBR" }, { "name": "GAZP", "start_date": "2015-01-01", "decimal_places": 2, "intervals": [1, 10, 60, 24], "parameters": ["open", "high", "low", "close", "volume"], "board": "TQBR" } ] } |
Код |
---|
import os import pandas as pd import requests import json from datetime import datetime, timedelta def load_config(config_file="config.json"): """ Загружает конфигурацию из JSON файла Параметры: config_file (str): Путь к файлу конфигурации Возвращает: dict: Конфигурация """ if not os.path.exists(config_file): print(f"Файл конфигурации {config_file} не найден") return None try: with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) return config except Exception as e: print(f"Ошибка при загрузке конфигурации: {e}") return None def fetch_moex_data(ticker, start_date, end_date, interval=60): """ Загружает данные с MOEX API для указанного тикера и временного периода Параметры: ticker (str): Тикер инструмента start_date (datetime): Начальная дата загрузки end_date (datetime): Конечная дата загрузки interval (int): Интервал свечей в минутах (по умолчанию 60 минут) Возвращает: pd.DataFrame: DataFrame с загруженными данными """ data = [] current_start = start_date print(f"Загрузка данных с MOEX для {ticker} (интервал: {interval} мин) с {start_date} по {end_date}...") while True: url = f"http://iss.moex.com/iss/engines/stock/markets/shares/boards/TQBR/securities/{ticker}/candles.json" params = { 'from': current_start.strftime('%Y-%m-%d'), 'till': end_date.strftime('%Y-%m-%d'), 'interval': interval, 'start': 0 } try: response = requests.get(url, params=params) response.raise_for_status() # Проверка на ошибки HTTP json_data = response.json() if 'candles' not in json_data or 'data' not in json_data['candles']: print("Нет данных в ответе API") break candles = pd.DataFrame(json_data['candles']['data'], columns=json_data['candles']['columns']) if len(candles) == 0: break data.append(candles) # Обновляем время для следующего запроса current_start = (pd.to_datetime(candles['end'].iloc[-1]) + timedelta(minutes=interval)) if len(candles) < 500: # Если получено меньше 500 записей, это последняя порция break except requests.exceptions.RequestException as e: print(f"Ошибка при запросе данных: {e}") break print(f"Загружено {len(data)} блоков данных") if not data: return pd.DataFrame() # Объединяем все блоки данных и удаляем дубликаты return pd.concat(data).drop_duplicates() def process_raw_data(raw_data): """ Обрабатывает сырые данные: разделяет begin на date и time, удаляет end Параметры: raw_data (pd.DataFrame): Сырые данные с MOEX Возвращает: pd.DataFrame: Обработанные данные """ # Разделение begin на date и time raw_data['date'] = pd.to_datetime(raw_data['begin']).dt.date # Преобразование времени в формат без двоеточий (HHMMSS) raw_data['time'] = pd.to_datetime(raw_data['begin']).dt.strftime('%H%M%S') # Удаление ненужных столбцов columns_to_keep = ['date', 'time', 'open', 'high', 'low', 'close', 'volume'] processed_data = raw_data[columns_to_keep].copy() # Сортировка по дате и времени processed_data = processed_data.sort_values(['date', 'time']) return processed_data def get_last_available_date(ticker_dir): """ Определяет последнюю дату, за которую есть данные в структуре каталогов Параметры: ticker_dir (str): Путь к каталогу тикера Возвращает: datetime.date: Последняя дата или None, если данных нет """ last_date = None # Проходим по всем подкаталогам (годы) if not os.path.exists(ticker_dir): return None for year_dir in os.listdir(ticker_dir): year_path = os.path.join(ticker_dir, year_dir) if not os.path.isdir(year_path): continue # Проходим по месяцам for month_dir in os.listdir(year_path): month_path = os.path.join(year_path, month_dir) if not os.path.isdir(month_path): continue # Проходим по дням for day_dir in os.listdir(month_path): day_path = os.path.join(month_path, day_dir) if not os.path.isdir(day_path): continue # Проверяем, что имя дня - это число (день месяца) if not day_dir.isdigit(): continue # Формируем дату: год, месяц, день try: year = int(year_dir) month = int(month_dir) day = int(day_dir) current_date = datetime(year, month, day).date() except: continue # Сравниваем с последней датой if last_date is None or current_date > last_date: last_date = current_date return last_date def update_data_file(ticker, interval, output_dir, start_date, end_date): """ Обновляет файлы данных, добавляя новые записи при необходимости Параметры: ticker (str): Тикер инструмента interval (int): Интервал свечей в минутах output_dir (str): Путь к каталогу для сохранения данных start_date (datetime): Начальная дата для загрузки end_date (datetime): Конечная дата для загрузки Возвращает: None """ # Определяем последнюю дату, за которую есть данные last_date = get_last_available_date(output_dir) if last_date is None: # Данных нет, загружаем с начальной даты new_start_date = start_date print("Не найдено существующих данных. Загружаем все данные...") else: # Загружаем с последней даты (включительно) new_start_date = datetime.combine(last_date, datetime.min.time()) print(f"Найдены данные до {last_date}. Загружаем новые данные начиная с этой даты...") # Загружаем данные с MOEX raw_data = fetch_moex_data(ticker, new_start_date, end_date, interval) if raw_data.empty: print("Нет новых данных для загрузки") return # Обрабатываем данные processed_data = process_raw_data(raw_data) # Сохраняем данные, разбивая по датам save_data_by_date(ticker, interval, output_dir, processed_data) print(f"Загружено и сохранено {len(processed_data)} записей") def save_data_by_date(ticker, interval, output_dir, data): """ Сохраняет данные с разбиением по датам в подкаталоги Параметры: ticker (str): Тикер инструмента interval (int): Интервал в минутах output_dir (str): Базовый каталог для сохранения data (pd.DataFrame): Данные для сохранения Возвращает: None """ # Получаем имя интервала interval_name = get_interval_name(interval) # Группируем данные по датам grouped = data.groupby('date') for date, day_data in grouped: # Разбиваем дату на год, месяц, день year = date.year month = date.month day = date.day # Форматируем месяц и день с ведущими нулями month_str = f"{month:02d}" day_str = f"{day:02d}" # Формируем путь к каталогу для этой даты date_dir = os.path.join(output_dir, str(year), month_str, day_str) # Создаем каталог, если он не существует os.makedirs(date_dir, exist_ok=True) # Создаем подкаталог для интервала interval_dir = os.path.join(date_dir, interval_name) os.makedirs(interval_dir, exist_ok=True) # Удаляем столбец date перед сохранением day_data_to_save = day_data.drop(columns=['date']) # Сохраняем каждый параметр в отдельный файл for column in day_data_to_save.columns: file_path = os.path.join(interval_dir, column) # Сохраняем столбец без индекса и без заголовка day_data_to_save[column].to_csv(file_path, index=False, header=False) print(f"Сохранено {len(day_data_to_save[column])} значений параметра '{column}' за {date} в {file_path}") def get_interval_name(interval): """ Преобразует интервал в минутах в текстовое представление Параметры: interval (int): Интервал в минутах Возвращает: str: Текстовое представление интервала """ if interval == 1: return "1min" elif interval == 10: return "10min" elif interval == 60: return "1hour" elif interval == 1440: return "1day" else: return f"{interval}min" if __name__ == "__main__": # Загрузка конфигурации config = load_config() if not config: print("Не удалось загрузить конфигурацию. Выход...") exit(1) # Параметры загрузки end_date = datetime.now() print(f"Начало загрузки данных для тикеров: {', '.join([item['ticker'] for item in config['tickers']])}") print(f"Период: с {min([datetime.strptime(item['start_date'], '%Y-%m-%d') for item in config['tickers']])} по {end_date.strftime('%Y-%m-%d')}") print("-" * 50) # Обработка каждого тикера и интервала for ticker_config in config['tickers']: ticker = ticker_config['ticker'] start_date_str = ticker_config['start_date'] intervals = ticker_config['intervals'] # Преобразование начальной даты из строки в datetime try: start_date = datetime.strptime(start_date_str, '%Y-%m-%d') except ValueError: print(f"Ошибка в формате даты для тикера {ticker}: {start_date_str}") continue print(f"\nОбработка тикера: {ticker} (начальная дата: {start_date_str})") print(f"Интервалы: {', '.join([get_interval_name(i) for i in intervals])}") # Формирование пути к каталогу для сохранения данных base_dir = "moex" ticker_dir = os.path.join(base_dir, ticker) # Обработка каждого интервала для текущего тикера for interval in intervals: print(f"\nОбработка интервала: {get_interval_name(interval)}") # Обновление файла данных update_data_file(ticker, interval, ticker_dir, start_date, end_date) # Проверка сохраненных данных print("\nПроверка сохраненных данных:") interval_name = get_interval_name(interval) # Проверяем наличие подкаталогов с датами if os.path.exists(ticker_dir): # Собираем все даты из структуры каталогов dates_found = [] # Проходим по годам for year_dir in sorted(os.listdir(ticker_dir)): year_path = os.path.join(ticker_dir, year_dir) if not os.path.isdir(year_path): continue # Проходим по месяцам for month_dir in sorted(os.listdir(year_path)): month_path = os.path.join(year_path, month_dir) if not os.path.isdir(month_path): continue # Проходим по дням for day_dir in sorted(os.listdir(month_path)): day_path = os.path.join(month_path, day_dir) if not os.path.isdir(day_path): continue # Проверяем наличие подкаталога с интервалом interval_path = os.path.join(day_path, interval_name) if os.path.isdir(interval_path): # Формируем дату для отображения try: year = int(year_dir) month = int(month_dir) day = int(day_dir) date_str = f"{year:04d}-{month:02d}-{day:02d}" dates_found.append(date_str) except: pass if dates_found: print(f"Найдено {len(dates_found)} дат с данными:") # Показываем последние 5 дат for date_str in sorted(dates_found)[-5:]: # Формируем путь к каталогу интервала interval_path = os.path.join(ticker_dir, date_str.split('-')[0], date_str.split('-')[1], date_str.split('-')[2], interval_name) # if os.path.exists(interval_path): # print(f" {date_str}:") # Проверяем наличие файлов с параметрами # for param in ['time', 'open', 'high', 'low', 'close', 'volume']: # param_file = os.path.join(interval_path, param) # if os.path.exists(param_file): # # Читаем файл # param_data = pd.read_csv(param_file, header=None) # print(f" {param}: {len(param_data)} значений") # Выводим последние 3 значения для проверки # if len(param_data) > 0: # print(f" Последние 3 значения: {', '.join(map(str, param_data.tail(3).values.flatten()))}") # else: # print(f" {date_str}: каталог интервала не найден") else: print("Данные не найдены") else: print(f"Каталог тикера {ticker} не найден") print("-" * 30) # print("-" * 50) print("\nЗагрузка данных завершена для всех инструментов и интервалов") |