Using Asyncio and WebSocket to Retrieve and Record Binance K-Line Market Data

·

Capturing real-time cryptocurrency market data is essential for traders, analysts, and algorithmic systems. Binance, one of the world’s leading digital asset exchanges, provides multiple ways to access K-line (candlestick) data — with WebSocket being the preferred method for low-latency, real-time updates.

This guide walks you through building a robust, asynchronous Python system using asyncio and websockets to subscribe to Binance K-line data, process it efficiently with Pandas, and store it in high-performance Parquet format. Whether you're developing a backtesting engine, a live analytics dashboard, or a trading bot, this workflow ensures reliable, scalable data ingestion.


Why Use WebSocket Over REST API?

While Binance’s REST API allows historical K-line retrieval, it’s not ideal for real-time applications due to rate limits and polling delays. In contrast, WebSocket streams push updates instantly as they occur — making them perfect for time-sensitive use cases.

👉 Discover how real-time market data powers next-gen trading strategies.


Establishing a Robust WebSocket Connection

To maintain persistent and resilient connections, we implement a reconnection-capable WebSocket client. The following functions create WebSocket streams for three Binance markets:

from ws_basics import ReconnectingWebsocket

# Base URLs for different Binance markets
SPOT_STREAM_URL = 'wss://stream.binance.com:9443/'
USDT_FUTURES_FSTREAM_URL = 'wss://fstream.binance.com/'
COIN_FUTURES_DSTREAM_URL = 'wss://dstream.binance.com/'

def get_coin_futures_multi_candlesticks_socket(symbols, time_interval):
    channels = [f'{s.lower()}@kline_{time_interval}' for s in symbols]
    return ReconnectingWebsocket(
        path='/'.join(channels),
        url=COIN_FUTURES_DSTREAM_URL,
        prefix='stream?streams='
    )

def get_usdt_futures_multi_candlesticks_socket(symbols, time_interval):
    channels = [f'{s.lower()}@kline_{time_interval}' for s in symbols]
    return ReconnectingWebsocket(
        path='/'.join(channels),
        url=USDT_FUTURES_FSTREAM_URL,
        prefix='stream?streams='
    )

def get_spot_multi_candlesticks_socket(symbols, time_interval):
    channels = [f'{s.lower()}@kline_{time_interval}' for s in symbols]
    return ReconnectingWebsocket(
        path='/'.join(channels),
        url=SPOT_STREAM_URL,
        prefix='stream?streams='
    )

These functions generate multiplexed WebSocket connections that can track multiple symbols simultaneously — such as BTCUSDT and ETHUSDT — across various time intervals like 1m, 5m, or 1h.

Example: Receiving Real-Time BTCUSDT 1-Minute Candles

import asyncio
import logging
from binance_market_ws import get_usdt_futures_multi_candlesticks_socket

async def main():
    socket = get_usdt_futures_multi_candlesticks_socket(['BTCUSDT'], '1m')
    async with socket as socket_conn:
        while True:
            try:
                res = await socket_conn.recv()
                print(res)
            except asyncio.TimeoutError:
                logging.error('WebSocket receive timeout')
                break

if __name__ == '__main__':
    asyncio.run(main())

Sample output:

{
  "stream": "btcusdt@kline_1m",
  "data": {
    "e": "kline",
    "E": 1719765539838,
    "s": "BTCUSDT",
    "k": {
      "t": 1719765480000,
      "T": 1719765539999,
      "s": "BTCUSDT",
      "i": "1m",
      "f": 5122041311,
      "L": 5122041720,
      "o": "61607.90",
      "c": "61623.30",
      "h": "61623.30",
      "l": "61605.30",
      "v": "16.692",
      "n": 410,
      "x": false,
      "q": "1028411.77850",
      ...
    }
  }
}

Each message contains a full K-line object. Note the 'x': False field — this indicates the candle is still forming. We only want closed candles for accurate analysis.


Parsing K-Line Data into Pandas DataFrame

To make raw JSON data usable, we convert it into structured Pandas DataFrames. This enables seamless integration with data analysis, visualization, and machine learning tools.

import pandas as pd

def convert_to_dataframe(x, interval_delta):
    columns = [
        'candle_begin_time', 'open', 'high', 'low', 'close', 'volume',
        'quote_volume', 'trade_num', 'taker_buy_base_asset_volume',
        'taker_buy_quote_asset_volume'
    ]
    candle_data = [
        pd.to_datetime(int(x['t']), unit='ms', utc=True),
        float(x['o']), float(x['h']), float(x['l']), float(x['c']),
        float(x['v']), float(x['q']), float(x['n']),
        float(x['V']), float(x['Q'])
    ]
    # Use K-line end time as index
    index_time = candle_data[0] + interval_delta
    return pd.DataFrame(data=[candle_data], columns=columns, index=[index_time])

We also apply defensive programming to filter out incomplete or invalid messages:

def handle_candle_data(res, interval_delta):
    if 'data' not in res:
        return None
    data = res['data']
    if data.get('e') != 'kline' or 'k' not in data:
        return None
    candle = data['k']
    if not candle.get('x'):  # Only accept closed candles
        return None
    return convert_to_dataframe(candle, interval_delta)

This ensures only finalized, high-integrity K-lines enter your dataset.


Building a Scalable K-Line Listener: CandleListener

The CandleListener class encapsulates WebSocket connectivity and message handling into a reusable component. It supports multiple symbols and asset types (spot/futures), and uses an asyncio.Queue to decouple data reception from processing.

class CandleListener:
    TRADE_TYPE_MAP = {
        'usdt_futures': get_usdt_futures_multi_candlesticks_socket,
        'coin_futures': get_coin_futures_multi_candlesticks_socket,
        'spot': get_spot_multi_candlesticks_socket
    }

    def __init__(self, type_, symbols, time_interval, que):
        self.trade_type = type_
        self.symbols = set(symbols)
        self.time_interval = time_interval
        self.interval_delta = pd.Timedelta(minutes=int(time_interval[:-1]))
        self.que = que
        self.req_reconnect = False

    async def start_listen(self):
        socket_func = self.TRADE_TYPE_MAP[self.trade_type]
        while True:
            socket = socket_func(self.symbols, self.time_interval)
            async with socket as socket_conn:
                while True:
                    if self.req_reconnect:
                        self.req_reconnect = False
                        break
                    try:
                        res = await socket_conn.recv()
                        df_candle = handle_candle_data(res, self.interval_delta)
                        if df_candle is not None:
                            self.que.put_nowait({
                                'type': 'candle_data',
                                'data': df_candle,
                                'symbol': res['data']['s'],
                                'time_interval': self.time_interval,
                                'trade_type': self.trade_type,
                                'recv_time': pd.Timestamp.now(tz='UTC')
                            })
                    except asyncio.TimeoutError:
                        logging.error('WebSocket timeout, reconnecting...')
                        break

You can dynamically add or remove trading pairs during runtime using add_symbols() and remove_symbols().


Asynchronously Recording Data in Parquet Format

To achieve high-performance disk writes without blocking the event loop, we adopt a producer-consumer architecture:

Parquet is ideal for financial time-series data due to its columnar storage, compression efficiency, and fast query performance.

import os

def update_candle_data(df_new: pd.DataFrame, symbol, time_interval, trade_type):
    output_path = f'{trade_type}_{symbol}_{time_interval}.pqt'
    if not os.path.exists(output_path):
        df_new.to_parquet(output_path, compression='zstd')
    else:
        df = pd.read_parquet(output_path)
        df = pd.concat([df, df_new]).sort_index()
        df = df[~df.index.duplicated(keep='last')]  # Prevent duplicates
        df.to_parquet(output_path, compression='zstd')

The consumer listens asynchronously:

async def dispatcher(main_que: asyncio.Queue):
    while True:
        req = await main_que.get()
        if req['type'] == 'candle_data':
            update_candle_data(
                req['data'],
                req['symbol'],
                req['time_interval'],
                req['trade_type']
            )
            logging.info('Recorded %s %s-%s at %s',
                         req['trade_type'], req['symbol'],
                         req['time_interval'], req['recv_time'])

Running Multiple Listeners Concurrently

async def main():
    main_que = asyncio.Queue()

    # Define producers
    listener_usdt_perp_1m = CandleListener('usdt_futures', ['BTCUSDT', 'ETHUSDT'], '1m', main_que)
    listener_coin_perp_3m = CandleListener('coin_futures', ['BTCUSD_PERP', 'ETHUSD_PERP'], '3m', main_que)
    listener_spot_1m = CandleListener('spot', ['BTCUSDT', 'BNBUSDT'], '1m', main_que)

    # Start consumer task
    dispatcher_task = asyncio.create_task(dispatcher(main_que))

    # Run all listeners and dispatcher
    await asyncio.gather(
        listener_usdt_perp_1m.start_listen(),
        listener_coin_perp_3m.start_listen(),
        listener_spot_1m.start_listen(),
        dispatcher_task
    )

Output example:

INFO - Recorded usdt_futures BTCUSDT-1m at 2024-06-30 15:00:00+00:00
INFO - Recorded spot BNBUSDT-1m at 2024-06-30 15:00:00+00:00

Generated files:

usdt_futures_BTCUSDT_1m.pqt
coin_futures_ETHUSD_PERP_3m.pqt
spot_BTCUSDT_1m.pqt

Each file stores clean, indexed candlestick data ready for analysis.


Core Keywords for SEO Optimization

These terms reflect user search intent around automated crypto data collection and are naturally integrated throughout the article.


Frequently Asked Questions (FAQ)

How do I ensure I only record closed candles?

Binance WebSocket sends updates for both open and closed candles. Check the 'x' field in the K-line object — only process when 'x': True, which indicates the candle has closed.

Why use Parquet instead of CSV?

Parquet offers superior compression, faster read/write speeds, schema enforcement, and efficient querying — especially important when handling large volumes of time-series financial data.

Can I run multiple WebSocket connections safely?

Yes. Using asyncio and proper queue management ensures threads don't block each other. The producer-consumer model keeps disk I/O separate from network operations.

What happens if the WebSocket disconnects?

The ReconnectingWebsocket class automatically attempts reconnection on failure. Additionally, timeout detection triggers manual reconnect logic to maintain stream continuity.

Is this setup suitable for high-frequency trading?

While this system captures real-time data reliably, production-grade HFT systems require additional layers: latency optimization, message timestamping, heartbeat monitoring, and failover redundancy.

👉 Explore how professional platforms manage real-time crypto data at scale.


Conclusion

You now have a fully functional system that:

This foundation supports advanced applications like backtesting engines, live dashboards, anomaly detection models, and algorithmic trading systems.

As you scale this solution, consider adding features like:

With continuous improvements, your data pipeline can evolve into a mission-critical component of any crypto analytics platform.

👉 See how top traders leverage real-time market insights today.