Internal Python Reference

This section documents the internal Python modules for developers contributing to the Huntsman codebase.

Views

Views for the Huntsman API.

class api.views.DashboardView(**kwargs)[source]

Bases: TemplateView

A view to render the main dashboard.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

template_name = 'api/dashboard.html'
get_context_data(**kwargs)[source]

Get the context data for rendering the dashboard.

This method loads IOC patterns and adds them to the context.

Parameters:

**kwargs (Any) – Arbitrary keyword arguments.

Returns:

The context data for the template.

Return type:

dict

dispatch(request, *args, **kwargs)
class api.views.AnalysisTriggerView(**kwargs)[source]

Bases: APIView

An API view to trigger a single analysis task.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

post(request, *args, **kwargs)[source]

Handle POST requests to trigger an analysis task.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object.

Return type:

Response

class api.views.BulkAnalysisTriggerView(**kwargs)[source]

Bases: APIView

An API view to trigger multiple analysis tasks in bulk.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

post(request, *args, **kwargs)[source]

Handle POST requests to trigger bulk analysis tasks.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object.

Return type:

Response

class api.views.TaskStatusView(**kwargs)[source]

Bases: RetrieveAPIView

An API view to retrieve the status of a single analysis task.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

queryset
serializer_class

alias of AnalysisTaskSerializer

lookup_field = 'id'
class api.views.TaskListView(**kwargs)[source]

Bases: ListAPIView

An API view to list all analysis tasks, with optional filtering by status.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

queryset
serializer_class

alias of AnalysisTaskSerializer

get_queryset()[source]

Get the queryset for listing tasks.

This method filters the queryset based on the ‘status’ query parameter.

Returns:

The filtered queryset of AnalysisTask objects.

Return type:

QuerySet

class api.views.SuperDBQueryView(**kwargs)[source]

Bases: APIView

An API view to execute a query on SuperDB.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

post(request, *args, **kwargs)[source]

Handle POST requests to execute a SuperDB query.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object.

Return type:

Response

class api.views.PredefinedQueriesListView(**kwargs)[source]

Bases: APIView

An API view to list all predefined SuperDB queries.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

get(request, format=None)[source]

Handle GET requests to list predefined queries.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • format (Any, optional) – The format of the response.

Returns:

A DRF Response object containing the predefined queries.

Return type:

Response

class api.views.CreatePoolView(**kwargs)[source]

Bases: APIView

An API view to create a new pool in SuperDB.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

post(request, *args, **kwargs)[source]

Handle POST requests to create a SuperDB pool.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object.

Return type:

Response

class api.views.LoadDataToBranchView(**kwargs)[source]

Bases: APIView

An API view to load data into a branch of a SuperDB pool.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

post(request, *args, **kwargs)[source]

Handle POST requests to load data into a SuperDB branch.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object.

Return type:

Response

class api.views.ServiceListView(**kwargs)[source]

Bases: APIView

An API view to list all available services and their supported types.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

get(request, format=None)[source]

Handle GET requests to list available services.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • format (Any, optional) – The format of the response.

Returns:

A DRF Response object containing the list of services.

Return type:

Response

class api.views.HealthCheckView(**kwargs)[source]

Bases: APIView

An API view to perform health checks on the system components.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

get(request, *args, **kwargs)[source]

Handle GET requests to perform health checks.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object with the health check results.

Return type:

Response

check_database()[source]

Check the status of the database connection.

Returns:

A dictionary with the status and a message.

Return type:

dict

check_celery()[source]

Check the status of the Celery workers.

Returns:

A dictionary with the status and a message.

Return type:

dict

class api.views.BulkTaskStatusView(**kwargs)[source]

Bases: APIView

An API view to retrieve the status of multiple tasks in bulk.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

post(request, *args, **kwargs)[source]

Handle POST requests to get the status of bulk tasks.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object with the status of the requested tasks.

Return type:

Response

class api.views.CorrelationEngineView(**kwargs)[source]

