文章
问答
冒泡
flink 中使用Calcite实现sql语句解析

flink sql是flink 中的一个重要模块,同时我们也知道flink sql的解析是基于calcite实现的。那么,当我们在使用flink的时候,如何使用calcite来对sql进行解析呢?

首先,如果使用了flink-table-planner,则不能再引入calcite。由于flink-table-planner中已经包含了calcite代码,如果再引入calcite的jar包,则会引起冲突。所以,直接使用flink-table-planner中的calcite即可。

那么这里最主要的就是如何获得 org.apache.calcite.sql.parser.Config 对象,通过对 org.apache.flink.table.planner.delegation.PlannerContext 源码的查看,我们可以看到如下代码:

private CalciteConfig getCalciteConfig() {
    return TableConfigUtils.getCalciteConfig(context.getTableConfig());
}

/**
 * Returns the SQL parser config for this environment including a custom Calcite configuration.
 */
private SqlParser.Config getSqlParserConfig() {
    return JavaScalaConversionUtil.<SqlParser.Config>toJava(
                    getCalciteConfig().getSqlParserConfig())
            .orElseGet(
                    // we use Java lex because back ticks are easier than double quotes in
                    // programming and cases are preserved
                    () -> {
                        SqlConformance conformance = getSqlConformance();
                        return SqlParser.config()
                                .withParserFactory(FlinkSqlParserFactories.create(conformance))
                                .withConformance(conformance)
                                .withLex(Lex.JAVA)
                                .withIdentifierMaxLength(256);
                    });
}

private FlinkSqlConformance getSqlConformance() {
    SqlDialect sqlDialect = context.getTableConfig().getSqlDialect();
    switch (sqlDialect) {
            // Actually, in Hive dialect, we won't use Calcite parser.
            // So, we can just use Flink's default sql conformance as a placeholder
        case HIVE:
        case DEFAULT:
            return FlinkSqlConformance.DEFAULT;
        default:
            throw new TableException("Unsupported SQL dialect: " + sqlDialect);
    }
}

 

由代码可见,通过 TableConfig 可以获得 CalciteConfig。所以,我们可以通过 TableEnvironment 来获得TableConfig ,从而获得 SqlParser.Config。有了config,就可以进行sql解析了。

SqlParser parser = SqlParser.create(sql, config);

如果不设置config,则会引起例如无法解析 CREATE TABLE 这样的问题。

flink

关于作者

落雁沙
非典型码农
获得点赞
文章被阅读