-
Notifications
You must be signed in to change notification settings - Fork 316
Expand file tree
/
Copy pathkhFrame.py
More file actions
3158 lines (2718 loc) · 153 KB
/
khFrame.py
File metadata and controls
3158 lines (2718 loc) · 153 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# coding: utf-8
import time
import datetime
import traceback
import importlib.util
from typing import Dict, List, Optional, Union, Any
import logging
import sys
import shutil
from types import SimpleNamespace
import threading
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant
from khTrade import KhTradeManager
from khRisk import KhRiskManager
from khQTTools import KhQuTools, determine_pool_type, format_price, round_price, get_price_decimals, check_t0_support, get_t0_details
from khConfig import KhConfig
import numpy as np
from PyQt5.QtCore import Qt, QMetaObject, Q_ARG, QObject
from PyQt5.QtWidgets import QMessageBox
import pandas as pd
import os
import holidays
# 简单的GUI类,用于处理日志记录
class DummySignal:
"""虚拟信号类,用于非GUI模式下替代PyQt5信号"""
def emit(self, *args, **kwargs):
"""空实现,忽略信号发射"""
pass
class SimpleGUI:
"""简单的GUI类,用于处理日志记录,兼容纯代码运行模式"""
def __init__(self):
# 创建虚拟信号对象,避免属性访问错误
self.progress_signal = DummySignal()
def log_message(self, message, level="INFO"):
"""记录日志消息"""
print(f"[{level}] {datetime.datetime.now()} - {message}")
def on_strategy_finished(self):
"""策略完成回调"""
print(f"[INFO] {datetime.datetime.now()} - 策略执行完成")
# 触发器基类
class TriggerBase:
"""触发器基类,定义触发机制的通用接口"""
def __init__(self, framework):
"""初始化触发器
Args:
framework: KhQuantFramework实例
"""
self.framework = framework
def initialize(self):
"""初始化触发器"""
pass
def should_trigger(self, timestamp, data):
"""判断是否应该触发策略
Args:
timestamp: 当前时间戳
data: 当前市场数据
Returns:
bool: 是否触发策略
"""
return False
def get_data_period(self):
"""获取数据周期,用于数据加载
Returns:
str: 数据周期,如"tick", "1m", "5m"等
"""
return "tick"
# Tick触发器
class TickTrigger(TriggerBase):
"""Tick触发器,每个Tick都触发策略"""
def should_trigger(self, timestamp, data):
"""判断是否应该触发策略
Args:
timestamp: 当前时间戳
data: 当前市场数据
Returns:
bool: 是否触发策略
"""
# Tick触发方式下,每个Tick都触发
return True
def get_data_period(self):
"""获取数据周期
Returns:
str: 数据周期
"""
return "tick"
# K线触发器
class KLineTrigger(TriggerBase):
"""K线触发器,在K线形成时触发策略"""
def __init__(self, framework, period):
"""初始化K线触发器
Args:
framework: KhQuantFramework实例
period: K线周期,如"1m", "5m", "1d"等
"""
super().__init__(framework)
self.period = period # "1m", "5m" 或 "1d"
self.last_trigger_time = {} # 记录每个股票上次触发时间
self.last_trigger_date = None # 记录上次触发的日期(用于日K线)
def should_trigger(self, timestamp, data):
"""判断是否应该触发策略
Args:
timestamp: 当前时间戳
data: 当前市场数据
Returns:
bool: 是否触发策略
"""
# 获取当前时间
if isinstance(timestamp, str):
try:
current_time = datetime.datetime.strptime(timestamp, "%Y%m%d%H%M%S")
except:
current_time = datetime.datetime.now()
else:
try:
timestamp = float(timestamp)
if timestamp > 1e10: # 如果是毫秒级时间戳
timestamp = timestamp / 1000
current_time = datetime.datetime.fromtimestamp(timestamp)
except:
current_time = datetime.datetime.now()
# 对于1分钟K线,在每分钟的开始触发
if self.period == "1m":
return current_time.second == 0
# 对于5分钟K线,在每5分钟的开始触发
elif self.period == "5m":
return current_time.minute % 5 == 0 and current_time.second == 0
# 对于日K线,每个交易日触发一次
elif self.period == "1d":
current_date = current_time.date()
# 检查是否是新的一天(日K线只需要基于日期判断,无需考虑具体时间)
if self.last_trigger_date != current_date:
self.last_trigger_date = current_date
return True
return False
return False
def get_data_period(self):
"""获取数据周期
Returns:
str: 数据周期
"""
return self.period
# 自定义定时触发器
class CustomTimeTrigger(TriggerBase):
"""自定义定时触发器,在指定的时间点触发策略"""
def __init__(self, framework, custom_times):
"""初始化自定义定时触发器
Args:
framework: KhQuantFramework实例
custom_times: 自定义触发时间点列表,格式为["09:30:00", "09:45:00", ...]
"""
super().__init__(framework)
# 解析时间字符串为秒数(从午夜开始)
self.trigger_seconds = []
for time_str in custom_times:
h, m, s = map(int, time_str.split(':'))
seconds = h * 3600 + m * 60 + s
self.trigger_seconds.append(seconds)
self.trigger_seconds.sort()
def should_trigger(self, timestamp, data):
"""判断是否应该触发策略
Args:
timestamp: 当前时间戳
data: 当前市场数据
Returns:
bool: 是否触发策略
"""
# 获取当前时间
if isinstance(timestamp, str):
try:
current_time = datetime.datetime.strptime(timestamp, "%Y%m%d%H%M%S")
except:
current_time = datetime.datetime.now()
else:
try:
timestamp = float(timestamp)
if timestamp > 1e10: # 如果是毫秒级时间戳
timestamp = timestamp / 1000
current_time = datetime.datetime.fromtimestamp(timestamp)
except:
current_time = datetime.datetime.now()
# 计算当前时间的秒数(从午夜开始)
current_seconds = current_time.hour * 3600 + current_time.minute * 60 + current_time.second
# 检查是否接近任一触发时间点(允许5秒误差)
for trigger_second in self.trigger_seconds:
if abs(current_seconds - trigger_second) < 5:
return True
return False
def get_data_period(self):
"""获取数据周期
Returns:
str: 数据周期
"""
# 自定义触发使用1秒级数据
return "1s"
# 触发器工厂
class TriggerFactory:
"""触发器工厂,用于创建不同类型的触发器"""
@staticmethod
def create_trigger(framework, config):
"""创建触发器
Args:
framework: KhQuantFramework实例
config: 配置字典
Returns:
TriggerBase: 触发器实例
"""
trigger_type = config.get("backtest", {}).get("trigger", {}).get("type", "tick")
if trigger_type == "tick":
return TickTrigger(framework)
elif trigger_type == "1m":
return KLineTrigger(framework, "1m")
elif trigger_type == "5m":
return KLineTrigger(framework, "5m")
elif trigger_type == "1d":
return KLineTrigger(framework, "1d")
elif trigger_type == "custom":
custom_times = config.get("backtest", {}).get("trigger", {}).get("custom_times", [])
return CustomTimeTrigger(framework, custom_times)
else:
# 默认使用Tick触发
return TickTrigger(framework)
class MyTraderCallback(XtQuantTraderCallback):
def __init__(self, gui):
super().__init__()
self.gui = gui
self.price_decimals = 2 # 默认价格精度,会在回测开始时根据股票池类型更新
self.gui.log_message("交易回调已初始化", "INFO")
def set_price_decimals(self, decimals: int):
"""设置价格精度"""
self.price_decimals = decimals
def on_stock_order(self, order):
"""委托回报推送"""
try:
direction_map = {
xtconstant.STOCK_BUY: '买入',
xtconstant.STOCK_SELL: '卖出'
}
status_map = {
0: '已提交',
1: '已接受',
2: '已拒绝',
3: '已撤销',
4: '已成交',
5: '部分成交'
}
# 格式化时间戳 (从order对象获取, 通常是Unix时间戳)
formatted_time = "未知"
order_time_val = getattr(order, 'order_time', None)
if order_time_val:
try:
timestamp = float(order_time_val)
# 检查并转换毫秒级时间戳
if timestamp > 1e10:
timestamp = timestamp / 1000
formatted_time = datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果不是数字时间戳,尝试解析字符串
try:
formatted_time = datetime.datetime.strptime(str(order_time_val), '%Y%m%d%H%M%S').strftime('%Y-%m-%d %H:%M:%S')
except:
formatted_time = str(order_time_val) # 解析失败则直接显示原始值
except Exception as e:
formatted_time = f"时间转换错误: {e}"
decimals = self.price_decimals
order_msg = (
f"委托信息 - "
f"时间: {formatted_time} | "
f"股票代码: {order.stock_code} | "
f"方向: {direction_map.get(order.order_type, '未知')} | "
f"委托价格: {order.price:.{decimals}f} | "
f"数量: {order.order_volume} | "
f"委托编号: {order.order_id} | "
f"原因: {order.status_msg or '策略交易'}"
)
self.gui.log_message(order_msg, "TRADE")
print(datetime.datetime.now(), '委托回调', order.order_remark)
except Exception as e:
self.gui.log_message(f"处理委托回报时出错: {str(e)}", "ERROR")
def on_stock_trade(self, trade):
"""成交回报推送"""
try:
direction_map = {
xtconstant.STOCK_BUY: '买入',
xtconstant.STOCK_SELL: '卖出'
}
# 获取实际成交价格
actual_price = getattr(trade, 'actual_price', trade.traded_price)
# 格式化时间戳 (从trade对象获取, 通常是Unix时间戳)
formatted_time = "未知"
traded_time_val = getattr(trade, 'traded_time', None)
if traded_time_val:
try:
timestamp = float(traded_time_val)
# 检查并转换毫秒级时间戳
if timestamp > 1e10:
timestamp = timestamp / 1000
formatted_time = datetime.datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
# 如果不是数字时间戳,尝试解析字符串
try:
formatted_time = datetime.datetime.strptime(str(traded_time_val), '%Y%m%d%H%M%S').strftime('%Y-%m-%d %H:%M:%S')
except:
formatted_time = str(traded_time_val) # 解析失败则直接显示原始值
except Exception as e:
formatted_time = f"时间转换错误: {e}"
decimals = self.price_decimals
trade_msg = (
f"成交信息 - "
f"时间: {formatted_time} | "
f"股票代码: {trade.stock_code} | "
f"方向: {direction_map.get(trade.order_type, '未知')} | "
f"实际成交价: {actual_price:.{decimals}f} | "
f"成交数量: {trade.traded_volume} | "
f"成交金额: {trade.traded_amount:.{decimals}f} | "
f"成交编号: {trade.traded_id} | "
f"原因: {trade.order_remark or '策略交易'}"
)
self.gui.log_message(trade_msg, "TRADE")
print(datetime.datetime.now(), '成交回调', trade.order_remark)
except Exception as e:
self.gui.log_message(f"处理成交回报时出错: {str(e)}", "ERROR")
def on_order_error(self, order_error):
"""委托错误回报推送"""
try:
error_msg = (
f"委托错误 - "
f"股票代码: {order_error.stock_code} | "
f"错误代码: {order_error.error_id} | "
f"错误信息: {order_error.error_msg} | "
f"备注: {order_error.order_remark}"
)
self.gui.log_message(error_msg, "ERROR")
print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")
except Exception as e:
self.gui.log_message(f"处理委托错误时出错: {str(e)}", "ERROR")
def on_cancel_error(self, cancel_error):
"""撤单错误回报推送"""
try:
error_msg = (
f"撤单错误 - "
f"委托编号: {cancel_error.order_id} | "
f"错误代码: {cancel_error.error_id} | "
f"错误信息: {cancel_error.error_msg}"
)
self.gui.log_message(error_msg, "ERROR")
print(datetime.datetime.now(), sys._getframe().f_code.co_name)
except Exception as e:
self.gui.log_message(f"处理撤单错误时出错: {str(e)}", "ERROR")
def on_disconnected(self):
"""连接断开"""
self.gui.log_message("交易连接已断开", "WARNING")
print(datetime.datetime.now(),'连接断开回调')
def on_order_stock_async_response(self, response):
"""异步下单回报推送"""
try:
msg = f"异步委托回调 - 备注: {response.order_remark}"
self.gui.log_message(msg, "TRADE")
print(f"异步委托回调 {response.order_remark}")
except Exception as e:
self.gui.log_message(f"处理异步下单回报时出错: {str(e)}", "ERROR")
def on_cancel_order_stock_async_response(self, response):
"""撤单异步回报推送"""
try:
msg = f"撤单异步回报 - 委托编号: {response.order_id}"
self.gui.log_message(msg, "TRADE")
print(datetime.datetime.now(), sys._getframe().f_code.co_name)
except Exception as e:
self.gui.log_message(f"处理撤单异步回报时出错: {str(e)}", "ERROR")
def on_account_status(self, status):
"""账户状态变动推送"""
try:
msg = f"账户状态变动 - 账户: {status.account_id} | 状态: {status.status}"
self.gui.log_message(msg, "INFO")
print(datetime.datetime.now(), sys._getframe().f_code.co_name)
except Exception as e:
self.gui.log_message(f"处理账户状态变动时出错: {str(e)}", "ERROR")
def on_stock_position(self, position):
"""持仓变动推送"""
try:
# 只记录重要的持仓变动
decimals = self.price_decimals
msg = (
f"持仓变动 - "
f"股票代码: {position.stock_code} | "
f"持仓数量: {position.volume} | "
f"最新价格: {getattr(position, 'current_price', 0):.{decimals}f} | "
f"持仓市值: {getattr(position, 'market_value', 0):.{decimals}f} | "
f"持仓盈亏: {getattr(position, 'profit', 0):.{decimals}f}"
)
self.gui.log_message(msg, "INFO")
except Exception as e:
self.gui.log_message(f"处理持仓变动时出错: {str(e)}", "ERROR")
def on_connected(self):
"""连接成功推送"""
self.gui.log_message("交易连接成功", "INFO")
def on_stock_asset(self, asset):
"""资金变动推送"""
'''
try:
decimals = self.price_decimals
msg = (
f"资金变动 - "
f"账户: {asset.account_id} | "
f"可用资金: {asset.cash:.{decimals}f} | "
f"总资产: {asset.total_asset:.{decimals}f}"
)
self.gui.log_message(msg, "INFO")
print("资金变动推送on asset callback")
print(asset.account_id, asset.cash, asset.total_asset)
except Exception as e:
self.gui.log_message(f"处理资金变动时出错: {str(e)}", "ERROR")
'''
class KhQuantFramework:
"""量化交易框架主类"""
def __init__(self, config_path: str, strategy_file: str, trader_callback=None):
"""初始化框架
Args:
config_path: 配置文件路径
strategy_file: 策略文件路径
trader_callback: 交易回调函数
"""
print(f"[DEBUG] KhQuantFramework.__init__ 开始")
print(f"[DEBUG] config_path: {config_path}")
print(f"[DEBUG] strategy_file: {strategy_file}")
self.config_path = config_path
self.config = KhConfig(config_path)
self.is_running = False # 运行状态标识
self.qmt_path = self.config.config_dict.get("qmt", {}).get("path", "") # QMT客户端路径
self.account = None # 账户对象
self.trader = None # 交易API实例
self.strategy_module = None # 策略模块
self.trade_mgr = KhTradeManager(self.config) # 交易管理器
self.risk_mgr = KhRiskManager(self.config) # 风险管理器
self.tools = KhQuTools() # 工具类
self.backtest_records = {} # 回测记录
self.daily_price_cache = {} # 日线价格缓存,用于存储所有股票的日线数据
self._cached_benchmark_close = {} # 基准指数收盘价缓存
# T+0交易模式标识(默认关闭,在run()中根据股票池判断)
self.t0_mode = False
# 添加运行时间记录变量
self.start_time = None # 策略开始运行时间
self.end_time = None # 策略结束运行时间
self.total_runtime = 0 # 总运行时间(秒)
# 添加简单的GUI属性用于日志记录
self.gui = SimpleGUI()
# 加载策略模块
print(f"[DEBUG] 准备加载策略模块: {strategy_file}")
try:
self.strategy_module = self.load_strategy(strategy_file)
print(f"[DEBUG] 策略模块加载成功")
except Exception as e:
print(f"[DEBUG] 策略模块加载失败: {str(e)}")
import traceback
traceback.print_exc()
raise
# 当前运行模式
self.run_mode = self.config.run_mode
self.trader_callback = trader_callback # 保存交易回调函数
# 创建触发器
self.trigger = TriggerFactory.create_trigger(self, self.config.config_dict)
# 初始化各个模块
self.trade_mgr = KhTradeManager(self.config)
self.risk_mgr = KhRiskManager(self.config)
self.tools = KhQuTools()
# 初始化QMT客户端路径,优先使用system.userdata_path
self.qmt_path = self.config.config_dict.get("system", {}).get("userdata_path", "")
if not self.qmt_path:
self.qmt_path = self.config.config_dict.get("qmt", {}).get("path", "")
# 交易账户
self.account = None
# 交易API
self.trader = None
# 交易回调
self.callback = None
# 初始化交易管理器
self.trade_mgr = KhTradeManager(self.config, self)
# 清除可能存在的历史数据缓存,确保每次运行都是干净的状态
if hasattr(self, 'historical_data_ref'):
delattr(self, 'historical_data_ref')
if hasattr(self, 'time_field_cache'):
delattr(self, 'time_field_cache')
if hasattr(self, 'time_idx_cache'):
delattr(self, 'time_idx_cache')
# 初始化风控管理器
self.risk_mgr = KhRiskManager(self.config)
def _log(self, message, level="INFO"):
"""根据是否存在回调函数选择日志记录方式"""
if self.trader_callback and hasattr(self.trader_callback, 'gui'):
self.trader_callback.gui.log_message(message, level)
else:
# 在调试模式下,trader_callback可能不存在,使用print输出
print(f"[{level}] {datetime.datetime.now()} - {message}")
def _should_log(self):
"""检查是否应该输出日志(用于性能优化)
始终返回True,保持兼容性
"""
return True
def _cache_should_log(self):
"""在回测开始时缓存日志开关状态(保持兼容性)"""
pass
def load_strategy(self, strategy_file: str):
"""动态加载策略模块
Args:
strategy_file: 策略文件路径
Returns:
module: 策略模块
"""
print(f"[DEBUG] load_strategy 被调用,参数: {strategy_file}")
import importlib.util
import sys
import os
# 获取策略文件的绝对路径
strategy_file = os.path.abspath(strategy_file)
print(f"[DEBUG] 策略文件绝对路径: {strategy_file}")
# 使用策略文件的实际文件名作为模块名(不含.py扩展名)
# 这样debugpy可以正确识别模块
module_name = os.path.splitext(os.path.basename(strategy_file))[0]
print(f"[DEBUG] 模块名: {module_name}")
# 创建模块规范
spec = importlib.util.spec_from_file_location(module_name, strategy_file)
print(f"[DEBUG] spec 创建成功")
# 创建模块对象
strategy_module = importlib.util.module_from_spec(spec)
print(f"[DEBUG] 模块对象创建成功")
# 将模块添加到sys.modules,这样debugpy可以找到它
# 这是让VSCode断点生效的关键!
sys.modules[module_name] = strategy_module
print(f"[DEBUG] 模块已添加到sys.modules: {module_name}")
# 确保模块的__file__属性指向正确的源文件
strategy_module.__file__ = strategy_file
print(f"[DEBUG] 模块__file__属性: {strategy_module.__file__}")
# 执行模块代码
print(f"[DEBUG] 准备执行模块代码")
spec.loader.exec_module(strategy_module)
print(f"[DEBUG] 模块代码执行完成")
return strategy_module
def init_trader_and_account(self):
"""初始化交易接口和账户"""
# 固定为回测模式,只进行虚拟账户初始化
self._init_virtual_account()
# 在回测模式下也设置回调
if self.trader_callback:
self.trade_mgr.callback = self.trader_callback
def _init_virtual_account(self):
"""初始化虚拟账户"""
# 创建虚拟账户对象
self.account = StockAccount(
self.config.account_id,
self.config.account_type
)
# 获取基准合约并转换格式
original_benchmark = self.config.config_dict["backtest"].get("benchmark", "sh.000300")
if original_benchmark == "sh.000300":
self.benchmark = "000300.SH"
else:
self.benchmark = original_benchmark
# 更新配置字典中的基准指数代码
self.config.config_dict["backtest"]["benchmark"] = self.benchmark
# 从回测配置中获取初始资金
init_capital = self.config.config_dict["backtest"]["init_capital"]
# 初始化资产字典
self.trade_mgr.assets = {
"account_type": xtconstant.SECURITY_ACCOUNT,
"account_id": self.config.account_id,
"cash": init_capital,
"frozen_cash": 0.0,
"market_value": 0.0,
"total_asset": init_capital,
"benchmark": self.benchmark
}
# 初始化持仓字典
self.trade_mgr.positions = {} # 初始持仓为空
# 初始化委托字典
self.trade_mgr.orders = {} # 初始委托为空
# 初始化成交字典
self.trade_mgr.trades = {} # 初始成交为空
print(f"虚拟账户初始化完成: {self.config.account_id}")
print(f"初始资产: {self.trade_mgr.assets}")
print(f"基准合约: {self.benchmark}")
def create_callback(self) -> XtQuantTraderCallback:
"""创建交易回调对象"""
return MyTraderCallback(self)
def init_data(self):
"""初始化行情数据"""
# 固定为回测模式,批量下载历史数据(增量下载)
download_complete = False
def download_progress(progress):
nonlocal download_complete
print(f"下载进度: {progress}")
if progress['finished'] >= progress['total']:
download_complete = True
# 获取股票列表
stock_codes = self.get_stock_list()
if not stock_codes:
if self.trader_callback:
self.trader_callback.gui.log_message("警告: 股票池为空,无法下载历史数据", "WARNING")
return
if self.trader_callback:
self.trader_callback.gui.log_message(f"开始下载{len(stock_codes)}只股票的历史数据...", "INFO")
xtdata.download_history_data2(
stock_codes,
period=self.config.kline_period,
start_time=self.config.backtest_start,
end_time=self.config.backtest_end, # 添加结束时间参数
incrementally=True,
callback=download_progress
)
# 等待下载完成
while not download_complete:
time.sleep(1)
def on_quote_callback(self, data: Dict):
"""行情数据回调处理"""
try:
# 提取时间信息
timestamp = data.get("timestamp", int(time.time()))
# 如果时间戳是字符串,尝试转换为整数
if isinstance(timestamp, str):
try:
timestamp = int(timestamp)
except:
timestamp = int(time.time())
# 创建时间信息
if timestamp > 1e10: # 毫秒级时间戳
dt = datetime.datetime.fromtimestamp(timestamp / 1000)
else: # 秒级时间戳
dt = datetime.datetime.fromtimestamp(timestamp)
# 构建时间信息字典
time_info = {
"timestamp": timestamp,
"datetime": dt.strftime("%Y-%m-%d %H:%M:%S"),
"date": dt.strftime("%Y-%m-%d"),
"time": dt.strftime("%H:%M:%S")
}
# 检查是否是交易日
if not self.tools.is_trade_day(time_info["date"]):
# 如果不是交易日,则跳过策略调用
if self.trader_callback:
self.trader_callback.gui.log_message(f"日期 {time_info['date']} 不是交易日,跳过策略执行", "INFO")
return
# 创建新的数据字典,包含时间信息
data_with_time = {"__current_time__": time_info}
# 将其他行情数据添加到data_with_time
for key, value in data.items():
if key != "__current_time__":
data_with_time[key] = value
# 使用触发器判断是否应该触发策略
if not self.trigger.should_trigger(timestamp, data_with_time):
# 对于K线周期触发,需要特殊处理
trigger_type = self.config.config_dict.get("backtest", {}).get("trigger", {}).get("type", "tick")
if trigger_type == "1m" or trigger_type == "5m":
# 当前时间
current_time_str = time_info["time"]
dt_time = datetime.datetime.strptime(current_time_str, "%H:%M:%S")
# 对于1分钟K线,检查是否接近每分钟的结束(57秒以后)
if trigger_type == "1m" and dt_time.second >= 57:
# 允许触发
pass
# 对于5分钟K线,检查是否接近每5分钟的结束(当前分钟为4、9、14...且秒数>=57)
elif trigger_type == "5m" and dt_time.minute % 5 == 4 and dt_time.second >= 57:
# 允许触发
pass
else:
# 不是K线周期结束,不触发
return
elif trigger_type == "1d":
# 日K线触发已经在DailyTrigger中处理了逻辑
# 如果触发器返回False,说明不应该触发
return
else:
# 触发器返回False,不触发策略
return
# 风控检查
if not self.risk_mgr.check_risk(data_with_time):
return
# 添加当前时间信息到数据字典
current_time = datetime.datetime.fromtimestamp(timestamp)
time_data = {
"__current_time__": {
"timestamp": timestamp,
"datetime": current_time.strftime("%Y-%m-%d %H:%M:%S"),
"date": current_time.strftime("%Y-%m-%d"),
"time": current_time.strftime("%H:%M:%S")
}
}
# 将时间信息合并到数据字典中
data_with_time = {**data_with_time, **time_data}
# 添加账户和持仓信息到数据字典
if hasattr(self, 'trade_mgr') and self.trade_mgr:
# 添加账户资产信息
account_data = {
"__account__": self.trade_mgr.assets
}
# 添加持仓信息
positions_data = {
"__positions__": self.trade_mgr.positions
}
# 添加股票池信息
stock_list_data = {
"__stock_list__": self.get_stock_list()
}
# 合并所有信息
data_with_time.update(account_data)
data_with_time.update(positions_data)
data_with_time.update(stock_list_data)
# 添加框架实例到数据字典
data_with_time["__framework__"] = self
# 检查股票数据是否为空
stock_data_empty = True
empty_stocks = []
for key, value in data_with_time.items():
# 跳过框架内部字段
if key.startswith("__"):
continue
# 检查股票数据是否为空
if isinstance(value, pd.Series) and not value.empty:
stock_data_empty = False
elif isinstance(value, pd.Series) and value.empty:
empty_stocks.append(key)
elif not value: # 处理其他空值情况
empty_stocks.append(key)
# 如果所有股票数据都为空,记录错误并跳过策略调用
if stock_data_empty:
current_time_str = data_with_time.get("__current_time__", {}).get("datetime", "未知时间")
if self.trader_callback:
self.trader_callback.gui.log_message(
f"警告: 时间点 {current_time_str} 的所有股票数据为空,跳过策略调用",
"WARNING"
)
if empty_stocks:
self.trader_callback.gui.log_message(
f"空数据股票列表: {', '.join(empty_stocks[:10])}" +
(f" 等{len(empty_stocks)}只股票" if len(empty_stocks) > 10 else ""),
"WARNING"
)
return
# 如果有部分股票数据为空,记录警告但继续执行
if empty_stocks:
current_time_str = data_with_time.get("__current_time__", {}).get("datetime", "未知时间")
if self.trader_callback:
self.trader_callback.gui.log_message(
f"警告: 时间点 {current_time_str} 有 {len(empty_stocks)} 只股票数据为空: {', '.join(empty_stocks[:5])}" +
(f" 等" if len(empty_stocks) > 5 else ""),
"WARNING"
)
# 调用策略处理
signals = self.strategy_module.khHandlebar(data_with_time)
# 处理信号中的价格精度
if signals:
for signal in signals:
if 'price' in signal:
# 使用动态精度
signal['price'] = round(float(signal['price']), self.price_decimals)
# 发送交易指令
if signals:
self.trade_mgr.process_signals(signals)
except Exception as e:
self.log_error(f"行情处理异常: {str(e)}")
traceback.print_exc()
def run(self):
"""启动框架"""
# 记录策略开始运行时间
self.start_time = time.time()
start_datetime = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if self.trader_callback:
self.trader_callback.gui.log_message(f"策略开始运行时间: {start_datetime}", "INFO")
self.trader_callback.gui.log_message("开始初始化交易接口和数据...", "INFO")
try:
# 初始化
init_start = time.time()
self.init_trader_and_account() # 初始化交易接口和账户
init_time = time.time() - init_start
if self.trader_callback:
self.trader_callback.gui.log_message(f"交易接口初始化耗时: {init_time:.2f}秒", "INFO")
# 初始化缓存
self.daily_price_cache = {}
self._cached_benchmark_close = {}
# 直接从设置界面读取是否初始化数据的配置
from PyQt5.QtCore import QSettings
settings = QSettings('KHQuant', 'StockAnalyzer')
init_data_enabled = settings.value('init_data_enabled', True, type=bool)
if self.trader_callback:
self.trader_callback.gui.log_message(f"数据初始化设置: {'启用' if init_data_enabled else '禁用'}", "INFO")
if init_data_enabled:
data_init_start = time.time()
if self.trader_callback:
self.trader_callback.gui.log_message("开始初始化行情数据...", "INFO")
self.init_data() # 初始化行情数据
data_init_time = time.time() - data_init_start
if self.trader_callback:
self.trader_callback.gui.log_message(f"数据初始化耗时: {data_init_time:.2f}秒", "INFO")
else:
if self.trader_callback:
self.trader_callback.gui.log_message("跳过数据初始化(根据设置禁用)", "INFO")
# 读取股票列表
stock_list_start = time.time()
stock_codes = self.get_stock_list()
stock_list_time = time.time() - stock_list_start
if self.trader_callback:
self.trader_callback.gui.log_message(f"股票列表加载耗时: {stock_list_time:.2f}秒", "INFO")
# 判断股票池类型并设置价格精度
self.pool_type, self.price_decimals = determine_pool_type(stock_codes)
# 将精度设置传递给交易管理器
self.trade_mgr.set_price_decimals(self.price_decimals)
# 将精度设置传递给回调对象(用于日志格式化)
if self.trader_callback and hasattr(self.trader_callback, 'set_price_decimals'):
self.trader_callback.set_price_decimals(self.price_decimals)
# 记录股票池类型信息
pool_type_names = {
'stock_only': '纯股票',
'etf_only': '纯ETF',
'mixed': '股票+ETF混合'
}
if self.trader_callback:
self.trader_callback.gui.log_message(
f"股票池类型: {pool_type_names.get(self.pool_type, self.pool_type)}, "
f"价格精度: {self.price_decimals}位小数", "INFO"
)
# ==================== T+0模式检验 ====================
t0_support_type, self.t0_mode = check_t0_support(stock_codes)
# 将T+0模式设置传递给交易管理器
self.trade_mgr.set_t0_mode(self.t0_mode)
if t0_support_type == 'all_t0':
# 全部支持T+0,进入T+0模式
if self.trader_callback:
self.trader_callback.gui.log_message(
"T+0交易模式已启用 - 股票池中全部为T0型ETF,支持当日买入当日卖出", "INFO"
)
# 通知GUI更新显示(使用try-except防止跨线程调用导致崩溃)
try:
if hasattr(self.trader_callback.gui, 'set_t0_mode_display'):
self.trader_callback.gui.set_t0_mode_display(True)
except Exception as e: