Painless Robinhood Algo Trading Engine in Python!

Anthony
7 min readDec 24, 2018

--

Photo by Chris Liverani on Unsplash

TL;DR: complete code below!

A couple months ago, and after having been a developer for a few years with little knowledge of financial engineering, I had the idea of making a quantitative trading engine for Robinhood in Python.

A project that took me just a week ended up teaching me invaluable skills regarding algorithmic trading, portfolio analysis, and even classification methods in ML, and it’s all thanks to a little idea I learned called accelerated learning. This concept centers around using something we already know to learn things that are completely new to us. It’s something we do every day, and if you just have a little bit of knowledge on development in any language, I’d like to share with you how I created a neat little tool to (ideally) make some passive income and inspire you to continue learning on your own!

Before starting, I’ll assume you have basic knowledge of programming, git, and using the Terminal/Command Prompt, and have a preferred text editor. Also, be sure to install Python 3.6 and the latest Anaconda distribution.

Setting Up the Project and Importing the Unofficial Robinhood API

First, let’s create a directory for our project and navigate to the project’s root.

mkdir algo-trading
cd algo-trading

Now, let’s import the Unofficial Python Robinhood API from GitHub. Here, you have two options:

  1. clone this repository into your project’s directory.
  2. Zip and download the above repository into your project’s directory.

In either case, delete all files and folders except Robinhood and LICENSE .

Creating the Backbone for the Engine

Create a new file in the root of your project directory called engine.py . Let’s make the following imports. numpy will be used for numerical calculations and pandas_market_calendars will provide an easy way to retrieve a timestamp for the next market open/close times.

import numpy as np
import pandas_market_calendars as mcal

numpy will come preinstalled in the latest distribution of Anaconda, but you’ll need to type the following line into Terminal to install the latter dependency.

pip install pandas_market_calendars

Let’s start by making our event functions. These will fire at certain times of the day on our server, and will execute functions defined by us. Import the following libraries so that we can define these functions later.

import datetime
from time import time
import threading

Now, paste in this cool function that calls actions on given intervals.

def set_interval(sec, action, start_time = None, stop_time = None):
def call_action():
now = datetime.datetime.today()
if start_time is not None and stop_time is not None:
if start_time < now and stop_time >= now:
set_interval(sec, action, start_time, stop_time)
action()
elif start_time > now:
set_interval(sec, action, start_time, stop_time)
elif stop_time < now:
action()
elif start_time is None:
if stop_time is None:
action()
elif stop_time >= now:
set_interval(sec, action, None, stop_time)
action()
else:
if start_time > now:
set_interval(sec, action, start_time, None)
else:
action()
t = threading.Timer(sec, call_action)
t.start()
return t

set_interval looks pretty confusing at first, but all it does is call action once start_time (a datetime object) passes. The sec parameter defines however many seconds will pass between each time this functions runs and checks to see if any events should occur. If stop_time is defined, then the action will be called between the two given times, and will stop once the latter time passes. Otherwise, action will be called indefinitely.

Defining Market Events

We now need a way to fire events at four key times and intervals:

  • An arbitrary time before the market opens (let’s pick 1 hour)
  • The exact time the New York Stock Exchange (NYSE) opens (9:30am ET… usually)
  • Every so-and-so seconds between market open and close (let’s pick 60 sec/1 min)
  • The exact time the NYSE closes

To do the following, let’s extract datetime objects for the open and close hours for the next market day at the NYSE. This can be accomplished via the following two functions.

def dt64_to_datetime(dt64):
return datetime.datetime.fromtimestamp(dt64.astype('O')/1e9)
def get_next_market_hours():
calendar = mcal.get_calendar("NYSE")
today = datetime.datetime.today()
today_str = today.strftime('%Y-%m-%d')
next_month = today + datetime.timedelta(days=30)
next_month_str = next_month.strftime('%Y-%m-%d')
schedule = calendar.schedule(today_str, next_month_str) start_times = schedule['market_open'].values
end_times = schedule['market_close'].values
current_date = np.datetime64(datetime.datetime.now()) if current_date < start_times[0]:
start_time = start_times[0]
end_time = end_times[0]
else:
start_time = start_times[1]
end_time = end_times[1]
return (dt64_to_datetime(start_time), dt64_to_datetime(end_time))

