Worker Services Documentation¶
The platform includes background worker services for processing telemetry data.
Overview¶
flowchart TB
subgraph Bridge["MQTT to Kafka Bridge"]
MQTT[("MQTT Broker")]
BridgeCode[("Bridge Service")]
end
subgraph Worker["Kafka Consumer"]
Kafka[("Kafka")]
WorkerCode[("Worker Service")]
end
MQTT -->|livestock/telemetry/+| BridgeCode
BridgeCode -->|telemetry_raw| Kafka
Kafka -->|Consume| WorkerCode
Processing Flow¶
flowchart TB
subgraph Worker["Worker Service"]
Consume[Consume]
Parse[Parse]
Store[(Store in DB)]
Geofence[Check Geofence]
Broadcast[Broadcast]
end
Kafka[("Kafka")] --> Consume
Consume --> Parse
Parse --> Store
Store --> Geofence
Geofence --> Broadcast
Broadcast --> API[("API Server")]
MQTT to Kafka Bridge¶
File: app/worker/mqtt_to_kafka_bridge.py
Purpose¶
Subscribes to MQTT topics and forwards messages to Kafka for processing.
How It Works¶
- Connects to MQTT broker
- Subscribes to
livestock/telemetry/#(all belt topics) - Receives telemetry messages
- Serializes to Protocol Buffer format
- Produces to Kafka
telemetry_rawtopic
Configuration¶
class MqttToKafkaBridge:
def __init__(
self,
mqtt_host: str, # mosquitto
mqtt_port: int, # 1883
mqtt_topic: str, # livestock/telemetry/#
kafka_bootstrap_servers: str, # kafka:9092
kafka_topic: str, # telemetry_raw
):
Message Format¶
MQTT message:
{
"belt_id": "BELT-001",
"latitude": -36.595,
"longitude": 144.945,
"temperature": 38.5,
"activity_level": 5.0,
"timestamp": 1713724800
}
Running¶
python -m app.worker.mqtt_to_kafka_bridge
Kafka Consumer (Worker)¶
File: app/worker/kafka_consumer.py
Purpose¶
Consumes telemetry from Kafka, stores in database, checks geofence, broadcasts to clients.
How It Works¶
- Subscribes to Kafka
telemetry_rawtopic - Consumes messages
- Parses protobuf or JSON
- Stores in TimescaleDB
- Checks geofence compliance
- Broadcasts to WebSocket clients
Processing Flow¶
Kafka Message
│
▼
┌────────────────┐
│ Parse Message │ protobuf or JSON
└──────┬───────┘
│
▼
┌────────────────┐
│ Store in DB │ TimescaleDB
└──────┬───────┘
│
▼
┌────────────────┐
│ Check Geofence │ PostGIS ST_Contains
└──────┬───────┘
│
▼
┌────────────────┐
│ Broadcast WS │ HTTP to API
└──────┬───────┘
│
▼
Process Next
Geofence Checking¶
The geofence check uses PostGIS:
def check_geofence(self, belt_id, latitude, longitude):
# Get animal's current paddock
animal = db.query(Animal).filter(Animal.belt_id == belt_id).first()
# Get paddock geometry
paddock = db.query(Paddock).filter(Paddock.id == animal.current_paddock_id).first()
# Check if point is inside polygon
query = """
SELECT ST_Contains(
ST_GeomFromEWKT(:paddock_geom),
ST_GeomFromEWKT(:point_geom)
) AS is_within
"""
# Returns True if inside paddock
Broadcasting¶
async def _broadcast_telemetry(self, belt_id, latitude, longitude, temperature, activity_level, timestamp):
async with aiohttp.ClientSession() as session:
payload = {
"type": "telemetry",
"belt_id": belt_id,
"latitude": latitude,
"longitude": longitude,
"temperature": temperature,
"activity_level": activity_level,
"timestamp": timestamp,
}
async with session.post(
"http://api:8000/internal/broadcast-telemetry",
json=payload
) as resp:
pass
Running¶
python -m app.worker.kafka_consumer
Geofence Service¶
File: app/worker/geofence_service.py
Purpose¶
Checks if an animal is within its assigned paddock.
Usage¶
from app.worker.geofence_service import GeofenceService
geofence_service = GeofenceService(db)
is_within, breached_paddock_id = geofence_service.check_geofence(
belt_id="BELT-001",
latitude=-36.595,
longitude=144.945
)
if not is_within:
# Create alert
alert = geofence_service.create_breach_alert(
belt_id="BELT-001",
latitude=-36.595,
longitude=144.945,
paddock_id=breached_paddock_id
)
Running Workers¶
Using Docker Compose¶
# Start all workers
docker-compose up -d bridge worker
# View worker logs
docker-compose logs -f worker
# View bridge logs
docker-compose logs -f bridge
Running Locally¶
# Terminal 1: Start MQTT to Kafka bridge
python -m app.worker.mqtt_to_kafka_bridge
# Terminal 2: Start Kafka consumer
python -m app.worker.kafka_consumer
Troubleshooting¶
Bridge not receiving MQTT messages¶
# Check MQTT broker
docker-compose logs mosquitto
# Check subscription
docker-compose exec bridge python -c "
from app.core.config import settings
print(f'MQTT Topic: {settings.mqtt_topic}')
"
Worker not consuming Kafka messages¶
# Check Kafka topic
docker-compose exec kafka kafka-topics --list --bootstrap-server localhost:9092
# Check consumer group
docker-compose exec kafka kafka-consumer-groups --list --bootstrap-server localhost:9092
# Manually test consumer
docker exec worker python -c "
from confluent_kafka import Consumer
c = Consumer({'bootstrap.servers': 'kafka:9092', 'group.id': 'test'})
c.subscribe(['telemetry_raw'])
msg = c.poll(timeout=5)
print(msg.value() if msg else 'No message')
"
Database connection issues¶
# Check PostgreSQL
docker-compose exec postgres pg_isready
# Check TimescaleDB
docker-compose exec timescale pg_isready
# Test connection
docker exec worker python -c "
from app.core.database import SessionLocal, TimescaleSessionLocal
db = SessionLocal()
print('PostgreSQL OK')
db.close()
db = TimescaleSessionLocal()
print('TimescaleDB OK')
"