Skip to contents

Creating a Surveillance Task with CS9

This vignette demonstrates how to create and configure surveillance tasks using the CS9 framework. CS9 provides a structured approach to building real-time surveillance systems with database integration and automated pipeline execution.

Overview

Creating a surveillance task in CS9 involves three main steps:

  1. Define database tables with schemas that describe your data structure
  2. Configure surveillance tasks that specify data processing workflows
  3. Implement task functions that perform the actual data analysis

Prerequisites

Before starting this tutorial, you need CS9 properly installed and configured. See vignette("installation") for complete setup instructions including database configuration and environment variables.

You can verify your setup is working with:

# Check if CS9 environment is properly configured
cs9::check_environment_setup()

For an overview of CS9 concepts and architecture, see vignette("cs9").

Step 1: Initialize a Surveillance System

First, create a surveillance system instance that will manage your tables and tasks:

# Create a surveillance system
library(cs9)

# Initialize the surveillance system
ss <- cs9::SurveillanceSystem_v9$new(
  name = "example_surveillance",
  implementation_version = "1.0.0"
)

# Verify the system was created
print(ss$name)
print(ss$implementation_version)

Step 2: Define Database Tables

Before creating tasks, you need to define the database tables that will store your data. Tables in CS9 require:

  • Field types: Column names and data types
  • Keys: Columns that uniquely identify records
  • Access level: Database access permissions
# Define a table for weather data
ss$add_table(
  name_access = "anon",           # Anonymous access level
  name_grouping = "weather",      # Data category
  name_variant = "daily_data",    # Specific variant
  field_types = c(
    # Temporal fields
    "date" = "DATE",
    "year" = "INTEGER", 
    "month" = "INTEGER",
    "day" = "INTEGER",
    
    # Geographic fields
    "location_code" = "TEXT",
    "location_name" = "TEXT",
    
    # Weather measurements
    "temperature_max" = "DOUBLE",
    "temperature_min" = "DOUBLE", 
    "precipitation" = "DOUBLE",
    "humidity" = "DOUBLE"
  ),
  keys = c("date", "location_code"),  # Unique identifier
  indexes = list(
    "idx_date" = "date",
    "idx_location" = "location_code", 
    "idx_date_location" = c("date", "location_code")
  )
)

# Define a table for processed results
ss$add_table(
  name_access = "anon",
  name_grouping = "weather", 
  name_variant = "weekly_summary",
  field_types = c(
    "year_week" = "TEXT",
    "location_code" = "TEXT",
    "avg_temp_max" = "DOUBLE",
    "avg_temp_min" = "DOUBLE",
    "total_precipitation" = "DOUBLE",
    "data_quality_score" = "DOUBLE"
  ),
  keys = c("year_week", "location_code")
)

# Check which tables are available
print("Available tables:")
print(names(ss$tables))

Step 3: Implement Task Functions

CS9 tasks require two main functions:

  1. Data selector function: Extracts and prepares data for analysis
  2. Action function: Performs the core analysis and stores results

Data Selector Function

The data selector function retrieves data needed for analysis:

# Data selector function for weather processing
weather_data_selector <- function(argset, tables) {
  # In a real implementation, this would query your database
  # For demonstration, we'll create sample data
  
  sample_data <- data.table::data.table(
    date = seq.Date(
      from = as.Date(argset$date_from),
      to = as.Date(argset$date_to), 
      by = "day"
    ),
    location_code = rep(argset$location_code, 
                       length.out = as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1),
    temperature_max = rnorm(as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1, 
                           mean = 20, sd = 8),
    temperature_min = rnorm(as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1, 
                           mean = 10, sd = 5),
    precipitation = rgamma(as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1, 
                          shape = 2, rate = 4)
  )
  
  # Add derived fields
  sample_data[, year := as.integer(format(date, "%Y"))]
  sample_data[, month := as.integer(format(date, "%m"))]
  sample_data[, day := as.integer(format(date, "%d"))]
  sample_data[, location_name := paste("Location", location_code)]
  sample_data[, humidity := runif(.N, 30, 90)]
  
  return(list(
    data = sample_data
  ))
}

Action Function

The action function performs the main analysis:

# Action function for weather processing
weather_action <- function(data, argset, tables) {
  # Process the daily weather data into weekly summaries
  
  # Add week information
  data$data[, year_week := format(date, "%Y-W%U")]
  
  # Calculate weekly aggregates
  weekly_summary <- data$data[, .(
    avg_temp_max = mean(temperature_max, na.rm = TRUE),
    avg_temp_min = mean(temperature_min, na.rm = TRUE), 
    total_precipitation = sum(precipitation, na.rm = TRUE),
    data_quality_score = 1.0 - sum(is.na(temperature_max) | is.na(temperature_min)) / .N
  ), by = .(year_week, location_code)]
  
  # Insert results into database table
  # In a real implementation, this would use:
  # tables$anon_weather_weekly_summary$upsert_data(weekly_summary)
  
  # For demonstration, just print results
  cat("Processed weekly weather summary:\n")
  print(weekly_summary)
  
  # Log successful completion
  cs9::update_config_log(
    ss = argset$surveillance_system,
    task = "weather_process_weekly", 
    paste("Successfully processed", nrow(weekly_summary), "weekly records for", argset$location_code)
  )
}