dt64_to_datetime simply converts numpy’s datetime64 date format to Python’s stock datetime format. Moreover, get_next_market_hours returns a tuple containing the start datetime and end datetime for the next market day. If the market has already opened when this calculation is made, the function returns the times for the following market day.

Finally, let’s create a function called initialize that fires several functions at the four key times listed above, and declare placeholders for these functions.

def initialize():
market_hours = get_next_market_hours()
pre_open_hour = market_hours[0] - datetime.timedelta(hours=1)
open_hour = market_hours[0]
close_hour = market_hours[1]
set_interval(60, lambda: on_market_will_open(), start_time = pre_open_hour)
set_interval(60, lambda: on_market_open(), start_time = open_hour)
set_interval(60, lambda: while_market_open(), start_time = open_hour, stop_time = close_hour)
set_interval(60, lambda: on_market_close(), start_time = close_hour)
def on_market_will_open():
pass
def on_market_open():
pass
def while_market_open():
pass
def on_market_close():
pass
initialize()

initialize is the focal point for our engine. As you can see, we call it at the end of the script, so it will be the first function to get fired. It gets the timeframe for the next market day, and then fires the four event functions we outlined at the specified times. We now have a working algorithmic trading engine… but where’s our algorithm!?

Writing an Algorithm

DISCLAIMER: This is a sample algorithm, and it will likely not earn you millions. What it can do for you, however, is give you some intuition as to how we can utilize our new trading engine to create algorithms that perform well in live situations.

At the top of our script, let’s import the Robinhood API from the local folder. Then, instantiate a global robinhood object right underneath it and log in using your personal credentials.

from Robinhood import Robinhoodrobinhood = Robinhood()
robinhood.login("your@email.com", "your_password")

We’ll keep track of the symbols for each stock we buy, as well as both the price we bought the stocks for and how many shares we bought at the given time. We can accomplish both of the following by creating two global dictionaries as such right below our above code.

symbols_to_buy_price = {}
stocks_to_quantity = {}

symbols_to_buy_price maps stock symbols to the price these stocks were bought at. symbols_to_quantity , likewise, maps stock symbols to the number of shares that were bought.

Our simple algorithm will only run prior to each market day. We will scour Robinhood’s top-movers category, get the ask price of each symbol in descending order, and purchase all the stocks we can afford. If we’ve purchased the stock on any prior day, we will check to see if it’s ask price dropped since then. If so, we’ll immediately sell it. This is how our on_market_will_open event will look…

def on_market_will_open():
top_movers = robinhood.get_tickers_by_tag('top-movers')
user_cash = robinhood.get_account()['buying_power']
for index, symbol in enumerate(top_movers):
ask_price = float(robinhood.get_quote(symbol)['ask_price'])
top_movers[index] = (symbol, ask_price)
top_movers.sort(key=lambda tm: tm[1], reverse=True) for symbol, price in top_movers.items():
if symbol in symbols_to_buy_price and price < symbols_to_buy_price[symbol]:
robinhood.place_market_sell_order(symbol=symbol, quantity=shares_to_buy)
user_cash += price
symbols_to_buy_price.pop(symbol, None)
symbols_to_quantity.pop(symbol, None)
elif symbol not in symbols_to_buy_price and price < user_cash:
robinhood.place_market_buy_order(symbol=symbol, quantity=shares_to_buy)
shares_to_buy = user_cash % price
user_cash -= shares_to_buy * price
symbols_to_buy_price[symbol] = price
symbols_to_quantity[symbol] = shares_to_buy

Since no action will be taken on, during, and immediately after market hours, our final task is to repeat this algorithm for the following day. All we need to do here is call initialize in on_market_close , as follows.

