Creating a Surveillance Task with CS9
This vignette demonstrates how to create and configure surveillance tasks using the CS9 framework. CS9 provides a structured approach to building real-time surveillance systems with database integration and automated pipeline execution.
Overview
Creating a surveillance task in CS9 involves three main steps:
- Define database tables with schemas that describe your data structure
-
Configure surveillance tasks that specify data processing workflows
- Implement task functions that perform the actual data analysis
Prerequisites
Before starting this tutorial, you need CS9 properly installed and configured. See vignette("installation")
for complete setup instructions including database configuration and environment variables.
You can verify your setup is working with:
# Check if CS9 environment is properly configured
cs9::check_environment_setup()
For an overview of CS9 concepts and architecture, see vignette("cs9")
.
Step 1: Initialize a Surveillance System
First, create a surveillance system instance that will manage your tables and tasks:
# Create a surveillance system
library(cs9)
# Initialize the surveillance system
ss <- cs9::SurveillanceSystem_v9$new(
name = "example_surveillance",
implementation_version = "1.0.0"
)
# Verify the system was created
print(ss$name)
print(ss$implementation_version)
Step 2: Define Database Tables
Before creating tasks, you need to define the database tables that will store your data. Tables in CS9 require:
- Field types: Column names and data types
- Keys: Columns that uniquely identify records
- Access level: Database access permissions
# Define a table for weather data
ss$add_table(
name_access = "anon", # Anonymous access level
name_grouping = "weather", # Data category
name_variant = "daily_data", # Specific variant
field_types = c(
# Temporal fields
"date" = "DATE",
"year" = "INTEGER",
"month" = "INTEGER",
"day" = "INTEGER",
# Geographic fields
"location_code" = "TEXT",
"location_name" = "TEXT",
# Weather measurements
"temperature_max" = "DOUBLE",
"temperature_min" = "DOUBLE",
"precipitation" = "DOUBLE",
"humidity" = "DOUBLE"
),
keys = c("date", "location_code"), # Unique identifier
indexes = list(
"idx_date" = "date",
"idx_location" = "location_code",
"idx_date_location" = c("date", "location_code")
)
)
# Define a table for processed results
ss$add_table(
name_access = "anon",
name_grouping = "weather",
name_variant = "weekly_summary",
field_types = c(
"year_week" = "TEXT",
"location_code" = "TEXT",
"avg_temp_max" = "DOUBLE",
"avg_temp_min" = "DOUBLE",
"total_precipitation" = "DOUBLE",
"data_quality_score" = "DOUBLE"
),
keys = c("year_week", "location_code")
)
# Check which tables are available
print("Available tables:")
print(names(ss$tables))
Step 3: Implement Task Functions
CS9 tasks require two main functions:
- Data selector function: Extracts and prepares data for analysis
- Action function: Performs the core analysis and stores results
Data Selector Function
The data selector function retrieves data needed for analysis:
# Data selector function for weather processing
weather_data_selector <- function(argset, tables) {
# In a real implementation, this would query your database
# For demonstration, we'll create sample data
sample_data <- data.table::data.table(
date = seq.Date(
from = as.Date(argset$date_from),
to = as.Date(argset$date_to),
by = "day"
),
location_code = rep(argset$location_code,
length.out = as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1),
temperature_max = rnorm(as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1,
mean = 20, sd = 8),
temperature_min = rnorm(as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1,
mean = 10, sd = 5),
precipitation = rgamma(as.numeric(as.Date(argset$date_to) - as.Date(argset$date_from)) + 1,
shape = 2, rate = 4)
)
# Add derived fields
sample_data[, year := as.integer(format(date, "%Y"))]
sample_data[, month := as.integer(format(date, "%m"))]
sample_data[, day := as.integer(format(date, "%d"))]
sample_data[, location_name := paste("Location", location_code)]
sample_data[, humidity := runif(.N, 30, 90)]
return(list(
data = sample_data
))
}
Action Function
The action function performs the main analysis:
# Action function for weather processing
weather_action <- function(data, argset, tables) {
# Process the daily weather data into weekly summaries
# Add week information
data$data[, year_week := format(date, "%Y-W%U")]
# Calculate weekly aggregates
weekly_summary <- data$data[, .(
avg_temp_max = mean(temperature_max, na.rm = TRUE),
avg_temp_min = mean(temperature_min, na.rm = TRUE),
total_precipitation = sum(precipitation, na.rm = TRUE),
data_quality_score = 1.0 - sum(is.na(temperature_max) | is.na(temperature_min)) / .N
), by = .(year_week, location_code)]
# Insert results into database table
# In a real implementation, this would use:
# tables$anon_weather_weekly_summary$upsert_data(weekly_summary)
# For demonstration, just print results
cat("Processed weekly weather summary:\n")
print(weekly_summary)
# Log successful completion
cs9::update_config_log(
ss = argset$surveillance_system,
task = "weather_process_weekly",
paste("Successfully processed", nrow(weekly_summary), "weekly records for", argset$location_code)
)
}
Step 4: Configure the Surveillance Task
Now configure the task with its execution parameters:
# Add the weather processing task
ss$add_task(
name_grouping = "weather",
name_action = "process",
name_variant = "weekly_summary",
cores = 1, # Single core execution
permission = NULL, # No special permissions needed
# Task structure - one plan per location per month
for_each_plan = list(
location_code = c("LOC001", "LOC002", "LOC003"),
date_from = c("2024-01-01", "2024-02-01", "2024-03-01"),
date_to = c("2024-01-31", "2024-02-28", "2024-03-31")
),
# No analysis-level iteration needed
for_each_analysis = NULL,
# Common arguments for all plans
universal_argset = list(
surveillance_system = "example_surveillance",
data_format = "cs9_standard"
),
# Automatically insert results at end of each plan
upsert_at_end_of_each_plan = TRUE,
insert_at_end_of_each_plan = FALSE,
# Function names
action_fn_name = "weather_action",
data_selector_fn_name = "weather_data_selector",
# Table mapping
tables = list(
weather_daily = "anon_weather_daily_data",
weather_weekly = "anon_weather_weekly_summary"
)
)
# Verify the task was added
print("Available tasks:")
print(names(ss$tasks))
Step 5: Execute the Task
Once configured, you can run the surveillance task:
# Get task information
task_name <- "weather_process_weekly_summary"
# View task plans and analyses
if(task_name %in% names(ss$tasks)) {
plans_info <- ss$shortcut_get_plans_argsets_as_dt(task_name)
cat("Task execution plans:\n")
print(plans_info)
# Run the task
cat("\nExecuting surveillance task...\n")
ss$run_task(task_name)
# Check task completion status
task_stats <- cs9::get_config_tasks_stats(task = task_name, last_run = TRUE)
if(nrow(task_stats) > 0) {
cat("\nTask execution completed successfully!\n")
print(task_stats[, .(task, datetime, status)])
}
}
Step 6: Monitor and Debug Tasks
CS9 provides several utilities for monitoring task execution:
# Get all available tables
cat("All surveillance system tables:\n")
print(names(ss$tables))
# Get task execution logs
recent_logs <- cs9::get_config_log(
task = "weather_process_weekly",
start_date = Sys.Date() - 7
)
if(nrow(recent_logs) > 0) {
cat("\nRecent task logs:\n")
print(recent_logs[, .(datetime, task, message)])
}
# Get task performance statistics
task_performance <- cs9::get_config_tasks_stats(last_run = TRUE)
if(nrow(task_performance) > 0) {
cat("\nTask performance summary:\n")
print(task_performance[, .(task, datetime, runtime_seconds, status)])
}
# Access data for a specific plan (for debugging)
if(task_name %in% names(ss$tasks)) {
# Get data for first plan
debug_data <- ss$shortcut_get_data(task_name, index_plan = 1)
cat("\nData structure for plan 1:\n")
if(!is.null(debug_data$data)) {
print(str(debug_data$data))
}
# Get arguments for first plan, first analysis
debug_args <- ss$shortcut_get_argset(task_name, index_plan = 1, index_analysis = 1)
cat("\nArgument set for plan 1, analysis 1:\n")
print(debug_args)
}
Advanced Task Features
Parallel Processing
For computationally intensive tasks, you can enable parallel processing:
# Configure a task for parallel execution
ss$add_task(
name_grouping = "weather",
name_action = "analyze",
name_variant = "trends",
cores = 4, # Use 4 CPU cores
# Large number of plans that can benefit from parallelization
for_each_plan = list(
location_code = sprintf("LOC%03d", 1:100), # 100 locations
analysis_year = rep(2020:2024, length.out = 100) # 5 years
),
universal_argset = list(
min_data_quality = 0.8,
trend_method = "linear_regression"
),
action_fn_name = "weather_trend_action",
data_selector_fn_name = "weather_trend_data_selector",
tables = list(
input_data = "anon_weather_daily_data",
trend_results = "anon_weather_trends"
)
)
Custom Plan Generation
For complex scenarios, you can use a function to generate plans dynamically:
# Function to generate plans based on database state
generate_weather_plans <- function() {
# In real implementation, query database for available data
available_locations <- c("LOC001", "LOC002", "LOC003")
available_years <- 2020:2024
# Generate plans for locations with sufficient data
plans <- list()
for(location in available_locations) {
for(year in available_years) {
plans <- append(plans, list(list(
location_code = location,
year = year,
min_date = paste0(year, "-01-01"),
max_date = paste0(year, "-12-31")
)))
}
}
return(list(
for_each_plan = plans,
for_each_analysis = NULL
))
}
# Use the custom plan generator
ss$add_task(
name_grouping = "weather",
name_action = "validate",
name_variant = "data_quality",
cores = 2,
# Use function to generate plans
plan_analysis_fn_name = "generate_weather_plans",
for_each_plan = NULL, # Will be generated by function
for_each_analysis = NULL,
universal_argset = list(
quality_threshold = 0.85,
validation_rules = c("completeness", "consistency", "accuracy")
),
action_fn_name = "weather_validation_action",
data_selector_fn_name = "weather_validation_data_selector",
tables = list(
source_data = "anon_weather_daily_data",
validation_results = "anon_weather_validation"
)
)
Best Practices
1. Error Handling
Always include robust error handling in your task functions:
robust_weather_action <- function(data, argset, tables) {
tryCatch({
# Validate input data
if(is.null(data$data) || nrow(data$data) == 0) {
warning("No data available for processing")
return(invisible(NULL))
}
# Check required columns
required_cols <- c("date", "location_code", "temperature_max")
missing_cols <- setdiff(required_cols, names(data$data))
if(length(missing_cols) > 0) {
stop("Missing required columns: ", paste(missing_cols, collapse = ", "))
}
# Perform analysis with validation
result <- data$data[, .(
avg_temp = mean(temperature_max, na.rm = TRUE),
record_count = .N
), by = location_code]
# Validate results before storing
if(any(is.na(result$avg_temp))) {
warning("Some temperature averages could not be calculated")
}
# Log successful completion
cs9::update_config_log(
ss = argset$surveillance_system,
task = "weather_analysis",
paste("Successfully processed", nrow(result), "location summaries")
)
}, error = function(e) {
# Log errors for debugging
cs9::update_config_log(
ss = argset$surveillance_system,
task = "weather_analysis",
paste("ERROR:", e$message)
)
stop(e)
})
}
2. Data Validation
Implement comprehensive data validation:
validate_weather_data <- function(data) {
validation_results <- list()
# Check data completeness
completeness <- data[, lapply(.SD, function(x) sum(!is.na(x))/.N)]
validation_results$completeness <- completeness
# Check data ranges
temp_range_check <- data[temperature_max < -50 | temperature_max > 60, .N]
validation_results$temperature_outliers <- temp_range_check
# Check temporal consistency
date_gaps <- data[order(date), .(
max_gap = max(as.numeric(diff(date)), na.rm = TRUE)
), by = location_code]
validation_results$temporal_gaps <- date_gaps
return(validation_results)
}
3. Modular Design
Structure your task functions for reusability:
# Utility function for common data transformations
standardize_weather_data <- function(data) {
# Apply common transformations
data[, temperature_celsius := round(temperature_max, 1)]
data[, date_formatted := format(date, "%Y-%m-%d")]
data[, is_weekend := weekdays(date) %in% c("Saturday", "Sunday")]
return(data)
}
# Reusable data selector template
create_weather_data_selector <- function(date_column = "date",
location_column = "location_code") {
function(argset, tables) {
# Common data selection logic
query_data <- tables[[argset$source_table]]$tbl() %>%
dplyr::filter(
!!rlang::sym(date_column) >= argset$date_from,
!!rlang::sym(date_column) <= argset$date_to,
!!rlang::sym(location_column) %in% argset$location_codes
) %>%
dplyr::collect() %>%
data.table::as.data.table()
# Apply standardization
standardized_data <- standardize_weather_data(query_data)
return(list(data = standardized_data))
}
}
Summary
This vignette covered the complete workflow for creating surveillance tasks in CS9:
- System Setup: Initialize surveillance system and check environment
- Table Definition: Define database schemas for storing data
-
Function Implementation: Create data selector and action functions
- Task Configuration: Set up task parameters and execution structure
- Task Execution: Run tasks and monitor performance
- Advanced Features: Parallel processing and custom plan generation
The CS9 framework provides a robust foundation for building scalable surveillance systems with automated pipeline execution, comprehensive logging, and flexible task orchestration.
Related Documentation
For more information about CS9:
-
vignette("cs9")
- Package overview and architecture concepts -
vignette("installation")
- Complete installation and setup guide
-
vignette("file-layout")
- Package structure and file organization patterns -
?SurveillanceSystem_v9
- Main surveillance system class documentation -
?check_environment_setup
- Environment configuration diagnostics