Bases: APIView

An API view to interact with the Correlation Engine.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

post(request, *args, **kwargs)[source]

Handle POST requests to run the correlation engine.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object with the correlation analysis results.

Return type:

Response

get(request, *args, **kwargs)[source]

Handle GET requests to list correlation rules and tags.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object with the list of rules and tags.

Return type:

Response

class api.views.STIXReportView(**kwargs)[source]

Bases: APIView

An API view to create a STIX report.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

post(request, *args, **kwargs)[source]

Handle POST requests to create a STIX report.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object.

Return type:

Response

class api.views.BulkSTIXReportView(**kwargs)[source]

Bases: APIView

An API view to create a bulk STIX report from multiple tasks.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

post(request, *args, **kwargs)[source]

Handle POST requests to create a bulk STIX report.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object.

Return type:

Response

class api.views.AIAnalysisView(**kwargs)[source]

Bases: APIView

An API view to perform AI analysis on a given dataset.

Constructor. Called in the URLconf; can contain helpful extra keyword arguments, and other things.

post(request, *args, **kwargs)[source]

Handle POST requests to perform AI analysis.

Parameters:
  • request (HttpRequest) – The HTTP request object.

  • *args (Any) – Variable length argument list.

  • **kwargs (Any) – Arbitrary keyword arguments.

Returns:

A DRF Response object.

Return type:

Response

Serializers

Serializers for the Huntsman API.

class api.serializers.AnalysisTaskSerializer(*args, **kwargs)[source]

Bases: ModelSerializer

Serializer for the AnalysisTask model.

Serializes AnalysisTask objects, including a method to fetch the full results of a task, potentially from cache or by querying SuperDB.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

class Meta[source]

Bases: object

Meta options for the AnalysisTaskSerializer.

model

alias of AnalysisTask

fields = ['id', 'status', 'service_name', 'identifier', 'identifier_type', 'full_result', 'created_at', 'completed_at']
read_only_fields = ['id', 'status', 'service_name', 'identifier', 'identifier_type', 'full_result', 'created_at', 'completed_at']
get_full_result(obj)[source]

Get the full result for an AnalysisTask.

If the task was successful and the result is stored in SuperDB, this method will query SuperDB for the full result. The result is cached for an hour to reduce redundant queries.

Parameters:

obj (AnalysisTask) – The AnalysisTask instance.

Returns:

The full result of the task.

Return type:

dict

class api.serializers.SuperDBQuerySerializer(*args, **kwargs)[source]

Bases: Serializer

Serializer for SuperDB queries.

Validates and serializes the components of a SuperDB query, which can be either a raw query string or a structured query with a pool and commands.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

class api.serializers.TaskSubmissionSerializer(*args, **kwargs)[source]

Bases: Serializer

Serializer for submitting a single analysis task.

Validates and serializes the data required to submit a new analysis task.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

class api.serializers.BulkTaskSubmissionSerializer(*args, **kwargs)[source]

Bases: Serializer

Serializer for submitting multiple analysis tasks in a single request.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

class api.serializers.BulkTaskStatusRequestSerializer(*args, **kwargs)[source]

Bases: Serializer

Serializer for requesting the status of multiple analysis tasks.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

class api.serializers.LoadDataToBranchSerializer(*args, **kwargs)[source]

Bases: Serializer

Serializer for loading data into a SuperDB branch.

Validates and serializes the data required to load data into a specific branch of a SuperDB pool.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

validate(data)[source]

Validate the data for loading into a branch.

Parameters:

data (dict) – The data to be validated.

Returns:

The validated data.

Return type:

dict

Raises:

serializers.ValidationError – If the ‘data’ field is not provided.

class api.serializers.CreatePoolSerializer(*args, **kwargs)[source]

Bases: Serializer

Serializer for creating a new SuperDB pool.

Validates and serializes the data required to create a new SuperDB pool.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

class api.serializers.CorrelationEngineSerializer(*args, **kwargs)[source]