def on_market_close():
initialize()

And, that’s it! To run your algorithm, type python engine.py into Terminal!

If you encounter any missing library errors, simply type pip install _________ , substituting the underscores with the name of each individual library to install.

Add some print statements at each event call to be notified when each event occurs, and feel free to experiment with your own algorithms. Finally, you can copy and paste the complete code below.

Complete engine.py Code

import numpy as npimport pandas_market_calendars as mcalimport datetime
from time import time
import threading
from Robinhood import Robinhoodrobinhood = Robinhood()
robinhood.login("your@email.com", "your_password")
symbols_to_buy_price = {}
symbols_to_quantity = {}
def set_interval(sec, action, start_time = None, stop_time = None):
def call_action():
now = datetime.datetime.today()
if start_time is not None and stop_time is not None:
if start_time < now and stop_time >= now:
set_interval(sec, action, start_time, stop_time)
action()
elif start_time > now:
set_interval(sec, action, start_time, stop_time)
elif stop_time < now:
action()
elif start_time is None:
if stop_time is None:
action()
elif stop_time >= now:
set_interval(sec, action, None, stop_time)
action()
else:
if start_time > now:
set_interval(sec, action, start_time, None)
else:
action()
t = threading.Timer(sec, call_action)
t.start()
return t
def dt64_to_datetime(dt64):
return datetime.datetime.fromtimestamp(dt64.astype('O')/1e9)
def get_next_market_hours():
calendar = mcal.get_calendar("NYSE")
# Get today's date
today = datetime.datetime.today()
today_str = today.strftime('%Y-%m-%d')
# Get next month's date, for safety
next_month = today + datetime.timedelta(days=30)
next_month_str = next_month.strftime('%Y-%m-%d')
# NOTE: Get all market days between today and next month, in case of weekends, breaks, and holidays.
schedule = calendar.schedule(today_str, next_month_str)
start_times = schedule['market_open'].values
end_times = schedule['market_close'].values
current_date = np.datetime64(datetime.datetime.now())if current_date < start_times[0]:
start_time = start_times[0]
end_time = end_times[0]
else:
start_time = start_times[1]
end_time = end_times[1]
return (dt64_to_datetime(start_time), dt64_to_datetime(end_time))def initialize():
market_hours = get_next_market_hours()
pre_open_hour = market_hours[0] - datetime.timedelta(hours=1)
open_hour = market_hours[0]
close_hour = market_hours[1]
set_interval(60, lambda: on_market_will_open(), start_time = pre_open_hour)
set_interval(60, lambda: on_market_open(), start_time = open_hour)
set_interval(60, lambda: while_market_open(), start_time = open_hour, stop_time = close_hour)
set_interval(60, lambda: on_market_close(), start_time = close_hour)
def on_market_will_open():
top_movers = robinhood.get_tickers_by_tag('top-movers')
user_cash = robinhood.get_account()['buying_power']
for index, symbol in enumerate(top_movers):
ask_price = float(robinhood.get_quote(symbol)['ask_price'])
top_movers[index] = (symbol, ask_price)
top_movers.sort(key=lambda tm: tm[1], reverse=True) for symbol, price in top_movers.items():
if symbol in symbols_to_buy_price and price < symbols_to_buy_price[symbol]:
robinhood.place_market_sell_order(symbol=symbol, quantity=shares_to_buy)
user_cash += price
symbols_to_buy_price.pop(symbol, None)
symbols_to_quantity.pop(symbol, None)
elif symbol not in symbols_to_buy_price and price < user_cash:
robinhood.place_market_buy_order(symbol=symbol, quantity=shares_to_buy)
shares_to_buy = user_cash % price
user_cash -= shares_to_buy * price
symbols_to_buy_price[symbol] = price
symbols_to_quantity[symbol] = shares_to_buy
def on_market_open():
pass
def while_market_open():
pass
def on_market_close():
initialize()
initialize()

--

--

Anthony
Anthony

No responses yet