Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ tests/integration/env/

# CMake
cmake-build-*/
CMakeCache.txt
CMakeFiles/
cmake_install.cmake
*.cmake
.idea/

# Mongo Explorer plugin
Expand Down
15 changes: 15 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ set(HEADERS globals.h
src/frontend/core/executor/impl/CypherQueryExecutor.h
src/query/processor/cypher/runtime/Aggregation.h
src/query/processor/cypher/runtime/AggregationFactory.h
src/temporal/TemporalTypes.h
src/temporal/EdgeStorage.h
src/temporal/BitmapManager.h
src/temporal/PropertyStores.h
src/temporal/SnapshotManager.h
src/temporal/TemporalStreamIngestor.h
src/temporal/TemporalFacade.h
src/temporal/TemporalIntegration.h
)

set(SOURCES src/backend/JasmineGraphBackend.cpp
Expand Down Expand Up @@ -196,6 +204,13 @@ set(SOURCES src/backend/JasmineGraphBackend.cpp
src/frontend/core/executor/impl/CypherQueryExecutor.cpp
src/query/processor/cypher/runtime/Aggregation.cpp
src/query/processor/cypher/runtime/AggregationFactory.cpp
src/temporal/EdgeStorage.cpp
src/temporal/BitmapManager.cpp
src/temporal/PropertyStores.cpp
src/temporal/SnapshotManager.cpp
src/temporal/TemporalStreamIngestor.cpp
src/temporal/TemporalFacade.cpp
src/temporal/TemporalIntegration.cpp

)

Expand Down
73 changes: 72 additions & 1 deletion src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ limitations under the License.
#include "../query/processor/cypher/util/SharedBuffer.h"
#include "../query/processor/cypher/runtime/AggregationFactory.h"
#include "../query/processor/cypher/runtime/Aggregation.h"
#include "../temporal/TemporalIntegration.h"
#include "../partitioner/stream/Partitioner.h"

#define MAX_PENDING_CONNECTIONS 10
Expand Down Expand Up @@ -1086,6 +1087,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
string graphId;
string partitionAlgo;
string direction;
bool enableTemporal = false;

if (existingGraph == "y") {
string existingGraphIdMsg = "Send the existing graph ID ? ";
Expand Down Expand Up @@ -1250,6 +1252,51 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
*loop_exit_p = true;
return;
}

// Ask for temporal streaming configuration
string temporalConfigMsg = "Enable temporal streaming with event timestamps (y/n)? ";
result_wr = write(connFd, temporalConfigMsg.c_str(), temporalConfigMsg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

string temporalEnabled = Utils::getFrontendInput(connFd);
enableTemporal = (temporalEnabled == "y" || temporalEnabled == "Y");

string temporalResponse = enableTemporal ?
"Temporal streaming enabled - expecting 'event_timestamp' field in JSON" :
"Standard streaming mode - no temporal processing";
result_wr = write(connFd, temporalResponse.c_str(), temporalResponse.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

// Inform about automatic operation type detection
string operationTypeInfo = "Operation types (ADD/EDIT/DELETE) will be automatically detected from 'operation_type' field in JSON messages";
result_wr = write(connFd, operationTypeInfo.c_str(), operationTypeInfo.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}


}

string msg_1 = "Do you want to use default KAFKA consumer(y/n) ?";
Expand Down Expand Up @@ -1349,6 +1396,30 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
return;
}

// Initialize temporal integration if temporal streaming is enabled
if (enableTemporal) {
std::string instanceDataFolder = Utils::getJasmineGraphProperty("org.jasminegraph.server.instance.datafolder");
std::string temporalBaseDir = instanceDataFolder + "/temporal/g" + graphId;
std::chrono::seconds snapshotInterval(60); // Default 60 second intervals

// Check if custom snapshot interval is configured
std::string intervalProp = Utils::getJasmineGraphProperty("org.jasminegraph.temporal.snapshot.interval");
if (!intervalProp.empty()) {
try {
int intervalSec = std::stoi(intervalProp);
snapshotInterval = std::chrono::seconds(intervalSec);
frontend_logger.info("Using custom snapshot interval: " + std::to_string(intervalSec) + " seconds");
} catch (const std::exception& e) {
frontend_logger.warn("Invalid snapshot interval property, using default 60 seconds");
}
}

// Initialize global temporal facade
jasminegraph::TemporalIntegration::initialize(temporalBaseDir, snapshotInterval);
frontend_logger.info("Temporal integration initialized for graph " + graphId +
" with snapshot interval: " + std::to_string(snapshotInterval.count()) + " seconds");
}

// create kafka consumer and graph partitioner
kstream = new KafkaConnector(configs);
// Create the KafkaConnector object.
Expand All @@ -1358,7 +1429,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
// Create the StreamHandler object.
StreamHandler *stream_handler = new StreamHandler(kstream, numberOfPartitions, workerClients, sqlite,
stoi(graphId), direction == Conts::DIRECTED,
spt::getPartitioner(partitionAlgo));
spt::getPartitioner(partitionAlgo), enableTemporal, true);

if (existingGraph != "y") {
string path = "kafka:\\" + topic_name_s + ":" + group_id;
Expand Down
Loading
Loading