# signal_scanner.py (Simplified for MA Cross Strategy - CORRECTED is True check) import MetaTrader5 as mt5 import pandas as pd import numpy as np import time import json import requests import os import logging from datetime import datetime # --- Configuration --- MT5_LOGIN = 52212233 MT5_PASSWORD = "$bQD8vMwOlbosu" MT5_SERVER = "ICMarketsSC-Demo" MT5_PATH = "C:\\MT5_Demo_ICMarkets_52212233\\terminal64.exe" NODEJS_INTERNAL_SIGNAL_URL = 'http://localhost:82/internal_signal' # Ensure this matches Node.js INTERNAL_SIGNAL_PORT # Corrected directory for operational files STREAMLINE_DIR = 'C:\\trading-bot\\streamline' ACTIVE_LISTENERS_FILE = os.path.join(STREAMLINE_DIR, 'active_listeners.json') SCAN_INTERVAL_SECONDS = 10 # How often to scan for signals LOG_FILE = os.path.join(STREAMLINE_DIR, 'signal_scanner_simple.log') # --- Timeframe Mapping (PineScript to MT5) --- TIMEFRAME_MAP_MT5 = { "1": mt5.TIMEFRAME_M1, "5": mt5.TIMEFRAME_M5, "15": mt5.TIMEFRAME_M15, "30": mt5.TIMEFRAME_M30, "60": mt5.TIMEFRAME_H1, "240": mt5.TIMEFRAME_H4, "D": mt5.TIMEFRAME_D1, "W": mt5.TIMEFRAME_W1, "M": mt5.TIMEFRAME_MN1 } # --- Logging Setup --- # Ensure directory for log file exists if it might not os.makedirs(STREAMLINE_DIR, exist_ok=True) logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler() ] ) # --- Global variable to store last signal times --- last_signal_timestamps = {} # { "SYMBOL_TF_DIRECTION": timestamp_of_last_signal } SIGNAL_COOLDOWN_SECONDS = 60 # 1 minute cooldown for MA cross signals # --- MT5 Connection --- def initialize_mt5(): if not mt5.initialize(path=MT5_PATH, login=MT5_LOGIN, password=MT5_PASSWORD, server=MT5_SERVER, timeout=10000): logging.error(f"MT5 initialize() failed, error code = {mt5.last_error()}") return False account_info = mt5.account_info() if account_info is None: logging.error(f"MT5 account_info() failed, error code = {mt5.last_error()}") mt5.shutdown() return False logging.info(f"MT5 Initialized: Login {account_info.login}, Server {account_info.server}, Balance {account_info.balance}") return True # --- Load Active Listeners from Node.js --- def get_active_listeners(): try: if not os.path.exists(ACTIVE_LISTENERS_FILE): logging.debug(f"{ACTIVE_LISTENERS_FILE} not found. No active listeners.") return {} with open(ACTIVE_LISTENERS_FILE, 'r') as f: listeners = json.load(f) return listeners except json.JSONDecodeError: logging.error(f"Error decoding JSON from {ACTIVE_LISTENERS_FILE}.") return {} except Exception as e: logging.error(f"Error reading listeners file {ACTIVE_LISTENERS_FILE}: {e}") return {} # --- Moving Average (PineScript ta.sma, ta.ema) --- def calculate_ma(source_series, length, ma_type="EMA"): if not isinstance(source_series, pd.Series): source_series = pd.Series(source_series) if source_series.empty or len(source_series) < length: logging.debug(f"calculate_ma: Not enough data for MA length {length}. Have {len(source_series)} bars.") return pd.Series([np.nan] * len(source_series)) if ma_type == "SMA": return source_series.rolling(window=length, min_periods=1).mean() elif ma_type == "EMA": return source_series.ewm(span=length, adjust=False, min_periods=1).mean() logging.warning(f"calculate_ma: Unknown MA type '{ma_type}'. Returning NaNs.") return pd.Series([np.nan] * len(source_series)) # --- Check Simplified PineScript Conditions --- def check_ma_cross_conditions(df_chart_tf, inputs): if df_chart_tf.empty: logging.debug(f"check_ma_cross_conditions: DataFrame is empty. Symbol: {inputs.get('symbol_for_log', 'Unknown')}, TF: {inputs.get('tf_chart_str', 'N/A')}") return None, None close_series = df_chart_tf['close'] symbol_for_log = df_chart_tf['symbol'].iloc[-1] if 'symbol' in df_chart_tf.columns and not df_chart_tf['symbol'].empty else inputs.get('symbol_for_log', 'Unknown Symbol') min_bars_needed_for_logic = inputs['length7_pattern'] + 1 if len(close_series) < min_bars_needed_for_logic: logging.debug(f"check_ma_cross_conditions: Not enough data for MA logic on {symbol_for_log}, TF: {inputs.get('tf_chart_str', 'N/A')}. Need {min_bars_needed_for_logic}, have {len(close_series)}") return None, None ma_1 = calculate_ma(close_series, inputs['length1_pattern'], inputs['sma_source']) ma_3 = calculate_ma(close_series, inputs['length3_pattern'], inputs['sma_source']) ma_7 = calculate_ma(close_series, inputs['length7_pattern'], inputs['sma_source']) if len(ma_1) < 2 or len(ma_3) < 2 or len(ma_7) < 2: logging.debug(f"check_ma_cross_conditions: Not enough MA values to compare current and previous for {symbol_for_log}, TF: {inputs.get('tf_chart_str', 'N/A')}. MA1 len: {len(ma_1)}") return None, None current_ma_1 = ma_1.iloc[-1] current_ma_3 = ma_3.iloc[-1] current_ma_7 = ma_7.iloc[-1] prev_ma_1 = ma_1.iloc[-2] prev_ma_3 = ma_3.iloc[-2] if pd.isna(current_ma_1) or pd.isna(current_ma_3) or pd.isna(current_ma_7) or \ pd.isna(prev_ma_1) or pd.isna(prev_ma_3): logging.debug(f"check_ma_cross_conditions: MA calculation resulted in NaN for {symbol_for_log}, TF: {inputs.get('tf_chart_str', 'N/A')}. " f"MA1: {current_ma_1}, MA3: {current_ma_3}, MA7: {current_ma_7}, PrevMA1: {prev_ma_1}, PrevMA3: {prev_ma_3}") return None, None mas_cross_1_bullish = current_ma_1 > current_ma_3 and prev_ma_1 <= prev_ma_3 mas_cross_2_bearish = current_ma_1 < current_ma_3 and prev_ma_1 >= prev_ma_3 bedingung_bullish_1_final = mas_cross_1_bullish and current_ma_3 < current_ma_7 bedingung_bearish_1_final = mas_cross_2_bearish and current_ma_3 > current_ma_7 logging.debug(f"SignalEval: Sym={symbol_for_log}, TF={inputs.get('tf_chart_str', 'N/A')} | " f"MAs(curr): MA1={current_ma_1:.4f}, MA3={current_ma_3:.4f}, MA7={current_ma_7:.4f} | " f"MAs(prev): MA1={prev_ma_1:.4f}, MA3={prev_ma_3:.4f} | " f"Cross: Bull={mas_cross_1_bullish}, Bear={mas_cross_2_bearish} | " f"FinalCond: Bullish={bedingung_bullish_1_final}, Bearish={bedingung_bearish_1_final}") return bedingung_bullish_1_final, bedingung_bearish_1_final # --- Main Scanning Logic --- def scan_for_signals(listeners_map): if not mt5.terminal_info(): logging.error("MT5 connection lost. Attempting to reinitialize.") if not initialize_mt5(): logging.error("Failed to reinitialize MT5. Skipping scan cycle.") return global last_signal_timestamps now_ts = time.time() for listener_key, details in listeners_map.items(): symbol = details['symbol'] direction_to_listen = details['direction'] # LONG or SHORT tf_chart_str = details['tf'] # Timeframe for MAs, e.g., "15" if listener_key in last_signal_timestamps and \ (now_ts - last_signal_timestamps[listener_key]) < SIGNAL_COOLDOWN_SECONDS: logging.debug(f"Signal for {listener_key} is in cooldown. Skipping.") continue logging.info(f"Scanning {symbol} on TF {tf_chart_str} for {direction_to_listen} signal (MA Cross Strategy).") if not mt5.symbol_select(symbol, True): logging.warning(f"Could not select MT5 symbol {symbol}. Skipping listener {listener_key}.") continue pine_inputs = { "symbol_for_log": symbol, "tf_chart_str": tf_chart_str, "sma_source": "EMA", "length1_pattern": 3, "length3_pattern": 9, "length7_pattern": 63, } mt5_timeframe = TIMEFRAME_MAP_MT5.get(tf_chart_str) if not mt5_timeframe: logging.warning(f"Invalid chart timeframe {tf_chart_str} for {symbol}. Skipping listener {listener_key}.") continue num_bars_to_fetch = pine_inputs["length7_pattern"] + 5 min_bars_required_for_logic_inside_check = pine_inputs["length7_pattern"] + 1 logging.debug(f"Fetching {num_bars_to_fetch} bars for {symbol} on TF {tf_chart_str} (MT5 TF: {mt5_timeframe}). Min needed for logic: {min_bars_required_for_logic_inside_check}") rates_chart_tf = mt5.copy_rates_from_pos(symbol, mt5_timeframe, 0, num_bars_to_fetch) if rates_chart_tf is None or len(rates_chart_tf) < min_bars_required_for_logic_inside_check: logging.warning(f"Not enough data for {symbol} on TF {tf_chart_str}. Need at least {min_bars_required_for_logic_inside_check}, got {len(rates_chart_tf) if rates_chart_tf is not None else 0}. Skipping listener {listener_key}.") continue logging.debug(f"Successfully fetched {len(rates_chart_tf)} bars for {symbol} on TF {tf_chart_str}.") df_chart_tf = pd.DataFrame(rates_chart_tf) df_chart_tf['symbol'] = symbol is_bullish_signal, is_bearish_signal = check_ma_cross_conditions(df_chart_tf, pine_inputs) triggered_direction = None # --- CORRECTED CONDITION --- if is_bullish_signal and direction_to_listen == "LONG": triggered_direction = "LONG" elif is_bearish_signal and direction_to_listen == "SHORT": triggered_direction = "SHORT" # --- END CORRECTION --- if triggered_direction: logging.info(f"*** VALID MA CROSS SIGNAL: {triggered_direction} for {symbol} on {tf_chart_str} ***") payload = { "symbol": symbol, "direction": triggered_direction, "tf": tf_chart_str } try: response = requests.post(NODEJS_INTERNAL_SIGNAL_URL, json=payload, timeout=10) if response.status_code == 200: logging.info(f"Successfully sent {triggered_direction} signal for {symbol} ({tf_chart_str}) to Node.js.") last_signal_timestamps[listener_key] = now_ts else: logging.error(f"Error sending signal for {symbol} ({tf_chart_str}) to Node.js: {response.status_code} - {response.text}") except requests.exceptions.RequestException as e: logging.error(f"Failed to connect to Node.js to send signal for {symbol} ({tf_chart_str}): {e}") else: logging.debug(f"No valid {direction_to_listen} MA Cross signal for {symbol} on {tf_chart_str} this cycle.") # --- Main Loop --- def main(): if not initialize_mt5(): return logging.info("Signal Scanner (Simplified MA Cross) started. Watching for active listeners...") while True: try: active_listeners_map = get_active_listeners() if active_listeners_map: logging.debug(f"Found {len(active_listeners_map)} active listeners: {list(active_listeners_map.keys())}. Processing...") scan_for_signals(active_listeners_map) else: logging.debug("No active listeners found. Sleeping...") time.sleep(SCAN_INTERVAL_SECONDS) except KeyboardInterrupt: logging.info("Signal Scanner stopping...") break except Exception as e: logging.error(f"CRITICAL ERROR in scanner main loop: {e}", exc_info=True) mt5_error_details = mt5.last_error() is_mt5_connection_error = False if mt5_error_details and mt5_error_details[0] != 0: is_mt5_connection_error = True connection_error_strings = ["Terminal not found", "Not connected", "Trade context is busy", "IPC timeout", "connection"] if any(err_str.lower() in str(e).lower() for err_str in connection_error_strings): is_mt5_connection_error = True if is_mt5_connection_error: logging.info(f"Attempting to re-initialize MT5 due to error: {e} (MT5 last_error: {mt5_error_details})") try: mt5.shutdown() except Exception as shut_e: logging.warning(f"Exception during MT5 shutdown attempt: {shut_e}") time.sleep(5) if not initialize_mt5(): logging.error("Failed to re-initialize MT5 after error. Scanner will pause and retry initialization later.") time.sleep(60) else: logging.info("Successfully re-initialized MT5 after error.") else: logging.info("Non-MT5 connection error encountered. Waiting before continuing.") time.sleep(30) try: mt5.shutdown() logging.info("Signal Scanner MT5 instance shutdown.") except Exception as shut_e: logging.warning(f"Exception during final MT5 shutdown: {shut_e}") if __name__ == "__main__": main()