Skip to content

Commit 1fa0eba

Browse files
ebyhrwendigo
authored andcommitted
Fix incorrect results after truncating tables in Memory connector
1 parent 27f86ca commit 1fa0eba

File tree

7 files changed

+47
-10
lines changed

7 files changed

+47
-10
lines changed

plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryInsertTableHandle.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,17 @@
2020

2121
import static java.util.Objects.requireNonNull;
2222

23-
public record MemoryInsertTableHandle(long table, Set<Long> activeTableIds)
23+
public record MemoryInsertTableHandle(long table, InsertMode mode, Set<Long> activeTableIds)
2424
implements ConnectorInsertTableHandle
2525
{
26+
public enum InsertMode
27+
{
28+
APPEND, OVERWRITE
29+
}
30+
2631
public MemoryInsertTableHandle
2732
{
33+
requireNonNull(mode, "mode is null");
2834
activeTableIds = ImmutableSet.copyOf(requireNonNull(activeTableIds, "activeTableIds is null"));
2935
}
3036
}

plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryMetadata.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.errorprone.annotations.concurrent.GuardedBy;
2323
import com.google.inject.Inject;
2424
import io.airlift.slice.Slice;
25+
import io.trino.plugin.memory.MemoryInsertTableHandle.InsertMode;
2526
import io.trino.spi.HostAddress;
2627
import io.trino.spi.Node;
2728
import io.trino.spi.NodeManager;
@@ -277,7 +278,7 @@ public synchronized void renameTable(ConnectorSession session, ConnectorTableHan
277278
long tableId = handle.id();
278279

279280
TableInfo oldInfo = tables.get(tableId);
280-
tables.put(tableId, new TableInfo(tableId, newTableName.getSchemaName(), newTableName.getTableName(), oldInfo.columns(), oldInfo.dataFragments(), oldInfo.comment()));
281+
tables.put(tableId, new TableInfo(tableId, newTableName.getSchemaName(), newTableName.getTableName(), oldInfo.columns(), oldInfo.truncated(), oldInfo.dataFragments(), oldInfo.comment()));
281282

282283
tableIds.remove(oldInfo.getSchemaTableName());
283284
tableIds.put(newTableName, tableId);
@@ -311,6 +312,7 @@ public synchronized MemoryOutputTableHandle beginCreateTable(ConnectorSession se
311312
tableMetadata.getTable().getSchemaName(),
312313
tableMetadata.getTable().getTableName(),
313314
columns.build(),
315+
false,
314316
new HashMap<>(),
315317
tableMetadata.getComment()));
316318

@@ -350,7 +352,10 @@ public synchronized Optional<ConnectorOutputMetadata> finishCreateTable(Connecto
350352
public synchronized MemoryInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle, List<ColumnHandle> columns, RetryMode retryMode)
351353
{
352354
MemoryTableHandle memoryTableHandle = (MemoryTableHandle) tableHandle;
353-
return new MemoryInsertTableHandle(memoryTableHandle.id(), ImmutableSet.copyOf(tableIds.values()));
355+
TableInfo tableInfo = tables.get(memoryTableHandle.id());
356+
InsertMode mode = tableInfo.truncated() ? InsertMode.OVERWRITE : InsertMode.APPEND;
357+
tables.put(tableInfo.id(), new TableInfo(tableInfo.id(), tableInfo.schemaName(), tableInfo.tableName(), tableInfo.columns(), false, tableInfo.dataFragments(), tableInfo.comment()));
358+
return new MemoryInsertTableHandle(memoryTableHandle.id(), mode, ImmutableSet.copyOf(tableIds.values()));
354359
}
355360

356361
@Override
@@ -374,7 +379,7 @@ public synchronized void truncateTable(ConnectorSession session, ConnectorTableH
374379
MemoryTableHandle handle = (MemoryTableHandle) tableHandle;
375380
long tableId = handle.id();
376381
TableInfo info = tables.get(handle.id());
377-
tables.put(tableId, new TableInfo(tableId, info.schemaName(), info.tableName(), info.columns(), ImmutableMap.of(), info.comment()));
382+
tables.put(tableId, new TableInfo(tableId, info.schemaName(), info.tableName(), info.columns(), true, ImmutableMap.of(), info.comment()));
378383
}
379384

