te')); return $arr; } /* 遍历用户所有主题 * @param $uid 用户ID * @param int $page 页数 * @param int $pagesize 每页记录条数 * @param bool $desc 排序方式 TRUE降序 FALSE升序 * @param string $key 返回的数组用那一列的值作为 key * @param array $col 查询哪些列 */ function thread_tid_find_by_uid($uid, $page = 1, $pagesize = 1000, $desc = TRUE, $key = 'tid', $col = array()) { if (empty($uid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('uid' => $uid), array('tid' => $orderby), $page, $pagesize, $key, $col); return $arr; } // 遍历栏目下tid 支持数组 $fid = array(1,2,3) function thread_tid_find_by_fid($fid, $page = 1, $pagesize = 1000, $desc = TRUE) { if (empty($fid)) return array(); $orderby = TRUE == $desc ? -1 : 1; $arr = thread_tid__find($cond = array('fid' => $fid), array('tid' => $orderby), $page, $pagesize, 'tid', array('tid', 'verify_date')); return $arr; } function thread_tid_delete($tid) { if (empty($tid)) return FALSE; $r = thread_tid__delete(array('tid' => $tid)); return $r; } function thread_tid_count() { $n = thread_tid__count(); return $n; } // 统计用户主题数 大数量下严谨使用非主键统计 function thread_uid_count($uid) { $n = thread_tid__count(array('uid' => $uid)); return $n; } // 统计栏目主题数 大数量下严谨使用非主键统计 function thread_fid_count($fid) { $n = thread_tid__count(array('fid' => $fid)); return $n; } ?>amazon s3 - Sinking to Hudi Table by using Spark and Flink together into the same S3 folder - Stack Overflow
最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

amazon s3 - Sinking to Hudi Table by using Spark and Flink together into the same S3 folder - Stack Overflow

programmeradmin1浏览0评论

I have one use case.

Sink(S1) -> I have written a job in Spark that is sinking the data from OpenSearch to S3.

Sink(S2) -> I have another job which is sinking the data from Kafka to S3 into the same folder as S1

Both are working fine, but if I do the sinking from S1, I can see it has sunk all the data correctly, but when I am sinking from S2, it cannot sink the data into the same folder. But using a different folder can sink the data despite having the same schema and data format.

I have check the hoodie.properties file for both of them, there are a lot of properties which are saved by default and those are different also

1st image has spark content and 2nd has flink content in the S3 folder

Bothe are MERGE_ON_READ, for the S1 here is the hoodie config

hudi_options = {
    "hoodie.table.name": args["HUDI_TABLE_NAME"],
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.schema.on.read.enable": "true",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.datasource.write.precombine.field": "time",
    "hoodie.datasource.write.partitionpath.field": "customer_uuid,year,month",
    "hoodie.datasource.write.recordkey.field": "event_id",
    "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
    "hoodie.database.name": "default_database",
}

For S2 sink hoodie config

(" +
            "  'connector' = 'hudi'," +
            "  'path' = '" + S3_PATH + "'," +
            "  'table.type' = 'MERGE_ON_READ'," +
            "  'hoodie.table.name' = '" + HUDI_TABLE_NAME + "'," +
            "  'hoodie.datasource.write.recordkey.field' = 'event_id'," +
            "  'hoodie.datasource.write.partitionpath.field' = 'customer_uuid,year,month'," +
            "  'hoodie.datasource.write.precombine.field' = 'time'," +
            "  'hoodie.datasource.write.hive_style_partitioning' = 'true'," +
            "  'hoodie.write.concurrency.mode' = 'optimistic_concurrency_control'," +
            "  'hoodie.schema.on.read.enable' = 'true'," +
            "  'hoodiepaction.payload.class' = '.apache.hudimon.model.DefaultHoodieRecordPayload'," +
            "  'hoodie.archivelog.folder' = 'history'," +
            "  'hoodie.timeline.path' = 'timeline'," +
            "  'hoodie.table.base.file.format' = 'PARQUET'," +
            "  'hoodie.table.metadata.partitions' = 'files'," +
            "  'hoodie.table.keygenerator.type' = 'COMPLEX'" +
            ")";

What to do so that both can use the same S3 folder with no data loss.

I have also noticed one thing that, if I do the sinking via flink first, it will do it properly, but into the same folder if I run the glue job, it will delete all the content and then replace it with data of flink. While visa-versa is giving error.

Thanks.

发布评论

评论列表(0)

  1. 暂无评论