Skip to content

feat: support export data for bigquery #1976

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4355,6 +4355,15 @@ pub enum Statement {
///
/// See [ReturnStatement]
Return(ReturnStatement),
/// Export data statement
///
/// Example:
/// ```sql
/// EXPORT DATA OPTIONS(uri='gs://bucket/folder/*', format='PARQUET', overwrite=true) AS
/// SELECT field1, field2 FROM mydataset.table1 ORDER BY field1 LIMIT 10
/// ```
/// [BigQuery](https://cloud.google.com/bigquery/docs/reference/standard-sql/export-statements)
ExportData(ExportData),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a link to the bigquery docs where the syntax comes from?

/// ```sql
/// CREATE [OR REPLACE] USER <user> [IF NOT EXISTS]
/// ```
Expand Down Expand Up @@ -6198,6 +6207,7 @@ impl fmt::Display for Statement {
Statement::Return(r) => write!(f, "{r}"),
Statement::List(command) => write!(f, "LIST {command}"),
Statement::Remove(command) => write!(f, "REMOVE {command}"),
Statement::ExportData(e) => write!(f, "{e}"),
Statement::CreateUser(s) => write!(f, "{s}"),
}
}
Expand Down Expand Up @@ -10144,6 +10154,34 @@ impl fmt::Display for MemberOf {
}
}

#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub struct ExportData {
pub options: Vec<SqlOption>,
pub query: Box<Query>,
pub connection: Option<ObjectName>,
}

impl fmt::Display for ExportData {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(connection) = &self.connection {
write!(
f,
"EXPORT DATA WITH CONNECTION {connection} OPTIONS({}) AS {}",
display_comma_separated(&self.options),
self.query
)
} else {
write!(
f,
"EXPORT DATA OPTIONS({}) AS {}",
display_comma_separated(&self.options),
self.query
)
}
}
}
/// Creates a user
///
/// Syntax:
Expand Down
13 changes: 12 additions & 1 deletion src/ast/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::ast::{query::SelectItemQualifiedWildcardKind, ColumnOptions};
use crate::ast::{query::SelectItemQualifiedWildcardKind, ColumnOptions, ExportData};
use core::iter;

use crate::tokenizer::Span;
Expand Down Expand Up @@ -531,6 +531,17 @@ impl Spanned for Statement {
Statement::Print { .. } => Span::empty(),
Statement::Return { .. } => Span::empty(),
Statement::List(..) | Statement::Remove(..) => Span::empty(),
Statement::ExportData(ExportData {
options,
query,
connection,
}) => union_spans(
options
.iter()
.map(|i| i.span())
.chain(core::iter::once(query.span()))
.chain(connection.iter().map(|i| i.span())),
),
Statement::CreateUser(..) => Span::empty(),
}
}
Expand Down
28 changes: 28 additions & 0 deletions src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,10 @@ impl<'a> Parser<'a> {
Keyword::COMMENT if self.dialect.supports_comment_on() => self.parse_comment(),
Keyword::PRINT => self.parse_print(),
Keyword::RETURN => self.parse_return(),
Keyword::EXPORT => {
self.prev_token();
self.parse_export_data()
}
_ => self.expected("an SQL statement", next_token),
},
Token::LParen => {
Expand Down Expand Up @@ -16523,6 +16527,30 @@ impl<'a> Parser<'a> {
}
}

/// /// Parse a `EXPORT DATA` statement.
///
/// See [Statement::ExportData]
fn parse_export_data(&mut self) -> Result<Statement, ParserError> {
self.expect_keywords(&[Keyword::EXPORT, Keyword::DATA])?;

let connection = if self.parse_keywords(&[Keyword::WITH, Keyword::CONNECTION]) {
Some(self.parse_object_name(false)?)
} else {
None
};
self.expect_keyword(Keyword::OPTIONS)?;
self.expect_token(&Token::LParen)?;
let options = self.parse_comma_separated(|p| p.parse_sql_option())?;
self.expect_token(&Token::RParen)?;
self.expect_keyword(Keyword::AS)?;
let query = self.parse_query()?;
Ok(Statement::ExportData(ExportData {
options,
query,
connection,
}))
}

/// Consume the parser and return its underlying token buffer
pub fn into_tokens(self) -> Vec<TokenWithSpan> {
self.tokens
Expand Down
232 changes: 231 additions & 1 deletion tests/sqlparser_bigquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ mod test_utils;

use std::ops::Deref;

use sqlparser::ast::helpers::attached_token::AttachedToken;
use sqlparser::ast::*;
use sqlparser::dialect::{BigQueryDialect, GenericDialect};
use sqlparser::keywords::Keyword;
use sqlparser::parser::{ParserError, ParserOptions};
use sqlparser::tokenizer::{Location, Span};
use sqlparser::tokenizer::{Location, Span, Token, TokenWithSpan, Word};
use test_utils::*;

#[test]
Expand Down Expand Up @@ -2567,6 +2569,234 @@ fn test_struct_trailing_and_nested_bracket() {
);
}