Bases: Serializer

Serializer for the Correlation Engine.

Validates and serializes the parameters for running the correlation engine on a specific task.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

class api.serializers.STIXReportSerializer(*args, **kwargs)[source]

Bases: Serializer

Serializer for generating a STIX report from a task.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

class api.serializers.STIXReportItemSerializer(*args, **kwargs)[source]

Bases: Serializer

Serializer for a single item in a bulk STIX report request.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

class api.serializers.BulkSTIXReportSerializer(*args, **kwargs)[source]

Bases: Serializer

Serializer for generating multiple STIX reports in a single request.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

class api.serializers.AIAnalysisSerializer(*args, **kwargs)[source]

Bases: Serializer

Serializer for AI analysis requests.

Validates and serializes the data and prompts for an AI analysis request.

When a field is instantiated, we store the arguments that were used, so that we can present a helpful representation of the object.

Models

Django models for the Huntsman API.

class api.models.AnalysisTask(*args, **kwargs)[source]

Bases: Model

Represents a single analysis task and its state.

class Status(*values)[source]

Bases: TextChoices

Status choices for an AnalysisTask.

PENDING = 'PENDING'
IN_PROGRESS = 'IN_PROGRESS'
SUCCESS = 'SUCCESS'
FAILURE = 'FAILURE'
id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

celery_task_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

status

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

service_name

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

identifier

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

identifier_type

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

result

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

created_at

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

completed_at

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

exception DoesNotExist

Bases: ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: MultipleObjectsReturned

get_next_by_created_at(*, field=<django.db.models.fields.DateTimeField: created_at>, is_next=True, **kwargs)
get_previous_by_created_at(*, field=<django.db.models.fields.DateTimeField: created_at>, is_next=False, **kwargs)
get_status_display(*, field=<django.db.models.fields.CharField: status>)
objects = <django.db.models.manager.Manager object>
class api.models.ConfigFile(*args, **kwargs)[source]

Bases: Model

Represents a generic configuration file (Recipes, Patterns).

name

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

path

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

content

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

updated_at

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

save(*args, **kwargs)[source]

Save the model instance and write the content to disk.

Parameters:
Return type:

None

exception DoesNotExist

Bases: ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: MultipleObjectsReturned

get_next_by_updated_at(*, field=<django.db.models.fields.DateTimeField: updated_at>, is_next=True, **kwargs)
get_previous_by_updated_at(*, field=<django.db.models.fields.DateTimeField: updated_at>, is_next=False, **kwargs)
id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

objects = <django.db.models.manager.Manager object>
api.models.clear_config_cache(sender, instance, **kwargs)[source]

Clear the configuration cache when a ConfigFile is saved.

Parameters:
Return type:

None

class api.models.Rule(*args, **kwargs)[source]

Bases: Model

Represents a detection Rule.

Uses ‘rule_id’ (from YAML) as the unique identifier for synchronization.

id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

uuid

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

rule_id

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

name

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

title

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

author

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

description

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

content

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

updated_at

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

save(*args, **kwargs)[source]

Save the model instance and write the content to disk.

Parameters:
Return type:

None

exception DoesNotExist

Bases: ObjectDoesNotExist

exception MultipleObjectsReturned

Bases: MultipleObjectsReturned

get_next_by_updated_at(*, field=<django.db.models.fields.DateTimeField: updated_at>, is_next=True, **kwargs)
get_previous_by_updated_at(*, field=<django.db.models.fields.DateTimeField: updated_at>, is_next=False, **kwargs)
objects = <django.db.models.manager.Manager object>

Tasks

Celery tasks for the Huntsman API.

Correlation Engine

The correlation engine for Huntsman.

class api.correlation_engine.CorrelationRule(rule_data, file_path)[source]

Bases: object

Represents a single correlation rule.

id

The unique identifier of the rule.

Type:

str

title

The title of the rule.

Type:

str

description

A description of what the rule does.

Type:

