I am trying to create a Pub/Sub topic and a log sink in Google Cloud. The idea is to create a log sink and assign the role pubsub.publisher to the log sink's service account for the Pub/Sub topic created. The code works fine when run locally, but it fails when bundled into a Docker container. Below is the relevant code snippet and the error message I am encountering.
(deploy-functions) kernal42@space42 KT % podman run --rm -v /Users/kernal42/Downloads/my-project-anp-c94dc8530044.json:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest
INFO:root:Creating topic: projects/my-project-anp/topics/kt-test-local-15
INFO:root:Topic created: projects/my-project-anp/topics/kt-test-local-15
ERROR:root:Sink not found: projects/my-project-anp/sinks/kt-test-local-15
Traceback (most recent call last):
File "/app/main.py", line 41, in setup_pubsub_logsink
existing_sink = config_client.get_sink(request={"sink_name": sink_path})
File "/usr/local/lib/python3.10/site-packages/google/cloud/logging_v2/services/config_service_v2/client.py", line 2199, in get_sink
response = rpc(
File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
return wrapped_func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
return retry_target(
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
_retry_error_helper(
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
raise final_exc from source_exc
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
result = target()
File "/usr/local/lib/python3.10/site-packages/google/api_core/timeout.py", line 130, in func_with_timeout
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.NotFound: 404 Sink kt-test-local-15 does not exist
INFO:root:Creating sink: kt-test-local-15
INFO:root:Created new sink with writer identity: serviceAccount:[email protected]
INFO:root:getting the iam policy for the sink: projects/my-project-anp/sinks/kt-test-local-15
ERROR:root:Error setting up pubsub and logsink: 403 User not authorized to perform this action.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 76, in error_remapped_callable
return callable_(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1181, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.PERMISSION_DENIED
details = "User not authorized to perform this action."
debug_error_string = "UNKNOWN:Error received from peer ipv4:142.250.193.202:443 {grpc_message:"User not authorized to perform this action.", grpc_status:7, created_time:"2025-02-15T05:19:05.783737357+00:00"}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/app/main.py", line 98, in <module>
setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
File "/app/main.py", line 68, in setup_pubsub_logsink
policy = publisher.get_iam_policy(request={"resource": topic_path})
File "/usr/local/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 1940, in get_iam_policy
response = rpc(
File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
return wrapped_func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.PermissionDenied: 403 User not authorized to perform this action.
(deploy-functions) kernal42@space42 KT %
Below is the code I have for above.
main.py
from google.cloud import pubsub_v1
from google.cloud.logging_v2.services.config_service_v2 import ConfigServiceV2Client
from google.cloud import logging_v2
from google.oauth2 import service_account
import logging
def setup_pubsub_logsink(project_id, topic_name, sink_name):
try:
# Initialize credentials
credentials = service_account.Credentials.from_service_account_file(
'/app/key.json'
)
# Initialize clients with explicit credentials
publisher = pubsub_v1.PublisherClient(credentials=credentials)
logging_client = logging_v2.Client(credentials=credentials)
config_client = ConfigServiceV2Client(credentials=credentials)
# Create topic path
topic_path = publisher.topic_path(project_id, topic_name)
# Create topic
logging.info(f"Creating topic: {topic_path}")
try:
topic = publisher.create_topic(request={"name": topic_path})
logging.info(f"Topic created: {topic.name}")
except Exception as e:
if "AlreadyExists" in str(e):
logging.info(f"Topic already exists: {topic_path}")
else:
logging.info(f"Topic already exists: {topic_path}")
# raise
# Create sink
sink_path = f"projects/{project_id}/sinks/{sink_name}"
writer_identity = None
# Check if sink exists
try:
existing_sink = config_client.get_sink(request={"sink_name": sink_path})
writer_identity = existing_sink.writer_identity
logging.info(f"Sink already exists with writer identity: {writer_identity}")
except Exception as e:
if "NotFound" not in str(e):
# raise
logging.exception(f"Sink not found: {sink_path}")
# Create new sink if it doesn't exist
logging.info(f"Creating sink: {sink_name}")
sink = {
"name": sink_name,
"destination": f"pubsub.googleapis/{topic_path}",
"filter": "severity >= WARNING" # Adjust filter as needed
}
new_sink = config_client.create_sink(
request={
"parent": f"projects/{project_id}",
"sink": sink,
}
)
writer_identity = new_sink.writer_identity
logging.info(f"Created new sink with writer identity: {writer_identity}")
logging.info(f"getting the iam policy for the sink: {sink_path}")
# Grant publisher permissions to the sink's writer identity
policy = publisher.get_iam_policy(request={"resource": topic_path})
policy.bindings.append({
"role": "roles/pubsub.publisher",
"members": [writer_identity]
})
logging.info(f"iam policy for the sink: {policy}")
publisher.set_iam_policy(request={
"resource": topic_path,
"policy": policy
})
logging.info("Successfully set up topic and sink with proper permissions")
except Exception as e:
logging.error(f"Error setting up pubsub and logsink: {str(e)}")
raise
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
PROJECT_ID = "<your-project-id>"
TOPIC_NAME = "kt-test-local-15"
SINK_NAME = "kt-test-local-15"
setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
Ensure you replace the project ID with the correct ID in above code snippet.
Dockerfile
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
# Install required packages
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy your service account key and code
COPY main.py .
# Note: key.json should be mounted at runtime, not built into the image
CMD ["python", "main.py"]
requirements.txt
google-cloud-pubsub
google-cloud-logging
Additional Information:
- The service account key (key.json) is securely mounted at runtime and not included in the Docker image.
- The code works perfectly when run locally but fails when run inside the Docker container.
(How I am mounting the Service Account key at runtime and how )we can reproduce using Docker/podman with the following commands:
- Build the image:
podman build -f Dockerfile -t rtsink:latest .
- Run the container:
podman run --rm -v <complete-path-to-your-sa-account-key.json>:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest
I am trying to create a Pub/Sub topic and a log sink in Google Cloud. The idea is to create a log sink and assign the role pubsub.publisher to the log sink's service account for the Pub/Sub topic created. The code works fine when run locally, but it fails when bundled into a Docker container. Below is the relevant code snippet and the error message I am encountering.
(deploy-functions) kernal42@space42 KT % podman run --rm -v /Users/kernal42/Downloads/my-project-anp-c94dc8530044.json:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest
INFO:root:Creating topic: projects/my-project-anp/topics/kt-test-local-15
INFO:root:Topic created: projects/my-project-anp/topics/kt-test-local-15
ERROR:root:Sink not found: projects/my-project-anp/sinks/kt-test-local-15
Traceback (most recent call last):
File "/app/main.py", line 41, in setup_pubsub_logsink
existing_sink = config_client.get_sink(request={"sink_name": sink_path})
File "/usr/local/lib/python3.10/site-packages/google/cloud/logging_v2/services/config_service_v2/client.py", line 2199, in get_sink
response = rpc(
File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
return wrapped_func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
return retry_target(
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
_retry_error_helper(
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
raise final_exc from source_exc
File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
result = target()
File "/usr/local/lib/python3.10/site-packages/google/api_core/timeout.py", line 130, in func_with_timeout
return func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.NotFound: 404 Sink kt-test-local-15 does not exist
INFO:root:Creating sink: kt-test-local-15
INFO:root:Created new sink with writer identity: serviceAccount:[email protected]
INFO:root:getting the iam policy for the sink: projects/my-project-anp/sinks/kt-test-local-15
ERROR:root:Error setting up pubsub and logsink: 403 User not authorized to perform this action.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 76, in error_remapped_callable
return callable_(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1181, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.10/site-packages/grpc/_channel.py", line 1006, in _end_unary_response_blocking
raise _InactiveRpcError(state) # pytype: disable=not-instantiable
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.PERMISSION_DENIED
details = "User not authorized to perform this action."
debug_error_string = "UNKNOWN:Error received from peer ipv4:142.250.193.202:443 {grpc_message:"User not authorized to perform this action.", grpc_status:7, created_time:"2025-02-15T05:19:05.783737357+00:00"}"
>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/app/main.py", line 98, in <module>
setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
File "/app/main.py", line 68, in setup_pubsub_logsink
policy = publisher.get_iam_policy(request={"resource": topic_path})
File "/usr/local/lib/python3.10/site-packages/google/pubsub_v1/services/publisher/client.py", line 1940, in get_iam_policy
response = rpc(
File "/usr/local/lib/python3.10/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
return wrapped_func(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.PermissionDenied: 403 User not authorized to perform this action.
(deploy-functions) kernal42@space42 KT %
Below is the code I have for above.
main.py
from google.cloud import pubsub_v1
from google.cloud.logging_v2.services.config_service_v2 import ConfigServiceV2Client
from google.cloud import logging_v2
from google.oauth2 import service_account
import logging
def setup_pubsub_logsink(project_id, topic_name, sink_name):
try:
# Initialize credentials
credentials = service_account.Credentials.from_service_account_file(
'/app/key.json'
)
# Initialize clients with explicit credentials
publisher = pubsub_v1.PublisherClient(credentials=credentials)
logging_client = logging_v2.Client(credentials=credentials)
config_client = ConfigServiceV2Client(credentials=credentials)
# Create topic path
topic_path = publisher.topic_path(project_id, topic_name)
# Create topic
logging.info(f"Creating topic: {topic_path}")
try:
topic = publisher.create_topic(request={"name": topic_path})
logging.info(f"Topic created: {topic.name}")
except Exception as e:
if "AlreadyExists" in str(e):
logging.info(f"Topic already exists: {topic_path}")
else:
logging.info(f"Topic already exists: {topic_path}")
# raise
# Create sink
sink_path = f"projects/{project_id}/sinks/{sink_name}"
writer_identity = None
# Check if sink exists
try:
existing_sink = config_client.get_sink(request={"sink_name": sink_path})
writer_identity = existing_sink.writer_identity
logging.info(f"Sink already exists with writer identity: {writer_identity}")
except Exception as e:
if "NotFound" not in str(e):
# raise
logging.exception(f"Sink not found: {sink_path}")
# Create new sink if it doesn't exist
logging.info(f"Creating sink: {sink_name}")
sink = {
"name": sink_name,
"destination": f"pubsub.googleapis/{topic_path}",
"filter": "severity >= WARNING" # Adjust filter as needed
}
new_sink = config_client.create_sink(
request={
"parent": f"projects/{project_id}",
"sink": sink,
}
)
writer_identity = new_sink.writer_identity
logging.info(f"Created new sink with writer identity: {writer_identity}")
logging.info(f"getting the iam policy for the sink: {sink_path}")
# Grant publisher permissions to the sink's writer identity
policy = publisher.get_iam_policy(request={"resource": topic_path})
policy.bindings.append({
"role": "roles/pubsub.publisher",
"members": [writer_identity]
})
logging.info(f"iam policy for the sink: {policy}")
publisher.set_iam_policy(request={
"resource": topic_path,
"policy": policy
})
logging.info("Successfully set up topic and sink with proper permissions")
except Exception as e:
logging.error(f"Error setting up pubsub and logsink: {str(e)}")
raise
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
PROJECT_ID = "<your-project-id>"
TOPIC_NAME = "kt-test-local-15"
SINK_NAME = "kt-test-local-15"
setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
Ensure you replace the project ID with the correct ID in above code snippet.
Dockerfile
# Dockerfile
FROM python:3.10-slim
WORKDIR /app
# Install required packages
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy your service account key and code
COPY main.py .
# Note: key.json should be mounted at runtime, not built into the image
CMD ["python", "main.py"]
requirements.txt
google-cloud-pubsub
google-cloud-logging
Additional Information:
- The service account key (key.json) is securely mounted at runtime and not included in the Docker image.
- The code works perfectly when run locally but fails when run inside the Docker container.
(How I am mounting the Service Account key at runtime and how )we can reproduce using Docker/podman with the following commands:
- Build the image:
podman build -f Dockerfile -t rtsink:latest .
- Run the container:
podman run --rm -v <complete-path-to-your-sa-account-key.json>:/app/key.json:ro --env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json localhost/rtsink:latest
1 Answer
Reset to default 1I'm not convinced that the code works outside of a container.
I repro'd your issue and received an error:
Expected a message object, but got {'role': 'roles/pubsub.publisher', 'members': ['serviceAccount:[email protected]']}
Using the IAM library's protobuf classes (iam_policy_pb2.SetIamPolicyRequest
,policy_pb2.Binding
) appears (!?) to work correctly:
from google.cloud import pubsub_v1
from google.cloud.logging_v2.services.config_service_v2 import ConfigServiceV2Client
from google.cloud import logging_v2
from google.oauth2 import service_account
from google.iam.v1 import iam_policy_pb2,policy_pb2
import logging
import os
def setup_pubsub_logsink(project_id, topic_name, sink_name):
try:
# Initialize clients with explicit credentials
publisher = pubsub_v1.PublisherClient()
logging_client = logging_v2.Client()
config_client = ConfigServiceV2Client()
# Create topic path
topic_path = publisher.topic_path(project_id, topic_name)
# Create topic
logging.info(f"Creating topic: {topic_path}")
try:
topic = publisher.create_topic(request={"name": topic_path})
logging.info(f"Topic created: {topic.name}")
except Exception as e:
if "AlreadyExists" in str(e):
logging.info(f"Topic already exists: {topic_path}")
else:
logging.info(f"Topic already exists: {topic_path}")
# raise
# Create sink
sink_path = f"projects/{project_id}/sinks/{sink_name}"
writer_identity = None
# Check if sink exists
try:
existing_sink = config_client.get_sink(request={"sink_name": sink_path})
writer_identity = existing_sink.writer_identity
logging.info(f"Sink already exists with writer identity: {writer_identity}")
except Exception as e:
if "NotFound" not in str(e):
# raise
logging.exception(f"Sink not found: {sink_path}")
# Create new sink if it doesn't exist
logging.info(f"Creating sink: {sink_name}")
sink = {
"name": sink_name,
"destination": f"pubsub.googleapis/{topic_path}",
"filter": "severity >= WARNING" # Adjust filter as needed
}
new_sink = config_client.create_sink(
request={
"parent": f"projects/{project_id}",
"sink": sink,
}
)
writer_identity = new_sink.writer_identity
logging.info(f"Created new sink with writer identity: {writer_identity}")
logging.info(f"getting the iam policy for the sink: {sink_path}")
# Grant publisher permissions to the sink's writer identity
policy = publisher.get_iam_policy(request={
"resource": topic_path,
})
binding = policy_pb2.Binding(
role="roles/pubsub.publisher",
members=[writer_identity]
)
policy.bindings.append(binding)
logging.info(f"iam policy for the sink: {policy}")
request = iam_policy_pb2.SetIamPolicyRequest(
resource=topic_path,
policy=policy,
)
publisher.set_iam_policy(request=request)
logging.info("Successfully set up topic and sink with proper permissions")
except Exception as e:
logging.error(f"Error setting up pubsub and logsink: {str(e)}")
raise
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
PROJECT_ID = os.getenv("PROJECT")
TOPIC_NAME = os.getenv("TOPIC")
SINK_NAME = os.getenv("SINK")
setup_pubsub_logsink(PROJECT_ID, TOPIC_NAME, SINK_NAME)
PermissionDenied
) suggests that you're doing this incorrectly. – DazWilkin Commented Feb 14 at 16:43--env=GOOGLE_APPLICATION_CREDENTIALS=/app/key.json
– DazWilkin Commented Feb 14 at 20:02