#[test]
fn test_export_data() {
let stmt = bigquery().verified_stmt(concat!(
"EXPORT DATA OPTIONS(",
"uri = 'gs://bucket/folder/*', ",
"format = 'PARQUET', ",
"overwrite = true",
") AS ",
"SELECT field1, field2 FROM mydataset.table1 ORDER BY field1 LIMIT 10",
));
assert_eq!(
stmt,
Statement::ExportData(ExportData {
options: vec![
SqlOption::KeyValue {
key: Ident::new("uri"),
value: Expr::Value(
Value::SingleQuotedString("gs://bucket/folder/*".to_owned())
.with_empty_span()
),
},
SqlOption::KeyValue {
key: Ident::new("format"),
value: Expr::Value(
Value::SingleQuotedString("PARQUET".to_owned()).with_empty_span()
),
},
SqlOption::KeyValue {
key: Ident::new("overwrite"),
value: Expr::Value(Value::Boolean(true).with_empty_span()),
},
],
connection: None,
query: Box::new(Query {
with: None,
body: Box::new(SetExpr::Select(Box::new(Select {
select_token: AttachedToken(TokenWithSpan::new(
Token::Word(Word {
value: "SELECT".to_string(),
quote_style: None,
keyword: Keyword::SELECT,
}),
Span::empty()
)),
distinct: None,
top: None,
top_before_distinct: false,
projection: vec![
SelectItem::UnnamedExpr(Expr::Identifier(Ident::new("field1"))),
SelectItem::UnnamedExpr(Expr::Identifier(Ident::new("field2"))),
],
exclude: None,
into: None,
from: vec![TableWithJoins {
relation: table_from_name(ObjectName::from(vec![
Ident::new("mydataset"),
Ident::new("table1")
])),
joins: vec![],
}],
lateral_views: vec![],
prewhere: None,
selection: None,
group_by: GroupByExpr::Expressions(vec![], vec![]),
cluster_by: vec![],
distribute_by: vec![],
sort_by: vec![],
having: None,
named_window: vec![],
qualify: None,
window_before_qualify: false,
value_table_mode: None,
connect_by: None,
flavor: SelectFlavor::Standard,
}))),
order_by: Some(OrderBy {
kind: OrderByKind::Expressions(vec![OrderByExpr {
expr: Expr::Identifier(Ident::new("field1")),
options: OrderByOptions {
asc: None,
nulls_first: None,
},
with_fill: None,
},]),
interpolate: None,
}),
limit_clause: Some(LimitClause::LimitOffset {
limit: Some(Expr::Value(number("10").with_empty_span())),
offset: None,
limit_by: vec![],
}),
fetch: None,
locks: vec![],
for_clause: None,
settings: None,
format_clause: None,
pipe_operators: vec![],
})
})
);

let stmt = bigquery().verified_stmt(concat!(
"EXPORT DATA WITH CONNECTION myconnection.myproject.us OPTIONS(",
"uri = 'gs://bucket/folder/*', ",
"format = 'PARQUET', ",
"overwrite = true",
") AS ",
"SELECT field1, field2 FROM mydataset.table1 ORDER BY field1 LIMIT 10",
));

assert_eq!(
stmt,
Statement::ExportData(ExportData {
options: vec![
SqlOption::KeyValue {
key: Ident::new("uri"),
value: Expr::Value(
Value::SingleQuotedString("gs://bucket/folder/*".to_owned())
.with_empty_span()
),
},
SqlOption::KeyValue {
key: Ident::new("format"),
value: Expr::Value(
Value::SingleQuotedString("PARQUET".to_owned()).with_empty_span()
),
},
SqlOption::KeyValue {
key: Ident::new("overwrite"),
value: Expr::Value(Value::Boolean(true).with_empty_span()),
},
],
connection: Some(ObjectName::from(vec![
Ident::new("myconnection"),
Ident::new("myproject"),
Ident::new("us")
])),
query: Box::new(Query {
with: None,
body: Box::new(SetExpr::Select(Box::new(Select {
select_token: AttachedToken(TokenWithSpan::new(
Token::Word(Word {
value: "SELECT".to_string(),
quote_style: None,
keyword: Keyword::SELECT,
}),
Span::empty()
)),
distinct: None,
top: None,
top_before_distinct: false,
projection: vec![
SelectItem::UnnamedExpr(Expr::Identifier(Ident::new("field1"))),
SelectItem::UnnamedExpr(Expr::Identifier(Ident::new("field2"))),
],
exclude: None,
into: None,
from: vec![TableWithJoins {
relation: table_from_name(ObjectName::from(vec![
Ident::new("mydataset"),
Ident::new("table1")
])),
joins: vec![],
}],
lateral_views: vec![],
prewhere: None,
selection: None,
group_by: GroupByExpr::Expressions(vec![], vec![]),
cluster_by: vec![],
distribute_by: vec![],
sort_by: vec![],
having: None,
named_window: vec![],
qualify: None,
window_before_qualify: false,
value_table_mode: None,
connect_by: None,
flavor: SelectFlavor::Standard,
}))),
order_by: Some(OrderBy {
kind: OrderByKind::Expressions(vec![OrderByExpr {
expr: Expr::Identifier(Ident::new("field1")),
options: OrderByOptions {
asc: None,
nulls_first: None,
},
with_fill: None,
},]),
interpolate: None,
}),
limit_clause: Some(LimitClause::LimitOffset {
limit: Some(Expr::Value(number("10").with_empty_span())),
offset: None,
limit_by: vec![],
}),
fetch: None,
locks: vec![],
for_clause: None,
settings: None,
format_clause: None,
pipe_operators: vec![],
})
})
);

// at least one option (uri) is required
let err = bigquery()
.parse_sql_statements(concat!(
"EXPORT DATA OPTIONS() AS ",
"SELECT field1, field2 FROM mydataset.table1 ORDER BY field1 LIMIT 10",
))
.unwrap_err();
assert_eq!(
err.to_string(),
"sql parser error: Expected: identifier, found: )"
);

let err = bigquery()
.parse_sql_statements(concat!(
"EXPORT DATA AS SELECT field1, field2 FROM mydataset.table1 ORDER BY field1 LIMIT 10",
))
.unwrap_err();
assert_eq!(
err.to_string(),
"sql parser error: Expected: OPTIONS, found: AS"
);
}

#[test]
fn test_begin_transaction() {
bigquery().verified_stmt("BEGIN TRANSACTION");
Expand Down