te')); return $arr; } /* 遍历用户所有主题 * @param $uid 用户ID * @param int $page 页数 * @param int $pagesize 每页记录条数 * @param bool $desc 排序方式 TRUE降序 FALSE升序 * @param string $key 返回的数组用那一列的值作为 key * @param array $col 查询哪些列 */ function thread_tid_find_by_uid($uid, $page = 1, $pagesize = 1000, $desc = TRUE, $key = 'tid', $col = array()) { if (empty($uid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('uid' => $uid), array('tid' => $orderby), $page, $pagesize, $key, $col); return $arr; } // 遍历栏目下tid 支持数组 $fid = array(1,2,3) function thread_tid_find_by_fid($fid, $page = 1, $pagesize = 1000, $desc = TRUE) { if (empty($fid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('fid' => $fid), array('tid' => $orderby), $page, $pagesize, 'tid', array('tid', 'verify_date')); return $arr; } function thread_tid_delete($tid) { if (empty($tid)) return FALSE; $r = thread_tid__delete(array('tid' => $tid)); return $r; } function thread_tid_count() { $n = thread_tid__count(); return $n; } // 统计用户主题数 大数量下严谨使用非主键统计 function thread_uid_count($uid) { $n = thread_tid__count(array('uid' => $uid)); return $n; } // 统计栏目主题数 大数量下严谨使用非主键统计 function thread_fid_count($fid) { $n = thread_tid__count(array('fid' => $fid)); return $n; } ?>jdbc - batch query execution in snowflake - Stack Overflow
最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

jdbc - batch query execution in snowflake - Stack Overflow

programmeradmin3浏览0评论

i am using jdbc with dbcp2 in java for creating Nifi Custom processor my goal is to achieve good performance by executing queries in batch (i.e: INSERT, UPDATE, DELETE) and i have written code in java for executing in batches but in the monitoring tab of Snowflake i can see these queries and executing one by one or record by record instead of going in a single batch or in batch of provided size.

// dbcp2 basic datasource
private volatile BasicDataSource dataSource;

public Connection getConnection() throws ProcessException {
    try {
        return dataSource.getConnection();
    } catch (final SQLException e) {
        String errorMessage = e.getMessage().toLowerCase();
        if (errorMessage.contains("authentication failed")) {
            throw new ProcessException("Authentication Failed, Please validate Username and Password.");
        } else {
            //                logger.info("SQL Exception: " + e.getMessage());
            throw new ProcessException(e.getMessage());
        }
    }
}

try {
    dataSource.setDriverClassName(drvClass);
    dataSource.setDriverClassLoader(getDriverClassLoader(DBUrl, DriverJar));
    dataSource.setMaxWaitMillis(maxWaitMillis);
    dataSource.setMaxTotal(maxTotal);
    dataSource.setUrl(DBUrl);
    dataSource.setUsername(user);
    dataSource.setPassword(password);
    if (validationQuery!=null && !validationQuery.isEmpty()) {
        dataSource.setValidationQuery(validationQuery);
        dataSource.setTestOnBorrow(true);
    }
}
catch (Exception e) {
    fileToProcess = session.putAttribute(fileToProcess, "responseMessage", e.getMessage());
    session.transfer(fileToProcess, REL_FAILURE);
}

try (Connection conn = getConnection()) {
    log.info("Connection Established");
    conn.setAutoCommit(false);
    ...

        try (Reader reader = new InputStreamReader(new BOMInputStream(inputStream), StandardCharsets.UTF_8);
                CSVParser parser = new CSVParser(reader, CSVFormat.RFC4180.withDelimiter(sqlQueryGenerator.getColDelimiter().charAt(0)))) {

            Iterator<CSVRecord> iterator = parser.iterator();
            CSVRecord record;
            String SQlQuery = null;
            try (Statement statement = conn.createStatement()) {
                while (iterator.hasNext() && currentRow < (sizeofCSV - footerCount)) {
                    record = iterator.next();

                    SQlQuery = InsertQueryGenerator(tableName,  record)
                        //                            log.info("Query : " + SQlQuery);
                        statement.addBatch(SQlQuery);


                    if(++currentBatchCount == batchSize){
                        try {

                            log.info(String.valueOf("Current Batch Size: "+currentBatchCount));
                            statement.executeBatch();
                            statement.clearBatch();
                            currentBatchCount = 0;
                        }
                        catch (SQLException e) {
                            throw new RuntimeException(e);
                            //                                    ResponseHandler_failure(conn,session,fileToProcess, "Error : " + e.getMessage());
                        }
                    }
                    currentRow = record.getRecordNumber();
                }
                //                        log.info(String.valueOf(batchSize));
                if(currentBatchCount > 0 ){
                    try {
                        log.info(String.valueOf("Current Batch Size: "+currentBatchCount));
                        statement.executeBatch();
                        statement.clearBatch();
                    }
                    catch (SQLException e) {
                        throw new RuntimeException(e);
                        //                                ResponseHandler_failure(conn,session,fileToProcess, "Error in executing batch fo final ones inside SQLException: "+e.getMessage());
                    }
                }

            } catch (Exception e) {
                errorArray.put(e.getMessage());
                //                        log.error("Error: "+e.getMessage());
            } 

        }
    catch (Exception e) {
        errorArray.put(e.getMessage());
        //                        log.error("Error: "+e.getMessage());
    } 


}
catch (Exception e) {
    errorArray.put(e.getMessage());
    //                        log.error("Error: "+e.getMessage());
} 

if(errorArray.isEmpty()){
    ResponseHandler_success(conn,session, fileToProcess, "Success");
}
else {
    ResponseHandler_failure(conn,session, fileToProcess, errorArray.toString());
}

i also tried similar code for Postgres DB. it is working fine but in Snowflake it is not inserting in batch where as one by one inserting

i also tried to implement provided method in documentation but still in the snowflake monitoring tab behavior is same, it is execute insert query one by one instead in a batch

发布评论

评论列表(0)

  1. 暂无评论