Skip to content

parsedmarc.kafkaclient 🔗

KafkaClient 🔗

KafkaClient(
    kafka_hosts: list[str],
    ssl: bool = False,
    username: str | None = None,
    password: str | None = None,
    ssl_context: SSLContext | None = None,
)
PARAMETER DESCRIPTION
kafka_hosts

A list of Kafka hostnames (with optional port numbers)

TYPE: list[str]

ssl

Use a SSL/TLS connection. This is implied True if username or password is supplied.

TYPE: bool DEFAULT: False

username

An optional username

TYPE: str | None DEFAULT: None

password

An optional password

TYPE: str | None DEFAULT: None

ssl_context

SSL context options

TYPE: SSLContext | None DEFAULT: None

Note

When using Azure Event Hubs, the username is literally $ConnectionString, and the password is the Azure Event Hub connection string.

generate_daterange staticmethod 🔗

generate_daterange(report)

Creates a date_range timestamp with format YYYY-MM-DD-T-HH:MM:SS based on begin and end dates for easier parsing in Kibana.

Move to utils to avoid duplication w/ elastic?

save_aggregate_reports_to_kafka 🔗

save_aggregate_reports_to_kafka(
    aggregate_reports: (
        AggregateReport | list[AggregateReport]
    ),
    aggregate_topic: str,
) -> None

Saves aggregate DMARC reports to Kafka

PARAMETER DESCRIPTION
aggregate_reports

Aggregate reports to save to Kafka

TYPE: AggregateReport | list[AggregateReport]

aggregate_topic

The name of the Kafka topic

TYPE: str

save_forensic_reports_to_kafka 🔗

save_forensic_reports_to_kafka(
    forensic_reports: ForensicReport | list[ForensicReport],
    forensic_topic: str,
) -> None

Saves forensic DMARC reports to Kafka, sends individual records (slices) since Kafka requires messages to be <= 1MB by default.

PARAMETER DESCRIPTION
forensic_reports

Forensic reports to save to Kafka

TYPE: ForensicReport | list[ForensicReport]

forensic_topic

The name of the Kafka topic

TYPE: str

strip_metadata staticmethod 🔗

strip_metadata(report)

Duplicates org_name, org_email and report_id into JSON root and removes report_metadata key to bring it more inline with Elastic output.

KafkaError 🔗

KafkaError(message: str | Exception)

Bases: RuntimeError

Raised when a Kafka error occurs