flink sql是flink 中的一个重要模块,同时我们也知道flink sql的解析是基于calcite实现的。那么,当我们在使用flink的时候,如何使用calcite来对sql进行解析呢?
首先,如果使用了flink-table-planner,则不能再引入calcite。由于flink-table-planner中已经包含了calcite代码,如果再引入calcite的jar包,则会引起冲突。所以,直接使用flink-table-planner中的calcite即可。
那么这里最主要的就是如何获得 org.apache.calcite.sql.parser.Config 对象,通过对 org.apache.flink.table.planner.delegation.PlannerContext 源码的查看,我们可以看到如下代码:
private CalciteConfig getCalciteConfig() {
return TableConfigUtils.getCalciteConfig(context.getTableConfig());
}
/**
* Returns the SQL parser config for this environment including a custom Calcite configuration.
*/
private SqlParser.Config getSqlParserConfig() {
return JavaScalaConversionUtil.<SqlParser.Config>toJava(
getCalciteConfig().getSqlParserConfig())
.orElseGet(
// we use Java lex because back ticks are easier than double quotes in
// programming and cases are preserved
() -> {
SqlConformance conformance = getSqlConformance();
return SqlParser.config()
.withParserFactory(FlinkSqlParserFactories.create(conformance))
.withConformance(conformance)
.withLex(Lex.JAVA)
.withIdentifierMaxLength(256);
});
}
private FlinkSqlConformance getSqlConformance() {
SqlDialect sqlDialect = context.getTableConfig().getSqlDialect();
switch (sqlDialect) {
// Actually, in Hive dialect, we won't use Calcite parser.
// So, we can just use Flink's default sql conformance as a placeholder
case HIVE:
case DEFAULT:
return FlinkSqlConformance.DEFAULT;
default:
throw new TableException("Unsupported SQL dialect: " + sqlDialect);
}
}
由代码可见,通过 TableConfig 可以获得 CalciteConfig。所以,我们可以通过 TableEnvironment 来获得TableConfig ,从而获得 SqlParser.Config。有了config,就可以进行sql解析了。
SqlParser parser = SqlParser.create(sql, config);
如果不设置config,则会引起例如无法解析 CREATE TABLE 这样的问题。