str

syntax

The query syntax for the rule.

Type:

str

author

The author of the rule.

Type:

str

source

The source of the rule.

Type:

str

tags

A list of tags associated with the rule.

Type:

List[str]

file_path

The file path where the rule is defined.

Type:

str

Initialize a CorrelationRule object.

Parameters:
  • rule_data (dict) – The dictionary containing the rule data.

  • file_path (str) – The file path where the rule is defined.

Raises:

ValueError – If a required field is missing in the rule data.

execute_query(task_id, superdb_client)[source]

Execute the rule’s query.

Parameters:
  • task_id (str) – The ID of the task to correlate.

  • superdb_client (SuperDBClient) – The SuperDB client to use for executing the query.

Returns:

A dictionary containing the results of the query execution.

Return type:

dict

class api.correlation_engine.CorrelationEngine(*args, **kwargs)[source]

Bases: object

Main correlation engine for loading and executing rules.

rules_directory_path

The path to the directory containing the correlation rules.

Type:

str

rules

A list of loaded correlation rules.

Type:

List[CorrelationRule]

superdb_client

The SuperDB client used for executing queries.

Type:

SuperDBClient

Initialize a CorrelationEngine object.

Parameters:

rules_directory_path (str, optional) – The path to the directory containing the correlation rules. If not provided, it will be loaded from the Django settings.

load_rules()[source]

Load correlation rules from the specified directory.

Raises:

FileNotFoundError – If the rules directory is not found or is not a directory.

Return type:

None

get_rules(rule_titles=None, tags_filter=None)[source]

Get a filtered list of correlation rules.

Parameters:
  • rule_titles (list of str, optional) – A list of rule titles to filter by.

  • tags_filter (list of str, optional) – A list of tags to filter by.

Returns:

A list of correlation rules that match the filter criteria.

Return type:

list of CorrelationRule

run_correlation_analysis(task_id, service_name=None, rule_titles=None, tags_filter=None)[source]

Run correlation analysis for a specific task ID.

Parameters:
  • task_id (str) – The ID of the task to analyze.

  • service_name (str, optional) – The name of the service to run the rules against.

  • rule_titles (list of str, optional) – A list of rule titles to run. If not provided, all rules will be run.

  • tags_filter (list of str, optional) – A list of tags to filter the rules to be run.

Returns:

A dictionary containing the results of the correlation analysis.

Return type:

dict

api.correlation_engine.create_correlation_engine_instance()[source]

Create a new instance of the CorrelationEngine.

Returns:

A new instance of the CorrelationEngine.

Return type:

CorrelationEngine

Configuration

Configuration loading for the Huntsman API.

api.config.refresh_configuration()[source]

Clear the cache for the YAML configuration loader.

This function clears the lru_cache for the _load_yaml_config function, forcing it to re-read the configuration files from disk the next time they are accessed.

Return type:

None

api.config.load_api_recipes()[source]

Load the API recipes from the YAML configuration file.

Returns:

The API recipes.

Return type:

dict

api.config.load_scraping_recipes()[source]

Load the scraping recipes from the YAML configuration file.

Returns:

The scraping recipes.

Return type:

dict

api.config.load_internal_services_recipes()[source]

Load the internal services recipes from the YAML configuration file.

Returns:

The internal services recipes.

Return type:

dict

api.config.load_rss_recipes()[source]

Load the RSS recipes from the YAML configuration file.

Returns:

The RSS recipes.

Return type:

dict

api.config.load_ioc_patterns()[source]

Load the IOC patterns from the YAML configuration file.

Returns:

The IOC patterns.

Return type:

dict

api.config.load_predefined_queries()[source]

Load the predefined queries from the YAML configuration file.

Returns:

The predefined queries.

Return type:

dict

api.config.load_all_recipes()[source]

Load all recipes from the YAML configuration files.

Returns:

A dictionary containing all the recipes.

Return type:

dict

URLs

URL patterns for the Huntsman API.