I am trying to work with api.countrylayer payload which returns a JSON list of countries. The JSON batch list should be split by each country for further processing. Therefore I am using the unnest function in Flink SQL, but it seems to manipulate the output.
payload:
[{"name":"Afghanistan","topLevelDomain"
[".af"],"alpha2Code":"AF","alpha3Code":"AFG","callingCodes":["93"],"capital":"Kabul"...},
{"name":"\u00c5land Islands","topLevelDomain":[".ax"],"alpha2Code":"AX","alpha3Code":"ALA","callingCodes":["358"],"capital":"Mariehamn"...},
{"name":"Albania","topLevelDomain":[".al"],"alpha2Code":"AL","alpha3Code":"ALB","callingCodes":["355"],"capital":"Tirana"...}]
processing:
tableEnv.executeSql("create view countryview as" +
" select " +
" country " +
" from kafkaCountryLayer " +
" cross join unnest(json_query(countries, '$' returning array<string>)) as countrylayer (country))");
Table table = tableEnv.sqlQuery("select " +
" country " +
" from countryview ");
table.executeInsert("print_output")
print_output result
[{name=Afghanistan, topLevelDomain=[.af], alpha2Code=AF, alpha3Code=AFG, callingCodes=[93], capital=Kabul...}]
[{name=Åland Islands, topLevelDomain=[.ax], alpha2Code=AX, alpha3Code=ALA, callingCodes=[358], capital=Mariehamn...}]
[{name=Albania, topLevelDomain=[.al], alpha2Code=AL, alpha3Code=ALB, callingCodes=[355], capital=Tirana...}]
expected result
{"name":"Afghanistan","topLevelDomain":[".af"],"alpha2Code":"AF","alpha3Code":"AFG","callingCodes":["93"],"capital":"Kabul"...}
{"name":"\u00c5land Islands","topLevelDomain":[".ax"],"alpha2Code":"AX","alpha3Code":"ALA","callingCodes":["358"],"capital":"Mariehamn"...}
{"name":"Albania","topLevelDomain":[".al"],"alpha2Code":"AL","alpha3Code":"ALB","callingCodes":["355"],"capital":"Tirana"...}
Is it a bug or am I using the function incorrectly? Can someone help me to get the expected result?