i am using jdbc with dbcp2 in java for creating Nifi Custom processor my goal is to achieve good performance by executing queries in batch (i.e: INSERT, UPDATE, DELETE) and i have written code in java for executing in batches but in the monitoring tab of Snowflake i can see these queries and executing one by one or record by record instead of going in a single batch or in batch of provided size.
// dbcp2 basic datasource
private volatile BasicDataSource dataSource;
public Connection getConnection() throws ProcessException {
try {
return dataSource.getConnection();
} catch (final SQLException e) {
String errorMessage = e.getMessage().toLowerCase();
if (errorMessage.contains("authentication failed")) {
throw new ProcessException("Authentication Failed, Please validate Username and Password.");
} else {
// logger.info("SQL Exception: " + e.getMessage());
throw new ProcessException(e.getMessage());
}
}
}
try {
dataSource.setDriverClassName(drvClass);
dataSource.setDriverClassLoader(getDriverClassLoader(DBUrl, DriverJar));
dataSource.setMaxWaitMillis(maxWaitMillis);
dataSource.setMaxTotal(maxTotal);
dataSource.setUrl(DBUrl);
dataSource.setUsername(user);
dataSource.setPassword(password);
if (validationQuery!=null && !validationQuery.isEmpty()) {
dataSource.setValidationQuery(validationQuery);
dataSource.setTestOnBorrow(true);
}
}
catch (Exception e) {
fileToProcess = session.putAttribute(fileToProcess, "responseMessage", e.getMessage());
session.transfer(fileToProcess, REL_FAILURE);
}
try (Connection conn = getConnection()) {
log.info("Connection Established");
conn.setAutoCommit(false);
...
try (Reader reader = new InputStreamReader(new BOMInputStream(inputStream), StandardCharsets.UTF_8);
CSVParser parser = new CSVParser(reader, CSVFormat.RFC4180.withDelimiter(sqlQueryGenerator.getColDelimiter().charAt(0)))) {
Iterator<CSVRecord> iterator = parser.iterator();
CSVRecord record;
String SQlQuery = null;
try (Statement statement = conn.createStatement()) {
while (iterator.hasNext() && currentRow < (sizeofCSV - footerCount)) {
record = iterator.next();
SQlQuery = InsertQueryGenerator(tableName, record)
// log.info("Query : " + SQlQuery);
statement.addBatch(SQlQuery);
if(++currentBatchCount == batchSize){
try {
log.info(String.valueOf("Current Batch Size: "+currentBatchCount));
statement.executeBatch();
statement.clearBatch();
currentBatchCount = 0;
}
catch (SQLException e) {
throw new RuntimeException(e);
// ResponseHandler_failure(conn,session,fileToProcess, "Error : " + e.getMessage());
}
}
currentRow = record.getRecordNumber();
}
// log.info(String.valueOf(batchSize));
if(currentBatchCount > 0 ){
try {
log.info(String.valueOf("Current Batch Size: "+currentBatchCount));
statement.executeBatch();
statement.clearBatch();
}
catch (SQLException e) {
throw new RuntimeException(e);
// ResponseHandler_failure(conn,session,fileToProcess, "Error in executing batch fo final ones inside SQLException: "+e.getMessage());
}
}
} catch (Exception e) {
errorArray.put(e.getMessage());
// log.error("Error: "+e.getMessage());
}
}
catch (Exception e) {
errorArray.put(e.getMessage());
// log.error("Error: "+e.getMessage());
}
}
catch (Exception e) {
errorArray.put(e.getMessage());
// log.error("Error: "+e.getMessage());
}
if(errorArray.isEmpty()){
ResponseHandler_success(conn,session, fileToProcess, "Success");
}
else {
ResponseHandler_failure(conn,session, fileToProcess, errorArray.toString());
}
i also tried similar code for Postgres DB. it is working fine but in Snowflake it is not inserting in batch where as one by one inserting
i also tried to implement provided method in documentation but still in the snowflake monitoring tab behavior is same, it is execute insert query one by one instead in a batch