Skip to content

Commit

Permalink
feat: add use_topic_schema for Cloud Storage Subscriptions (#1154)
Browse files Browse the repository at this point in the history
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: mukund-ananthu <[email protected]>
Co-authored-by: Anthonios Partheniou <[email protected]>
  • Loading branch information
4 people committed Jul 6, 2024
1 parent 5869a74 commit ec0cc34
Show file tree
Hide file tree
Showing 22 changed files with 5,424 additions and 960 deletions.
256 changes: 87 additions & 169 deletions google/pubsub_v1/services/publisher/async_client.py

Large diffs are not rendered by default.

114 changes: 54 additions & 60 deletions google/pubsub_v1/services/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import re
from typing import (
Dict,
Callable,
Mapping,
MutableMapping,
MutableSequence,
Expand Down Expand Up @@ -579,7 +580,9 @@ def __init__(
self,
*,
credentials: Optional[ga_credentials.Credentials] = None,
transport: Optional[Union[str, PublisherTransport]] = None,
transport: Optional[
Union[str, PublisherTransport, Callable[..., PublisherTransport]]
] = None,
client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None,
client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
) -> None:
Expand All @@ -591,9 +594,11 @@ def __init__(
credentials identify the application to the service; if none
are specified, the client will attempt to ascertain the
credentials from the environment.
transport (Union[str, PublisherTransport]): The
transport to use. If set to None, a transport is chosen
automatically.
transport (Optional[Union[str,PublisherTransport,Callable[..., PublisherTransport]]]):
The transport to use, or a Callable that constructs and returns a new transport.
If a Callable is given, it will be called with the same set of initialization
arguments as used in the PublisherTransport constructor.
If set to None, a transport is chosen automatically.
client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]):
Custom options for the client.
Expand Down Expand Up @@ -699,17 +704,24 @@ def __init__(
api_key_value
)

Transport = type(self).get_transport_class(cast(str, transport))
transport_init: Union[
Type[PublisherTransport], Callable[..., PublisherTransport]
] = (
type(self).get_transport_class(transport)
if isinstance(transport, str) or transport is None
else cast(Callable[..., PublisherTransport], transport)
)
# initialize with the provided callable or the passed in class

emulator_host = os.environ.get("PUBSUB_EMULATOR_HOST")
if emulator_host:
if issubclass(Transport, type(self)._transport_registry["grpc"]):
if issubclass(transport_init, type(self)._transport_registry["grpc"]):
channel = grpc.insecure_channel(target=emulator_host)
else:
channel = grpc.aio.insecure_channel(target=emulator_host)
Transport = functools.partial(Transport, channel=channel)
transport_init = functools.partial(transport_init, channel=channel)

self._transport = Transport(
self._transport = transport_init(
credentials=credentials,
credentials_file=self._client_options.credentials_file,
host=self._api_endpoint,
Expand Down Expand Up @@ -788,19 +800,17 @@ def sample_create_topic():
A topic resource.
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
# - Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([name])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

# Minor optimization to avoid making a copy if the user passes
# in a pubsub.Topic.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
# - Use the request object if provided (there's no risk of modifying the input as
# there are no flattened fields), or create one.
if not isinstance(request, pubsub.Topic):
request = pubsub.Topic(request)
# If we have keyword arguments corresponding to fields on the
Expand Down Expand Up @@ -907,19 +917,17 @@ def sample_update_topic():
A topic resource.
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
# - Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([topic, update_mask])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

# Minor optimization to avoid making a copy if the user passes
# in a pubsub.UpdateTopicRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
# - Use the request object if provided (there's no risk of modifying the input as
# there are no flattened fields), or create one.
if not isinstance(request, pubsub.UpdateTopicRequest):
request = pubsub.UpdateTopicRequest(request)
# If we have keyword arguments corresponding to fields on the
Expand Down Expand Up @@ -1022,19 +1030,17 @@ def sample_publish():
Response for the Publish method.
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
# - Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([topic, messages])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

# Minor optimization to avoid making a copy if the user passes
# in a pubsub.PublishRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
# - Use the request object if provided (there's no risk of modifying the input as
# there are no flattened fields), or create one.
if not isinstance(request, pubsub.PublishRequest):
request = pubsub.PublishRequest(request)
# If we have keyword arguments corresponding to fields on the
Expand Down Expand Up @@ -1127,19 +1133,17 @@ def sample_get_topic():
A topic resource.
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
# - Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([topic])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

# Minor optimization to avoid making a copy if the user passes
# in a pubsub.GetTopicRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
# - Use the request object if provided (there's no risk of modifying the input as
# there are no flattened fields), or create one.
if not isinstance(request, pubsub.GetTopicRequest):
request = pubsub.GetTopicRequest(request)
# If we have keyword arguments corresponding to fields on the
Expand Down Expand Up @@ -1235,19 +1239,17 @@ def sample_list_topics():
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
# - Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([project])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

# Minor optimization to avoid making a copy if the user passes
# in a pubsub.ListTopicsRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
# - Use the request object if provided (there's no risk of modifying the input as
# there are no flattened fields), or create one.
if not isinstance(request, pubsub.ListTopicsRequest):
request = pubsub.ListTopicsRequest(request)
# If we have keyword arguments corresponding to fields on the
Expand Down Expand Up @@ -1354,19 +1356,17 @@ def sample_list_topic_subscriptions():
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
# - Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([topic])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

# Minor optimization to avoid making a copy if the user passes
# in a pubsub.ListTopicSubscriptionsRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
# - Use the request object if provided (there's no risk of modifying the input as
# there are no flattened fields), or create one.
if not isinstance(request, pubsub.ListTopicSubscriptionsRequest):
request = pubsub.ListTopicSubscriptionsRequest(request)
# If we have keyword arguments corresponding to fields on the
Expand Down Expand Up @@ -1477,19 +1477,17 @@ def sample_list_topic_snapshots():
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
# - Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([topic])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

# Minor optimization to avoid making a copy if the user passes
# in a pubsub.ListTopicSnapshotsRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
# - Use the request object if provided (there's no risk of modifying the input as
# there are no flattened fields), or create one.
if not isinstance(request, pubsub.ListTopicSnapshotsRequest):
request = pubsub.ListTopicSnapshotsRequest(request)
# If we have keyword arguments corresponding to fields on the
Expand Down Expand Up @@ -1587,19 +1585,17 @@ def sample_delete_topic():
sent along with the request as metadata.
"""
# Create or coerce a protobuf request object.
# Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
# - Quick check: If we got a request object, we should *not* have
# gotten any keyword arguments that map to the request.
has_flattened_params = any([topic])
if request is not None and has_flattened_params:
raise ValueError(
"If the `request` argument is set, then none of "
"the individual field arguments should be set."
)

# Minor optimization to avoid making a copy if the user passes
# in a pubsub.DeleteTopicRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
# - Use the request object if provided (there's no risk of modifying the input as
# there are no flattened fields), or create one.
if not isinstance(request, pubsub.DeleteTopicRequest):
request = pubsub.DeleteTopicRequest(request)
# If we have keyword arguments corresponding to fields on the
Expand Down Expand Up @@ -1686,10 +1682,8 @@ def sample_detach_subscription():
"""
# Create or coerce a protobuf request object.
# Minor optimization to avoid making a copy if the user passes
# in a pubsub.DetachSubscriptionRequest.
# There's no risk of modifying the input as we've already verified
# there are no flattened fields.
# - Use the request object if provided (there's no risk of modifying the input as
# there are no flattened fields), or create one.
if not isinstance(request, pubsub.DetachSubscriptionRequest):
request = pubsub.DetachSubscriptionRequest(request)

Expand Down
4 changes: 3 additions & 1 deletion google/pubsub_v1/services/publisher/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def __init__(

# Save the scopes.
self._scopes = scopes
if not hasattr(self, "_ignore_credentials"):
self._ignore_credentials: bool = False

# If no credentials are provided, then determine the appropriate
# defaults.
Expand All @@ -100,7 +102,7 @@ def __init__(
credentials, _ = google.auth.load_credentials_from_file(
credentials_file, **scopes_kwargs, quota_project_id=quota_project_id
)
elif credentials is None:
elif credentials is None and not self._ignore_credentials:
credentials, _ = google.auth.default(
**scopes_kwargs, quota_project_id=quota_project_id
)
Expand Down
28 changes: 17 additions & 11 deletions google/pubsub_v1/services/publisher/transports/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(
credentials: Optional[ga_credentials.Credentials] = None,
credentials_file: Optional[str] = None,
scopes: Optional[Sequence[str]] = None,
channel: Optional[grpc.Channel] = None,
channel: Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]] = None,
api_mtls_endpoint: Optional[str] = None,
client_cert_source: Optional[Callable[[], Tuple[bytes, bytes]]] = None,
ssl_channel_credentials: Optional[grpc.ChannelCredentials] = None,
Expand All @@ -74,14 +74,17 @@ def __init__(
credentials identify the application to the service; if none
are specified, the client will attempt to ascertain the
credentials from the environment.
This argument is ignored if ``channel`` is provided.
This argument is ignored if a ``channel`` instance is provided.
credentials_file (Optional[str]): A file with credentials that can
be loaded with :func:`google.auth.load_credentials_from_file`.
This argument is ignored if ``channel`` is provided.
This argument is ignored if a ``channel`` instance is provided.
scopes (Optional(Sequence[str])): A list of scopes. This argument is
ignored if ``channel`` is provided.
channel (Optional[grpc.Channel]): A ``Channel`` instance through
which to make calls.
ignored if a ``channel`` instance is provided.
channel (Optional[Union[grpc.Channel, Callable[..., grpc.Channel]]]):
A ``Channel`` instance through which to make calls, or a Callable
that constructs and returns one. If set to None, ``self.create_channel``
is used to create the channel. If a Callable is given, it will be called
with the same arguments as used in ``self.create_channel``.
api_mtls_endpoint (Optional[str]): Deprecated. The mutual TLS endpoint.
If provided, it overrides the ``host`` argument and tries to create
a mutual TLS channel with client SSL credentials from
Expand All @@ -91,11 +94,11 @@ def __init__(
private key bytes, both in PEM format. It is ignored if
``api_mtls_endpoint`` is None.
ssl_channel_credentials (grpc.ChannelCredentials): SSL credentials
for the grpc channel. It is ignored if ``channel`` is provided.
for the grpc channel. It is ignored if a ``channel`` instance is provided.
client_cert_source_for_mtls (Optional[Callable[[], Tuple[bytes, bytes]]]):
A callback to provide client certificate bytes and private key bytes,
both in PEM format. It is used to configure a mutual TLS channel. It is
ignored if ``channel`` or ``ssl_channel_credentials`` is provided.
ignored if a ``channel`` instance or ``ssl_channel_credentials`` is provided.
quota_project_id (Optional[str]): An optional project to use for billing
and quota.
client_info (google.api_core.gapic_v1.client_info.ClientInfo):
Expand All @@ -121,9 +124,10 @@ def __init__(
if client_cert_source:
warnings.warn("client_cert_source is deprecated", DeprecationWarning)

if channel:
if isinstance(channel, grpc.Channel):
# Ignore credentials if a channel was passed.
credentials = False
credentials = None
self._ignore_credentials = True
# If a channel was explicitly provided, set it.
self._grpc_channel = channel
self._ssl_channel_credentials = None
Expand Down Expand Up @@ -162,7 +166,9 @@ def __init__(
)

if not self._grpc_channel:
self._grpc_channel = type(self).create_channel(
# initialize with the provided callable or the default channel
channel_init = channel or type(self).create_channel
self._grpc_channel = channel_init(
self._host,
# use the credentials which are saved
credentials=self._credentials,
Expand Down
Loading

0 comments on commit ec0cc34

Please sign in to comment.