Skip to content

Commit 0d9188c

Browse files
Json events sample (#134)
JSON Events Sample Signed-off-by: Anders Swanson <anders.swanson@oracle.com>
1 parent 7225904 commit 0d9188c

File tree

24 files changed

+987
-0
lines changed

24 files changed

+987
-0
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Oracle Spring Boot Sample for JSON Events and OKafka
2+
3+
This sample application demonstrates how to use the Oracle Spring Boot Starter for OKafka, and the Oracle Spring Starter for JSON Collections with [JSON Relational Duality Views](https://docs.oracle.com/en/database/oracle/oracle-database/23/jsnvu/overview-json-relational-duality-views.html) and [Transactional Event Queues](https://www.oracle.com/database/advanced-queuing/).
4+
5+
The same application demonstrates a JSON document ingestion workflow where weather station sensor data is sent to a backend application, using OKafka Producer, Consumer, and JSON Relational Duality Views:
6+
7+
1. Raw sensor data is sent to a REST endpoint.
8+
2. The raw sensor data is parsed as a POJO and produced to an OKafka queue in serialized JSONB format.
9+
3. The OKafka consumer receives the sensor POJO, enriching and saving the POJO to the database as a JSON Relational Duality View.
10+
4. After consumption, enriched sensor data can be queried from the database by their weather station ID.
11+
12+
## Run the sample application
13+
14+
The sample application test uses Testcontainers, and creates a temporary Oracle Free container database, and requires a docker runtime environment.
15+
16+
To run application test, run the following command:
17+
18+
```shell
19+
mvn test
20+
```
21+
22+
The test starts the OKafka sensor data consumer, and sends a series of raw weather station events to the producer. The test verifies that the events have been processed and saved to the database, available in JSON Relational Duality View form.
23+
24+
## Configure your project to use Oracle JSON Relational Duality Views
25+
26+
To use OKafka and Oracle JSON Relational Duality Views from your Spring Boot application, add the following Maven dependencies to your project:
27+
28+
```xml
29+
<dependency>
30+
<groupId>com.oracle.database.spring</groupId>
31+
<artifactId>oracle-spring-boot-starter-json-collections</artifactId>
32+
</dependency>
33+
<dependency>
34+
<groupId>com.oracle.database.spring</groupId>
35+
<artifactId>oracle-spring-boot-starter-okafka</artifactId>
36+
</dependency>
37+
```
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!-- Copyright (c) 2024, Oracle and/or its affiliates. -->
3+
<!-- Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -->
4+
<project xmlns="http://maven.apache.org/POM/4.0.0"
5+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
6+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
7+
<modelVersion>4.0.0</modelVersion>
8+
<parent>
9+
<artifactId>oracle-spring-boot-starter-samples</artifactId>
10+
<groupId>com.oracle.database.spring</groupId>
11+
<version>24.2.0</version>
12+
<relativePath>../pom.xml</relativePath>
13+
</parent>
14+
15+
<artifactId>oracle-spring-boot-sample-json-events</artifactId>
16+
<version>24.2.0</version>
17+
18+
<name>Oracle Spring Boot Starter - JSON Events Sample</name>
19+
<description>Oracle Spring Boot Starter Sample for JSON Events</description>
20+
21+
<organization>
22+
<name>Oracle America, Inc.</name>
23+
<url>https://www.oracle.com</url>
24+
</organization>
25+
26+
<developers>
27+
<developer>
28+
<name>Oracle</name>
29+
<email>obaas_ww at oracle.com</email>
30+
<organization>Oracle America, Inc.</organization>
31+
<organizationUrl>https://www.oracle.com</organizationUrl>
32+
</developer>
33+
</developers>
34+
35+
<licenses>
36+
<license>
37+
<name>The Universal Permissive License (UPL), Version 1.0</name>
38+
<url>https://oss.oracle.com/licenses/upl/</url>
39+
<distribution>repo</distribution>
40+
</license>
41+
</licenses>
42+
43+
<scm>
44+
<url>https://github.com/oracle/spring-cloud-oracle</url>
45+
<connection>scm:git:https://github.com/oracle/spring-cloud-oracle.git</connection>
46+
<developerConnection>scm:git:git@github.com:oracle/spring-cloud-oracle.git</developerConnection>
47+
</scm>
48+
49+
<dependencies>
50+
<dependency>
51+
<groupId>com.oracle.database.spring</groupId>
52+
<artifactId>oracle-spring-boot-starter-json-collections</artifactId>
53+
<version>${project.version}</version>
54+
</dependency>
55+
<dependency>
56+
<groupId>com.oracle.database.spring</groupId>
57+
<artifactId>oracle-spring-boot-starter-okafka</artifactId>
58+
<version>${project.version}</version>
59+
</dependency>
60+
<dependency>
61+
<groupId>org.springframework.boot</groupId>
62+
<artifactId>spring-boot-starter</artifactId>
63+
</dependency>
64+
<dependency>
65+
<groupId>org.springframework.boot</groupId>
66+
<artifactId>spring-boot-starter-web</artifactId>
67+
</dependency>
68+
<dependency>
69+
<groupId>org.springframework.boot</groupId>
70+
<artifactId>spring-boot-starter-data-jdbc</artifactId>
71+
</dependency>
72+
73+
<dependency>
74+
<groupId>org.springframework.boot</groupId>
75+
<artifactId>spring-boot-starter-test</artifactId>
76+
<version>${spring-boot-dependencies.version}</version>
77+
<scope>test</scope>
78+
</dependency>
79+
80+
<!-- Test Dependencies-->
81+
<dependency>
82+
<groupId>org.testcontainers</groupId>
83+
<artifactId>junit-jupiter</artifactId>
84+
<scope>test</scope>
85+
</dependency>
86+
87+
<dependency>
88+
<groupId>org.testcontainers</groupId>
89+
<artifactId>testcontainers</artifactId>
90+
<scope>test</scope>
91+
</dependency>
92+
93+
<dependency>
94+
<groupId>org.testcontainers</groupId>
95+
<artifactId>oracle-free</artifactId>
96+
<scope>test</scope>
97+
</dependency>
98+
</dependencies>
99+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.database.spring.jsonevents;
4+
5+
import org.springframework.boot.SpringApplication;
6+
import org.springframework.boot.autoconfigure.SpringBootApplication;
7+
8+
@SpringBootApplication
9+
public class JSONEventsSampleApp {
10+
public static void main(String[] args) {
11+
SpringApplication.run(JSONEventsSampleApp.class, args);
12+
}
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.database.spring.jsonevents;
4+
5+
import java.util.Properties;
6+
7+
import com.oracle.database.spring.jsonevents.model.Sensor;
8+
import com.oracle.database.spring.jsonevents.serde.JSONBDeserializer;
9+
import com.oracle.database.spring.jsonevents.serde.JSONBSerializer;
10+
import com.oracle.spring.json.jsonb.JSONB;
11+
import org.apache.kafka.clients.consumer.Consumer;
12+
import org.apache.kafka.clients.producer.Producer;
13+
import org.apache.kafka.common.serialization.Deserializer;
14+
import org.apache.kafka.common.serialization.Serializer;
15+
import org.apache.kafka.common.serialization.StringDeserializer;
16+
import org.apache.kafka.common.serialization.StringSerializer;
17+
import org.oracle.okafka.clients.consumer.KafkaConsumer;
18+
import org.oracle.okafka.clients.producer.KafkaProducer;
19+
import org.springframework.beans.factory.annotation.Qualifier;
20+
import org.springframework.beans.factory.annotation.Value;
21+
import org.springframework.context.annotation.Bean;
22+
import org.springframework.context.annotation.Configuration;
23+
24+
/**
25+
* OKafkaConfiguration configures the OKafka properties, producer, and consumer beans.
26+
*/
27+
@Configuration
28+
public class OKafkaConfiguration {
29+
private final JSONB jsonb;
30+
31+
@Value("${app.ojdbcPath}")
32+
private String ojdbcPath;
33+
34+
@Value("${app.bootstrapServers}")
35+
private String bootstrapServers;
36+
37+
// We use the default 23ai Free service name
38+
@Value("${app.serviceName:freepdb1}")
39+
private String serviceName;
40+
41+
// We use plaintext for a containerized, local database.
42+
// Use SSL for wallet connections, like Autonomous Database.
43+
@Value("${app.securityProtocol:PLAINTEXT}")
44+
private String securityProtocol;
45+
46+
@Value("${app.consumerGroup:SensorEvents}")
47+
private String consumerGroup;
48+
49+
public OKafkaConfiguration(JSONB jsonb) {
50+
this.jsonb = jsonb;
51+
}
52+
53+
@Bean
54+
@Qualifier("okafkaProperties")
55+
public Properties okafkaProperties() {
56+
Properties props = new Properties();
57+
props.put("oracle.service.name", serviceName);
58+
props.put("security.protocol", securityProtocol);
59+
props.put("bootstrap.servers", bootstrapServers);
60+
// If using Oracle Database wallet, pass wallet directory
61+
props.put("oracle.net.tns_admin", ojdbcPath);
62+
return props;
63+
}
64+
65+
@Bean
66+
@Qualifier("okafkaConsumer")
67+
public Consumer<String, Sensor> okafkaConsumer() {
68+
Properties props = okafkaProperties();
69+
props.put("group.id", consumerGroup);
70+
props.put("enable.auto.commit","false");
71+
props.put("max.poll.records", 2000);
72+
73+
Deserializer<String> keyDeserializer = new StringDeserializer();
74+
Deserializer<Sensor> valueDeserializer = new JSONBDeserializer<>(jsonb, Sensor.class);
75+
return new KafkaConsumer<>(props, keyDeserializer, valueDeserializer);
76+
}
77+
78+
@Bean
79+
@Qualifier("okafkaProducer")
80+
public Producer<String, Sensor> okafkaProducer() {
81+
Properties props = okafkaProperties();
82+
props.put("enable.idempotence", "true");
83+
84+
Serializer<String> keySerializer = new StringSerializer();
85+
Serializer<Sensor> valueSerializer = new JSONBSerializer<>(jsonb);
86+
return new KafkaProducer<>(props, keySerializer, valueSerializer);
87+
}
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.database.spring.jsonevents;
4+
5+
import java.util.Collections;
6+
import java.util.Properties;
7+
import java.util.concurrent.ExecutionException;
8+
9+
import jakarta.annotation.PostConstruct;
10+
import org.apache.kafka.clients.admin.Admin;
11+
import org.apache.kafka.clients.admin.NewTopic;
12+
import org.apache.kafka.common.errors.TopicExistsException;
13+
import org.oracle.okafka.clients.admin.AdminClient;
14+
import org.springframework.beans.factory.annotation.Qualifier;
15+
import org.springframework.beans.factory.annotation.Value;
16+
import org.springframework.context.annotation.Configuration;
17+
import org.springframework.core.task.AsyncTaskExecutor;
18+
19+
/**
20+
* OKafkaSetup creates the app's OKafka topic, and starts the consumer thread.
21+
*/
22+
@Configuration
23+
public class OKafkaSetup {
24+
private final AsyncTaskExecutor asyncTaskExecutor;
25+
private final SensorConsumer sensorConsumer;
26+
private final Properties okafkaProperties;
27+
28+
@Value("${app.topic}")
29+
private String topic;
30+
31+
public OKafkaSetup(@Qualifier("applicationTaskExecutor") AsyncTaskExecutor asyncTaskExecutor,
32+
SensorConsumer sensorConsumer,
33+
@Qualifier("okafkaProperties") Properties okafkaProperties) {
34+
this.asyncTaskExecutor = asyncTaskExecutor;
35+
this.sensorConsumer = sensorConsumer;
36+
this.okafkaProperties = okafkaProperties;
37+
}
38+
39+
@PostConstruct
40+
void init() {
41+
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
42+
try (Admin admin = AdminClient.create(okafkaProperties)) {
43+
admin.createTopics(Collections.singletonList(newTopic))
44+
.all()
45+
.get();
46+
} catch (ExecutionException | InterruptedException e) {
47+
if (e.getCause() instanceof TopicExistsException) {
48+
System.out.println("Topic already exists, skipping creation");
49+
} else {
50+
throw new RuntimeException(e);
51+
}
52+
}
53+
asyncTaskExecutor.submit(sensorConsumer);
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.database.spring.jsonevents;
4+
5+
import java.time.Duration;
6+
import java.util.List;
7+
8+
import com.oracle.database.spring.jsonevents.model.Sensor;
9+
import jakarta.annotation.PreDestroy;
10+
import org.apache.kafka.clients.consumer.Consumer;
11+
import org.apache.kafka.clients.consumer.ConsumerRecord;
12+
import org.apache.kafka.clients.consumer.ConsumerRecords;
13+
import org.springframework.beans.factory.annotation.Qualifier;
14+
import org.springframework.beans.factory.annotation.Value;
15+
import org.springframework.stereotype.Service;
16+
17+
@Service
18+
public class SensorConsumer implements Runnable, AutoCloseable {
19+
private final Consumer<String, Sensor> consumer;
20+
private final String topic;
21+
private final SensorEnricher sensorEnricher;
22+
private final SensorService sensorService;
23+
24+
public SensorConsumer(@Qualifier("okafkaConsumer") Consumer<String, Sensor> consumer,
25+
@Value("${app.topic}") String topic,
26+
SensorEnricher sensorEnricher,
27+
SensorService sensorService) {
28+
this.consumer = consumer;
29+
this.topic = topic;
30+
this.sensorEnricher = sensorEnricher;
31+
this.sensorService = sensorService;
32+
}
33+
34+
@Override
35+
public void run() {
36+
consumer.subscribe(List.of(topic));
37+
while (true) {
38+
ConsumerRecords<String, Sensor> records = consumer.poll(Duration.ofMillis(100));
39+
processRecords(records);
40+
// Commit records when done processing.
41+
consumer.commitAsync();
42+
}
43+
}
44+
45+
private void processRecords(ConsumerRecords<String, Sensor> records) {
46+
for (ConsumerRecord<String, Sensor> record : records) {
47+
Sensor sensor = record.value();
48+
// Add weather station data to the event
49+
Sensor enriched = sensorEnricher.enrich(sensor);
50+
// Persist the event
51+
sensorService.save(enriched);
52+
}
53+
}
54+
55+
@PreDestroy
56+
@Override
57+
public void close() throws Exception {
58+
if (consumer != null) {
59+
consumer.close();
60+
}
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright (c) 2024, Oracle and/or its affiliates.
2+
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
3+
package com.oracle.database.spring.jsonevents;
4+
5+
import java.util.List;
6+
7+
import com.oracle.database.spring.jsonevents.model.Sensor;
8+
import com.oracle.database.spring.jsonevents.model.SensorEvent;
9+
import org.springframework.http.ResponseEntity;
10+
import org.springframework.web.bind.annotation.GetMapping;
11+
import org.springframework.web.bind.annotation.PathVariable;
12+
import org.springframework.web.bind.annotation.PostMapping;
13+
import org.springframework.web.bind.annotation.RequestBody;
14+
import org.springframework.web.bind.annotation.RequestMapping;
15+
import org.springframework.web.bind.annotation.RestController;
16+
17+
@RestController
18+
@RequestMapping("/api/v1/events")
19+
public class SensorController {
20+
private final SensorEventProducer sensorEventProducer;
21+
private final SensorService sensorService;
22+
23+
public SensorController(SensorEventProducer sensorEventProducer, SensorService sensorService) {
24+
this.sensorEventProducer = sensorEventProducer;
25+
this.sensorService = sensorService;
26+
}
27+
28+
@PostMapping
29+
public ResponseEntity<?> produce(@RequestBody SensorEvent event) {
30+
sensorEventProducer.send(event);
31+
return ResponseEntity.noContent().build();
32+
}
33+
34+
@GetMapping("/station/{stationId}")
35+
ResponseEntity<List<Sensor>> getEvents(@PathVariable String stationId) {
36+
List<Sensor> sensors = sensorService.byStationId(stationId);
37+
return ResponseEntity.ok(sensors);
38+
}
39+
}

0 commit comments

Comments
 (0)