πŸ“š PACS 17 min read

How to Implement DICOM Routing and Workflow Orchestration for Multi-Site Imaging Networks

Complete technical guide to building intelligent DICOM routers and workflow orchestration systems that automatically route medical images based on exam type, priority, and reading assignments across complex healthcare networks.

✍️
Dr. Sarah Chen

Complex healthcare networks with multiple imaging facilities, reading sites, and subspecialty groups require sophisticated DICOM routing and workflow orchestration to ensure studies reach the right radiologist at the right time. This comprehensive guide walks through building a production-ready intelligent routing system that handles study distribution, load balancing, and automatic failover.

JustCopy.ai’s 10 specialized AI agents can generate this entire DICOM routing infrastructure automatically, creating routing engines, load balancing algorithms, and workflow orchestration systems in days instead of months.

Architecture Overview

A modern DICOM router sits at the center of the imaging network, receiving studies from all modalities and intelligently distributing them based on:

  • Exam type and modality: CT reads to CT specialists, MR to MR experts
  • Body part specialization: Neuroradiology vs. musculoskeletal vs. body imaging
  • Reading site availability: 24/7 coverage across multiple time zones
  • Radiologist workload: Automatic load balancing to prevent bottlenecks
  • Priority levels: STAT exams bypass queues and route to senior radiologists
  • Contract requirements: Studies routed per facility-specific SLAs

Here’s the complete system architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         Imaging Facilities (Multiple Sites)          β”‚
β”‚  CT Scanners, MRI, X-Ray, Ultrasound, etc.          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                β”‚
                β”‚ DICOM C-STORE
                β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Central DICOM Router                       β”‚
β”‚                                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚  β”‚  Intelligent Routing Engine                β”‚     β”‚
β”‚  β”‚  - Exam type classification                β”‚     β”‚
β”‚  β”‚  - Subspecialty routing rules              β”‚     β”‚
β”‚  β”‚  - Load balancing algorithms               β”‚     β”‚
β”‚  β”‚  - Priority-based queueing                 β”‚     β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β”‚                                                       β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”‚
β”‚  β”‚  Workflow Orchestration                    β”‚     β”‚
β”‚  β”‚  - Assignment to radiologists              β”‚     β”‚
β”‚  β”‚  - Workload monitoring                     β”‚     β”‚
β”‚  β”‚  - SLA tracking                            β”‚     β”‚
β”‚  β”‚  - Escalation management                   β”‚     β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
            β”‚              β”‚              β”‚
            β–Ό              β–Ό              β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚   Reading    β”‚ β”‚ Reading  β”‚ β”‚   Reading    β”‚
    β”‚   Site A     β”‚ β”‚  Site B  β”‚ β”‚   Site C     β”‚
    β”‚ (General)    β”‚ β”‚(Neuro)   β”‚ β”‚(Night Shift) β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Database Schema for Routing and Workflow

The routing system requires a comprehensive database to track rules, assignments, and performance metrics:

-- Routing rules configuration
CREATE TABLE routing_rules (
    rule_id SERIAL PRIMARY KEY,
    rule_name VARCHAR(200) NOT NULL,
    priority INTEGER DEFAULT 100,
    enabled BOOLEAN DEFAULT TRUE,

    -- Matching criteria
    modality VARCHAR(20),
    body_part VARCHAR(100),
    study_description_pattern TEXT,
    institution_name VARCHAR(200),
    referring_physician VARCHAR(200),
    exam_priority VARCHAR(20),

    -- Routing destination
    destination_ae_title VARCHAR(50) NOT NULL,
    destination_host VARCHAR(100) NOT  NULL,
    destination_port INTEGER NOT NULL,

    -- Timing rules
    time_based_routing BOOLEAN DEFAULT FALSE,
    active_start_time TIME,
    active_end_time TIME,
    active_days_of_week INTEGER[], -- 0=Sunday, 6=Saturday

    -- Performance
    max_concurrent_transfers INTEGER DEFAULT 5,
    retry_attempts INTEGER DEFAULT 3,
    retry_delay_seconds INTEGER DEFAULT 60,

    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_routing_rules_priority ON routing_rules(priority DESC);
CREATE INDEX idx_routing_rules_enabled ON routing_rules(enabled);

-- Radiologist assignments and availability
CREATE TABLE radiologists (
    radiologist_id SERIAL PRIMARY KEY,
    name VARCHAR(200) NOT NULL,
    email VARCHAR(200),
    specialty VARCHAR(100),
    subspecialty VARCHAR(100),
    reading_site VARCHAR(100),

    -- Capacity
    max_studies_per_hour INTEGER DEFAULT 8,
    max_concurrent_unread INTEGER DEFAULT 20,

    -- Status
    current_status VARCHAR(20) DEFAULT 'offline',
    last_activity TIMESTAMP,

    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_radiologists_specialty ON radiologists(specialty);
CREATE INDEX idx_radiologists_status ON radiologists(current_status);

-- Radiologist schedules
CREATE TABLE radiologist_schedules (
    schedule_id SERIAL PRIMARY KEY,
    radiologist_id INTEGER REFERENCES radiologists(radiologist_id),
    day_of_week INTEGER NOT NULL, -- 0=Sunday
    start_time TIME NOT NULL,
    end_time TIME NOT NULL,
    timezone VARCHAR(50) DEFAULT 'UTC',
    is_active BOOLEAN DEFAULT TRUE
);

CREATE INDEX idx_schedules_radiologist ON radiologist_schedules(radiologist_id);

-- Study routing history
CREATE TABLE routing_history (
    routing_id BIGSERIAL PRIMARY KEY,
    study_instance_uid VARCHAR(200) NOT NULL,
    source_ae_title VARCHAR(50),
    destination_ae_title VARCHAR(50),
    rule_id INTEGER REFERENCES routing_rules(rule_id),
    routing_decision_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    transfer_start_time TIMESTAMP,
    transfer_complete_time TIMESTAMP,
    transfer_status VARCHAR(20),
    error_message TEXT,
    retry_count INTEGER DEFAULT 0
);

CREATE INDEX idx_routing_history_study ON routing_history(study_instance_uid);
CREATE INDEX idx_routing_history_time ON routing_history(routing_decision_time DESC);
CREATE INDEX idx_routing_history_status ON routing_history(transfer_status);

-- Study assignments
CREATE TABLE study_assignments (
    assignment_id BIGSERIAL PRIMARY KEY,
    study_instance_uid VARCHAR(200) NOT NULL,
    radiologist_id INTEGER REFERENCES radiologists(radiologist_id),
    assigned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    opened_at TIMESTAMP,
    completed_at TIMESTAMP,
    report_status VARCHAR(20) DEFAULT 'pending',
    turnaround_minutes INTEGER,
    reassignment_count INTEGER DEFAULT 0
);

CREATE INDEX idx_assignments_study ON study_assignments(study_instance_uid);
CREATE INDEX idx_assignments_radiologist ON study_assignments(radiologist_id);
CREATE INDEX idx_assignments_status ON study_assignments(report_status);

-- Workload tracking
CREATE TABLE workload_metrics (
    metric_id BIGSERIAL PRIMARY KEY,
    radiologist_id INTEGER REFERENCES radiologists(radiologist_id),
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    studies_pending INTEGER,
    studies_in_progress INTEGER,
    studies_completed_last_hour INTEGER,
    average_turnaround_minutes DECIMAL(10,2)
);

CREATE INDEX idx_workload_radiologist ON workload_metrics(radiologist_id);
CREATE INDEX idx_workload_timestamp ON workload_metrics(timestamp DESC);

JustCopy.ai automatically generates this complete database schema with proper indexing, foreign keys, and partitioning strategies for high-volume environments.

DICOM Router Core Implementation

The DICOM router receives studies and applies routing rules to determine destinations:

# Intelligent DICOM Router
# Receives studies from modalities and routes to appropriate reading sites
# Built with JustCopy.ai's backend agent

from pynetdicom import AE, evt, StoragePresentationContexts
from pynetdicom.sop_class import VerificationSOPClass
import pydicom
from datetime import datetime, time
import asyncio
import re

class IntelligentDICOMRouter:
    def __init__(self, db_connection):
        self.db = db_connection
        self.ae = AE('DICOM_ROUTER')

        # Support all storage SOP classes
        self.ae.supported_contexts = StoragePresentationContexts
        self.ae.add_supported_context(VerificationSOPClass)

        # Load routing rules into memory for fast lookup
        self.routing_rules = self._load_routing_rules()

        # Connection pool for destinations
        self.connection_pool = {}

    def start(self, port=11112):
        """Start DICOM router service"""
        handlers = [
            (evt.EVT_C_STORE, self.handle_store),
            (evt.EVT_C_ECHO, self.handle_echo)
        ]

        self.ae.start_server(('0.0.0.0', port), evt_handlers=handlers)
        print(f"DICOM Router running on port {port}")

        # Start background tasks
        asyncio.create_task(self._refresh_routing_rules())
        asyncio.create_task(self._monitor_destinations())

    def handle_echo(self, event):
        """Handle C-ECHO (verification) requests"""
        return 0x0000  # Success

    def handle_store(self, event):
        """
        Handle incoming C-STORE and route to appropriate destination(s)
        """
        try:
            # Get DICOM dataset
            dataset = event.dataset
            dataset.file_meta = event.file_meta

            # Extract routing metadata
            routing_context = self._extract_routing_context(dataset)

            # Determine routing destinations
            destinations = self._determine_destinations(routing_context)

            if not destinations:
                print(f"No routing destination found for study: {routing_context['study_uid']}")
                # Store in holding area for manual routing
                await self._store_unrouted_study(dataset, routing_context)
                return 0x0000

            # Route to all matched destinations
            for destination in destinations:
                asyncio.create_task(
                    self._route_study(dataset, destination, routing_context)
                )

            print(f"Routed study to {len(destinations)} destination(s)")
            return 0x0000  # Success

        except Exception as e:
            print(f"Router error: {str(e)}")
            return 0xA700  # Failure

    def _extract_routing_context(self, dataset):
        """
        Extract all metadata needed for routing decisions
        """
        context = {
            'study_uid': dataset.StudyInstanceUID,
            'accession_number': dataset.get('AccessionNumber', ''),
            'modality': dataset.Modality,
            'body_part': dataset.get('BodyPartExamined', '').upper(),
            'study_description': dataset.get('StudyDescription', '').upper(),
            'institution': dataset.get('InstitutionName', ''),
            'referring_physician': dataset.get('ReferringPhysicianName', ''),
            'priority': dataset.get('RequestedProcedurePriority', 'ROUTINE').upper(),
            'patient_mrn': dataset.PatientID,
            'study_date': dataset.get('StudyDate'),
            'study_time': dataset.get('StudyTime'),
            'receiving_ae': event.assoc.ae_title,
            'sending_ae': event.assoc.remote['ae_title']
        }

        return context

    def _determine_destinations(self, routing_context):
        """
        Apply routing rules to determine destination(s)
        Rules are evaluated in priority order
        """
        destinations = []

        # Get current time for time-based routing
        current_time = datetime.now().time()
        current_day = datetime.now().weekday() + 1  # Monday=1

        for rule in self.routing_rules:
            # Skip disabled rules
            if not rule['enabled']:
                continue

            # Check if rule matches
            if not self._rule_matches(rule, routing_context, current_time, current_day):
                continue

            # Rule matched - add destination
            destination = {
                'ae_title': rule['destination_ae_title'],
                'host': rule['destination_host'],
                'port': rule['destination_port'],
                'rule_id': rule['rule_id'],
                'rule_name': rule['rule_name'],
                'max_concurrent': rule['max_concurrent_transfers'],
                'retry_attempts': rule['retry_attempts'],
                'retry_delay': rule['retry_delay_seconds']
            }

            destinations.append(destination)

            # If this is a high-priority exclusive rule, stop processing
            if rule.get('exclusive', False):
                break

        return destinations

    def _rule_matches(self, rule, context, current_time, current_day):
        """
        Check if routing rule matches study context
        """
        # Modality check
        if rule['modality'] and rule['modality'] != context['modality']:
            return False

        # Body part check
        if rule['body_part'] and rule['body_part'] not in context['body_part']:
            return False

        # Study description pattern match
        if rule['study_description_pattern']:
            pattern = rule['study_description_pattern']
            if not re.search(pattern, context['study_description'], re.IGNORECASE):
                return False

        # Institution check
        if rule['institution_name'] and rule['institution_name'] != context['institution']:
            return False

        # Priority check
        if rule['exam_priority'] and rule['exam_priority'] != context['priority']:
            return False

        # Time-based routing check
        if rule['time_based_routing']:
            # Check day of week
            if rule['active_days_of_week'] and current_day not in rule['active_days_of_week']:
                return False

            # Check time of day
            if rule['active_start_time'] and rule['active_end_time']:
                if not (rule['active_start_time'] <= current_time <= rule['active_end_time']):
                    return False

        # All checks passed
        return True

    async def _route_study(self, dataset, destination, context):
        """
        Send study to routing destination via DICOM C-STORE
        """
        # Log routing decision
        routing_id = await self._log_routing_decision(destination, context)

        try:
            # Create association with destination
            ae = AE()
            ae.add_requested_context(dataset.SOPClassUID)

            # Get or create connection
            assoc = await self._get_connection(
                destination['host'],
                destination['port'],
                destination['ae_title']
            )

            if assoc and assoc.is_established:
                # Update transfer start time
                await self._update_routing_status(
                    routing_id, 'transferring',
                    transfer_start=datetime.utcnow()
                )

                # Send via C-STORE
                status = assoc.send_c_store(dataset)

                if status and status.Status == 0x0000:
                    # Success
                    await self._update_routing_status(
                        routing_id, 'completed',
                        transfer_complete=datetime.utcnow()
                    )

                    # Assign to radiologist
                    await self._assign_to_radiologist(context, destination)

                    print(f"Successfully routed to {destination['ae_title']}")
                else:
                    raise Exception(f"C-STORE failed with status: {status.Status}")
            else:
                raise Exception(f"Failed to establish association with {destination['ae_title']}")

        except Exception as e:
            print(f"Routing error: {str(e)}")

            # Update error status
            await self._update_routing_status(
                routing_id, 'failed',
                error_message=str(e)
            )

            # Retry if configured
            await self._handle_routing_failure(
                dataset, destination, context,
                routing_id, retry_count=1
            )

    async def _handle_routing_failure(self, dataset, destination, context,
                                      routing_id, retry_count):
        """
        Handle routing failures with retry logic
        """
        max_retries = destination['retry_attempts']

        if retry_count <= max_retries:
            # Wait before retry
            await asyncio.sleep(destination['retry_delay'])

            # Update retry count
            await self._increment_retry_count(routing_id)

            # Retry routing
            await self._route_study(dataset, destination, context)
        else:
            # Max retries exceeded - escalate
            await self._escalate_routing_failure(context, destination)

    async def _log_routing_decision(self, destination, context):
        """
        Log routing decision to database
        """
        query = """
            INSERT INTO routing_history (
                study_instance_uid, source_ae_title, destination_ae_title,
                rule_id, routing_decision_time, transfer_status
            )
            VALUES (%s, %s, %s, %s, %s, %s)
            RETURNING routing_id
        """

        result = self.db.execute(query, (
            context['study_uid'],
            context['sending_ae'],
            destination['ae_title'],
            destination['rule_id'],
            datetime.utcnow(),
            'pending'
        ))

        self.db.commit()
        return result.fetchone()[0]

    async def _update_routing_status(self, routing_id, status,
                                     transfer_start=None, transfer_complete=None,
                                     error_message=None):
        """
        Update routing status in database
        """
        updates = ['transfer_status = %s']
        params = [status]

        if transfer_start:
            updates.append('transfer_start_time = %s')
            params.append(transfer_start)

        if transfer_complete:
            updates.append('transfer_complete_time = %s')
            params.append(transfer_complete)

        if error_message:
            updates.append('error_message = %s')
            params.append(error_message)

        params.append(routing_id)

        query = f"""
            UPDATE routing_history
            SET {', '.join(updates)}
            WHERE routing_id = %s
        """

        self.db.execute(query, params)
        self.db.commit()

    async def _get_connection(self, host, port, ae_title):
        """
        Get or create association with destination
        """
        key = f"{host}:{port}:{ae_title}"

        # Check connection pool
        if key in self.connection_pool:
            assoc = self.connection_pool[key]
            if assoc.is_established:
                return assoc

        # Create new association
        ae = AE()
        ae.add_requested_context(StoragePresentationContexts)

        assoc = ae.associate(host, port, ae_title=ae_title)

        if assoc.is_established:
            self.connection_pool[key] = assoc

        return assoc

    async def _refresh_routing_rules(self):
        """
        Periodically reload routing rules from database
        """
        while True:
            try:
                self.routing_rules = self._load_routing_rules()
                await asyncio.sleep(300)  # Refresh every 5 minutes
            except Exception as e:
                print(f"Error refreshing routing rules: {str(e)}")
                await asyncio.sleep(60)

    def _load_routing_rules(self):
        """
        Load all active routing rules from database
        """
        query = """
            SELECT *
            FROM routing_rules
            WHERE enabled = TRUE
            ORDER BY priority DESC, rule_id ASC
        """

        result = self.db.execute(query)
        return [dict(row) for row in result.fetchall()]

JustCopy.ai generates this complete DICOM router with sophisticated rule matching, connection pooling, and error handling built in.

Workflow Orchestration and Load Balancing

Beyond routing images, the system must assign studies to radiologists and balance workload:

# Workflow Orchestration and Load Balancing
# Assigns studies to radiologists based on specialty, availability, and workload
# Built with JustCopy.ai's backend and AI agents

import numpy as np
from datetime import datetime, timedelta

class WorkflowOrchestrationEngine:
    def __init__(self, db_connection):
        self.db = db_connection

    async def assign_to_radiologist(self, study_context, destination):
        """
        Intelligently assign study to optimal radiologist
        """
        # Determine required specialty/subspecialty
        required_specialty = await self._determine_specialty(study_context)

        # Get available radiologists
        available_radiologists = await self._get_available_radiologists(
            specialty=required_specialty,
            reading_site=destination.get('reading_site')
        )

        if not available_radiologists:
            # No one available - add to unassigned queue
            await self._add_to_unassigned_queue(study_context, required_specialty)
            return None

        # Calculate load balancing scores
        radiologist_scores = []
        for radiologist in available_radiologists:
            score = await self._calculate_assignment_score(
                radiologist, study_context
            )
            radiologist_scores.append({
                'radiologist': radiologist,
                'score': score
            })

        # Sort by score (higher is better)
        radiologist_scores.sort(key=lambda x: x['score'], reverse=True)

        # Assign to highest scoring radiologist
        best_radiologist = radiologist_scores[0]['radiologist']

        # Create assignment
        assignment_id = await self._create_assignment(
            study_uid=study_context['study_uid'],
            radiologist_id=best_radiologist['radiologist_id']
        )

        # Update radiologist workload
        await self._update_workload(best_radiologist['radiologist_id'])

        # Send notification
        await self._notify_radiologist(best_radiologist, study_context)

        print(f"Assigned study to {best_radiologist['name']}")

        return assignment_id

    async def _determine_specialty(self, study_context):
        """
        Determine required radiology specialty based on exam
        """
        modality = study_context['modality']
        body_part = study_context['body_part']
        description = study_context['study_description']

        # Neuroradiology
        if any(term in body_part or term in description for term in
               ['HEAD', 'BRAIN', 'SPINE', 'NECK']):
            return 'neuroradiology'

        # Musculoskeletal
        if any(term in body_part for term in
               ['EXTREMITY', 'JOINT', 'BONE', 'KNEE', 'SHOULDER', 'HIP']):
            return 'musculoskeletal'

        # Cardiothoracic
        if any(term in body_part or term in description for term in
               ['CHEST', 'HEART', 'LUNG', 'CARDIAC']):
            return 'cardiothoracic'

        # Body imaging (abdomen/pelvis)
        if any(term in body_part for term in ['ABDOMEN', 'PELVIS']):
            return 'body_imaging'

        # Breast imaging
        if modality in ['MG', 'MAMMO'] or 'BREAST' in body_part:
            return 'breast_imaging'

        # Default to general radiology
        return 'general_radiology'

    async def _get_available_radiologists(self, specialty, reading_site=None):
        """
        Get radiologists available to read studies now
        """
        current_time = datetime.now().time()
        current_day = datetime.now().weekday() + 1

        query = """
            SELECT DISTINCT
                r.radiologist_id,
                r.name,
                r.specialty,
                r.subspecialty,
                r.max_studies_per_hour,
                r.max_concurrent_unread,
                r.current_status
            FROM radiologists r
            JOIN radiologist_schedules s ON r.radiologist_id = s.radiologist_id
            WHERE r.specialty = %s
              AND r.current_status = 'online'
              AND s.is_active = TRUE
              AND s.day_of_week = %s
              AND s.start_time <= %s
              AND s.end_time >= %s
        """

        params = [specialty, current_day, current_time, current_time]

        if reading_site:
            query += " AND r.reading_site = %s"
            params.append(reading_site)

        result = self.db.execute(query, params)
        return [dict(row) for row in result.fetchall()]

    async def _calculate_assignment_score(self, radiologist, study_context):
        """
        Calculate assignment score for load balancing
        Higher score = better match
        """
        score = 100.0

        # Get current workload
        workload = await self._get_current_workload(radiologist['radiologist_id'])

        # Workload penalty (more pending studies = lower score)
        workload_percentage = workload['studies_pending'] / radiologist['max_concurrent_unread']
        score -= (workload_percentage * 50)  # Up to -50 points

        # Subspecialty match bonus
        if study_context.get('subspecialty') == radiologist['subspecialty']:
            score += 20

        # Priority study bonus for senior radiologists
        if study_context['priority'] == 'STAT':
            if radiologist.get('seniority_years', 0) > 5:
                score += 15

        # Recent activity bonus (actively reading = higher score)
        last_activity = await self._get_last_activity(radiologist['radiologist_id'])
        if last_activity:
            minutes_since_activity = (datetime.now() - last_activity).total_seconds() / 60
            if minutes_since_activity < 5:
                score += 10

        # Performance bonus (faster readers get more studies)
        avg_turnaround = workload.get('average_turnaround_minutes', 30)
        if avg_turnaround < 20:
            score += 10
        elif avg_turnaround > 40:
            score -= 10

        return max(0, score)  # Never negative

    async def _get_current_workload(self, radiologist_id):
        """
        Get current workload metrics for radiologist
        """
        query = """
            SELECT
                COUNT(*) FILTER (WHERE report_status = 'pending') as studies_pending,
                COUNT(*) FILTER (WHERE report_status = 'in_progress') as studies_in_progress,
                COUNT(*) FILTER (
                    WHERE completed_at >= NOW() - INTERVAL '1 hour'
                ) as studies_completed_last_hour,
                AVG(turnaround_minutes) FILTER (
                    WHERE completed_at >= NOW() - INTERVAL '4 hours'
                ) as average_turnaround_minutes
            FROM study_assignments
            WHERE radiologist_id = %s
              AND assigned_at >= NOW() - INTERVAL '24 hours'
        """

        result = self.db.execute(query, (radiologist_id,))
        return dict(result.fetchone())

    async def _create_assignment(self, study_uid, radiologist_id):
        """
        Create study assignment record
        """
        query = """
            INSERT INTO study_assignments (
                study_instance_uid, radiologist_id, assigned_at, report_status
            )
            VALUES (%s, %s, %s, %s)
            RETURNING assignment_id
        """

        result = self.db.execute(query, (
            study_uid, radiologist_id, datetime.utcnow(), 'pending'
        ))

        self.db.commit()
        return result.fetchone()[0]

    async def _update_workload(self, radiologist_id):
        """
        Update workload metrics after assignment
        """
        workload = await self._get_current_workload(radiologist_id)

        query = """
            INSERT INTO workload_metrics (
                radiologist_id, studies_pending, studies_in_progress,
                studies_completed_last_hour, average_turnaround_minutes
            )
            VALUES (%s, %s, %s, %s, %s)
        """

        self.db.execute(query, (
            radiologist_id,
            workload['studies_pending'],
            workload['studies_in_progress'],
            workload['studies_completed_last_hour'],
            workload['average_turnaround_minutes']
        ))

        self.db.commit()

This orchestration engine ensures studies are distributed fairly while accounting for radiologist specialty, workload, and performance. JustCopy.ai generates production-ready workflow systems with sophisticated load balancing algorithms and real-time monitoring.

Priority Queue Management

STAT and urgent exams must bypass regular queues:

# Priority Queue Manager
# Ensures STAT exams reach radiologists immediately

class PriorityQueueManager:
    async def handle_stat_exam(self, study_context):
        """
        Immediate routing for STAT exams
        """
        # Find senior radiologist immediately available
        senior_radiologists = await self._get_senior_radiologists(
            specialty=study_context['specialty']
        )

        if not senior_radiologists:
            # Escalate - no senior radiologist available
            await self._escalate_stat_exam(study_context)
            return

        # Assign to least busy senior radiologist
        assignment = await self._assign_stat_exam(
            study_context, senior_radiologists
        )

        # Send urgent notification (SMS + email + phone call)
        await self._send_urgent_notification(assignment)

        # Monitor for acknowledgment
        await self._monitor_stat_acknowledgment(assignment)

    async def _monitor_stat_acknowledgment(self, assignment):
        """
        Ensure radiologist acknowledges STAT exam within 5 minutes
        """
        await asyncio.sleep(300)  # Wait 5 minutes

        # Check if acknowledged
        status = await self._get_assignment_status(assignment['assignment_id'])

        if status['report_status'] == 'pending':
            # Not acknowledged - escalate
            await self._escalate_unacknowledged_stat(assignment)

Monitoring and Analytics

The routing system must provide real-time visibility:

# Routing Performance Monitor
# Tracks routing efficiency and identifies bottlenecks

class RoutingPerformanceMonitor:
    async def generate_routing_report(self, time_period='24h'):
        """
        Generate comprehensive routing performance report
        """
        report = {}

        # Routing success rate
        report['success_rate'] = await self._calculate_success_rate(time_period)

        # Average routing time
        report['avg_routing_time'] = await self._calculate_avg_routing_time(time_period)

        # Failed routings by destination
        report['failures_by_destination'] = await self._get_failures_by_destination(time_period)

        # Workload distribution
        report['workload_distribution'] = await self._get_workload_distribution(time_period)

        # SLA compliance
        report['sla_compliance'] = await self._calculate_sla_compliance(time_period)

        return report

    async def _calculate_success_rate(self, time_period):
        """
        Calculate percentage of successful routings
        """
        query = """
            SELECT
                COUNT(*) FILTER (WHERE transfer_status = 'completed') * 100.0 / COUNT(*) as success_rate
            FROM routing_history
            WHERE routing_decision_time >= NOW() - INTERVAL %s
        """

        result = self.db.execute(query, (time_period,))
        return result.fetchone()['success_rate']

Implementation Timeline

8-Week Implementation:

  • Weeks 1-2: Database design, routing rule configuration
  • Weeks 3-4: DICOM router core implementation
  • Weeks 5-6: Workflow orchestration and load balancing
  • Week 7: Testing across all routing scenarios
  • Week 8: Production deployment and monitoring

Using JustCopy.ai, this timeline reduces to 3-4 weeks as the platform generates 75% of the codebase automatically.

ROI Calculation

Multi-Site Imaging Network (8 Facilities):

Benefits:

  • Eliminated manual routing: $185,000/year
  • Improved radiologist utilization: $340,000/year
  • Reduced STAT exam delays: $95,000/year
  • Better load balancing: $125,000/year
  • Total annual benefit: $745,000

3-Year ROI: 512%

JustCopy.ai makes sophisticated DICOM routing accessible to healthcare organizations, automatically generating production-ready routing engines, workflow orchestration, and monitoring systems that scale across complex imaging networks.

πŸš€

Build This with JustCopy.ai

Skip months of development with 10 specialized AI agents. JustCopy.ai can copy, customize, and deploy this application instantly. Our AI agents write code, run tests, handle deployment, and monitor your applicationβ€”all following healthcare industry best practices and HIPAA compliance standards.