最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

flink sql Repeatedly parsing JSON problem - Stack Overflow

programmeradmin1浏览0评论

I have a flink sql like this:

select
    json_value(json_str, '$.key1') as key1,
    json_value(json_str, '$.key2') as key2,
    json_value(json_str, '$.key3') as key3,
    json_value(json_str, '$.key4') as key4
from input_table

When I ran this SQL, I found that Flink actually parses this String four times, and the CPU usage is very high。Is there a way to parse it once and reuse this json object to save CPU.

My Flink Version: 1.16

I have a flink sql like this:

select
    json_value(json_str, '$.key1') as key1,
    json_value(json_str, '$.key2') as key2,
    json_value(json_str, '$.key3') as key3,
    json_value(json_str, '$.key4') as key4
from input_table

When I ran this SQL, I found that Flink actually parses this String four times, and the CPU usage is very high。Is there a way to parse it once and reuse this json object to save CPU.

My Flink Version: 1.16

Share Improve this question asked Mar 3 at 8:22 xinfaxinfa 1 1
  • Did you try it in the latest version, 1.20? It would also be interesting to hear the analysis you did to determine that it is being parsed four times – Robin Moffatt Commented Mar 3 at 17:30
Add a comment  | 

2 Answers 2

Reset to default 0

Sounds impossible.

I briefly read the source code, then it turns out that the results are cached.

source code version:

  • flink: 1.20
  • calcite: master

Flink json_value is mapped to Calcite function. src BuiltInFunctionDefinitions.java

public static final BuiltInFunctionDefinition JSON_QUERY =
        BuiltInFunctionDefinition.newBuilder()
                .name("JSON_QUERY")
                .kind(SCALAR)
                .callSyntax(JsonFunctionsCallSyntax.JSON_QUERY)
                .inputTypeStrategy(
                        sequence(
                                logical(LogicalTypeFamily.CHARACTER_STRING),
                                and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL),
                                TYPE_LITERAL,
                                symbol(JsonQueryWrapper.class),
                                SpecificInputTypeStrategies.JSON_QUERY_ON_EMPTY_ERROR_BEHAVIOUR,
                                SpecificInputTypeStrategies
                                        .JSON_QUERY_ON_EMPTY_ERROR_BEHAVIOUR))
                .outputTypeStrategy(forceNullable(argument(2)))
                .runtimeDeferred()
                .build();

...
/**
 * Specifies that this {@link BuiltInFunctionDefinition} will be mapped to a Calcite
 * function.
 */
public Builder runtimeDeferred() {
    // This method is just a marker method for clarity. It is equivalent to calling
    // neither {@link #runtimeProvided} nor {@link #runtimeClass}.
    return this;
}

Calcite JSONfunction is stateful(cached by LoadingCache), src calcite/runtime/JsonFunctions.java

// /apache/calcite/util/BuiltInMethod.java
JSON_VALUE(JsonFunctions.StatefulFunction.class, "jsonValue",
    String.class, String.class,
    SqlJsonValueEmptyOrErrorBehavior.class, Object.class,
    SqlJsonValueEmptyOrErrorBehavior.class, Object.class),

...
// /apache/calcite/runtime/JsonFunctions.java
// jsonValue(String, String, ...) invoke by reflect
public @Nullable Object jsonValue(String input,
    String pathSpec,
    SqlJsonValueEmptyOrErrorBehavior emptyBehavior,
    Object defaultValueOnEmpty,
    SqlJsonValueEmptyOrErrorBehavior errorBehavior,
    Object defaultValueOnError) {
  return jsonValue(
      jsonApiCommonSyntaxWithCache(input, pathSpec),
      emptyBehavior,
      defaultValueOnEmpty,
      errorBehavior,
      defaultValueOnError);
}

// jsonApiCommonSyntaxWithCache
private final LoadingCache<String, JsonValueContext> cache =
    CacheBuilder.newBuilder()
        .maximumSize(FUNCTION_LEVEL_CACHE_MAX_SIZE.value())
        .build(CacheLoader.from(JsonFunctions::jsonValueExpression));

public JsonPathContext jsonApiCommonSyntaxWithCache(String input,
    String pathSpec) {
  return jsonApiCommonSyntax(cache.getUnchecked(input), pathSpec);
}

// JsonFunctions::jsonValueExpression
public static JsonValueContext jsonValueExpression(String input) {
  try {
    return JsonValueContext.withJavaObj(dejsonize(input));
  } catch (Exception e) {
    return JsonValueContext.withException(e);
  }
}

// dejsonize. JSON_PATH_JSON_PROVIDER def:
//     JacksonJsonProvider JSON_PATH_JSON_PROVIDER = new JacksonJsonProvider()
public static @Nullable Object dejsonize(String input) {
  return JSON_PATH_JSON_PROVIDER.parse(input);
}

FUNCTION_LEVEL_CACHE_MAX_SIZE default value is 1_000, maybe you can modify this value to measure cpu usage.

doc:

/**
 * The maximum number of items in a function-level cache.
 *
 * <p>A few SQL functions have expensive processing that, if its results are
 * cached, can be reused by future calls to the function. One such function
 * is {@code RLIKE}, whose arguments are a regular expression and a string.
 * The regular expression needs to be compiled to a
 * {@link java.util.regex.Pattern}. Compilation is expensive, and within a
 * particular query, the arguments are often the same string, or a small
 * number of distinct strings, so caching makes sense.
 *
 * <p>Therefore, functions such as {@code RLIKE}, {@code SIMILAR TO},
 * {@code PARSE_URL}, {@code PARSE_TIMESTAMP}, {@code FORMAT_DATE} have a
 * function-level cache. The cache is created in the code generated for the
 * query, at the call site of the function, and expires when the query has
 * finished executing. Such caches do not need time-based expiration, but
 * we need to cap the size of the cache to deal with scenarios such as a
 * billion-row table where every row has a distinct regular expression.
 *
 * <p>Because of how Calcite generates and executes code in Enumerable
 * convention, each function object is used from a single thread. Therefore,
 * non thread-safe objects such as {@link java.text.DateFormat} can be safely
 * cached.
 *
 * <p>The value of this parameter limits the size of every function-level
 * cache in Calcite. The default value is 1,000.
 */
public static final CalciteSystemProperty<Integer> FUNCTION_LEVEL_CACHE_MAX_SIZE =
    intProperty("calcite.function.cache.maxSize", 1_000, v -> v >= 0);

'''


    json_map['key3'] as key3
    .......
    ..
    .....
    ......
    json_map['key100'] as key100

from(
        select json_map
        from input_table,lateral table(json_tuple(json_str)) as T(json_map)
    )

'''

I finally solved my problem through this method。this is json_tupe func

'''

public void eval(String jsonStr, String... keys) {
    HashMap<String, String> result = new HashMap<>(keys.length);
    if (jsonStr == null || jsonStr.isEmpty()) {
        collect(result);
        return;
    }
    try {
        JsonNode jsonNode = objectMapper.readTree(jsonStr);
        for (String key : keys) {
            JsonNode valueNode = jsonNode.get(key);

            if (valueNode != null) {
                result.put(key, valueNode.asText());
            }
        }
        collect(result);
    } catch (Exception e) {
        collect(result);
    }
}

'''

发布评论

评论列表(0)

  1. 暂无评论