WebSocket Documentation¶
The platform provides WebSocket endpoints for real-time data streaming.
WebSocket Endpoints¶
Telemetry WebSocket¶
Stream real-time telemetry data from all connected belts.
Endpoint: ws://localhost:8000/ws/telemetry
Alerts WebSocket¶
Stream geofence breach alerts in real-time.
Endpoint: ws://localhost:8000/ws/alerts
Connection¶
JavaScript Client¶
// Connect to telemetry stream
const telemetryWs = new WebSocket('ws://localhost:8000/ws/telemetry');
telemetryWs.onopen = () => {
console.log('Connected to telemetry WebSocket');
};
telemetryWs.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received:', data);
};
telemetryWs.onclose = () => {
console.log('Disconnected');
};
// Connect to alerts stream
const alertsWs = new WebSocket('ws://localhost:8000/ws/alerts');
alertsWs.onmessage = (event) => {
const alert = JSON.parse(event.data);
console.log('Alert:', alert);
};
Connection Flow¶
sequenceDiagram
participant Client as Frontend
participant API as FastAPI
participant Manager as ConnectionManager
Client->>API: WebSocket Connection
API->>Manager: Add to active_connections
Manager-->>Client: Connection Accepted
API-->>Client: Connected
loop Every Telemetry Update
Worker->>API: POST /internal/broadcast-telemetry
API->>Manager: Broadcast to all clients
Manager->>Client: WebSocket Message
end
Client->>API: Close Connection
API->>Manager: Remove from active_connections
Message Formats¶
Telemetry Message¶
{
"type": "telemetry",
"belt_id": "BELT-001",
"latitude": -36.595,
"longitude": 144.945,
"temperature": 38.5,
"activity_level": 5.0,
"timestamp": 1713724800
}
Alert Message¶
{
"type": "alert",
"id": "alert-1",
"belt_id": "BELT-001",
"latitude": -36.595,
"longitude": 144.945,
"paddock_id": "paddock-1",
"timestamp": 1713724800,
"expected_paddock_id": "paddock-1"
}
Broadcasting¶
The Worker service broadcasts telemetry to connected clients via internal API:
# app/api/broadcast.py
@router.post("/internal/broadcast-telemetry")
async def broadcast_telemetry(data: dict):
manager = get_manager()
await manager.broadcast(data, "telemetry")
return {"status": "ok"}
Connection Manager¶
The ConnectionManager class handles WebSocket connections:
# app/api/websocket.py
class ConnectionManager:
async def connect(self, websocket: WebSocket, channel: str = "telemetry"):
await websocket.accept()
active_connections[channel].add(websocket)
def disconnect(self, websocket: WebSocket, channel: str = "telemetry"):
active_connections[channel].discard(websocket)
async def broadcast(self, message: dict, channel: str = "telemetry"):
for connection in active_connections[channel]:
try:
await connection.send_json(message)
except Exception:
# Remove disconnected clients
active_connections[channel].discard(connection)
Handling Disconnections¶
The frontend handles reconnection:
// frontend/src/lib/telemetry.ts
let wsReconnectTimeout: NodeJS.Timeout | null = null;
function scheduleReconnect() {
if (wsReconnectTimeout) return;
wsReconnectTimeout = setTimeout(() => {
wsReconnectTimeout = null;
ws = null;
connectTelemetryWebSocket();
}, 3000);
}
ws.onclose = (event) => {
if (!event.wasClean) {
scheduleReconnect();
}
};
Ping/Pong¶
Clients can send ping messages to keep connections alive:
// Send ping
telemetryWs.send(JSON.stringify({ type: "ping" }));
// Receive pong
telemetryWs.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "pong") {
console.log('Pong received');
}
};
Error Handling¶
Frontend¶
ws.onerror = (error) => {
console.error('WebSocket error:', error);
useTelemetryStore.getState().setConnected(false);
};
Backend¶
@router.websocket("/ws/telemetry")
async def websocket_telemetry(websocket: WebSocket):
await manager.connect(websocket, "telemetry")
try:
while True:
data = await websocket.receive_text()
# Process data
except WebSocketDisconnect:
manager.disconnect(websocket, "telemetry")
except Exception as e:
logger.error(f"WebSocket error: {e}")
manager.disconnect(websocket, "telemetry")
Performance Considerations¶
- Connection Pooling: Multiple connections are managed in a set
- Error Handling: Disconnected clients are automatically removed
- Reconnection: Frontend implements automatic reconnection
- Message Format: JSON is used for simplicity (protobuf available)
Security¶
For production, consider: - Add authentication to WebSocket connections - Use WSS (WebSocket Secure) for TLS - Implement rate limiting - Add origin validation