summaryrefslogtreecommitdiffstats
path: root/utils/grpc_proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'utils/grpc_proxy.py')
-rw-r--r--utils/grpc_proxy.py268
1 files changed, 268 insertions, 0 deletions
diff --git a/utils/grpc_proxy.py b/utils/grpc_proxy.py
new file mode 100644
index 0000000..0215df5
--- /dev/null
+++ b/utils/grpc_proxy.py
@@ -0,0 +1,268 @@
+# Copyright 2017 The LUCI Authors. All rights reserved.
+# Use of this source code is governed under the Apache License, Version 2.0
+# that can be found in the LICENSE file.
+
+"""Common gRPC implementation for Swarming and Isolate"""
+
+import logging
+import os
+import re
+import time
+import urlparse
+from utils import net
+
+# gRPC may not be installed on the worker machine. This is fine, as long as
+# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
+# Full external requirements are: grpcio, certifi.
+try:
+ import grpc
+ from google import auth as google_auth
+ from google.auth.transport import grpc as google_auth_transport_grpc
+ from google.auth.transport import requests as google_auth_transport_requests
+except ImportError as err:
+ grpc = None
+
+
+# If gRPC was successfully imported, try to import certifi as well. This is not
+# actually used anywhere in this module, but if certifi is missing,
+# google.auth.transport will fail (see
+# https://stackoverflow.com/questions/24973326). So checking it here allows us
+# to print out a somewhat-sane error message.
+certifi = None
+if grpc is not None:
+ try:
+ import certifi
+ except ImportError:
+ # Will print out error messages later (ie when we have a logger)
+ pass
+
+
+# How many times to retry a gRPC call
+MAX_GRPC_ATTEMPTS = 30
+
+
+# Longest time to sleep between gRPC calls
+MAX_GRPC_SLEEP = 10.
+
+
+# Start the timeout at three minutes.
+GRPC_TIMEOUT_SEC = 3 * 60
+
+
+def available():
+ """Returns true if gRPC can be used on this host."""
+ return grpc != None
+
+
+class Proxy(object):
+ """Represents a gRPC proxy.
+
+ If the proxy begins with 'https', the returned channel will be secure and
+ authorized using default application credentials - see
+ https://developers.google.com/identity/protocols/application-default-credentials.
+ Currently, we're using Cloud Container Builder scopes for testing; this may
+ change in the future to allow different scopes to be passed in for different
+ channels.
+
+ To use the returned channel to call methods directly, say:
+
+ proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix',
+ myapi_pb2.MyApiStub)
+
+ To make a unary call with retries (recommended):
+
+ proto_output = proxy.call_unary('MyMethod', proto_input)
+
+ To make a unary call without retries, or to pass in a client side stream
+ (proto_input can be an iterator here):
+
+ proto_output = proxy.call_no_retries('MyMethod', proto_input)
+
+ You can also call the stub directly (not recommended, since no errors will be
+ caught or logged):
+
+ proto_output = proxy.stub.MyMethod(proto_input)
+
+ To make a call to a server-side streaming call (these are not retried):
+
+ for response in proxy.get_stream('MyStreaminingMethod', proto_input):
+ <process response>
+
+ To retrieve the prefix:
+
+ prefix = proxy.prefix # returns "prefix/for/resource/names"
+
+ All exceptions are logged using "logging.warning."
+ """
+
+ def __init__(self, proxy, stub_class):
+ self._verbose = os.environ.get('LUCI_GRPC_PROXY_VERBOSE')
+ if self._verbose:
+ logging.info('Enabled verbose mode for %s with stub %s',
+ proxy, stub_class.__name__)
+ # NB: everything in url is unicode; convert to strings where
+ # needed.
+ url = urlparse.urlparse(proxy)
+ if self._verbose:
+ logging.info('Parsed URL for proxy is %r', url)
+ if url.scheme == 'http':
+ self._secure = False
+ elif url.scheme == 'https':
+ self._secure = True
+ else:
+ raise ValueError('gRPC proxy %s must use http[s], not %s' % (
+ proxy, url.scheme))
+ if url.netloc == '':
+ raise ValueError('gRPC proxy is missing hostname: %s' % proxy)
+ self._host = url.netloc
+ self._prefix = url.path
+ if self._prefix.endswith('/'):
+ self._prefix = self._prefix[:-1]
+ if self._prefix.startswith('/'):
+ self._prefix = self._prefix[1:]
+ if url.params != '' or url.fragment != '':
+ raise ValueError('gRPC proxy may not contain params or fragments: %s' %
+ proxy)
+ self._debug_info = ['full proxy name: ' + proxy]
+ self._channel = self._create_channel()
+ self._stub = stub_class(self._channel)
+ logging.info('%s: initialized', self.name)
+ if self._verbose:
+ self._dump_proxy_info()
+
+ @property
+ def prefix(self):
+ return self._prefix
+
+ @property
+ def channel(self):
+ return self._channel
+
+ @property
+ def stub(self):
+ return self._stub
+
+ @property
+ def name(self):
+ security = 'insecure'
+ if self._secure:
+ security = 'secure'
+ return 'gRPC %s proxy %s/%s' % (
+ security, self._host, self._stub.__class__.__name__)
+
+ def call_unary(self, name, request):
+ """Calls a method, waiting if the service is not available.
+
+ Usage: proto_output = proxy.call_unary('MyMethod', proto_input)
+ """
+ for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
+ try:
+ return self.call_no_retries(name, request)
+ except grpc.RpcError as g:
+ if g.code() is not grpc.StatusCode.UNAVAILABLE:
+ raise
+ logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)',
+ self.name, attempt, MAX_GRPC_ATTEMPTS)
+ # Save the error in case we need to return it
+ grpc_error = g
+ time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
+ # If we get here, it must be because we got (and saved) an error
+ assert grpc_error is not None
+ raise grpc_error
+
+ def get_stream(self, name, request):
+ """Calls a server-side streaming method, returning an iterator.
+
+ Usage: for resp in proxy.get_stream('MyMethod', proto_input'):
+ """
+ stream = self.call_no_retries(name, request)
+ while True:
+ # The lambda "next(stream, 1)" will return a protobuf on success, or the
+ # integer 1 if the stream has ended. This allows us to avoid attempting
+ # to catch StopIteration, which gets logged by _wrap_grpc_operation.
+ response = self._wrap_grpc_operation(name + ' pull from stream',
+ lambda: next(stream, 1))
+ if isinstance(response, int):
+ # Iteration is finished
+ return
+ yield response
+
+ def call_no_retries(self, name, request):
+ """Calls a method without any retries.
+
+ Recommended for client-side streaming or nonidempotent unary calls.
+ """
+ method = getattr(self._stub, name)
+ if method is None:
+ raise NameError('%s: "%s" is not a valid method name', self.name, name)
+ return self._wrap_grpc_operation(
+ name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC))
+
+ def _wrap_grpc_operation(self, name, fn):
+ """Wraps a gRPC operation (call or iterator increment) for logging."""
+ if self._verbose:
+ logging.info('%s/%s - starting gRPC operation', self.name, name)
+ try:
+ return fn()
+ except grpc.RpcError as g:
+ logging.warning('\n\nFailure in %s/%s: gRPC error %s', self.name, name, g)
+ self._dump_proxy_info()
+ raise g
+ except Exception as e:
+ logging.warning('\n\nFailure in %s/%s: exception %s', self.name, name, e)
+ self._dump_proxy_info()
+ raise e
+
+ def _dump_proxy_info(self):
+ logging.warning('DETAILED PROXY INFO')
+ logging.warning('prefix = %s', self.prefix)
+ logging.warning('debug info:\n\t%s\n\n',
+ '\n\t'.join(self._debug_info))
+
+ def _create_channel(self):
+ # Make sure grpc was successfully imported
+ assert available()
+
+ if not self._secure:
+ return grpc.insecure_channel(self._host)
+
+ # Authenticate the host.
+ #
+ # You're allowed to override the root certs and server if necessary. For
+ # example, if you're running your proxy on localhost, you'll need to set
+ # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate
+ # for the root CA that your localhost server has used to certify itself, and
+ # the GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to
+ # identify itself. For example, the ROOTS env var might be
+ # "/path/to/roots.crt" while the OVERRIDE env var might be "test_server," if
+ # this is what's used by the server you're running.
+ #
+ # If you're connecting to a real server with real SSL, none of this should
+ # be used.
+ if not certifi:
+ self._debug_info.append('CERTIFI IS NOT PRESENT;' +
+ ' gRPC HTTPS CONNECTIONS MAY FAIL')
+ root_certs = None
+ roots = os.environ.get('LUCI_GRPC_PROXY_TLS_ROOTS')
+ if roots:
+ self._debug_info.append('Overridden root CA: %s' % roots)
+ with open(roots) as f:
+ root_certs = f.read()
+ else:
+ self._debug_info.append('Using default root CAs from certifi')
+ overd = os.environ.get('LUCI_GRPC_PROXY_TLS_OVERRIDE')
+ options = ()
+ if overd:
+ options=(('grpc.ssl_target_name_override', overd),)
+ ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
+
+ # Authenticate the user.
+ scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
+ self._debug_info.append('Scopes are: %r' % scopes)
+ user_creds, _ = google_auth.default(scopes=scopes)
+
+ # Create the channel.
+ request = google_auth_transport_requests.Request()
+ self._debug_info.append('Options are: %r' % options)
+ return google_auth_transport_grpc.secure_authorized_channel(
+ user_creds, request, self._host, ssl_creds, options=options)