Step 4: Configure the Surveillance Task

Now configure the task with its execution parameters:

# Add the weather processing task
ss$add_task(
  name_grouping = "weather",
  name_action = "process", 
  name_variant = "weekly_summary",
  cores = 1,                              # Single core execution
  permission = NULL,                      # No special permissions needed
  
  # Task structure - one plan per location per month
  for_each_plan = list(
    location_code = c("LOC001", "LOC002", "LOC003"),
    date_from = c("2024-01-01", "2024-02-01", "2024-03-01"),  
    date_to = c("2024-01-31", "2024-02-28", "2024-03-31")
  ),
  
  # No analysis-level iteration needed
  for_each_analysis = NULL,
  
  # Common arguments for all plans  
  universal_argset = list(
    surveillance_system = "example_surveillance",
    data_format = "cs9_standard"
  ),
  
  # Automatically insert results at end of each plan
  upsert_at_end_of_each_plan = TRUE,
  insert_at_end_of_each_plan = FALSE,
  
  # Function names
  action_fn_name = "weather_action",
  data_selector_fn_name = "weather_data_selector",
  
  # Table mapping
  tables = list(
    weather_daily = "anon_weather_daily_data", 
    weather_weekly = "anon_weather_weekly_summary"
  )
)

# Verify the task was added
print("Available tasks:")
print(names(ss$tasks))

Step 5: Execute the Task

Once configured, you can run the surveillance task:

# Get task information
task_name <- "weather_process_weekly_summary"

# View task plans and analyses
if(task_name %in% names(ss$tasks)) {
  plans_info <- ss$shortcut_get_plans_argsets_as_dt(task_name)
  cat("Task execution plans:\n")
  print(plans_info)
  
  # Run the task
  cat("\nExecuting surveillance task...\n")
  ss$run_task(task_name)
  
  # Check task completion status
  task_stats <- cs9::get_config_tasks_stats(task = task_name, last_run = TRUE)
  if(nrow(task_stats) > 0) {
    cat("\nTask execution completed successfully!\n")
    print(task_stats[, .(task, datetime, status)])
  }
}

Step 6: Monitor and Debug Tasks

CS9 provides several utilities for monitoring task execution:

# Get all available tables
cat("All surveillance system tables:\n")
print(names(ss$tables))

# Get task execution logs  
recent_logs <- cs9::get_config_log(
  task = "weather_process_weekly",
  start_date = Sys.Date() - 7
)
if(nrow(recent_logs) > 0) {
  cat("\nRecent task logs:\n") 
  print(recent_logs[, .(datetime, task, message)])
}

# Get task performance statistics
task_performance <- cs9::get_config_tasks_stats(last_run = TRUE)
if(nrow(task_performance) > 0) {
  cat("\nTask performance summary:\n")
  print(task_performance[, .(task, datetime, runtime_seconds, status)])
}

# Access data for a specific plan (for debugging)
if(task_name %in% names(ss$tasks)) {
  # Get data for first plan
  debug_data <- ss$shortcut_get_data(task_name, index_plan = 1)
  cat("\nData structure for plan 1:\n")
  if(!is.null(debug_data$data)) {
    print(str(debug_data$data))
  }
  
  # Get arguments for first plan, first analysis
  debug_args <- ss$shortcut_get_argset(task_name, index_plan = 1, index_analysis = 1)
  cat("\nArgument set for plan 1, analysis 1:\n")
  print(debug_args)
}

Advanced Task Features

Parallel Processing

For computationally intensive tasks, you can enable parallel processing:

# Configure a task for parallel execution
ss$add_task(
  name_grouping = "weather",
  name_action = "analyze",
  name_variant = "trends", 
  cores = 4,  # Use 4 CPU cores
  
  # Large number of plans that can benefit from parallelization
  for_each_plan = list(
    location_code = sprintf("LOC%03d", 1:100),  # 100 locations
    analysis_year = rep(2020:2024, length.out = 100)  # 5 years
  ),
  
  universal_argset = list(
    min_data_quality = 0.8,
    trend_method = "linear_regression"
  ),
  
  action_fn_name = "weather_trend_action",
  data_selector_fn_name = "weather_trend_data_selector",
  
  tables = list(
    input_data = "anon_weather_daily_data",
    trend_results = "anon_weather_trends"
  )
)

Custom Plan Generation

For complex scenarios, you can use a function to generate plans dynamically:

