Capturing real-time cryptocurrency market data is essential for traders, analysts, and algorithmic systems. Binance, one of the world’s leading digital asset exchanges, provides multiple ways to access K-line (candlestick) data — with WebSocket being the preferred method for low-latency, real-time updates.
This guide walks you through building a robust, asynchronous Python system using asyncio and websockets to subscribe to Binance K-line data, process it efficiently with Pandas, and store it in high-performance Parquet format. Whether you're developing a backtesting engine, a live analytics dashboard, or a trading bot, this workflow ensures reliable, scalable data ingestion.
Why Use WebSocket Over REST API?
While Binance’s REST API allows historical K-line retrieval, it’s not ideal for real-time applications due to rate limits and polling delays. In contrast, WebSocket streams push updates instantly as they occur — making them perfect for time-sensitive use cases.
👉 Discover how real-time market data powers next-gen trading strategies.
Establishing a Robust WebSocket Connection
To maintain persistent and resilient connections, we implement a reconnection-capable WebSocket client. The following functions create WebSocket streams for three Binance markets:
- Spot trading
- USDT-margined futures
- Coin-margined futures
from ws_basics import ReconnectingWebsocket
# Base URLs for different Binance markets
SPOT_STREAM_URL = 'wss://stream.binance.com:9443/'
USDT_FUTURES_FSTREAM_URL = 'wss://fstream.binance.com/'
COIN_FUTURES_DSTREAM_URL = 'wss://dstream.binance.com/'
def get_coin_futures_multi_candlesticks_socket(symbols, time_interval):
channels = [f'{s.lower()}@kline_{time_interval}' for s in symbols]
return ReconnectingWebsocket(
path='/'.join(channels),
url=COIN_FUTURES_DSTREAM_URL,
prefix='stream?streams='
)
def get_usdt_futures_multi_candlesticks_socket(symbols, time_interval):
channels = [f'{s.lower()}@kline_{time_interval}' for s in symbols]
return ReconnectingWebsocket(
path='/'.join(channels),
url=USDT_FUTURES_FSTREAM_URL,
prefix='stream?streams='
)
def get_spot_multi_candlesticks_socket(symbols, time_interval):
channels = [f'{s.lower()}@kline_{time_interval}' for s in symbols]
return ReconnectingWebsocket(
path='/'.join(channels),
url=SPOT_STREAM_URL,
prefix='stream?streams='
)These functions generate multiplexed WebSocket connections that can track multiple symbols simultaneously — such as BTCUSDT and ETHUSDT — across various time intervals like 1m, 5m, or 1h.
Example: Receiving Real-Time BTCUSDT 1-Minute Candles
import asyncio
import logging
from binance_market_ws import get_usdt_futures_multi_candlesticks_socket
async def main():
socket = get_usdt_futures_multi_candlesticks_socket(['BTCUSDT'], '1m')
async with socket as socket_conn:
while True:
try:
res = await socket_conn.recv()
print(res)
except asyncio.TimeoutError:
logging.error('WebSocket receive timeout')
break
if __name__ == '__main__':
asyncio.run(main())Sample output:
{
"stream": "btcusdt@kline_1m",
"data": {
"e": "kline",
"E": 1719765539838,
"s": "BTCUSDT",
"k": {
"t": 1719765480000,
"T": 1719765539999,
"s": "BTCUSDT",
"i": "1m",
"f": 5122041311,
"L": 5122041720,
"o": "61607.90",
"c": "61623.30",
"h": "61623.30",
"l": "61605.30",
"v": "16.692",
"n": 410,
"x": false,
"q": "1028411.77850",
...
}
}
}Each message contains a full K-line object. Note the 'x': False field — this indicates the candle is still forming. We only want closed candles for accurate analysis.
Parsing K-Line Data into Pandas DataFrame
To make raw JSON data usable, we convert it into structured Pandas DataFrames. This enables seamless integration with data analysis, visualization, and machine learning tools.
import pandas as pd
def convert_to_dataframe(x, interval_delta):
columns = [
'candle_begin_time', 'open', 'high', 'low', 'close', 'volume',
'quote_volume', 'trade_num', 'taker_buy_base_asset_volume',
'taker_buy_quote_asset_volume'
]
candle_data = [
pd.to_datetime(int(x['t']), unit='ms', utc=True),
float(x['o']), float(x['h']), float(x['l']), float(x['c']),
float(x['v']), float(x['q']), float(x['n']),
float(x['V']), float(x['Q'])
]
# Use K-line end time as index
index_time = candle_data[0] + interval_delta
return pd.DataFrame(data=[candle_data], columns=columns, index=[index_time])We also apply defensive programming to filter out incomplete or invalid messages:
def handle_candle_data(res, interval_delta):
if 'data' not in res:
return None
data = res['data']
if data.get('e') != 'kline' or 'k' not in data:
return None
candle = data['k']
if not candle.get('x'): # Only accept closed candles
return None
return convert_to_dataframe(candle, interval_delta)This ensures only finalized, high-integrity K-lines enter your dataset.
Building a Scalable K-Line Listener: CandleListener
The CandleListener class encapsulates WebSocket connectivity and message handling into a reusable component. It supports multiple symbols and asset types (spot/futures), and uses an asyncio.Queue to decouple data reception from processing.
class CandleListener:
TRADE_TYPE_MAP = {
'usdt_futures': get_usdt_futures_multi_candlesticks_socket,
'coin_futures': get_coin_futures_multi_candlesticks_socket,
'spot': get_spot_multi_candlesticks_socket
}
def __init__(self, type_, symbols, time_interval, que):
self.trade_type = type_
self.symbols = set(symbols)
self.time_interval = time_interval
self.interval_delta = pd.Timedelta(minutes=int(time_interval[:-1]))
self.que = que
self.req_reconnect = False
async def start_listen(self):
socket_func = self.TRADE_TYPE_MAP[self.trade_type]
while True:
socket = socket_func(self.symbols, self.time_interval)
async with socket as socket_conn:
while True:
if self.req_reconnect:
self.req_reconnect = False
break
try:
res = await socket_conn.recv()
df_candle = handle_candle_data(res, self.interval_delta)
if df_candle is not None:
self.que.put_nowait({
'type': 'candle_data',
'data': df_candle,
'symbol': res['data']['s'],
'time_interval': self.time_interval,
'trade_type': self.trade_type,
'recv_time': pd.Timestamp.now(tz='UTC')
})
except asyncio.TimeoutError:
logging.error('WebSocket timeout, reconnecting...')
breakYou can dynamically add or remove trading pairs during runtime using add_symbols() and remove_symbols().
Asynchronously Recording Data in Parquet Format
To achieve high-performance disk writes without blocking the event loop, we adopt a producer-consumer architecture:
- Producers: Multiple
CandleListenerinstances stream data into a shared queue. - Consumer: A single dispatcher processes incoming messages and writes to Parquet files.
Parquet is ideal for financial time-series data due to its columnar storage, compression efficiency, and fast query performance.
import os
def update_candle_data(df_new: pd.DataFrame, symbol, time_interval, trade_type):
output_path = f'{trade_type}_{symbol}_{time_interval}.pqt'
if not os.path.exists(output_path):
df_new.to_parquet(output_path, compression='zstd')
else:
df = pd.read_parquet(output_path)
df = pd.concat([df, df_new]).sort_index()
df = df[~df.index.duplicated(keep='last')] # Prevent duplicates
df.to_parquet(output_path, compression='zstd')The consumer listens asynchronously:
async def dispatcher(main_que: asyncio.Queue):
while True:
req = await main_que.get()
if req['type'] == 'candle_data':
update_candle_data(
req['data'],
req['symbol'],
req['time_interval'],
req['trade_type']
)
logging.info('Recorded %s %s-%s at %s',
req['trade_type'], req['symbol'],
req['time_interval'], req['recv_time'])Running Multiple Listeners Concurrently
async def main():
main_que = asyncio.Queue()
# Define producers
listener_usdt_perp_1m = CandleListener('usdt_futures', ['BTCUSDT', 'ETHUSDT'], '1m', main_que)
listener_coin_perp_3m = CandleListener('coin_futures', ['BTCUSD_PERP', 'ETHUSD_PERP'], '3m', main_que)
listener_spot_1m = CandleListener('spot', ['BTCUSDT', 'BNBUSDT'], '1m', main_que)
# Start consumer task
dispatcher_task = asyncio.create_task(dispatcher(main_que))
# Run all listeners and dispatcher
await asyncio.gather(
listener_usdt_perp_1m.start_listen(),
listener_coin_perp_3m.start_listen(),
listener_spot_1m.start_listen(),
dispatcher_task
)Output example:
INFO - Recorded usdt_futures BTCUSDT-1m at 2024-06-30 15:00:00+00:00
INFO - Recorded spot BNBUSDT-1m at 2024-06-30 15:00:00+00:00Generated files:
usdt_futures_BTCUSDT_1m.pqt
coin_futures_ETHUSD_PERP_3m.pqt
spot_BTCUSDT_1m.pqtEach file stores clean, indexed candlestick data ready for analysis.
Core Keywords for SEO Optimization
- Binance WebSocket
- real-time K-line data
- asyncio Python
- Pandas DataFrame
- Parquet format
- cryptocurrency market data
- WebSocket candlestick streaming
- asynchronous data recording
These terms reflect user search intent around automated crypto data collection and are naturally integrated throughout the article.
Frequently Asked Questions (FAQ)
How do I ensure I only record closed candles?
Binance WebSocket sends updates for both open and closed candles. Check the 'x' field in the K-line object — only process when 'x': True, which indicates the candle has closed.
Why use Parquet instead of CSV?
Parquet offers superior compression, faster read/write speeds, schema enforcement, and efficient querying — especially important when handling large volumes of time-series financial data.
Can I run multiple WebSocket connections safely?
Yes. Using asyncio and proper queue management ensures threads don't block each other. The producer-consumer model keeps disk I/O separate from network operations.
What happens if the WebSocket disconnects?
The ReconnectingWebsocket class automatically attempts reconnection on failure. Additionally, timeout detection triggers manual reconnect logic to maintain stream continuity.
Is this setup suitable for high-frequency trading?
While this system captures real-time data reliably, production-grade HFT systems require additional layers: latency optimization, message timestamping, heartbeat monitoring, and failover redundancy.
👉 Explore how professional platforms manage real-time crypto data at scale.
Conclusion
You now have a fully functional system that:
- Connects to Binance via WebSocket for real-time K-line updates
- Filters and parses only closed candles using defensive programming
- Processes multiple symbols and markets concurrently via
asyncio - Stores structured data efficiently in Parquet format
This foundation supports advanced applications like backtesting engines, live dashboards, anomaly detection models, and algorithmic trading systems.
As you scale this solution, consider adding features like:
- Data validation checks
- Disk space monitoring
- Cloud storage export (e.g., S3)
- Alerting on missing candles
With continuous improvements, your data pipeline can evolve into a mission-critical component of any crypto analytics platform.
👉 See how top traders leverage real-time market insights today.