diff --git a/Cargo.toml b/Cargo.toml index a1a4071096c0c..707a91cd9ca00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -581,6 +581,7 @@ sources-logs = [ "sources-kubernetes_logs", "sources-logstash", "sources-nats", + "sources-okta", "sources-opentelemetry", "sources-pulsar", "sources-file_descriptor", @@ -637,6 +638,7 @@ sources-logstash = ["sources-utils-net-tcp", "tokio-util/net"] sources-mongodb_metrics = ["dep:mongodb"] sources-nats = ["dep:async-nats", "dep:nkeys"] sources-nginx_metrics = ["dep:nom"] +sources-okta = ["sources-utils-http-client"] sources-opentelemetry = ["dep:hex", "vector-lib/opentelemetry", "dep:prost", "dep:prost-types", "sources-http_server", "sources-utils-http", "sources-utils-http-headers", "sources-vector"] sources-postgresql_metrics = ["dep:postgres-openssl", "dep:tokio-postgres"] sources-prometheus = ["sources-prometheus-scrape", "sources-prometheus-remote-write", "sources-prometheus-pushgateway"] diff --git a/changelog.d/22968_okta_source.md b/changelog.d/22968_okta_source.md new file mode 100644 index 0000000000000..66a7e342c3ce1 --- /dev/null +++ b/changelog.d/22968_okta_source.md @@ -0,0 +1,3 @@ +this change adds an `okta` source for consuming the [Okta system log](https://developer.okta.com/docs/api/openapi/okta-management/management/tag/SystemLog/) + +authors: sonnens diff --git a/src/sources/mod.rs b/src/sources/mod.rs index 8a86d9d6343d0..8f6412eb053f2 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -62,6 +62,8 @@ pub mod mongodb_metrics; pub mod nats; #[cfg(feature = "sources-nginx_metrics")] pub mod nginx_metrics; +#[cfg(feature = "sources-okta")] +pub mod okta; #[cfg(feature = "sources-opentelemetry")] pub mod opentelemetry; #[cfg(feature = "sources-postgresql_metrics")] diff --git a/src/sources/okta/client.rs b/src/sources/okta/client.rs new file mode 100644 index 0000000000000..1df9e5415730c --- /dev/null +++ b/src/sources/okta/client.rs @@ -0,0 +1,402 @@ +use crate::internal_events::{ + HttpClientEventsReceived, HttpClientHttpError, HttpClientHttpResponseError, +}; +use crate::{ + codecs::{Decoder, DecodingConfig}, + config::{SourceConfig, SourceContext}, + sources, + sources::util::http_client::{default_interval, default_timeout, warn_if_interval_too_low}, + tls::TlsSettings, +}; +use bytes::BytesMut; +use chrono::Utc; +use futures::StreamExt as _; +use futures_util::{stream, FutureExt}; +use http::Uri; +use percent_encoding::utf8_percent_encode; +use serde_with::serde_as; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; +use tokio_stream::wrappers::IntervalStream; +use tokio_util::codec::Decoder as _; +use vector_lib::codecs::JsonDeserializerConfig; +use vector_lib::codecs::{ + decoding::{DeserializerConfig, FramingConfig}, + StreamDecodingError, +}; +use vector_lib::config::proxy::ProxyConfig; +use vector_lib::configurable::configurable_component; +use vector_lib::json_size::JsonSize; +use vector_lib::shutdown::ShutdownSignal; +use vector_lib::tls::TlsConfig; +use vector_lib::{ + config::{LogNamespace, SourceOutput}, + event::Event, + EstimatedJsonEncodedSizeOf, +}; + +use crate::{ + http::HttpClient, + internal_events::{EndpointBytesReceived, StreamClosedError}, + SourceSender, +}; + +use hyper::{Body, Request}; + +/// Configuration for the `okta` source. +#[serde_as] +#[configurable_component(source("okta", "Pull Okta system logs via the Okta API",))] +#[derive(Clone, Debug)] +pub struct OktaConfig { + /// The Okta subdomain to scrape + #[configurable(metadata(docs::examples = "foo.okta.com"))] + pub domain: String, + + /// API token for authentication + #[configurable(metadata(docs::examples = "00xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"))] + pub token: String, + + /// The interval between scrapes. Requests are run concurrently so if a scrape takes longer + /// than the interval a new scrape will be started. This can take extra resources, set the timeout + /// to a value lower than the scrape interval to prevent this from happening. + #[serde(default = "default_interval")] + #[serde_as(as = "serde_with::DurationSeconds")] + #[serde(rename = "scrape_interval_secs")] + #[configurable(metadata(docs::human_name = "Scrape Interval"))] + pub interval: Duration, + + /// The timeout for each scrape request. + #[serde(default = "default_timeout")] + #[serde_as(as = "serde_with:: DurationSecondsWithFrac")] + #[serde(rename = "scrape_timeout_secs")] + #[configurable(metadata(docs::human_name = "Scrape Timeout"))] + pub timeout: Duration, + + /// The time to look back for logs. This is used to determine the start time of the first request + /// (ie, the earliest log to fetch) + #[configurable(metadata(docs::human_name = "Since (seconds before now)"))] + pub since: Option, + + /// TLS configuration. + #[configurable(derived)] + pub tls: Option, + + /// The namespace to use for logs. This overrides the global setting. + #[configurable(metadata(docs::hidden))] + #[serde(default)] + pub log_namespace: Option, +} + +impl Default for OktaConfig { + fn default() -> Self { + Self { + domain: "".to_string(), + token: "".to_string(), + interval: default_interval(), + timeout: default_timeout(), + since: None, + tls: None, + log_namespace: None, + } + } +} + +impl_generate_config_from_default!(OktaConfig); + +fn find_rel_next_link(header: &str) -> Option { + for part in header.split(',') { + let relpart: Vec<_> = part.split(';').collect(); + if let Some(url) = relpart + .get(0) + .map(|s| s.trim().trim_matches(|c| c == '<' || c == '>')) + { + if part.contains("rel=\"next\"") { + return Some(url.to_string()); + } + } + } + None +} + +#[async_trait::async_trait] +#[typetag::serde(name = "okta")] +impl SourceConfig for OktaConfig { + async fn build(&self, cx: SourceContext) -> crate::Result { + let since = match self.since { + Some(since) => Utc::now() - Duration::from_secs(since), + _ => Utc::now(), + }; + + let path_and_query = format!( + "/api/v1/logs?since={}", + utf8_percent_encode(&since.to_rfc3339(), percent_encoding::NON_ALPHANUMERIC) + ); + + let mut url_parts = Uri::try_from(&self.domain) + .map_err(|_| { + format!( + "Invalid domain: {}. Must be a valid Okta subdomain.", + self.domain + ) + })? + .into_parts(); + + url_parts.path_and_query = Some(path_and_query.parse()?); + if url_parts.scheme.is_none() { + url_parts.scheme = Some(http::uri::Scheme::HTTPS); + } + + let url = Uri::from_parts(url_parts).map_err(|_| { + format!( + "Invalid domain: {}. Must be a valid Okta subdomain.", + self.domain + ) + })?; + + let tls = TlsSettings::from_options(self.tls.as_ref())?; + + let log_namespace = cx.log_namespace(self.log_namespace); + + warn_if_interval_too_low(self.timeout, self.interval); + + Ok(run( + url, + tls, + cx.proxy, + self.token.clone(), + self.interval, + self.timeout, + log_namespace, + cx.shutdown, + cx.out, + ) + .boxed()) + } + + fn outputs(&self, global_log_namespace: LogNamespace) -> Vec { + // There is a global and per-source `log_namespace` config. The source config overrides the global setting, + // and is merged here. + let log_namespace = global_log_namespace.merge(self.log_namespace); + + vec![SourceOutput::new_maybe_logs( + JsonDeserializerConfig::default().output_type(), + JsonDeserializerConfig::default().schema_definition(log_namespace.clone()), + )] + } + + fn can_acknowledge(&self) -> bool { + false + } +} + +fn enrich_events(events: &mut Vec, log_namespace: &LogNamespace) { + let now = Utc::now(); + for event in events { + log_namespace.insert_standard_vector_source_metadata( + event.as_mut_log(), + OktaConfig::NAME, + now, + ); + } +} + +/// Calls the Okta system logs API and sends the events to the output stream. +/// +/// Okta's API paginates with a `link` header that contains a url (in `rel=next`) to the next page of results, +/// and will always return a `rel=next` link regardless of whether there are more results. +/// This function fetches all pages until there are no more results (an empty JSON array) and finishes until +/// the next interval +/// The function will run until the `shutdown` signal is received. +pub(crate) async fn run( + url: Uri, + tls: TlsSettings, + proxy: ProxyConfig, + token: String, + interval: Duration, + timeout: Duration, + log_namespace: LogNamespace, + shutdown: ShutdownSignal, + mut out: SourceSender, +) -> Result<(), ()> { + let url_mutex = Arc::new(Mutex::new(url)); + let decoder = DecodingConfig::new( + FramingConfig::Bytes, + DeserializerConfig::Json(JsonDeserializerConfig::default()), + log_namespace.clone(), + ) + .build() + .unwrap(); + + let client = HttpClient::new(tls, &proxy).expect("Building HTTP client failed"); + + let mut stream = IntervalStream::new(tokio::time::interval(interval)) + .take_until(shutdown) + .then(move |_| { + let client = client.clone(); + let timeout = timeout.clone(); + let url_mutex = url_mutex.clone(); + let token = token.clone(); + let decoder = decoder.clone(); + + async move { + stream::unfold((), move |_| { + let timeout = timeout.clone(); + let url_mutex = url_mutex.clone(); + let token = token.clone(); + let log_namespace = log_namespace.clone(); + let decoder = decoder.clone(); + let client = client.clone(); + + async move { + // We update the actual URL based on the response the API returns + // so the critical section is between here & when the request finishes + let mut url_lock = url_mutex.lock().await; + let url = url_lock.to_string(); + + let mut request = Request::get(&url) + .body(Body::empty()) + .expect("error creating request"); + + let headers = request.headers_mut(); + headers.insert( + http::header::AUTHORIZATION, + format!("SSWS {}", token).parse().unwrap(), + ); + headers.insert(http::header::ACCEPT, "application/json".parse().unwrap()); + headers.insert( + http::header::CONTENT_TYPE, + "application/json".parse().unwrap(), + ); + + let client = client.clone(); + let timeout = timeout.clone(); + let decoder = decoder.clone(); + + tokio::time::timeout(timeout.clone(), client.send(request)) + .then({ + let url = url.clone(); + move |result| async move { + match result { + Ok(Ok(response)) => { + let (header, body) = response.into_parts(); + if let Some(next) = header + .headers + .get_all("link") + .iter() + .filter_map(|v| v.to_str().ok()) + .filter_map(|v| find_rel_next_link(v)) + .next() + .and_then(|next| Uri::try_from(next).ok()) + { + *url_lock = next; + }; + + let body = hyper::body::to_bytes(body).await?; + + emit!(EndpointBytesReceived { + byte_size: body.len(), + protocol: "http", + endpoint: url.as_str(), + }); + Ok((header, body)) + } + Ok(Err(error)) => Err(error.into()), + Err(_) => Err(format!( + "Timeout error: request exceeded {}s", + timeout.as_secs_f64() + ) + .into()), + } + .inspect(|_| { + drop(url_lock); + }) + } + }) + .then(move |response| { + let decoder = decoder.clone(); + async move { + match response { + Ok((header, body)) + if header.status == hyper::StatusCode::OK => + { + let mut buf = BytesMut::new(); + buf.extend_from_slice(&body); + let mut events = decode_events(&mut buf, decoder); + let byte_size = if events.is_empty() { + JsonSize::zero() + } else { + events.estimated_json_encoded_size_of() + }; + + emit!(HttpClientEventsReceived { + byte_size, + count: events.len(), + url: url + }); + + if events.is_empty() { + return None; + } + + enrich_events(&mut events, &log_namespace); + + Some((stream::iter(events), ())) + } + Ok((header, _)) => { + emit!(HttpClientHttpResponseError { + code: header.status, + url: url + }); + None + } + Err(error) => { + emit!(HttpClientHttpError { error, url: url }); + None + } + } + } + }) + .await + } + }) + .flatten() + .boxed() + } + }) + .flatten_unordered(None) + .boxed(); + + match out.send_event_stream(&mut stream).await { + Ok(()) => { + debug!("Finished sending."); + Ok(()) + } + Err(_) => { + let (count, _) = stream.size_hint(); + emit!(StreamClosedError { count }); + Err(().into()) + } + } +} + +fn decode_events(buf: &mut BytesMut, mut decoder: Decoder) -> Vec { + let mut events = Vec::new(); + loop { + match decoder.decode_eof(buf) { + Ok(Some((next, _))) => { + events.extend(next); + } + Ok(None) => break, + Err(error) => { + // Error is logged by `crate::codecs::Decoder`, no further + // handling is needed here. + if !error.can_continue() { + break; + } + break; + } + } + } + events +} diff --git a/src/sources/okta/mod.rs b/src/sources/okta/mod.rs new file mode 100644 index 0000000000000..4b823f57f8e66 --- /dev/null +++ b/src/sources/okta/mod.rs @@ -0,0 +1,7 @@ +#[cfg(feature = "sources-okta")] +pub mod client; + +#[cfg(test)] +mod tests; + +pub use client::OktaConfig; diff --git a/src/sources/okta/tests.rs b/src/sources/okta/tests.rs new file mode 100644 index 0000000000000..299f1325061c6 --- /dev/null +++ b/src/sources/okta/tests.rs @@ -0,0 +1,227 @@ +use crate::components::validation::prelude::*; +use tokio::time::Duration; +use vector_lib::config::LogNamespace; +use warp::Filter; + +use vector_lib::event::Event; + +use crate::config::log_schema; +use crate::sources::okta::OktaConfig; +use crate::test_util::{ + components::run_and_assert_source_compliance, components::HTTP_PULL_SOURCE_TAGS, next_addr, + test_generate_config, wait_for_tcp, +}; + +pub(crate) const INTERVAL: Duration = Duration::from_secs(10); + +pub(crate) const TIMEOUT: Duration = Duration::from_secs(1); + +/// The happy path should yield at least one event and must emit the required internal events for sources. +pub(crate) async fn run_compliance(config: OktaConfig) -> Vec { + let events = + run_and_assert_source_compliance(config, Duration::from_secs(5), &HTTP_PULL_SOURCE_TAGS) + .await; + + assert!(!events.is_empty()); + + events +} + +#[test] +fn okta_generate_config() { + test_generate_config::(); +} + +impl ValidatableComponent for OktaConfig { + fn validation_configuration() -> ValidationConfiguration { + let config = Self { + domain: "foo.okta.com".to_string(), + token: "token".to_string(), + interval: Duration::from_secs(1), + timeout: Duration::from_secs(1), + ..Default::default() + }; + let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into(); + + ValidationConfiguration::from_source( + Self::NAME, + log_namespace, + vec![ComponentTestCaseConfig::from_source(config, None, None)], + ) + } +} + +register_validatable_component!(OktaConfig); + +#[tokio::test] +async fn okta_compliance() { + let in_addr = next_addr(); + + let dummy_endpoint = warp::path!("api" / "v1" / "logs") + .and(warp::query::>()) + .map(move |_| { + warp::http::Response::builder() + .header("Content-Type", "application/json") + .body(r#"[{"data":"foo"},{"data":"bar"}]"#) + .unwrap() + }); + + tokio::spawn(warp::serve(dummy_endpoint).run(in_addr)); + wait_for_tcp(in_addr).await; + + let events = run_compliance(OktaConfig { + domain: format!("http://{}", in_addr), + token: "token".to_string(), + interval: INTERVAL, + timeout: TIMEOUT, + log_namespace: None, + ..Default::default() + }) + .await; + + assert_eq!(events.len(), 2); + + for event in events.iter() { + assert_eq!( + event.as_log()[log_schema().source_type_key().unwrap().to_string()], + OktaConfig::NAME.into() + ); + } + let log_event = events[0].as_log(); + assert_eq!( + log_event + .get("data") + .expect("data must be available") + .as_str() + .unwrap(), + "foo" + ); +} + +#[tokio::test] +async fn okta_follows_rel() { + let in_addr = next_addr(); + + let dummy_endpoint = warp::path!("api" / "v1" / "logs") + .and(warp::query::>()) + .map({ + let addr = in_addr.clone(); + move |q: std::collections::HashMap| match q.get("after") { + None => warp::http::Response::builder() + .header("Content-Type", "application/json") + .header( + "link", + format!("; rel=\"next\"", addr), + ) + .body(r#"[{"data":"foo"}]"#) + .unwrap(), + Some(after) if after == "bar" => warp::http::Response::builder() + .header("Content-Type", "application/json") + .header( + "link", + format!("; rel=\"next\"", addr), + ) + .body(r#"[{"data":"bar"}]"#) + .unwrap(), + Some(after) if after == "baz" => warp::http::Response::builder() + .header("Content-Type", "application/json") + .header( + "link", + format!("; rel=\"next\"", addr), + ) + .body(r#"[]"#) + .unwrap(), + Some(_) => panic!("following Link header with zero length reply"), + } + }); + + tokio::spawn(warp::serve(dummy_endpoint).run(in_addr)); + wait_for_tcp(in_addr).await; + + let events = run_compliance(OktaConfig { + domain: format!("http://{}", in_addr), + token: "token".to_string(), + interval: INTERVAL, + timeout: TIMEOUT, + log_namespace: None, + ..Default::default() + }) + .await; + + assert_eq!(events.len(), 2); + + for event in events.iter() { + assert_eq!( + event.as_log()[log_schema().source_type_key().unwrap().to_string()], + OktaConfig::NAME.into() + ); + } + assert_eq!(events[0].as_log()["data"].as_str().unwrap(), "foo"); + assert_eq!(events[1].as_log()["data"].as_str().unwrap(), "bar"); +} + +#[tokio::test] +async fn okta_persists_rel() { + // the client follows `next` links; on the next interval it should pick up where it left off + // and not start over from the beginning + let in_addr = next_addr(); + let seen = tokio::sync::OnceCell::::new(); + + // the first request sets `seen` but returns 0 events, ending the inner stream, + // the next interval should pick up where it left off + let dummy_endpoint = warp::path!("api" / "v1" / "logs") + .and(warp::query::>()) + .map({ + let addr = in_addr.clone(); + move |q: std::collections::HashMap| match q.get("after") { + None => warp::http::Response::builder() + .header("Content-Type", "application/json") + .header( + "link", + format!("; rel=\"next\"", addr), + ) + .body(r#"[{"data":"foo"}]"#) + .unwrap(), + Some(after) if after == "bar" => { + if seen.initialized() { + warp::http::Response::builder() + .header("Content-Type", "application/json") + .header( + "link", + format!("; rel=\"next\"", addr), + ) + .body(r#"[{"data":"bar"}]"#) + .unwrap() + } else { + seen.set(true).unwrap(); + warp::http::Response::builder() + .header("Content-Type", "application/json") + .header( + "link", + format!("; rel=\"next\"", addr), + ) + .body(r#"[]"#) + .unwrap() + } + } + Some(_) => warp::http::Response::builder() + .header("Content-Type", "application/json") + .body(r#"[]"#) + .unwrap(), + } + }); + + tokio::spawn(warp::serve(dummy_endpoint).run(in_addr)); + wait_for_tcp(in_addr).await; + + let events = run_compliance(OktaConfig { + domain: format!("http://{}", in_addr), + token: "token".to_string(), + interval: Duration::from_secs(1), + timeout: Duration::from_millis(100), + ..Default::default() + }) + .await; + + assert_eq!(events.len(), 2); +} diff --git a/website/content/en/docs/reference/configuration/sources/okta.md b/website/content/en/docs/reference/configuration/sources/okta.md new file mode 100644 index 0000000000000..fb7c985885c5d --- /dev/null +++ b/website/content/en/docs/reference/configuration/sources/okta.md @@ -0,0 +1,14 @@ +--- +title: Okta +description: Collect system logs from Okta +component_kind: source +layout: component +tags: ["okta", "component", "source", "logs"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... +*/}} diff --git a/website/cue/reference/components/sources/base/okta.cue b/website/cue/reference/components/sources/base/okta.cue new file mode 100644 index 0000000000000..c4f8afd85cab8 --- /dev/null +++ b/website/cue/reference/components/sources/base/okta.cue @@ -0,0 +1,136 @@ +package metadata + +base: components: sources: okta: configuration: { + domain: { + description: "The Okta subdomain to scrape" + required: true + type: string: examples: ["foo.okta.com"] + } + scrape_interval_secs: { + description: """ + The interval between scrapes. Requests are run concurrently so if a scrape takes longer + than the interval, a new scrape will be started. This can take extra resources, set the timeout + to a value lower than the scrape interval to prevent this from happening. + """ + required: false + type: uint: { + default: 15 + unit: "seconds" + } + } + scrape_timeout_secs: { + description: "The timeout for each scrape request." + required: false + type: float: { + default: 5.0 + unit: "seconds" + } + } + since: { + description: """ + The time to look back for logs. This is used to determine the start time of the first request + (that is, the earliest log to fetch) + """ + required: false + type: uint: {} + } + tls: { + description: "TLS configuration." + required: false + type: object: options: { + alpn_protocols: { + description: """ + Sets the list of supported ALPN protocols. + + Declare the supported ALPN protocols, which are used during negotiation with a peer. They are prioritized in the order + that they are defined. + """ + required: false + type: array: items: type: string: examples: ["h2"] + } + ca_file: { + description: """ + Absolute path to an additional CA certificate file. + + The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/certificate_authority.crt"] + } + crt_file: { + description: """ + Absolute path to a certificate file used to identify this server. + + The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as + an inline string in PEM format. + + If this is set _and_ is not a PKCS#12 archive, `key_file` must also be set. + """ + required: false + type: string: examples: ["/path/to/host_certificate.crt"] + } + key_file: { + description: """ + Absolute path to a private key file used to identify this server. + + The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/host_certificate.key"] + } + key_pass: { + description: """ + Passphrase used to unlock the encrypted key file. + + This has no effect unless `key_file` is set. + """ + required: false + type: string: examples: ["${KEY_PASS_ENV_VAR}", "PassWord1"] + } + server_name: { + description: """ + Server name to use when using Server Name Indication (SNI). + + Only relevant for outgoing connections. + """ + required: false + type: string: examples: ["www.example.com"] + } + verify_certificate: { + description: """ + Enables certificate verification. For components that create a server, this requires that the + client connections have a valid client certificate. For components that initiate requests, + this validates that the upstream has a valid certificate. + + If enabled, certificates must not be expired and must be issued by a trusted + issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the + certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and + so on, until the verification process reaches a root certificate. + + Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. + """ + required: false + type: bool: {} + } + verify_hostname: { + description: """ + Enables hostname verification. + + If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by + the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. + + Only relevant for outgoing connections. + + Do NOT set this to `false` unless you understand the risks of not verifying the remote hostname. + """ + required: false + type: bool: {} + } + } + } + token: { + description: "API token for authentication" + required: true + type: string: examples: ["00xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"] + } +} diff --git a/website/cue/reference/components/sources/okta.cue b/website/cue/reference/components/sources/okta.cue new file mode 100644 index 0000000000000..4759978a1ee27 --- /dev/null +++ b/website/cue/reference/components/sources/okta.cue @@ -0,0 +1,181 @@ +package metadata + +components: sources: okta: { + + title: "Okta" + + classes: { + commonly_used: true + delivery: "best_effort" + deployment_roles: ["aggregator"] + development: "beta" + egress_method: "stream" + stateful: false + } + + features: { + acknowledgements: true + auto_generated: true + multiline: enabled: false + } + + support: { + requirements: [] + warnings: [] + notices: [] + } + + installation: { + platform_name: null + } + + configuration: base.components.sources.okta.configuration + + output: logs: event: { + description: "An Okta system log event" + fields: { + "*": { + description: "fields from the Okta system log" + required: true + type: object: { examples: [{ + "actor": { + "id": "00uttidj01jqL21aM1d6", + "type": "User", + "alternateId": "john.doe@example.com", + "displayName": "John Doe", + "detailEntry": null + }, + "client": { + "userAgent": { + "rawUserAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.0.0 Safari/537.36", + "os": "Mac OS X", + "browser": "CHROME" + }, + "zone": null, + "device": "Computer", + "id": null, + "ipAddress": "10.0.0.1", + "geographicalContext": { + "city": "New York", + "state": "New York", + "country": "United States", + "postalCode": 10013, + "geolocation": { + "lat": 40.3157, + "lon": -74.01 + } + } + }, + "device": { + "id": "guofdhyjex1feOgbN1d9", + "name": "Mac15,6", + "os_platform": "OSX", + "os_version": "14.6.0", + "managed": false, + "registered": true, + "device_integrator": null, + "disk_encryption_type": "ALL_INTERNAL_VOLUMES", + "screen_lock_type": "BIOMETRIC", + "jailbreak": null, + "secure_hardware_present": true + }, + "authenticationContext": { + "authenticationProvider": null, + "credentialProvider": null, + "credentialType": null, + "issuer": null, + "interface": null, + "authenticationStep": 0, + "rootSessionId": "idxBager62CSveUkTxvgRtonA", + "externalSessionId": "idxBager62CSveUkTxvgRtonA" + }, + "displayMessage": "User login to Okta", + "eventType": "user.session.start", + "outcome": { + "result": "SUCCESS", + "reason": null + }, + "published": "2024-08-13T15:58:20.353Z", + "securityContext": { + "asNumber": 394089, + "asOrg": "ASN 0000", + "isp": "google", + "domain": null, + "isProxy": false + }, + "severity": "INFO", + "debugContext": { + "debugData": { + "requestId": "ab609228fe84ce59cdcbfa690bcce016", + "requestUri": "/idp/idx/authenticators/poll", + "url": "/idp/idx/authenticators/poll" + } + }, + "legacyEventType": "core.user_auth.login_success", + "transaction": { + "type": "WEB", + "id": "ab609228fe84ce59cdcbfa690bgce016", + "detail": null + }, + "uuid": "dc9fd3c0-598c-11ef-8478-2b7584bf8d5a", + "version": 0, + "request": { + "ipChain": [ + { + "ip": "10.0.0.1", + "geographicalContext": { + "city": "New York", + "state": "New York", + "country": "United States", + "postalCode": 10013, + "geolocation": { + "lat": 40.3157, + "lon": -74.01 + } + }, + "version": "V4", + "source": null + } + ] + }, + "target": [ + { + "id": "pfdfdhyjf0HMbkP2e1d7", + "type": "AuthenticatorEnrollment", + "alternateId": "unknown", + "displayName": "Okta Verify", + "detailEntry": null + }, + { + "id": "0oatxlef9sQvvqInq5d6", + "type": "AppInstance", + "alternateId": "Okta Admin Console", + "displayName": "Okta Admin Console", + "detailEntry": null + }] + }] + } + } + } + } + + how_it_works: { + api_token: { + title: "API Token" + body: """ + The `okta` source uses the Okta HTTP API, you will need to generate an API token in the + Okta admin console with sufficient permissions. + """ + } + lookback: { + title: "Lookback & Polling" + body: """ + The `okta` source polls Okta for new log events, by default beginning at the current time on + startup, following the API's pagination links for the next interval. + + The `since` parameter begins fetching logs generated prior to Vector's startup + """ + + } + } +}