I have a flink sql like this:
select
json_value(json_str, '$.key1') as key1,
json_value(json_str, '$.key2') as key2,
json_value(json_str, '$.key3') as key3,
json_value(json_str, '$.key4') as key4
from input_table
When I ran this SQL, I found that Flink actually parses this String four times, and the CPU usage is very high。Is there a way to parse it once and reuse this json object to save CPU.
My Flink Version: 1.16
I have a flink sql like this:
select
json_value(json_str, '$.key1') as key1,
json_value(json_str, '$.key2') as key2,
json_value(json_str, '$.key3') as key3,
json_value(json_str, '$.key4') as key4
from input_table
When I ran this SQL, I found that Flink actually parses this String four times, and the CPU usage is very high。Is there a way to parse it once and reuse this json object to save CPU.
My Flink Version: 1.16
Share Improve this question asked Mar 3 at 8:22 xinfaxinfa 1 1- Did you try it in the latest version, 1.20? It would also be interesting to hear the analysis you did to determine that it is being parsed four times – Robin Moffatt Commented Mar 3 at 17:30
2 Answers
Reset to default 0Sounds impossible.
I briefly read the source code, then it turns out that the results are cached.
source code version:
- flink: 1.20
- calcite: master
Flink json_value is mapped to Calcite function. src BuiltInFunctionDefinitions.java
public static final BuiltInFunctionDefinition JSON_QUERY =
BuiltInFunctionDefinition.newBuilder()
.name("JSON_QUERY")
.kind(SCALAR)
.callSyntax(JsonFunctionsCallSyntax.JSON_QUERY)
.inputTypeStrategy(
sequence(
logical(LogicalTypeFamily.CHARACTER_STRING),
and(logical(LogicalTypeFamily.CHARACTER_STRING), LITERAL),
TYPE_LITERAL,
symbol(JsonQueryWrapper.class),
SpecificInputTypeStrategies.JSON_QUERY_ON_EMPTY_ERROR_BEHAVIOUR,
SpecificInputTypeStrategies
.JSON_QUERY_ON_EMPTY_ERROR_BEHAVIOUR))
.outputTypeStrategy(forceNullable(argument(2)))
.runtimeDeferred()
.build();
...
/**
* Specifies that this {@link BuiltInFunctionDefinition} will be mapped to a Calcite
* function.
*/
public Builder runtimeDeferred() {
// This method is just a marker method for clarity. It is equivalent to calling
// neither {@link #runtimeProvided} nor {@link #runtimeClass}.
return this;
}
Calcite JSONfunction is stateful(cached by LoadingCache), src calcite/runtime/JsonFunctions.java
// /apache/calcite/util/BuiltInMethod.java
JSON_VALUE(JsonFunctions.StatefulFunction.class, "jsonValue",
String.class, String.class,
SqlJsonValueEmptyOrErrorBehavior.class, Object.class,
SqlJsonValueEmptyOrErrorBehavior.class, Object.class),
...
// /apache/calcite/runtime/JsonFunctions.java
// jsonValue(String, String, ...) invoke by reflect
public @Nullable Object jsonValue(String input,
String pathSpec,
SqlJsonValueEmptyOrErrorBehavior emptyBehavior,
Object defaultValueOnEmpty,
SqlJsonValueEmptyOrErrorBehavior errorBehavior,
Object defaultValueOnError) {
return jsonValue(
jsonApiCommonSyntaxWithCache(input, pathSpec),
emptyBehavior,
defaultValueOnEmpty,
errorBehavior,
defaultValueOnError);
}
// jsonApiCommonSyntaxWithCache
private final LoadingCache<String, JsonValueContext> cache =
CacheBuilder.newBuilder()
.maximumSize(FUNCTION_LEVEL_CACHE_MAX_SIZE.value())
.build(CacheLoader.from(JsonFunctions::jsonValueExpression));
public JsonPathContext jsonApiCommonSyntaxWithCache(String input,
String pathSpec) {
return jsonApiCommonSyntax(cache.getUnchecked(input), pathSpec);
}
// JsonFunctions::jsonValueExpression
public static JsonValueContext jsonValueExpression(String input) {
try {
return JsonValueContext.withJavaObj(dejsonize(input));
} catch (Exception e) {
return JsonValueContext.withException(e);
}
}
// dejsonize. JSON_PATH_JSON_PROVIDER def:
// JacksonJsonProvider JSON_PATH_JSON_PROVIDER = new JacksonJsonProvider()
public static @Nullable Object dejsonize(String input) {
return JSON_PATH_JSON_PROVIDER.parse(input);
}
FUNCTION_LEVEL_CACHE_MAX_SIZE default value is 1_000, maybe you can modify this value to measure cpu usage.
doc:
/**
* The maximum number of items in a function-level cache.
*
* <p>A few SQL functions have expensive processing that, if its results are
* cached, can be reused by future calls to the function. One such function
* is {@code RLIKE}, whose arguments are a regular expression and a string.
* The regular expression needs to be compiled to a
* {@link java.util.regex.Pattern}. Compilation is expensive, and within a
* particular query, the arguments are often the same string, or a small
* number of distinct strings, so caching makes sense.
*
* <p>Therefore, functions such as {@code RLIKE}, {@code SIMILAR TO},
* {@code PARSE_URL}, {@code PARSE_TIMESTAMP}, {@code FORMAT_DATE} have a
* function-level cache. The cache is created in the code generated for the
* query, at the call site of the function, and expires when the query has
* finished executing. Such caches do not need time-based expiration, but
* we need to cap the size of the cache to deal with scenarios such as a
* billion-row table where every row has a distinct regular expression.
*
* <p>Because of how Calcite generates and executes code in Enumerable
* convention, each function object is used from a single thread. Therefore,
* non thread-safe objects such as {@link java.text.DateFormat} can be safely
* cached.
*
* <p>The value of this parameter limits the size of every function-level
* cache in Calcite. The default value is 1,000.
*/
public static final CalciteSystemProperty<Integer> FUNCTION_LEVEL_CACHE_MAX_SIZE =
intProperty("calcite.function.cache.maxSize", 1_000, v -> v >= 0);
'''
json_map['key3'] as key3
.......
..
.....
......
json_map['key100'] as key100
from(
select json_map
from input_table,lateral table(json_tuple(json_str)) as T(json_map)
)
'''
I finally solved my problem through this method。this is json_tupe func
'''
public void eval(String jsonStr, String... keys) {
HashMap<String, String> result = new HashMap<>(keys.length);
if (jsonStr == null || jsonStr.isEmpty()) {
collect(result);
return;
}
try {
JsonNode jsonNode = objectMapper.readTree(jsonStr);
for (String key : keys) {
JsonNode valueNode = jsonNode.get(key);
if (valueNode != null) {
result.put(key, valueNode.asText());
}
}
collect(result);
} catch (Exception e) {
collect(result);
}
}
'''