Skip to content

Commit b7ab45d

Browse files
authored
Merge pull request #82 from trocco-io/24500-reconnect
fix: reconnect when auth error occurs in runDropStage method
2 parents 5fd4e49 + ed4f320 commit b7ab45d

File tree

1 file changed

+42
-2
lines changed

1 file changed

+42
-2
lines changed

src/main/java/org/embulk/output/SnowflakeOutputPlugin.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.sql.Types;
88
import java.util.*;
99
import java.util.function.BiFunction;
10+
import net.snowflake.client.jdbc.SnowflakeSQLException;
1011
import net.snowflake.client.jdbc.internal.org.bouncycastle.operator.OperatorCreationException;
1112
import net.snowflake.client.jdbc.internal.org.bouncycastle.pkcs.PKCSException;
1213
import org.embulk.config.ConfigDiff;
@@ -124,6 +125,16 @@ public static MatchByColumnName fromString(String value) {
124125
}
125126
}
126127

128+
// error codes which need reauthenticate
129+
// ref:
130+
// https://github.com/snowflakedb/snowflake-jdbc/blob/v3.13.26/src/main/java/net/snowflake/client/jdbc/SnowflakeUtil.java#L42
131+
private static final int ID_TOKEN_EXPIRED_GS_CODE = 390110;
132+
private static final int SESSION_NOT_EXIST_GS_CODE = 390111;
133+
private static final int MASTER_TOKEN_NOTFOUND = 390113;
134+
private static final int MASTER_EXPIRED_GS_CODE = 390114;
135+
private static final int MASTER_TOKEN_INVALID_GS_CODE = 390115;
136+
private static final int ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE = 390195;
137+
127138
@Override
128139
protected Class<? extends PluginTask> getTaskClass() {
129140
return SnowflakePluginTask.class;
@@ -204,12 +215,12 @@ public ConfigDiff transaction(
204215
snowflakeCon.runCreateStage(stageIdentifier);
205216
configDiff = super.transaction(config, schema, taskCount, control);
206217
if (t.getDeleteStage()) {
207-
snowflakeCon.runDropStage(stageIdentifier);
218+
runDropStageWithRecovery(snowflakeCon, stageIdentifier, task);
208219
}
209220
} catch (Exception e) {
210221
if (t.getDeleteStage() && t.getDeleteStageOnError()) {
211222
try {
212-
snowflakeCon.runDropStage(stageIdentifier);
223+
runDropStageWithRecovery(snowflakeCon, stageIdentifier, task);
213224
} catch (SQLException ex) {
214225
throw new RuntimeException(ex);
215226
}
@@ -220,6 +231,35 @@ public ConfigDiff transaction(
220231
return configDiff;
221232
}
222233

234+
private void runDropStageWithRecovery(
235+
SnowflakeOutputConnection snowflakeCon, StageIdentifier stageIdentifier, PluginTask task)
236+
throws SQLException {
237+
try {
238+
snowflakeCon.runDropStage(stageIdentifier);
239+
} catch (SnowflakeSQLException ex) {
240+
// INFO: Don't handle only SnowflakeReauthenticationRequest here
241+
// because SnowflakeSQLException with following error codes may be thrown in some cases.
242+
243+
logger.info("SnowflakeSQLException was caught: ({}) {}", ex.getErrorCode(), ex.getMessage());
244+
245+
switch (ex.getErrorCode()) {
246+
case ID_TOKEN_EXPIRED_GS_CODE:
247+
case SESSION_NOT_EXIST_GS_CODE:
248+
case MASTER_TOKEN_NOTFOUND:
249+
case MASTER_EXPIRED_GS_CODE:
250+
case MASTER_TOKEN_INVALID_GS_CODE:
251+
case ID_TOKEN_INVALID_LOGIN_REQUEST_GS_CODE:
252+
// INFO: If runCreateStage consumed a lot of time, authentication might be expired.
253+
// In this case, retry to drop stage.
254+
snowflakeCon = (SnowflakeOutputConnection) getConnector(task, true).connect(true);
255+
snowflakeCon.runDropStage(stageIdentifier);
256+
break;
257+
default:
258+
throw ex;
259+
}
260+
}
261+
}
262+
223263
@Override
224264
public ConfigDiff resume(
225265
TaskSource taskSource, Schema schema, int taskCount, OutputPlugin.Control control) {

0 commit comments

Comments
 (0)