# Function to generate plans based on database state
generate_weather_plans <- function() {
  # In real implementation, query database for available data
  available_locations <- c("LOC001", "LOC002", "LOC003")
  available_years <- 2020:2024
  
  # Generate plans for locations with sufficient data
  plans <- list()
  for(location in available_locations) {
    for(year in available_years) {
      plans <- append(plans, list(list(
        location_code = location,
        year = year,
        min_date = paste0(year, "-01-01"),
        max_date = paste0(year, "-12-31")
      )))
    }
  }
  
  return(list(
    for_each_plan = plans,
    for_each_analysis = NULL
  ))
}

# Use the custom plan generator
ss$add_task(
  name_grouping = "weather",
  name_action = "validate", 
  name_variant = "data_quality",
  cores = 2,
  
  # Use function to generate plans
  plan_analysis_fn_name = "generate_weather_plans",
  for_each_plan = NULL,  # Will be generated by function
  for_each_analysis = NULL,
  
  universal_argset = list(
    quality_threshold = 0.85,
    validation_rules = c("completeness", "consistency", "accuracy")
  ),
  
  action_fn_name = "weather_validation_action",
  data_selector_fn_name = "weather_validation_data_selector", 
  
  tables = list(
    source_data = "anon_weather_daily_data",
    validation_results = "anon_weather_validation"
  )
)

Best Practices

1. Error Handling

Always include robust error handling in your task functions:

robust_weather_action <- function(data, argset, tables) {
  tryCatch({
    # Validate input data
    if(is.null(data$data) || nrow(data$data) == 0) {
      warning("No data available for processing")
      return(invisible(NULL))
    }
    
    # Check required columns
    required_cols <- c("date", "location_code", "temperature_max")
    missing_cols <- setdiff(required_cols, names(data$data))
    if(length(missing_cols) > 0) {
      stop("Missing required columns: ", paste(missing_cols, collapse = ", "))
    }
    
    # Perform analysis with validation
    result <- data$data[, .(
      avg_temp = mean(temperature_max, na.rm = TRUE),
      record_count = .N
    ), by = location_code]
    
    # Validate results before storing
    if(any(is.na(result$avg_temp))) {
      warning("Some temperature averages could not be calculated")
    }
    
    # Log successful completion
    cs9::update_config_log(
      ss = argset$surveillance_system,
      task = "weather_analysis",
      paste("Successfully processed", nrow(result), "location summaries")
    )
    
  }, error = function(e) {
    # Log errors for debugging
    cs9::update_config_log(
      ss = argset$surveillance_system, 
      task = "weather_analysis",
      paste("ERROR:", e$message)
    )
    stop(e)
  })
}

2. Data Validation

Implement comprehensive data validation:

validate_weather_data <- function(data) {
  validation_results <- list()
  
  # Check data completeness
  completeness <- data[, lapply(.SD, function(x) sum(!is.na(x))/.N)]
  validation_results$completeness <- completeness
  
  # Check data ranges
  temp_range_check <- data[temperature_max < -50 | temperature_max > 60, .N]
  validation_results$temperature_outliers <- temp_range_check
  
  # Check temporal consistency
  date_gaps <- data[order(date), .(
    max_gap = max(as.numeric(diff(date)), na.rm = TRUE)
  ), by = location_code]
  validation_results$temporal_gaps <- date_gaps
  
  return(validation_results)
}

3. Modular Design

Structure your task functions for reusability:

# Utility function for common data transformations
standardize_weather_data <- function(data) {
  # Apply common transformations
  data[, temperature_celsius := round(temperature_max, 1)]
  data[, date_formatted := format(date, "%Y-%m-%d")]
  data[, is_weekend := weekdays(date) %in% c("Saturday", "Sunday")]
  
  return(data)
}

# Reusable data selector template
create_weather_data_selector <- function(date_column = "date", 
                                       location_column = "location_code") {
  function(argset, tables) {
    # Common data selection logic
    query_data <- tables[[argset$source_table]]$tbl() %>%
      dplyr::filter(
        !!rlang::sym(date_column) >= argset$date_from,
        !!rlang::sym(date_column) <= argset$date_to,
        !!rlang::sym(location_column) %in% argset$location_codes
      ) %>%
      dplyr::collect() %>%
      data.table::as.data.table()
    
    # Apply standardization
    standardized_data <- standardize_weather_data(query_data)
    
    return(list(data = standardized_data))
  }
}

Summary

This vignette covered the complete workflow for creating surveillance tasks in CS9:

  1. System Setup: Initialize surveillance system and check environment
  2. Table Definition: Define database schemas for storing data
  3. Function Implementation: Create data selector and action functions
  4. Task Configuration: Set up task parameters and execution structure
  5. Task Execution: Run tasks and monitor performance
  6. Advanced Features: Parallel processing and custom plan generation

The CS9 framework provides a robust foundation for building scalable surveillance systems with automated pipeline execution, comprehensive logging, and flexible task orchestration.

For more information about CS9: