I have written a python script file which fetch the real time data from bybit and send that data to kafka topic.
My kafka is running in local.
below is the script :
import requests
import json
from websocket import create_connection
from websocket import create_connection
from kafka import KafkaProducer
Bybit API credentials
api_key = ''
api_secret = ''
symbol = 'BTCUSD' # Replace with the symbol you're interested in
Kafka configuration
kafka_bootstrap_servers = 'localhost:9092'
price_topic = 'server-2'
trade_topic = 'bybit_trade_data'
Set up Kafka producer
producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
Function to fetch price data from Bybit API
def fetch_price_data():
url = f'https://api.bybit.com/v2/public/tickers?symbol={symbol}'
response = requests.get(url)
data = response.json()
return data['result'][0]
Function to connect to Bybit WebSocket for trade data
def connect_to_websocket():
ws_url = 'wss://stream.bybit.com/realtime'
ws = create_connection(ws_url)
request_data = {
'op': 'subscribe',
'args': [f'trade.{symbol}']
}
ws.send(json.dumps(request_data))
return ws
Main loop to continuously fetch data and produce to Kafka topics
def main():
ws = connect_to_websocket()
while True:
try:
# Fetch price data from Bybit REST API
price_data = fetch_price_data()
# Produce price data to Kafka topic
producer.send(price_topic, value=price_data)
print(f"Price Data sent to Kafka - {price_data}")
# Receive and process trade data from Bybit WebSocket
trade_data = json.loads(ws.recv())
# Produce trade data to Kafka topic
producer.send(trade_topic, value=trade_data)
print(f"Trade Data sent to Kafka - {trade_data}")
except Exception as e:
print(f"Error: {e}")
if name == "main":
main()
Also i have written a d3.js code to display the data coming from bybit.
<!DOCTYPE html>
WebSocket Bar Chart Visualization
<br> #bar-chart{<br> background-color: red;<br> }<br>
<script>
// Connect to WebSocket
const socket = new WebSocket("wss://stream.bybit.com/realtime");
// Initialize D3.js bar chart
const svg = d3
.select("#bar-chart")
.append("svg")
.attr("width", 400)
.attr("height", 200);
//Web Socket config
socket.addEventListener("open", function (event) {
console.log("WebSocket is open now.");
});
socket.addEventListener("error", function (event) {
console.log("WebSocket error: ", event);
});
// Receive and update data from WebSocket
socket.addEventListener("message", function (event) {
// console.log("Message from server: ", event.data);
// const data = JSON.parse(event.data);
console.log("Message from server: ", event.data);
let data;
try {
data = JSON.parse(event.data);
} catch (error) {
console.error("Error parsing data: ", error);
return;
}
// Assuming data is in the format { label: 'Category', value: 10 }
updateBarChart(data.label, data.value);
});
// Function to update the bar chart
function updateBarChart(label, value) {
// Your D3.js code to update the bar chart goes here
// Example: Append a rectangle for each data point
// svg
// .selectAll("rect")
// .data([value])
// .enter()
// .append("rect")
// .attr("x", 0)
// .attr("y", 0)
// .attr("width", value)
// .attr("height", 50)
// .attr("fill", "blue");
// Bind the new data to the rectangles
const rects = svg.selectAll("rect").data([value]);
// Update existing rectangles
rects
.attr("width", (d) => d)
.attr("height", 50)
.attr("fill", "blue");
// Add new rectangles for any new data points
rects
.enter()
.append("rect")
.attr("x", 0)
.attr("y", 0)
.attr("width", (d) => d)
.attr("height", 50)
.attr("fill", "blue");
// Remove rectangles for any missing data points
rects.exit().remove();
}
socket.addEventListener("close", function (event) {
console.log("WebSocket is closed now.");
});
</script>
But in live server i am not getting any visualization result.
Please help me with D3.js code
Top comments (0)