380385
@Override
@@ -393,7 +398,7 @@ public synchronized void addColumn(ConnectorSession session, ConnectorTableHandl
393398
.add(new ColumnInfo(new MemoryColumnHandle(table.columns().size(), column.getType()), column.getName(), column.getType(), column.isNullable(), Optional.ofNullable(column.getComment())))
394399
.build();
395400

396-
tables.put(tableId, new TableInfo(tableId, table.schemaName(), table.tableName(), columns, table.dataFragments(), table.comment()));
401+
tables.put(tableId, new TableInfo(tableId, table.schemaName(), table.tableName(), columns, table.truncated(), table.dataFragments(), table.comment()));
397402
}
398403

399404
@Override
@@ -408,7 +413,7 @@ public synchronized void renameColumn(ConnectorSession session, ConnectorTableHa
408413
ColumnInfo columnInfo = columns.get(column.columnIndex());
409414
columns.set(column.columnIndex(), new ColumnInfo(columnInfo.handle(), target, columnInfo.type(), columnInfo.nullable(), columnInfo.comment()));
410415

411-
tables.put(tableId, new TableInfo(tableId, table.schemaName(), table.tableName(), ImmutableList.copyOf(columns), table.dataFragments(), table.comment()));
416+
tables.put(tableId, new TableInfo(tableId, table.schemaName(), table.tableName(), ImmutableList.copyOf(columns), table.truncated(), table.dataFragments(), table.comment()));
412417
}
413418

414419
@Override
@@ -423,7 +428,7 @@ public synchronized void dropNotNullConstraint(ConnectorSession session, Connect
423428
ColumnInfo columnInfo = columns.get(column.columnIndex());
424429
columns.set(column.columnIndex(), new ColumnInfo(columnInfo.handle(), columnInfo.name(), columnInfo.type(), true, columnInfo.comment()));
425430

426-
tables.put(tableId, new TableInfo(tableId, table.schemaName(), table.tableName(), ImmutableList.copyOf(columns), table.dataFragments(), table.comment()));
431+
tables.put(tableId, new TableInfo(tableId, table.schemaName(), table.tableName(), ImmutableList.copyOf(columns), table.truncated(), table.dataFragments(), table.comment()));
427432
}
428433

429434
@Override
@@ -538,7 +543,7 @@ private void updateRowsOnHosts(long tableId, Collection<Slice> fragments)
538543
dataFragments.merge(memoryDataFragment.hostAddress(), memoryDataFragment, MemoryDataFragment::merge);
539544
}
540545

541-
tables.put(tableId, new TableInfo(tableId, info.schemaName(), info.tableName(), info.columns(), dataFragments, info.comment()));
546+
tables.put(tableId, new TableInfo(tableId, info.schemaName(), info.tableName(), info.columns(), info.truncated(), dataFragments, info.comment()));
542547
}
543548

544549
public synchronized List<MemoryDataFragment> getDataFragments(long tableId)
@@ -599,7 +604,7 @@ public synchronized void setTableComment(ConnectorSession session, ConnectorTabl
599604
MemoryTableHandle table = (MemoryTableHandle) tableHandle;
600605
TableInfo info = tables.get(table.id());
601606
checkArgument(info != null, "Table not found");
602-
tables.put(table.id(), new TableInfo(table.id(), info.schemaName(), info.tableName(), info.columns(), info.dataFragments(), comment));
607+
tables.put(table.id(), new TableInfo(table.id(), info.schemaName(), info.tableName(), info.columns(), info.truncated(), info.dataFragments(), comment));
603608
}
604609

605610
@Override
@@ -617,6 +622,7 @@ public synchronized void setColumnComment(ConnectorSession session, ConnectorTab
617622
info.columns().stream()
618623
.map(tableColumn -> Objects.equals(tableColumn.handle(), columnHandle) ? new ColumnInfo(tableColumn.handle(), tableColumn.name(), tableColumn.getMetadata().getType(), tableColumn.nullable(), comment) : tableColumn)
619624
.collect(toImmutableList()),
625+
info.truncated(),
620626
info.dataFragments(),
621627
info.comment()));
622628
}

plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPageSinkProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.google.common.collect.ImmutableList;
1818
import com.google.inject.Inject;
1919
import io.airlift.slice.Slice;
20+
import io.trino.plugin.memory.MemoryInsertTableHandle.InsertMode;
2021
import io.trino.spi.HostAddress;
2122
import io.trino.spi.NodeManager;
2223
import io.trino.spi.Page;
@@ -75,6 +76,10 @@ public ConnectorPageSink createPageSink(ConnectorTransactionHandle transactionHa
7576
long tableId = memoryInsertTableHandle.table();
7677
checkState(memoryInsertTableHandle.activeTableIds().contains(tableId));
7778

79+
if (memoryInsertTableHandle.mode() == InsertMode.OVERWRITE) {
80+
pagesStore.purge(tableId);
81+
}
82+
7883
pagesStore.cleanUp(memoryInsertTableHandle.activeTableIds());
7984
pagesStore.initialize(tableId);
8085
return new MemoryPageSink(pagesStore, currentHostAddress, tableId);

plugin/trino-memory/src/main/java/io/trino/plugin/memory/MemoryPagesStore.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ public synchronized boolean contains(Long tableId)
134134
return tables.containsKey(tableId);
135135
}
136136

137+
public synchronized void purge(long tableId)
138+
{
139+
tables.remove(tableId);
140+
}
141+
137142
public synchronized void cleanUp(Set<Long> activeTableIds)
138143
{
139144
// We have to remember that there might be some race conditions when there are two tables created at once.

plugin/trino-memory/src/main/java/io/trino/plugin/memory/TableInfo.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public record TableInfo(
3535
String schemaName,
3636
String tableName,
3737
List<ColumnInfo> columns,
38+
boolean truncated,
3839
Map<HostAddress, MemoryDataFragment> dataFragments,
3940
Optional<String> comment)
4041
{

plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryConnectorTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,19 @@ public void testRenameView()
605605
assertUpdate("DROP SCHEMA test_different_schema");
606606
}
607607

608+
@Test
609+
void testInsertAfterTruncate()
610+
{
611+
try (TestTable table = new TestTable(getQueryRunner()::execute, "test_truncate", "AS SELECT 1 x")) {
612+
assertUpdate("TRUNCATE TABLE " + table.getName());
613+
assertQueryReturnsEmptyResult("SELECT * FROM " + table.getName());
614+
615+
assertUpdate("INSERT INTO " + table.getName() + " VALUES 2", 1);
616+
assertThat(query("SELECT * FROM " + table.getName()))
617+
.matches("VALUES 2");
618+
}
619+
}
620+
608621
@Override
609622
protected String errorMessageForInsertIntoNotNullColumn(String columnName)
610623
{

plugin/trino-memory/src/test/java/io/trino/plugin/memory/TestMemoryPagesStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.google.common.collect.ImmutableList;
1717
import com.google.common.collect.ImmutableSet;
1818
import io.airlift.units.DataSize;
19+
import io.trino.plugin.memory.MemoryInsertTableHandle.InsertMode;
1920
import io.trino.spi.HostAddress;
2021
import io.trino.spi.Page;
2122
import io.trino.spi.TrinoException;
@@ -165,7 +166,7 @@ private static ConnectorOutputTableHandle createMemoryOutputTableHandle(long tab
165166

166167
private static ConnectorInsertTableHandle createMemoryInsertTableHandle(long tableId, Long[] activeTableIds)
167168
{
168-
return new MemoryInsertTableHandle(tableId, ImmutableSet.copyOf(activeTableIds));
169+
return new MemoryInsertTableHandle(tableId, InsertMode.APPEND, ImmutableSet.copyOf(activeTableIds));
169170
}
170171

171172
private static Page createPage()

0 commit comments

Comments
 (0)