Skip to content

Commit 6d7f1b1

Browse files
authored
Code storage: Azure Blob integration (#408)
1 parent d3555c8 commit 6d7f1b1

File tree

15 files changed

+496
-8
lines changed

15 files changed

+496
-8
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright DataStax, Inc.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<parent>
21+
<artifactId>langstream-codestorage-providers</artifactId>
22+
<groupId>ai.langstream</groupId>
23+
<version>0.0.17-SNAPSHOT</version>
24+
</parent>
25+
<modelVersion>4.0.0</modelVersion>
26+
27+
<artifactId>langstream-codestorage-azure-blob-storage</artifactId>
28+
29+
<properties>
30+
<azure-sdk-bom.version>1.2.6</azure-sdk-bom.version>
31+
</properties>
32+
33+
34+
<dependencyManagement>
35+
<dependencies>
36+
<dependency>
37+
<groupId>com.azure</groupId>
38+
<artifactId>azure-sdk-bom</artifactId>
39+
<version>${azure-sdk-bom.version}</version>
40+
<type>pom</type>
41+
<scope>import</scope>
42+
</dependency>
43+
</dependencies>
44+
</dependencyManagement>
45+
46+
<dependencies>
47+
<dependency>
48+
<groupId>${project.groupId}</groupId>
49+
<artifactId>langstream-api</artifactId>
50+
<version>${project.version}</version>
51+
</dependency>
52+
<dependency>
53+
<groupId>org.slf4j</groupId>
54+
<artifactId>slf4j-api</artifactId>
55+
</dependency>
56+
<dependency>
57+
<groupId>ch.qos.logback</groupId>
58+
<artifactId>logback-classic</artifactId>
59+
<!-- <scope>provided</scope>-->
60+
</dependency>
61+
62+
<dependency>
63+
<groupId>ch.qos.logback</groupId>
64+
<artifactId>logback-core</artifactId>
65+
<!-- <scope>provided</scope>-->
66+
</dependency>
67+
<dependency>
68+
<groupId>com.azure</groupId>
69+
<artifactId>azure-storage-blob</artifactId>
70+
<exclusions>
71+
<exclusion>
72+
<groupId>com.azure</groupId>
73+
<artifactId>azure-core-http-netty</artifactId>
74+
</exclusion>
75+
</exclusions>
76+
</dependency>
77+
<dependency>
78+
<groupId>com.azure</groupId>
79+
<artifactId>azure-core-http-okhttp</artifactId>
80+
</dependency>
81+
<dependency>
82+
<groupId>com.fasterxml.jackson.core</groupId>
83+
<artifactId>jackson-databind</artifactId>
84+
</dependency>
85+
</dependencies>
86+
87+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package ai.langstream.impl.storage.k8s.codestorage;
17+
18+
import ai.langstream.api.codestorage.CodeArchiveMetadata;
19+
import ai.langstream.api.codestorage.CodeStorage;
20+
import ai.langstream.api.codestorage.CodeStorageException;
21+
import ai.langstream.api.codestorage.LocalZipFileArchiveFile;
22+
import ai.langstream.api.codestorage.UploadableCodeArchive;
23+
import com.azure.core.http.rest.Response;
24+
import com.azure.core.util.BinaryData;
25+
import com.azure.core.util.Context;
26+
import com.azure.storage.blob.BlobClient;
27+
import com.azure.storage.blob.BlobContainerClient;
28+
import com.azure.storage.blob.BlobContainerClientBuilder;
29+
import com.azure.storage.blob.models.BlobHttpHeaders;
30+
import com.azure.storage.blob.models.BlockBlobItem;
31+
import com.azure.storage.blob.options.BlobParallelUploadOptions;
32+
import com.azure.storage.common.StorageSharedKeyCredential;
33+
import com.fasterxml.jackson.databind.ObjectMapper;
34+
import java.io.IOException;
35+
import java.nio.file.Files;
36+
import java.nio.file.Path;
37+
import java.nio.file.StandardCopyOption;
38+
import java.util.HashMap;
39+
import java.util.Map;
40+
import java.util.Objects;
41+
import java.util.UUID;
42+
import lombok.SneakyThrows;
43+
import lombok.extern.slf4j.Slf4j;
44+
import org.jetbrains.annotations.NotNull;
45+
46+
@Slf4j
47+
public class AzureBlobCodeStorage implements CodeStorage {
48+
49+
private static final ObjectMapper mapper = new ObjectMapper();
50+
// metadata keys must not contain dashes since it's not supported by azure
51+
protected static final String OBJECT_METADATA_KEY_TENANT = "langstreamtenant";
52+
protected static final String OBJECT_METADATA_KEY_APPLICATION = "langstreamapplication";
53+
protected static final String OBJECT_METADATA_KEY_VERSION = "langstreamversion";
54+
protected static final String OBJECT_METADATA_KEY_PY_BINARIES_DIGEST =
55+
"langstreampybinariesdigest";
56+
protected static final String OBJECT_METADATA_KEY_JAVA_BINARIES_DIGEST =
57+
"langstreamjavabinariesdigest";
58+
private final BlobContainerClient containerClient;
59+
60+
@SneakyThrows
61+
public AzureBlobCodeStorage(Map<String, Object> configuration) {
62+
final AzureBlobCodeStorageConfiguration azureConfig =
63+
mapper.convertValue(configuration, AzureBlobCodeStorageConfiguration.class);
64+
65+
if (azureConfig.getEndpoint() == null) {
66+
throw new IllegalArgumentException("Azure 'endpoint' must be provided");
67+
}
68+
69+
BlobContainerClientBuilder containerClientBuilder = new BlobContainerClientBuilder();
70+
if (azureConfig.getSasToken() != null) {
71+
containerClientBuilder.sasToken(azureConfig.getSasToken());
72+
log.info("Connecting to Azure at {} with SAS token", azureConfig.getEndpoint());
73+
} else if (azureConfig.getStorageAccountName() != null) {
74+
containerClientBuilder.credential(
75+
new StorageSharedKeyCredential(
76+
azureConfig.getStorageAccountName(),
77+
azureConfig.getStorageAccountKey()));
78+
log.info(
79+
"Connecting to Azure at {} with account name {}",
80+
azureConfig.getEndpoint(),
81+
azureConfig.getStorageAccountName());
82+
} else if (azureConfig.getStorageAccountConnectionString() != null) {
83+
log.info("Connecting to Azure at {} with connection string", azureConfig.getEndpoint());
84+
containerClientBuilder.credential(
85+
StorageSharedKeyCredential.fromConnectionString(
86+
azureConfig.getStorageAccountConnectionString()));
87+
} else {
88+
throw new IllegalArgumentException(
89+
"Either sas-token, account-name/account-key or account-connection-string must be provided");
90+
}
91+
92+
containerClientBuilder.endpoint(azureConfig.getEndpoint());
93+
final String container = azureConfig.getContainer();
94+
containerClientBuilder.containerName(container);
95+
this.containerClient = containerClientBuilder.buildClient();
96+
log.info(
97+
"Connected to Azure to account {}, container {}",
98+
containerClient.getAccountName(),
99+
containerClient.getBlobContainerName());
100+
101+
if (!this.containerClient.exists()) {
102+
log.info("Creating container");
103+
this.containerClient.createIfNotExists();
104+
} else {
105+
log.info("Container already exists");
106+
}
107+
}
108+
109+
@Override
110+
public CodeArchiveMetadata storeApplicationCode(
111+
String tenant, String applicationId, String version, UploadableCodeArchive codeArchive)
112+
throws CodeStorageException {
113+
114+
try {
115+
Path tempFile = Files.createTempFile("langstream", "upload");
116+
try {
117+
Files.copy(codeArchive.getData(), tempFile, StandardCopyOption.REPLACE_EXISTING);
118+
String codeStoreId =
119+
tenant + "_" + applicationId + "_" + version + "_" + UUID.randomUUID();
120+
log.info(
121+
"Storing code archive {} for tenant {} and application {} with version {}",
122+
codeStoreId,
123+
tenant,
124+
applicationId,
125+
version);
126+
127+
final String javaBinariesDigest = codeArchive.getJavaBinariesDigest();
128+
final String pyBinariesDigest = codeArchive.getPyBinariesDigest();
129+
130+
final Map<String, String> userMetadata = new HashMap<>();
131+
userMetadata.put(OBJECT_METADATA_KEY_TENANT, tenant);
132+
userMetadata.put(OBJECT_METADATA_KEY_APPLICATION, applicationId);
133+
userMetadata.put(OBJECT_METADATA_KEY_VERSION, version);
134+
if (javaBinariesDigest != null) {
135+
userMetadata.put(OBJECT_METADATA_KEY_JAVA_BINARIES_DIGEST, javaBinariesDigest);
136+
}
137+
if (pyBinariesDigest != null) {
138+
userMetadata.put(OBJECT_METADATA_KEY_PY_BINARIES_DIGEST, pyBinariesDigest);
139+
}
140+
141+
final BlobParallelUploadOptions options =
142+
new BlobParallelUploadOptions(BinaryData.fromFile(tempFile))
143+
.setMetadata(userMetadata)
144+
.setHeaders(
145+
new BlobHttpHeaders().setContentType("application/zip"));
146+
147+
final BlobClient blobClient = getBlobClient(tenant, codeStoreId);
148+
final Response<BlockBlobItem> response =
149+
blobClient.uploadWithResponse(options, null, Context.NONE);
150+
if (response.getValue() == null) {
151+
throw new CodeStorageException(
152+
"Failed to upload code archive to azure, status code: "
153+
+ response.getStatusCode()
154+
+ ". value was null.");
155+
}
156+
157+
return new CodeArchiveMetadata(
158+
tenant, codeStoreId, applicationId, pyBinariesDigest, javaBinariesDigest);
159+
} finally {
160+
Files.delete(tempFile);
161+
}
162+
} catch (IOException err) {
163+
throw new CodeStorageException(err);
164+
}
165+
}
166+
167+
@Override
168+
public void downloadApplicationCode(
169+
String tenant, String codeStoreId, DownloadedCodeHandled codeArchive)
170+
throws CodeStorageException {
171+
try {
172+
Path tempFile = Files.createTempDirectory("langstream-download-code");
173+
Path zipFile = tempFile.resolve("code.zip");
174+
try {
175+
176+
getBlobClient(tenant, codeStoreId)
177+
.getBlockBlobClient()
178+
.downloadToFile(zipFile.toString());
179+
codeArchive.accept(new LocalZipFileArchiveFile(zipFile));
180+
} finally {
181+
if (Files.exists(zipFile)) {
182+
Files.delete(zipFile);
183+
}
184+
Files.delete(tempFile);
185+
}
186+
} catch (IOException e) {
187+
log.error("Error downloading code archive {} for tenant {}", codeStoreId, tenant, e);
188+
throw new CodeStorageException(e);
189+
}
190+
}
191+
192+
@Override
193+
public CodeArchiveMetadata describeApplicationCode(String tenant, String codeStoreId)
194+
throws CodeStorageException {
195+
final BlobClient blobClient = getBlobClient(tenant, codeStoreId);
196+
final Boolean exists = blobClient.exists();
197+
if (exists != null && exists) {
198+
final Map<String, String> metadata =
199+
blobClient.getBlockBlobClient().getProperties().getMetadata();
200+
final String objectTenant = metadata.get(OBJECT_METADATA_KEY_TENANT);
201+
if (!Objects.equals(objectTenant, tenant)) {
202+
throw new CodeStorageException(
203+
"Tenant mismatch in Azure object "
204+
+ blobClient.getBlobName()
205+
+ ": "
206+
+ objectTenant
207+
+ " != "
208+
+ tenant);
209+
}
210+
final String applicationId = metadata.get(OBJECT_METADATA_KEY_APPLICATION);
211+
Objects.requireNonNull(
212+
applicationId,
213+
"Blob " + blobClient.getBlobName() + " contains empty application metadata");
214+
215+
final String pyBinariesDigest = metadata.get(OBJECT_METADATA_KEY_PY_BINARIES_DIGEST);
216+
final String javaBinariesDigest =
217+
metadata.get(OBJECT_METADATA_KEY_JAVA_BINARIES_DIGEST);
218+
219+
return new CodeArchiveMetadata(
220+
tenant, codeStoreId, applicationId, pyBinariesDigest, javaBinariesDigest);
221+
} else {
222+
return null;
223+
}
224+
}
225+
226+
@NotNull
227+
private BlobClient getBlobClient(String tenant, String codeStoreId) {
228+
final String key = tenant + "-" + codeStoreId;
229+
final BlobClient blobClient = containerClient.getBlobClient(key);
230+
return blobClient;
231+
}
232+
233+
@Override
234+
public void deleteApplicationCode(String tenant, String codeStoreId)
235+
throws CodeStorageException {
236+
getBlobClient(tenant, codeStoreId).deleteIfExists();
237+
}
238+
239+
@Override
240+
public void deleteApplication(String tenant, String application) throws CodeStorageException {
241+
// TODO
242+
}
243+
244+
@Override
245+
public void close() {}
246+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package ai.langstream.impl.storage.k8s.codestorage;
17+
18+
import com.fasterxml.jackson.annotation.JsonAlias;
19+
import com.fasterxml.jackson.annotation.JsonProperty;
20+
import lombok.AllArgsConstructor;
21+
import lombok.Data;
22+
import lombok.NoArgsConstructor;
23+
24+
@Data
25+
@NoArgsConstructor
26+
@AllArgsConstructor
27+
public class AzureBlobCodeStorageConfiguration {
28+
private String type;
29+
30+
private String container = "langstream-code-storage";
31+
32+
private String endpoint;
33+
34+
@JsonAlias({"sastoken"})
35+
@JsonProperty("sas-token")
36+
private String sasToken;
37+
38+
@JsonAlias({"storageaccountname"})
39+
@JsonProperty("storage-account-name")
40+
private String storageAccountName;
41+
42+
@JsonAlias({"storageaccountkey"})
43+
@JsonProperty("storage-account-key")
44+
private String storageAccountKey;
45+
46+
@JsonAlias({"storageaccountconnectionstring"})
47+
@JsonProperty("storage-account-connection-string")
48+
private String storageAccountConnectionString;
49+
}

0 commit comments

Comments
 (0)