最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

google cloud platform - Issue with Creating PubSub and Log Sink in Docker Container - Stack Overflow

programmeradmin3浏览0评论

I am trying to create a Pub/Sub topic and a log sink in Google Cloud. The idea is to create a log sink and assign the role pubsub.publisher to the log sink's service account for the Pub/Sub topic created. The code works fine when run locally, but it fails when bundled into a Docker container. Below is the relevant code snippet and the error message I am encountering.

(deploy-functions) kernal42@space42 KT % podman run --rm -v /Users/kernal42/Downloads/my-project-anp-c94dc8530044.json:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest
INFO:root:Creating topic: projects/my-project-anp/topics/kt-test-local-15
INFO:root:Topic created: projects/my-project-anp/topics/kt-test-local-15
ERROR:root:Sink not found: projects/my-project-anp/sinks/kt-test-local-15
Traceback (most recent call last):
  File "/app/main.py", line 41, in setup_pubsub_logsink
    existing_sink = config_client.get_sink(request={"sink_name": sink_path})
  File "/usr/local/lib/python3.10/site-packages/google/cloud/logging_v2/services/config_service_v2/client.py", line 2199, in get_sink
    response = rpc(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
    return wrapped_func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
  File "/usr/local/lib/python3.10/site-packages/google/api_core/timeout.py", line 130, in func_with_timeout
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.NotFound: 404 Sink kt-test-local-15 does not exist
INFO:root:Creating sink: kt-test-local-15
INFO:root:Created new sink with writer identity: serviceAccount:[email protected]
INFO:root:getting the iam policy for the sink: projects/my-project-anp/sinks/kt-test-local-15
ERROR:root:Error setting up pubsub and logsink: 403 User not authorized to perform this action.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 76, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1181, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.PERMISSION_DENIED
    details = "User not authorized to perform this action."
    debug_error_string = "UNKNOWN:Error received from peer ipv4:142.250.193.202:443 {grpc_message:"User not authorized to perform this action.", grpc_status:7, created_time:"2025-02-15T05:19:05.783737357+00:00"}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/app/main.py", line 98, in <module>
    setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
  File "/app/main.py", line 68, in setup_pubsub_logsink
    policy = publisher.get_iam_policy(request={"resource": topic_path})
  File "/usr/local/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 1940, in get_iam_policy
    response = rpc(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
    return wrapped_func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.PermissionDenied: 403 User not authorized to perform this action.
(deploy-functions) kernal42@space42 KT % 

Below is the code I have for above.

main.py

from google.cloud import pubsub_v1
from google.cloud.logging_v2.services.config_service_v2 import ConfigServiceV2Client
from google.cloud import logging_v2
from google.oauth2 import service_account
import logging

def setup_pubsub_logsink(project_id, topic_name, sink_name):
    try:
        # Initialize credentials
        credentials = service_account.Credentials.from_service_account_file(
            '/app/key.json'
        )
        
        # Initialize clients with explicit credentials
        publisher = pubsub_v1.PublisherClient(credentials=credentials)
        logging_client = logging_v2.Client(credentials=credentials)
        config_client = ConfigServiceV2Client(credentials=credentials)
        
        # Create topic path
        topic_path = publisher.topic_path(project_id, topic_name)
        
        # Create topic
        logging.info(f"Creating topic: {topic_path}")
        try:
            topic = publisher.create_topic(request={"name": topic_path})
            logging.info(f"Topic created: {topic.name}")
        except Exception as e:
            if "AlreadyExists" in str(e):
                logging.info(f"Topic already exists: {topic_path}")
            else:
                logging.info(f"Topic already exists: {topic_path}")
                # raise
        
        # Create sink
        sink_path = f"projects/{project_id}/sinks/{sink_name}"
        writer_identity = None
        
        # Check if sink exists
        try:
            existing_sink = config_client.get_sink(request={"sink_name": sink_path})
            writer_identity = existing_sink.writer_identity
            logging.info(f"Sink already exists with writer identity: {writer_identity}")
        except Exception as e:
            if "NotFound" not in str(e):
                # raise
                logging.exception(f"Sink not found: {sink_path}")
            
            # Create new sink if it doesn't exist
            logging.info(f"Creating sink: {sink_name}")
            sink = {
                "name": sink_name,
                "destination": f"pubsub.googleapis/{topic_path}",
                "filter": "severity >= WARNING"  # Adjust filter as needed
            }
            
            new_sink = config_client.create_sink(
                request={
                    "parent": f"projects/{project_id}",
                    "sink": sink,
                }
            )
            writer_identity = new_sink.writer_identity
            logging.info(f"Created new sink with writer identity: {writer_identity}")
        
        logging.info(f"getting the iam policy for the sink: {sink_path}")
        # Grant publisher permissions to the sink's writer identity
        policy = publisher.get_iam_policy(request={"resource": topic_path})
        policy.bindings.append({
            "role": "roles/pubsub.publisher",
            "members": [writer_identity]
        })
        logging.info(f"iam policy for the sink: {policy}")
        
        publisher.set_iam_policy(request={
            "resource": topic_path,
            "policy": policy
        })
        
        logging.info("Successfully set up topic and sink with proper permissions")
        
    except Exception as e:
        logging.error(f"Error setting up pubsub and logsink: {str(e)}")
        raise

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    
    PROJECT_ID = "<your-project-id>"
    TOPIC_NAME = "kt-test-local-15"
    SINK_NAME = "kt-test-local-15"
    
    setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)

Ensure you replace the project ID with the correct ID in above code snippet.

Dockerfile

# Dockerfile
FROM python:3.10-slim

WORKDIR /app

# Install required packages
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy your service account key and code
COPY main.py .
# Note: key.json should be mounted at runtime, not built into the image

CMD ["python", "main.py"]

requirements.txt

google-cloud-pubsub
google-cloud-logging

Additional Information:

  • The service account key (key.json) is securely mounted at runtime and not included in the Docker image.
  • The code works perfectly when run locally but fails when run inside the Docker container.

(How I am mounting the Service Account key at runtime and how )we can reproduce using Docker/podman with the following commands:

  1. Build the image: podman build -f Dockerfile -t rtsink:latest .
  2. Run the container: podman run --rm -v <complete-path-to-your-sa-account-key.json>:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest

I am trying to create a Pub/Sub topic and a log sink in Google Cloud. The idea is to create a log sink and assign the role pubsub.publisher to the log sink's service account for the Pub/Sub topic created. The code works fine when run locally, but it fails when bundled into a Docker container. Below is the relevant code snippet and the error message I am encountering.

(deploy-functions) kernal42@space42 KT % podman run --rm -v /Users/kernal42/Downloads/my-project-anp-c94dc8530044.json:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest
INFO:root:Creating topic: projects/my-project-anp/topics/kt-test-local-15
INFO:root:Topic created: projects/my-project-anp/topics/kt-test-local-15
ERROR:root:Sink not found: projects/my-project-anp/sinks/kt-test-local-15
Traceback (most recent call last):
  File "/app/main.py", line 41, in setup_pubsub_logsink
    existing_sink = config_client.get_sink(request={"sink_name": sink_path})
  File "/usr/local/lib/python3.10/site-packages/google/cloud/logging_v2/services/config_service_v2/client.py", line 2199, in get_sink
    response = rpc(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
    return wrapped_func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
  File "/usr/local/lib/python3.10/site-packages/google/api_core/timeout.py", line 130, in func_with_timeout
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.NotFound: 404 Sink kt-test-local-15 does not exist
INFO:root:Creating sink: kt-test-local-15
INFO:root:Created new sink with writer identity: serviceAccount:[email protected]
INFO:root:getting the iam policy for the sink: projects/my-project-anp/sinks/kt-test-local-15
ERROR:root:Error setting up pubsub and logsink: 403 User not authorized to perform this action.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 76, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1181, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
    raise _InactiveRpcError(state)  # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
    status = StatusCode.PERMISSION_DENIED
    details = "User not authorized to perform this action."
    debug_error_string = "UNKNOWN:Error received from peer ipv4:142.250.193.202:443 {grpc_message:"User not authorized to perform this action.", grpc_status:7, created_time:"2025-02-15T05:19:05.783737357+00:00"}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/app/main.py", line 98, in <module>
    setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
  File "/app/main.py", line 68, in setup_pubsub_logsink
    policy = publisher.get_iam_policy(request={"resource": topic_path})
  File "/usr/local/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 1940, in get_iam_policy
    response = rpc(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
    return wrapped_func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.PermissionDenied: 403 User not authorized to perform this action.
(deploy-functions) kernal42@space42 KT % 

Below is the code I have for above.

main.py

from google.cloud import pubsub_v1
from google.cloud.logging_v2.services.config_service_v2 import ConfigServiceV2Client
from google.cloud import logging_v2
from google.oauth2 import service_account
import logging

def setup_pubsub_logsink(project_id, topic_name, sink_name):
    try:
        # Initialize credentials
        credentials = service_account.Credentials.from_service_account_file(
            '/app/key.json'
        )
        
        # Initialize clients with explicit credentials
        publisher = pubsub_v1.PublisherClient(credentials=credentials)
        logging_client = logging_v2.Client(credentials=credentials)
        config_client = ConfigServiceV2Client(credentials=credentials)
        
        # Create topic path
        topic_path = publisher.topic_path(project_id, topic_name)
        
        # Create topic
        logging.info(f"Creating topic: {topic_path}")
        try:
            topic = publisher.create_topic(request={"name": topic_path})
            logging.info(f"Topic created: {topic.name}")
        except Exception as e:
            if "AlreadyExists" in str(e):
                logging.info(f"Topic already exists: {topic_path}")
            else:
                logging.info(f"Topic already exists: {topic_path}")
                # raise
        
        # Create sink
        sink_path = f"projects/{project_id}/sinks/{sink_name}"
        writer_identity = None
        
        # Check if sink exists
        try:
            existing_sink = config_client.get_sink(request={"sink_name": sink_path})
            writer_identity = existing_sink.writer_identity
            logging.info(f"Sink already exists with writer identity: {writer_identity}")
        except Exception as e:
            if "NotFound" not in str(e):
                # raise
                logging.exception(f"Sink not found: {sink_path}")
            
            # Create new sink if it doesn't exist
            logging.info(f"Creating sink: {sink_name}")
            sink = {
                "name": sink_name,
                "destination": f"pubsub.googleapis/{topic_path}",
                "filter": "severity >= WARNING"  # Adjust filter as needed
            }
            
            new_sink = config_client.create_sink(
                request={
                    "parent": f"projects/{project_id}",
                    "sink": sink,
                }
            )
            writer_identity = new_sink.writer_identity
            logging.info(f"Created new sink with writer identity: {writer_identity}")
        
        logging.info(f"getting the iam policy for the sink: {sink_path}")
        # Grant publisher permissions to the sink's writer identity
        policy = publisher.get_iam_policy(request={"resource": topic_path})
        policy.bindings.append({
            "role": "roles/pubsub.publisher",
            "members": [writer_identity]
        })
        logging.info(f"iam policy for the sink: {policy}")
        
        publisher.set_iam_policy(request={
            "resource": topic_path,
            "policy": policy
        })
        
        logging.info("Successfully set up topic and sink with proper permissions")
        
    except Exception as e:
        logging.error(f"Error setting up pubsub and logsink: {str(e)}")
        raise

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    
    PROJECT_ID = "<your-project-id>"
    TOPIC_NAME = "kt-test-local-15"
    SINK_NAME = "kt-test-local-15"
    
    setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)

Ensure you replace the project ID with the correct ID in above code snippet.

Dockerfile

# Dockerfile
FROM python:3.10-slim

WORKDIR /app

# Install required packages
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy your service account key and code
COPY main.py .
# Note: key.json should be mounted at runtime, not built into the image

CMD ["python", "main.py"]

requirements.txt

google-cloud-pubsub
google-cloud-logging

Additional Information:

  • The service account key (key.json) is securely mounted at runtime and not included in the Docker image.
  • The code works perfectly when run locally but fails when run inside the Docker container.

(How I am mounting the Service Account key at runtime and how )we can reproduce using Docker/podman with the following commands:

  1. Build the image: podman build -f Dockerfile -t rtsink:latest .
  2. Run the container: podman run --rm -v <complete-path-to-your-sa-account-key.json>:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest
Share Improve this question edited Feb 15 at 5:33 kernal42 asked Feb 14 at 14:39 kernal42kernal42 1551 silver badge11 bronze badges 7
  • Please include details as to how you're mounting the Service Account key at runtime because the evidence (PermissionDenied) suggests that you're doing this incorrectly. – DazWilkin Commented Feb 14 at 16:43
  • Added the details in the query description on how I am mounting the Service Account key at runtime and how we can reproduce using Docker/podman. – kernal42 Commented Feb 14 at 19:05
  • 1 Not only do you want to mount the Service Account key but you must also add it the environment variable used by ADC --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json – DazWilkin Commented Feb 14 at 20:02
  • credentials = service_account.Credentials.from_service_account_file( '/app/key.json' ) This line is from the main.py where we are using the credentials explicitly and not implicitly (via env=GOOGLE_APPLICATION_CREDENTIALS) hence adding/mounting this env is not needed. – kernal42 Commented Feb 15 at 9:13
  • I've updated my query description with the full error logs. The logs indicate that the topics and sink are being created and, when we try to retrieve the IAM permissions (using the function get_iam_policy), we encounter a 403 error. Some of the cloud services being created indicate that there is no problem accessing the SA Account secret by the code/pod at runtime but we are seeing GRPC-related errors while retrieving the IAM policy for the Pub/Sub topic – kernal42 Commented Feb 15 at 9:14
 |  Show 2 more comments

1 Answer 1

Reset to default 1

I'm not convinced that the code works outside of a container.

I repro'd your issue and received an error:

Expected a message object, but got {'role': 'roles/pubsub.publisher', 'members': ['serviceAccount:[email protected]']}

Using the IAM library's protobuf classes (iam_policy_pb2.SetIamPolicyRequest,policy_pb2.Binding) appears (!?) to work correctly:

from google.cloud import pubsub_v1
from google.cloud.logging_v2.services.config_service_v2 import ConfigServiceV2Client
from google.cloud import logging_v2
from google.oauth2 import service_account
from google.iam.v1 import iam_policy_pb2,policy_pb2
import logging
import os


def setup_pubsub_logsink(project_id, topic_name, sink_name):
    try:        
        # Initialize clients with explicit credentials
        publisher = pubsub_v1.PublisherClient()
        logging_client = logging_v2.Client()
        config_client = ConfigServiceV2Client()
        
        # Create topic path
        topic_path = publisher.topic_path(project_id, topic_name)
        
        # Create topic
        logging.info(f"Creating topic: {topic_path}")
        try:
            topic = publisher.create_topic(request={"name": topic_path})
            logging.info(f"Topic created: {topic.name}")
        except Exception as e:
            if "AlreadyExists" in str(e):
                logging.info(f"Topic already exists: {topic_path}")
            else:
                logging.info(f"Topic already exists: {topic_path}")
                # raise
        
        # Create sink
        sink_path = f"projects/{project_id}/sinks/{sink_name}"
        writer_identity = None
        
        # Check if sink exists
        try:
            existing_sink = config_client.get_sink(request={"sink_name": sink_path})
            writer_identity = existing_sink.writer_identity
            logging.info(f"Sink already exists with writer identity: {writer_identity}")
        except Exception as e:
            if "NotFound" not in str(e):
                # raise
                logging.exception(f"Sink not found: {sink_path}")
            
            # Create new sink if it doesn't exist
            logging.info(f"Creating sink: {sink_name}")
            sink = {
                "name": sink_name,
                "destination": f"pubsub.googleapis/{topic_path}",
                "filter": "severity >= WARNING"  # Adjust filter as needed
            }
            
            new_sink = config_client.create_sink(
                request={
                    "parent": f"projects/{project_id}",
                    "sink": sink,
                }
            )
            writer_identity = new_sink.writer_identity
            logging.info(f"Created new sink with writer identity: {writer_identity}")
        
        logging.info(f"getting the iam policy for the sink: {sink_path}")
        # Grant publisher permissions to the sink's writer identity
        policy = publisher.get_iam_policy(request={
            "resource": topic_path,
        })

        binding = policy_pb2.Binding(
            role="roles/pubsub.publisher",
            members=[writer_identity]
        )
        policy.bindings.append(binding)
        logging.info(f"iam policy for the sink: {policy}")
        
        request = iam_policy_pb2.SetIamPolicyRequest(
            resource=topic_path,
            policy=policy,
        )
        publisher.set_iam_policy(request=request)
        
        logging.info("Successfully set up topic and sink with proper permissions")
        
    except Exception as e:
        logging.error(f"Error setting up pubsub and logsink: {str(e)}")
        raise

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    
    PROJECT_ID = os.getenv("PROJECT")
    TOPIC_NAME = os.getenv("TOPIC")
    SINK_NAME = os.getenv("SINK")
    
    setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
发布评论

评论列表(0)

  1. 暂无评论