(Remember: At the moment Alpaca uses different symbols for executing orders such as “BTC/USD” and checking portfolio positions such as “BTCUSD”.)
KEY_ID = "your API KEY" #replace it with your own KEY_ID from Alpaca: https://alpaca.markets/
SECRET_KEY = "your SECRET KEY" #replace it with your own SECRET_KEY from Alpaca
import warnings
warnings.filterwarnings('ignore')
Step 1: Alpaca Setup & Python Environment
Sign up for an Alpaca account: https://alpaca.markets/
Get your API Key ID and Secret Key from your paper trading account dashboard.
Install necessary Python libraries:
pip install alpaca-trade-api pandas scikit-learn xgboost numpy pandas-ta
Set up API credentials (environment variables are best):
import os
import alpaca_trade_api as tradeapi
import pandas as pd
from datetime import datetime, timedelta
import time
# For Paper Trading
os.environ['APCA_API_BASE_URL'] = 'https://paper-api.alpaca.markets'
# Replace with your actual keys or set them as environment variables
os.environ['APCA_API_KEY_ID'] = 'your API KEY'
os.environ['APCA_API_SECRET_KEY'] = 'your SECRET KEY'
API_KEY = os.getenv('APCA_API_KEY_ID')
API_SECRET = os.getenv('APCA_API_SECRET_KEY')
BASE_URL = os.getenv('APCA_API_BASE_URL')
api = tradeapi.REST(API_KEY, API_SECRET, BASE_URL, api_version='v2')
print("Connected to Alpaca Paper Trading.")
# Check account
# account = api.get_account()
# print(f"Account status: {account.status}")
Connected to Alpaca Paper Trading.
Step 2: Data Acquisition
def fetch_data(symbol, timeframe, start_date_str, end_date_str=None):
"""Fetches historical crypto data from Alpaca."""
if end_date_str is None:
end_date_str = datetime.now().strftime('%Y-%m-%d')
# Alpaca API expects ISO 8601 format for start/end times
# And it has a limit on how many bars can be fetched per request (e.g., 10000 for crypto)
# So we may need to fetch in chunks if requesting a long period.
all_bars = []
start_dt = pd.to_datetime(start_date_str, utc=True)
end_dt = pd.to_datetime(end_date_str, utc=True)
# Fetch data in chunks to avoid hitting API limits for very long periods
# For 1-minute data, 10000 bars is about 7 days.
# Let's fetch data in smaller chunks, e.g., 5 days at a time.
current_start = start_dt
while current_start < end_dt:
chunk_end = min(current_start + timedelta(days=5), end_dt) # Adjust chunk size as needed
print(f"Fetching data from {current_start.isoformat()} to {chunk_end.isoformat()}")
# Alpaca's get_crypto_bars expects start and end in ISO format
bars = api.get_crypto_bars(
symbol,
timeframe,
start=current_start.isoformat(),
end=chunk_end.isoformat(),
limit=10000 # Max limit per request
).df
if bars.empty:
print(f"No data found for chunk starting {current_start.isoformat()}")
if current_start + timedelta(days=5) > end_dt and not all_bars: # if first chunk and no data
break
current_start += timedelta(days=5) # Move to next chunk period
time.sleep(1) # Be nice to the API
continue
all_bars.append(bars)
# Update current_start to the timestamp of the last bar fetched + 1 minute
# to avoid overlap and to ensure we move forward.
if not bars.index.empty:
current_start = bars.index[-1] + pd.Timedelta(minutes=1)
else: # Should not happen if bars is not empty, but as a safeguard
current_start += timedelta(days=5)
time.sleep(1) # Respect API rate limits
if not all_bars:
print("No data fetched. Check your date range or symbol.")
return pd.DataFrame()
df = pd.concat(all_bars)
df = df[~df.index.duplicated(keep='first')] # Remove potential duplicates from chunking
df = df.sort_index() # Ensure chronological order
df = df[df.index <= end_dt] # Ensure we don't go past the requested end_dt
return df
# Example usage:
symbol = "BTC/USD" # Alpaca uses "BTC/USD" for crypto pairs
timeframe = tradeapi. টাইমফ্রেম.মিনিট # or "1Min" for older SDK versions
# Fetch last 30 days of data for example
start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
# end_date = datetime.now().strftime('%Y-%m-%d') # Fetches up to now
btc_data = fetch_data(symbol, timeframe, start_date)
if not btc_data.empty:
print(f"Fetched {len(btc_data)} rows of {symbol} data.")
print(btc_data.head())
print(btc_data.tail())
else:
print("Failed to fetch data.")
--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) Cell In[5], line 61 59 # Example usage: 60 symbol = "BTC/USD" # Alpaca uses "BTC/USD" for crypto pairs ---> 61 timeframe = tradeapi. টাইমফ্রেম.মিনিট # or "1Min" for older SDK versions 62 # Fetch last 30 days of data for example 63 start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d') AttributeError: module 'alpaca_trade_api' has no attribute 'টাইমফ্রেম'
Step 3: Feature Engineering
This is where you create signals for your model. pandas-ta is a great library for this.
import pandas_ta as ta
def create_features(df):
if df.empty or 'close' not in df.columns:
print("DataFrame is empty or 'close' column is missing.")
return df
df.ta.sma(length=10, append=True, col_names=('SMA_10'))
df.ta.sma(length=30, append=True, col_names=('SMA_30'))
df.ta.ema(length=10, append=True, col_names=('EMA_10'))
df.ta.rsi(length=14, append=True, col_names=('RSI_14'))
df.ta.macd(append=True, col_names=('MACD_12_26_9', 'MACDh_12_26_9', 'MACDs_12_26_9'))
df.ta.bbands(length=20, append=True, col_names=('BBL_20_2.0', 'BBM_20_2.0', 'BBU_20_2.0', 'BBB_20_2.0', 'BBP_20_2.0'))
df.ta.atr(length=14, append=True, col_names=('ATR_14'))
# Lagged returns
for lag in [1, 3, 5, 10]:
df[f'return_{lag}m'] = df['close'].pct_change(periods=lag)
# Add more features: volatility, momentum, volume-based if available, etc.
# e.g., log returns, price relative to moving average, etc.
df.dropna(inplace=True) # Remove rows with NaNs created by indicators
return df
if not btc_data.empty:
featured_data = create_features(btc_data.copy()) # Use .copy() to avoid modifying original
print("\nData with features:")
print(featured_data.head())
else:
print("Cannot create features, btc_data is empty.")
featured_data = pd.DataFrame() # ensure it's a DataFrame
Step 4: Model Training - Defining Target & Training
Let's define a simple target: will the price be higher or lower in N minutes?
from sklearn.model_selection import train_test_split, TimeSeriesSplit
from sklearn.ensemble import RandomForestClassifier # Example model
from sklearn.metrics import accuracy_score, classification_report
import xgboost as xgb
def prepare_data_for_model(df, target_horizon=5, test_size=0.2):
"""
Prepares X (features) and y (target) for the ML model.
Target: 1 if price increases by more than a small threshold after target_horizon periods, 0 otherwise.
-1 if price decreases by more than a small threshold. (Optional: make it 3 classes)
"""
if df.empty or 'close' not in df.columns:
print("DataFrame is empty or 'close' column is missing.")
return pd.DataFrame(), pd.Series(dtype='float64'), pd.DataFrame(), pd.Series(dtype='float64')
# Define target: 1 if price goes up in `target_horizon` minutes, 0 otherwise
# A small threshold can help avoid noise around 0% change
# price_threshold = 0.0005 # e.g., 0.05% change
# df['future_price'] = df['close'].shift(-target_horizon)
# df['price_change'] = (df['future_price'] - df['close']) / df['close']
# df['target'] = 0 # Hold
# df.loc[df['price_change'] > price_threshold, 'target'] = 1 # Buy
# df.loc[df['price_change'] < -price_threshold, 'target'] = -1 # Sell (for 3-class)
# For 2-class (Up/Not Up):
df['target'] = (df['close'].shift(-target_horizon) > df['close']).astype(int)
df.dropna(inplace=True) # Remove rows with NaN target (due to shift)
feature_columns = [col for col in df.columns if col not in ['open', 'high', 'low', 'close', 'volume', 'trade_count', 'vwap', 'target', 'future_price', 'price_change']]
X = df[feature_columns]
y = df['target']
# Time series split is crucial: DO NOT shuffle time series data for training
# For a simple split:
split_index = int(len(X) * (1 - test_size))
X_train, X_test = X[:split_index], X[split_index:]
y_train, y_test = y[:split_index], y[split_index:]
# For more robust cross-validation, use TimeSeriesSplit
# tscv = TimeSeriesSplit(n_splits=5)
# for train_index, test_index in tscv.split(X):
# X_train, X_test = X.iloc[train_index], X.iloc[test_index]
# y_train, y_test = y.iloc[train_index], y.iloc[test_index]
# Train and evaluate your model here
return X_train, X_test, y_train, y_test, feature_columns
if not featured_data.empty:
X_train, X_test, y_train, y_test, feature_cols = prepare_data_for_model(featured_data.copy(), target_horizon=5)
if not X_train.empty:
print(f"\nTraining data shape: X_train: {X_train.shape}, y_train: {y_train.shape}")
print(f"Test data shape: X_test: {X_test.shape}, y_test: {y_test.shape}")
print(f"Features used: {feature_cols}")
# Example Model: Random Forest
# model = RandomForestClassifier(n_estimators=100, random_state=42, class_weight='balanced')
# Example Model: XGBoost (often performs well)
model = xgb.XGBClassifier(
objective='binary:logistic', # or 'multi:softprob' for multi-class
n_estimators=100,
learning_rate=0.1,
max_depth=3,
use_label_encoder=False, # Suppress a warning
eval_metric='logloss' # or 'mlogloss' for multi-class
)
model.fit(X_train, y_train)
# Evaluate on test set
y_pred = model.predict(X_test)
print("\nModel Evaluation on Test Set:")
print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print(classification_report(y_test, y_pred, zero_division=0))
# Feature importance (for tree-based models)
if hasattr(model, 'feature_importances_'):
importances = pd.Series(model.feature_importances_, index=X_train.columns).sort_values(ascending=False)
print("\nFeature Importances:")
print(importances.head(10))
else:
print("Not enough data to create training/test sets after feature engineering and target creation.")
else:
print("Cannot prepare data for model, featured_data is empty.")
model = None # Ensure model is defined even if training fails
Important Considerations for Modeling:
Target Definition: This is critical. Predicting direction is hard. Predicting magnitude or using a threshold (e.g., price must move > 0.1% to be a "1") can be better.
Class Imbalance: If "up" signals are rare, your model might be biased. Use techniques like class_weight='balanced' (for some models) or over/undersampling (e.g., SMOTE).
Stationarity: Price series are generally non-stationary. Features like returns or indicators often help.
Overfitting: Models can learn noise from historical data. Robust cross-validation (like TimeSeriesSplit) and regularization are key.
Step 5: Backtesting (Simplified Vectorized Example)
A proper backtest is event-driven and considers transaction costs, slippage, etc. This is a very simplified version.
def run_simple_backtest(df_with_predictions, initial_capital=10000, trade_size_usd=1000, transaction_cost_pct=0.003): # Alpaca crypto fee
"""
A very simplified vectorized backtest.
Assumes df_with_predictions has a 'signal' column (1 for buy, 0 for hold/nothing, -1 for sell if implementing).
For this example, we'll assume our model's prediction (0 or 1) is the signal.
1 = Go Long, 0 = Exit Long (or do nothing if not in position)
"""
if df_with_predictions.empty or 'predicted_signal' not in df_with_predictions.columns:
print("DataFrame for backtest is empty or 'predicted_signal' column missing.")
return
capital = initial_capital
position_btc = 0 # Amount of BTC held
portfolio_value = []
# Assume 'predicted_signal' comes from your model (1 for predicted up, 0 for predicted down/neutral)
# Let's assume a simple strategy: if signal is 1, buy. If signal is 0 and we have a position, sell.
for i in range(len(df_with_predictions)):
current_price = df_with_predictions['close'].iloc[i]
signal = df_with_predictions['predicted_signal'].iloc[i]
# Decision logic
if signal == 1 and position_btc == 0: # Buy signal and no current position
# Buy
amount_to_buy_btc = trade_size_usd / current_price
cost = amount_to_buy_btc * current_price * (1 + transaction_cost_pct)
if capital >= cost:
capital -= cost
position_btc += amount_to_buy_btc
# print(f"{df_with_predictions.index[i]}: BUY {amount_to_buy_btc:.6f} BTC @ {current_price:.2f}")
elif signal == 0 and position_btc > 0: # Sell signal (or neutral) and have a position
# Sell
proceeds = position_btc * current_price * (1 - transaction_cost_pct)
capital += proceeds
# print(f"{df_with_predictions.index[i]}: SELL {position_btc:.6f} BTC @ {current_price:.2f}")
position_btc = 0
current_portfolio_value = capital + (position_btc * current_price)
portfolio_value.append(current_portfolio_value)
df_with_predictions['portfolio_value'] = portfolio_value
print("\nBacktest Results:")
print(f"Initial Capital: ${initial_capital:.2f}")
print(f"Final Portfolio Value: ${df_with_predictions['portfolio_value'].iloc[-1]:.2f}")
returns = (df_with_predictions['portfolio_value'].iloc[-1] / initial_capital - 1) * 100
print(f"Total Return: {returns:.2f}%")
# Plotting (optional)
# import matplotlib.pyplot as plt
# plt.figure(figsize=(12,6))
# plt.plot(df_with_predictions.index, df_with_predictions['portfolio_value'])
# plt.title('Portfolio Value Over Time')
# plt.xlabel('Date')
# plt.ylabel('Portfolio Value ($)')
# plt.show()
if model and not X_test.empty:
# Use the model to predict on the entire test set for backtesting
# For a more realistic backtest, you'd re-train periodically or use a walk-forward approach.
# Here, we're just using the single model trained on X_train.
all_featured_data_for_backtest = featured_data.loc[X_test.index].copy() # Get original data rows for X_test
all_featured_data_for_backtest['predicted_signal'] = model.predict(X_test) # Use the trained model
run_simple_backtest(all_featured_data_for_backtest)
else:
print("Skipping backtest as model or test data is not available.")
Backtesting Libraries: For more serious backtesting, consider backtrader or zipline-reloaded. They handle many complexities.
Step 6: Signal Generation & Order Execution (Live/Paper Trading)
This is where you'd run the bot periodically (e.g., every minute).
SYMBOL = "BTC/USD"
TRADE_QTY_USD = 100 # Amount in USD to trade per signal. Adjust based on risk tolerance.
TARGET_HORIZON_MINUTES = 5 # Same as used in training
# Global model and feature_cols (assuming they are trained and available)
# model = ... (your trained model)
# feature_cols = ... (list of feature column names used for training)
def get_latest_bar_features():
"""Fetches latest bars, calculates features for the most recent one."""
# Fetch enough data to calculate all features (e.g., max lookback of your indicators)
# If SMA_30 is longest, need at least 30 + target_horizon previous bars
# Let's fetch more to be safe, e.g., 100 bars
now = datetime.now()
start_fetch_dt = (now - timedelta(minutes=150)).strftime('%Y-%m-%d %H:%M:%S') # fetch last 150 mins
latest_bars_df = api.get_crypto_bars(
SYMBOL,
tradeapi. টাইমফ্রেম.মিনিট,
start=start_fetch_dt, # Alpaca needs ISO format with T
# end defaults to now
limit=150 # fetch a bit more than needed for features
).df
if latest_bars_df.empty or len(latest_bars_df) < 35: # Min needed for SMA_30 + some buffer
print("Not enough recent bars to calculate features.")
return None
featured_bars = create_features(latest_bars_df.copy())
if featured_bars.empty:
print("Failed to create features for latest bars.")
return None
# Return only the features for the most recent complete bar
return featured_bars[feature_cols].iloc[-1:] # Return as DataFrame
def check_and_place_trade():
global model, feature_cols # Ensure these are accessible
if model is None or feature_cols is None:
print("Model not trained or feature columns not defined. Skipping trade check.")
return
print(f"\n{datetime.now()}: Checking for trading signal...")
current_features_df = get_latest_bar_features()
if current_features_df is None or current_features_df.empty:
print("Could not get features for the latest bar.")
return
# Ensure columns are in the same order as during training
current_features_df = current_features_df[feature_cols]
prediction = model.predict(current_features_df)
signal = prediction[0] # 0 for down/neutral, 1 for up
print(f"Raw features for prediction: {current_features_df.iloc[0].to_dict()}")
print(f"Model prediction: {signal}")
try:
positions = api.list_positions()
btc_position = next((p for p in positions if p.symbol == SYMBOL), None)
current_price_info = api.get_latest_crypto_quote(SYMBOL) # Use quote for more current price
current_price = (current_price_info.ap + current_price_info.bp) / 2 # Mid price
if not current_price:
print("Could not get current price for BTC/USD.")
return
if signal == 1: # Predicted UP - Potential BUY
if btc_position is None or float(btc_position.qty) == 0:
qty_to_buy = TRADE_QTY_USD / current_price
print(f"BUY signal. Attempting to buy {qty_to_buy:.6f} {SYMBOL} at ~${current_price:.2f}")
api.submit_order(
symbol=SYMBOL,
qty=round(qty_to_buy, 6), # Alpaca crypto needs precision
side='buy',
type='market',
time_in_force='gtc' # Good 'til canceled
)
print("BUY order submitted.")
else:
print(f"BUY signal, but already have a position of {btc_position.qty} {SYMBOL}. Holding.")
elif signal == 0: # Predicted DOWN/NEUTRAL - Potential SELL
if btc_position and float(btc_position.qty) > 0:
qty_to_sell = float(btc_position.qty) # Sell entire position
print(f"SELL signal. Attempting to sell {qty_to_sell:.6f} {SYMBOL} at ~${current_price:.2f}")
api.submit_order(
symbol=SYMBOL,
qty=round(qty_to_sell, 6),
side='sell',
type='market',
time_in_force='gtc'
)
print("SELL order submitted.")
else:
print("SELL signal, but no open position to sell. Doing nothing.")
else:
print("Neutral signal or unrecognized signal. Doing nothing.")
except Exception as e:
print(f"Error during trade execution: {e}")
# Main loop (very basic scheduler)
# For a robust bot, use APScheduler or run it in a more managed environment (e.g., cloud server with cron)
if __name__ == "__main__" and model is not None: # Ensure model is trained
# This is a simplified loop. In a real bot, you'd schedule this.
# For example, using APScheduler to run exactly at the start of each minute.
print("Starting dummy trading loop (runs a few times for demo). Press Ctrl+C to stop.")
print("IMPORTANT: This is for PAPER TRADING ONLY.")
print(f"Will use model: {type(model).__name__} and features: {feature_cols}")
try:
# Initial run
check_and_place_trade()
for i in range(5): # Run for a few iterations for demo
# Wait for the next minute (approximately)
# A more precise scheduler (like APScheduler) is better for live trading
time.sleep(60)
check_and_place_trade()
except KeyboardInterrupt:
print("Trading loop stopped by user.")
except Exception as e:
print(f"An error occurred in the trading loop: {e}")
else:
if model is None:
print("Model is not trained. Cannot start trading loop.")
Step 7: Risk Management (Conceptual)
Position Sizing: Don't risk too much on a single trade (e.g., TRADE_QTY_USD should be a small % of your paper capital).
Stop-Loss: Automatically sell if the price moves against you by a certain percentage or dollar amount after entering a trade. Alpaca supports stop-loss orders.
# Example of a market buy order with a trailing stop-loss
# api.submit_order(
# symbol=SYMBOL,
# qty=qty_to_buy,
# side='buy',
# type='market',
# time_in_force='day',
# trail_percent='1.5' # Trail stop loss 1.5% below high water mark
# )
Take-Profit: Automatically sell if the price moves in your favor by a certain amount.
Max Drawdown: If your total capital drops by X%, stop trading and re-evaluate.
Step 8: Deployment & Monitoring
Server: Run your bot on a reliable machine or cloud server (AWS EC2, Google Cloud, etc.) that's always on.
Scheduling: Use cron (Linux) or APScheduler (Python library) to run your check_and_place_trade function every minute precisely.
Logging: Log every decision, trade, error, and API response.
Monitoring: Track P&L, number of trades, win rate, errors, API connectivity, etc. Set up alerts.
Very Important Next Steps & Considerations:
PAPER TRADE EXTENSIVELY. I cannot stress this enough.
Improve Feature Engineering: This is key. Explore more advanced features, time-series properties, etc.
Hyperparameter Tuning: Use GridSearchCV or RandomizedSearchCV (or Optuna/Hyperopt) to find optimal model parameters.
Robust Backtesting: Implement a proper event-driven backtester that accounts for slippage and realistic transaction costs.
Walk-Forward Optimization: Markets change. Periodically re-train your model on newer data and test on out-of-sample data that follows.
Understand Alpaca's Crypto Trading Rules: Fees (currently 0.3% for taker/maker on basic accounts), minimum order sizes, precision for quantities.
Error Handling & Resilience: Your bot needs to handle API errors, network issues, unexpected data, etc.
The 1-Minute Problem:
Noise: Extremely high noise-to-signal ratio.
Latency: Your reaction time vs. market speed.
Transaction Costs: Can quickly kill profitability on small moves. Your average win needs to be significantly larger than costs.
Alternative ML Approaches:
Reinforcement Learning: Can learn trading policies directly. Very complex.
LSTMs/Transformers: For sequence modeling, but require significant data and expertise.
Simplicity First: Start with a very simple model and strategy. Complexity doesn't always mean better results and makes debugging harder.
This is a massive undertaking. Be patient, learn continuously, and manage your risk (even paper money risk, as it builds habits). Good luck!
AttributeError Traceback (most recent call last) Cell In[5], line 61 59 # Example usage: 60 symbol = "BTC/USD" # Alpaca uses "BTC/USD" for crypto pairs ---> 61 timeframe = tradeapi. টাইমফ্রেম.মিনিট # or "1Min" for older SDK versions 62 # Fetch last 30 days of data for example 63 start_date = (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
AttributeError: module 'alpaca_trade_api' has no attribute 'টাইমফ্রেম'. Can you fix it?
import warnings
warnings.filterwarnings('ignore')
import os
import alpaca_trade_api as tradeapi
import pandas as pd
from datetime import datetime, timedelta
import time
# For Paper Trading
os.environ['APCA_API_BASE_URL'] = 'https://paper-api.alpaca.markets'
# Replace with your actual keys or set them as environment variables
# Make sure these are set correctly if you haven't already
os.environ['APCA_API_KEY_ID'] = 'your API KEY'
os.environ['APCA_API_SECRET_KEY'] = 'your SECRET KEY'
# os.environ['APCA_API_KEY_ID'] = 'YOUR_PAPER_API_KEY_ID'
# os.environ['APCA_API_SECRET_KEY'] = 'YOUR_PAPER_API_SECRET_KEY'
API_KEY = os.getenv('APCA_API_KEY_ID')
API_SECRET = os.getenv('APCA_API_SECRET_KEY')
BASE_URL = os.getenv('APCA_API_BASE_URL')
# Ensure API_KEY and API_SECRET are not None
if not API_KEY or not API_SECRET:
raise ValueError("API_KEY or API_SECRET not set. Please set them as environment variables or directly in the script.")
api = tradeapi.REST(API_KEY, API_SECRET, BASE_URL, api_version='v2')
print("Attempting to connect to Alpaca Paper Trading...")
try:
account = api.get_account()
print(f"Successfully connected. Account status: {account.status}")
except Exception as e:
print(f"Failed to connect or get account info: {e}")
# Exit or handle if connection fails
exit()
def fetch_data(symbol, timeframe_enum, start_date_str, end_date_str=None): # Changed timeframe to timeframe_enum
"""Fetches historical crypto data from Alpaca."""
if end_date_str is None:
end_date_str = datetime.now().strftime('%Y-%m-%d')
all_bars = []
start_dt = pd.to_datetime(start_date_str, utc=True)
end_dt = pd.to_datetime(end_date_str, utc=True)
current_start = start_dt
while current_start < end_dt:
# Calculate chunk_end, ensuring it doesn't exceed end_dt
# For 1-minute data, 10000 bars is approx 6.94 days. Let's use 6 days to be safe.
chunk_end_candidate = current_start + timedelta(days=6)
chunk_end = min(chunk_end_candidate, end_dt)
print(f"Fetching data from {current_start.isoformat()} to {chunk_end.isoformat()}")
try:
bars = api.get_crypto_bars(
symbol,
timeframe_enum, # Use the passed enum
start=current_start.isoformat(),
end=chunk_end.isoformat(), # Ensure end is also passed
limit=10000
).df
except Exception as e:
print(f"Error fetching data chunk: {e}")
# Decide how to handle: break, retry, or skip chunk
current_start = chunk_end # Move to next potential period
time.sleep(5) # Wait longer if an error occurred
continue
if bars.empty:
print(f"No data found for chunk starting {current_start.isoformat()}")
if current_start >= end_dt and not all_bars: # if first chunk and no data
break
current_start = chunk_end # Move to next chunk period
time.sleep(1) # Be nice to the API
continue
all_bars.append(bars)
if not bars.index.empty:
# Move current_start to the timestamp of the last bar fetched + 1 unit of timeframe
# For TimeFrame.Minute, this is +1 minute
if timeframe_enum == tradeapi.TimeFrame.Minute:
current_start = bars.index[-1].to_pydatetime() + pd.Timedelta(minutes=1)
elif timeframe_enum == tradeapi.TimeFrame.Hour:
current_start = bars.index[-1].to_pydatetime() + pd.Timedelta(hours=1)
# Add other timeframes if needed
else: # Default for Day or others
current_start = bars.index[-1].to_pydatetime() + pd.Timedelta(days=1)
# Ensure current_start does not go beyond end_dt excessively in the loop condition
if current_start > end_dt and chunk_end_candidate >= end_dt :
break
else: # Should not happen if bars is not empty, but as a safeguard
current_start = chunk_end
time.sleep(1) # Respect API rate limits
if not all_bars:
print("No data fetched. Check your date range, symbol, or API connection.")
return pd.DataFrame()
df = pd.concat(all_bars)
df = df[~df.index.duplicated(keep='first')] # Remove potential duplicates
df = df.sort_index() # Ensure chronological order
# Filter to ensure we are within the originally requested start_dt and end_dt precisely
df = df[(df.index >= start_dt) & (df.index <= end_dt)]
return df
# Example usage:
symbol = "BTC/USD" # Alpaca uses "BTC/USD" for crypto pairs
# CORRECTED TIMEFRAME:
timeframe_to_use = tradeapi.TimeFrame.Minute # Corrected
# Alternative: timeframe_to_use = "1Min" (as a string, for some SDK versions or if TimeFrame enum is problematic)
# Fetch last 7 days of data for example (a smaller range for quicker testing)
start_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
# end_date = datetime.now().strftime('%Y-%m-%d') # Fetches up to now by default if None
btc_data = fetch_data(symbol, timeframe_to_use, start_date)
if not btc_data.empty:
print(f"\nFetched {len(btc_data)} rows of {symbol} data.")
print("Sample data (first 5 rows):")
print(btc_data.head())
print("\nSample data (last 5 rows):")
print(btc_data.tail())
else:
print("Failed to fetch data or no data available for the period.")
Attempting to connect to Alpaca Paper Trading... Successfully connected. Account status: ACTIVE Fetching data from 2025-05-02T00:00:00+00:00 to 2025-05-08T00:00:00+00:00 Fetching data from 2025-05-08T00:01:00+00:00 to 2025-05-09T00:00:00+00:00 Fetched 4349 rows of BTC/USD data. Sample data (first 5 rows): close high low trade_count \ timestamp 2025-05-02 00:00:00+00:00 96599.4770 96599.4770 96599.4770 0 2025-05-02 00:02:00+00:00 96650.8100 96650.8100 96650.8100 0 2025-05-02 00:03:00+00:00 96588.0115 96588.0115 96588.0115 0 2025-05-02 00:05:00+00:00 96639.6100 96639.6100 96639.6100 0 2025-05-02 00:08:00+00:00 96521.6100 96584.7550 96521.6100 0 open volume vwap symbol timestamp 2025-05-02 00:00:00+00:00 96599.4770 0.0 96599.4770 BTC/USD 2025-05-02 00:02:00+00:00 96650.8100 0.0 96650.8100 BTC/USD 2025-05-02 00:03:00+00:00 96588.0115 0.0 96588.0115 BTC/USD 2025-05-02 00:05:00+00:00 96639.6100 0.0 96639.6100 BTC/USD 2025-05-02 00:08:00+00:00 96584.7550 0.0 96553.1825 BTC/USD Sample data (last 5 rows): close high low trade_count \ timestamp 2025-05-08 23:55:00+00:00 103121.6815 103121.6815 103121.6815 0 2025-05-08 23:56:00+00:00 103170.6190 103170.6190 103170.6190 0 2025-05-08 23:57:00+00:00 103204.4250 103204.4250 103204.4250 0 2025-05-08 23:58:00+00:00 103284.7150 103284.7150 103284.7150 0 2025-05-09 00:00:00+00:00 103206.1315 103267.4635 103196.4550 1 open volume vwap symbol timestamp 2025-05-08 23:55:00+00:00 103121.6815 0.000000 103121.6815 BTC/USD 2025-05-08 23:56:00+00:00 103170.6190 0.000000 103170.6190 BTC/USD 2025-05-08 23:57:00+00:00 103204.4250 0.000000 103204.4250 BTC/USD 2025-05-08 23:58:00+00:00 103284.7150 0.000000 103284.7150 BTC/USD 2025-05-09 00:00:00+00:00 103230.7000 0.000012 103230.7000 BTC/USD
Key changes:
timeframe = tradeapi.TimeFrame.Minute: This is the crucial fix.
I renamed the timeframe parameter in fetch_data to timeframe_enum to avoid confusion if you were to pass the string version.
Added a check for API_KEY and API_SECRET to ensure they are loaded.
Improved the connection check with a try-except block.
Refined the chunking logic in fetch_data slightly, particularly how current_start is advanced and how chunk_end is determined to better respect end_dt.
Made the advancement of current_start adaptive to the timeframe_enum.
Final filtering of the concatenated DataFrame to strictly adhere to start_dt and end_dt.
Reduced the default fetch range in the example to 7 days for faster initial testing.
Important: You will also need to make the same correction in Step 6 (Signal Generation & Order Execution) inside the get_latest_bar_features function:
Step 3: Feature Engineering
This is where you create signals for your model. pandas-ta is a great library for this.
import pandas_ta as ta
def create_features(df):
if df.empty or 'close' not in df.columns:
print("DataFrame is empty or 'close' column is missing.")
return df
df.ta.sma(length=10, append=True, col_names=('SMA_10'))
df.ta.sma(length=30, append=True, col_names=('SMA_30'))
df.ta.ema(length=10, append=True, col_names=('EMA_10'))
df.ta.rsi(length=14, append=True, col_names=('RSI_14'))
df.ta.macd(append=True, col_names=('MACD_12_26_9', 'MACDh_12_26_9', 'MACDs_12_26_9'))
df.ta.bbands(length=20, append=True, col_names=('BBL_20_2.0', 'BBM_20_2.0', 'BBU_20_2.0', 'BBB_20_2.0', 'BBP_20_2.0'))
df.ta.atr(length=14, append=True, col_names=('ATR_14'))
# Lagged returns
for lag in [1, 3, 5, 10]:
df[f'return_{lag}m'] = df['close'].pct_change(periods=lag)
# Add more features: volatility, momentum, volume-based if available, etc.
# e.g., log returns, price relative to moving average, etc.
df.dropna(inplace=True) # Remove rows with NaNs created by indicators
return df
if not btc_data.empty:
featured_data = create_features(btc_data.copy()) # Use .copy() to avoid modifying original
print("\nData with features:")
print(featured_data.head())
else:
print("Cannot create features, btc_data is empty.")
featured_data = pd.DataFrame() # ensure it's a DataFrame
--------------------------------------------------------------------------- ImportError Traceback (most recent call last) Cell In[3], line 1 ----> 1 import pandas_ta as ta 3 def create_features(df): 4 if df.empty or 'close' not in df.columns: File ~\anaconda3\envs\alpaca_AI_env\Lib\site-packages\pandas_ta\__init__.py:116 97 EXCHANGE_TZ = { 98 "NZSX": 12, "ASX": 11, 99 "TSE": 9, "HKE": 8, "SSE": 8, "SGX": 8, (...) 102 "BMF": -2, "NYSE": -4, "TSX": -4 103 } 105 RATE = { 106 "DAYS_PER_MONTH": 21, 107 "MINUTES_PER_HOUR": 60, (...) 113 "YEARLY": 1, 114 } --> 116 from pandas_ta.core import * File ~\anaconda3\envs\alpaca_AI_env\Lib\site-packages\pandas_ta\core.py:18 16 from pandas_ta.candles import * 17 from pandas_ta.cycles import * ---> 18 from pandas_ta.momentum import * 19 from pandas_ta.overlap import * 20 from pandas_ta.performance import * File ~\anaconda3\envs\alpaca_AI_env\Lib\site-packages\pandas_ta\momentum\__init__.py:34 32 from .smi import smi 33 from .squeeze import squeeze ---> 34 from .squeeze_pro import squeeze_pro 35 from .stc import stc 36 from .stoch import stoch File ~\anaconda3\envs\alpaca_AI_env\Lib\site-packages\pandas_ta\momentum\squeeze_pro.py:2 1 # -*- coding: utf-8 -*- ----> 2 from numpy import NaN as npNaN 3 from pandas import DataFrame 4 from pandas_ta.momentum import mom ImportError: cannot import name 'NaN' from 'numpy' (C:\Users\micro\anaconda3\envs\alpaca_AI_env\Lib\site-packages\numpy\__init__.py)
super ai .:. thegod .:. improve yourself .:. improve your business .:. improve the world .:. about .:. contact .:. general ai .:. ai / ml courses .:. ai art gallery