Skip to content

Commit

Permalink
feat: add options websocket market streams (#1563)
Browse files Browse the repository at this point in the history
* add timeout to jobs

* feat: add ws support for options

* add missing functions

* lint

* lint tests

* update timeout gh action

---------

Co-authored-by: carlosmiei <43336371+carlosmiei@users.noreply.github.com>
  • Loading branch information
pcriadoperez and carlosmiei authored Feb 26, 2025
1 parent d12d6a9 commit 028d6aa
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
build:
needs: lint
runs-on: ubuntu-22.04
timeout-minutes: 20
timeout-minutes: 60
env:
PROXY: "http://51.83.140.52:16301"
TEST_TESTNET: "true"
Expand Down
206 changes: 146 additions & 60 deletions binance/ws/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ class BinanceSocketManager:
FSTREAM_TESTNET_URL = "wss://stream.binancefuture.com/"
DSTREAM_URL = "wss://dstream.binance.{}/"
DSTREAM_TESTNET_URL = "wss://dstream.binancefuture.com/"
VSTREAM_URL = "wss://vstream.binance.{}/"
VSTREAM_TESTNET_URL = "wss://testnetws.binanceops.{}/"
OPTIONS_URL = "wss://nbstream.binance.{}/eoptions/"

WEBSOCKET_DEPTH_5 = "5"
WEBSOCKET_DEPTH_10 = "10"
Expand All @@ -47,8 +46,7 @@ def __init__(self, client: AsyncClient, user_timeout=KEEPALIVE_TIMEOUT):
self.STREAM_URL = self.STREAM_URL.format(client.tld)
self.FSTREAM_URL = self.FSTREAM_URL.format(client.tld)
self.DSTREAM_URL = self.DSTREAM_URL.format(client.tld)
self.VSTREAM_URL = self.VSTREAM_URL.format(client.tld)
self.VSTREAM_TESTNET_URL = self.VSTREAM_TESTNET_URL.format(client.tld)
self.OPTIONS_URL = self.OPTIONS_URL.format(client.tld)

self._conns = {}
self._loop = get_loop()
Expand Down Expand Up @@ -129,14 +127,12 @@ def _get_futures_socket(
return self._get_socket(path, stream_url, prefix, socket_type=socket_type)

def _get_options_socket(self, path: str, prefix: str = "ws/"):
stream_url = self.VSTREAM_URL
if self.testnet:
stream_url = self.VSTREAM_TESTNET_URL
stream_url = self.OPTIONS_URL
return self._get_socket(
path,
stream_url,
prefix,
is_binary=True,
is_binary=False,
socket_type=BinanceSocketType.OPTIONS,
)

Expand Down Expand Up @@ -659,23 +655,6 @@ def index_price_socket(self, symbol: str, fast: bool = True):
symbol.lower() + stream_name, futures_type=FuturesType.COIN_M
)

def futures_depth_socket(
self, symbol: str, depth: str = "10", futures_type=FuturesType.USD_M
):
"""Subscribe to a futures depth data stream
https://binance-docs.github.io/apidocs/futures/en/#partial-book-depth-streams
:param symbol: required
:type symbol: str
:param depth: optional Number of depth entries to return, default 10.
:type depth: str
:param futures_type: use USD-M or COIN-M futures default USD-M
"""
return self._get_futures_socket(
symbol.lower() + "@depth" + str(depth), futures_type=futures_type
)

def symbol_mark_price_socket(
self,
symbol: str,
Expand Down Expand Up @@ -874,23 +853,11 @@ def multiplex_socket(self, streams: List[str]):

def options_multiplex_socket(self, streams: List[str]):
"""Start a multiplexed socket using a list of socket names.
User stream sockets can not be included.
Symbols in socket name must be lowercase i.e bnbbtc@aggTrade, neobtc@ticker
Combined stream events are wrapped as follows: {"stream":"<streamName>","data":<rawPayload>}
https://binance-docs.github.io/apidocs/voptions/en/#account-and-trading-interface
:param streams: list of stream names in lower case
:type streams: list
:returns: connection key string if successful, False otherwise
Message Format - see Binance API docs for all types
https://developers.binance.com/docs/derivatives/option/websocket-market-streams
"""
stream_name = "/".join([s.lower() for s in streams])
stream_name = "/".join([s for s in streams])
stream_path = f"streams={stream_name}"
return self._get_options_socket(stream_path, prefix="stream?")

Expand Down Expand Up @@ -1036,60 +1003,179 @@ def isolated_margin_socket(self, symbol: str):
return self._get_account_socket(symbol, stream_url=stream_url)

def options_ticker_socket(self, symbol: str):
"""Subscribe to a 24 hour ticker info stream
"""Subscribe to a 24-hour ticker info stream for options trading.
https://binance-docs.github.io/apidocs/voptions/en/#market-streams-payload-24-hour-ticker
API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/24-hour-TICKER
:param symbol: required
Stream provides real-time 24hr ticker information for all symbols. Only symbols whose ticker info
changed will be sent. Updates every 1000ms.
:param symbol: The option symbol to subscribe to (e.g. "BTC-220930-18000-C")
:type symbol: str
"""
return self._get_options_socket(symbol.lower() + "@ticker")
return self._get_options_socket(symbol.upper() + "@ticker")

def options_ticker_by_expiration_socket(self, symbol: str, expiration_date: str):
"""Subscribe to a 24 hour ticker info stream
https://binance-docs.github.io/apidocs/voptions/en/#24-hour-ticker-by-underlying-asset-and-expiration-data
:param symbol: required
"""Subscribe to a 24-hour ticker info stream by underlying asset and expiration date.
API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/24-hour-TICKER-by-underlying-asset-and-expiration-data
Stream provides real-time 24hr ticker information grouped by underlying asset and expiration date.
Updates every 1000ms.
:param symbol: The underlying asset (e.g., "ETH")
:type symbol: str
:param expiration_date : required
:param expiration_date: The expiration date (e.g., "220930" for Sept 30, 2022)
:type expiration_date: str
"""
return self._get_options_socket(symbol.lower() + "@ticker@" + expiration_date)
return self._get_options_socket(symbol.upper() + "@ticker@" + expiration_date)

def options_recent_trades_socket(self, symbol: str):
"""Subscribe to a latest completed trades stream
"""Subscribe to a real-time trade information stream.
API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Trade-Streams
https://binance-docs.github.io/apidocs/voptions/en/#market-streams-payload-latest-completed-trades
Stream pushes raw trade information for a specific symbol or underlying asset.
Updates every 50ms.
:param symbol: required
:param symbol: The option symbol or underlying asset (e.g., "BTC-200630-9000-P" or "BTC")
:type symbol: str
"""
return self._get_options_socket(symbol.lower() + "@trade")
return self._get_options_socket(symbol.upper() + "@trade")

def options_kline_socket(
self, symbol: str, interval=AsyncClient.KLINE_INTERVAL_1MINUTE
):
"""Subscribe to a candlestick data stream
"""Subscribe to a Kline/Candlestick data stream.
API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Kline-Candlestick-Streams
https://binance-docs.github.io/apidocs/voptions/en/#market-streams-payload-candle
Stream pushes updates to the current klines/candlestick every 1000ms (if existing).
:param symbol: required
Available intervals:
- Minutes: "1m", "3m", "5m", "15m", "30m"
- Hours: "1h", "2h", "4h", "6h", "12h"
- Days: "1d", "3d"
- Weeks: "1w"
:param symbol: The option symbol (e.g., "BTC-200630-9000-P")
:type symbol: str
:param interval: Kline interval, default KLINE_INTERVAL_1MINUTE
:type interval: str
"""
return self._get_options_socket(symbol.lower() + "@kline_" + interval)
return self._get_options_socket(symbol.upper() + "@kline_" + interval)

def options_depth_socket(self, symbol: str, depth: str = "10"):
"""Subscribe to a depth data stream
"""Subscribe to partial book depth stream for options trading.
API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Partial-Book-Depth-Streams
https://binance-docs.github.io/apidocs/voptions/en/#market-streams-payload-depth
Stream provides top N bids and asks from the order book.
Default update speed is 500ms if not specified in the stream name.
:param symbol: The option symbol (e.g., "BTC-200630-9000-P")
:type symbol: str
:param depth: Number of price levels. Valid values: "10", "20", "50", "100"
:type depth: str
"""
return self._get_options_socket(symbol.upper() + "@depth" + str(depth))

def futures_depth_socket(self, symbol: str, depth: str = "10", futures_type=FuturesType.USD_M):
"""Subscribe to a futures depth data stream
https://binance-docs.github.io/apidocs/futures/en/#partial-book-depth-streams
:param symbol: required
:type symbol: str
:param depth: optional Number of depth entries to return, default 10.
:type depth: str
:param futures_type: use USD-M or COIN-M futures default USD-M
"""
return self._get_futures_socket(
symbol.lower() + "@depth" + str(depth), futures_type=futures_type
)

def options_new_symbol_socket(self):
"""Subscribe to a new symbol listing information stream.
Stream provides real-time notifications when new option symbols are listed.
Updates every 50ms.
Stream name: option_pair
API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/New-Symbol-Info
Response fields include:
- Event type and timestamps
- Underlying index (e.g., 'BTCUSDT')
- Quotation asset (e.g., 'USDT')
- Trading pair name (e.g., 'BTC-221116-21000-C')
- Conversion ratio and minimum trade volume
- Option type (CALL/PUT)
- Strike price and expiration time
"""
return self._get_options_socket("option_pair")

def options_open_interest_socket(self, symbol: str, expiration_date: str):
"""Subscribe to an options open interest stream.
Stream provides open interest information for specific underlying asset on specific expiration date.
Updates every 60 seconds.
Stream name format: <underlyingAsset>@openInterest@<expirationDate>
API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Open-Interest
Response fields include:
- Event type and timestamps
- Option symbol (e.g., 'ETH-221125-2700-C')
- Open interest in contracts
- Open interest in USDT
:param symbol: The underlying asset (e.g., "ETH")
:type symbol: str
:param expiration_date: The expiration date (e.g., "221125" for Nov 25, 2022)
:type expiration_date: str
"""
return self._get_options_socket(symbol.upper() + "@openInterest@" + expiration_date)

def options_mark_price_socket(self, symbol: str):
"""Subscribe to an options mark price stream.
Stream provides mark price information for all option symbols on specific underlying asset.
Updates every 1000ms.
Stream name format: <underlyingAsset>@markPrice
API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Mark-Price
Response fields include:
- Event type and timestamps
- Option symbol (e.g., 'ETH-220930-1500-C')
- Option mark price
:param symbol: The underlying asset (e.g., "ETH")
:type symbol: str
"""
return self._get_options_socket(symbol.upper() + "@markPrice")

def options_index_price_socket(self, symbol: str):
"""Subscribe to an options index price stream.
API Reference: https://developers.binance.com/docs/derivatives/option/websocket-market-streams/Index-Price-Streams
Stream provides index price information for underlying assets (e.g., ETHUSDT).
Updates every 1000ms.
Response fields include:
- Event type and timestamps
- Underlying symbol (e.g., 'ETHUSDT')
- Index price
:param symbol: The underlying symbol (e.g., "ETHUSDT")
:type symbol: str
"""
return self._get_options_socket(symbol.lower() + "@depth" + str(depth))
return self._get_options_socket(symbol.upper() + "@index")

async def _stop_socket(self, conn_key):
"""Stop a websocket given the connection key
Expand Down
100 changes: 100 additions & 0 deletions tests/test_streams_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import sys
import pytest
from binance import BinanceSocketManager

pytestmark = [
pytest.mark.skipif(sys.version_info < (3, 8), reason="websockets_proxy Python 3.8+"),
pytest.mark.asyncio
]

# Test constants
OPTION_SYMBOL = "BTC-250328-40000-P"
UNDERLYING_SYMBOL = "BTC"
EXPIRATION_DATE = "250328"
INTERVAL = "1m"
DEPTH = "20"

async def test_options_ticker(clientAsync):
"""Test options ticker socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_ticker_socket(OPTION_SYMBOL)
async with socket as ts:
msg = await ts.recv()
assert msg['e'] == '24hrTicker'
await clientAsync.close_connection()

async def test_options_ticker_by_expiration(clientAsync):
"""Test options ticker by expiration socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_ticker_by_expiration_socket(UNDERLYING_SYMBOL, EXPIRATION_DATE)
async with socket as ts:
msg = await ts.recv()
assert len(msg) > 0
await clientAsync.close_connection()

async def test_options_recent_trades(clientAsync):
"""Test options recent trades socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_recent_trades_socket(UNDERLYING_SYMBOL)
async with socket as ts:
msg = await ts.recv()
assert msg['e'] == 'trade'
await clientAsync.close_connection()

async def test_options_kline(clientAsync):
"""Test options kline socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_kline_socket(OPTION_SYMBOL, INTERVAL)
async with socket as ts:
msg = await ts.recv()
assert msg['e'] == 'kline'
await clientAsync.close_connection()

async def test_options_depth(clientAsync):
"""Test options depth socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_depth_socket(OPTION_SYMBOL, DEPTH)
async with socket as ts:
msg = await ts.recv()
assert msg['e'] == 'depth'
await clientAsync.close_connection()

async def test_options_multiplex(clientAsync):
"""Test options multiplex socket"""
bm = BinanceSocketManager(clientAsync)
streams = [
f"{OPTION_SYMBOL}@ticker",
f"{OPTION_SYMBOL}@trade",
]
socket = bm.options_multiplex_socket(streams)
async with socket as ts:
msg = await ts.recv()
assert 'stream' in msg
await clientAsync.close_connection()

async def test_options_open_interest(clientAsync):
"""Test options open interest socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_open_interest_socket(UNDERLYING_SYMBOL, EXPIRATION_DATE)
async with socket as ts:
msg = await ts.recv()
assert len(msg) > 0
await clientAsync.close_connection()

async def test_options_mark_price(clientAsync):
"""Test options mark price socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_mark_price_socket(UNDERLYING_SYMBOL)
async with socket as ts:
msg = await ts.recv()
assert len(msg) > 0
await clientAsync.close_connection()

async def test_options_index_price(clientAsync):
"""Test options index price socket"""
bm = BinanceSocketManager(clientAsync)
socket = bm.options_index_price_socket('ETHUSDT')
async with socket as ts:
msg = await ts.recv()
assert msg['e'] == 'index'
await clientAsync.close_connection()

0 comments on commit 028d6aa

Please sign in to comment.