summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--isolate_storage.py91
-rw-r--r--utils/grpc_proxy.py268
2 files changed, 281 insertions, 78 deletions
diff --git a/isolate_storage.py b/isolate_storage.py
index a158bbf..ef0d79e 100644
--- a/isolate_storage.py
+++ b/isolate_storage.py
@@ -21,32 +21,15 @@ from utils import net
import isolated_format
-# gRPC may not be installed on the worker machine. This is fine, as long as
-# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
-# Full external requirements are: grpcio, certifi.
try:
- import grpc
- from google import auth as google_auth
- from google.auth.transport import grpc as google_auth_transport_grpc
- from google.auth.transport import requests as google_auth_transport_requests
+ import grpc # for error codes
+ from utils import grpc_proxy
from proto import bytestream_pb2
except ImportError as err:
grpc = None
+ grpc_proxy = None
bytestream_pb2 = None
-# If gRPC is installed, at least give a warning if certifi is not. This is not
-# actually used anywhere in this module, but if certifi is missing,
-# google.auth.transport will fail with
-# https://stackoverflow.com/questions/24973326
-certifi = None
-if grpc is not None:
- try:
- import certifi
- except ImportError:
- # Could not import certifi; gRPC HTTPS connections may fail. This will be
- # logged in IsolateServerGrpc.__init__, since the logger is not configured
- # during the import time.
- pass
# Chunk size to use when reading from network stream.
NET_IO_FILE_CHUNK = 16 * 1024
@@ -533,65 +516,17 @@ class IsolateServerGrpc(StorageApi):
def __init__(self, server, namespace, proxy):
super(IsolateServerGrpc, self).__init__()
logging.info('Using gRPC for Isolate')
- if not certifi:
- logging.warning(
- 'Could not import certifi; gRPC HTTPS connections may fail')
+ # Proxies only support the default-gzip namespace for now.
+ # TODO(aludwin): support other namespaces if necessary
+ assert namespace == 'default-gzip'
self._server = server
self._lock = threading.Lock()
self._memory_use = 0
self._num_pushes = 0
self._already_exists = 0
-
- # Proxies only support the default-gzip namespace for now.
- # TODO(aludwin): support other namespaces if necessary
- assert namespace == 'default-gzip'
+ self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub)
self._namespace = namespace
- # Make sure grpc was successfully imported
- assert grpc
- assert bytestream_pb2
-
- roots = os.environ.get('ISOLATE_GRPC_PROXY_TLS_ROOTS')
- overd = os.environ.get('ISOLATE_GRPC_PROXY_TLS_OVERRIDE')
-
- # The "proxy" envvar must be of the form:
- # http[s]://<server>[:port][/prefix]
- m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
- if not m:
- raise ValueError(('gRPC proxy must have the form: '
- 'http[s]://<server>[:port][/prefix] '
- '(given: %s)') % proxy)
- transport = m.group(1)
- host = m.group(2)
- prefix = m.group(3)
- if not prefix.endswith('/'):
- prefix = prefix + '/'
- logging.info('gRPC proxy: transport %s, host %s, prefix %s',
- transport, host, prefix)
- self._prefix = prefix
-
- if transport == 'http':
- self._channel = grpc.insecure_channel(host)
- elif transport == 'https':
- # Using cloud container builder scopes for testing:
- scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
- credentials, _ = google_auth.default(scopes=scopes)
- request = google_auth_transport_requests.Request()
- options = ()
- root_certs = None
- if roots is not None:
- logging.info('Using root CA %s', roots)
- with open(roots) as f:
- root_certs = f.read()
- if overd is not None:
- logging.info('Using TLS server override %s', overd)
- options=(('grpc.ssl_target_name_override', overd),)
- ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
- self._channel = google_auth_transport_grpc.secure_authorized_channel(
- credentials, request, host, ssl_creds, options=options)
- else:
- raise ValueError('unknown transport %s (should be http[s])' % transport)
- self._stub = bytestream_pb2.ByteStreamStub(self._channel)
@property
def location(self):
@@ -611,10 +546,10 @@ class IsolateServerGrpc(StorageApi):
assert offset == 0
request = bytestream_pb2.ReadRequest()
#TODO(aludwin): send the expected size of the item
- request.resource_name = '%sblobs/%s/0' % (
- self._prefix, digest)
+ request.resource_name = '%s/blobs/%s/0' % (
+ self._proxy.prefix, digest)
try:
- for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT):
+ for response in self._proxy.get_stream('Read', request):
yield response.data
except grpc.RpcError as g:
logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
@@ -645,8 +580,8 @@ class IsolateServerGrpc(StorageApi):
# proto messages to send via gRPC.
request = bytestream_pb2.WriteRequest()
u = uuid.uuid4()
- request.resource_name = '%suploads/%s/blobs/%s/%d' % (
- self._prefix, u, item.digest, item.size)
+ request.resource_name = '%s/uploads/%s/blobs/%s/%d' % (
+ self._proxy.prefix, u, item.digest, item.size)
request.write_offset = 0
for chunk in chunker():
# Make sure we send at least one chunk for zero-length blobs
@@ -663,7 +598,7 @@ class IsolateServerGrpc(StorageApi):
response = None
try:
- response = self._stub.Write(slicer())
+ response = self._proxy.call_no_retries('Write', slicer())
except grpc.RpcError as r:
if r.code() == grpc.StatusCode.ALREADY_EXISTS:
# This is legit - we didn't check before we pushed so no problem if
diff --git a/utils/grpc_proxy.py b/utils/grpc_proxy.py
new file mode 100644
index 0000000..0215df5
--- /dev/null
+++ b/utils/grpc_proxy.py
@@ -0,0 +1,268 @@
+# Copyright 2017 The LUCI Authors. All rights reserved.
+# Use of this source code is governed under the Apache License, Version 2.0
+# that can be found in the LICENSE file.
+
+"""Common gRPC implementation for Swarming and Isolate"""
+
+import logging
+import os
+import re
+import time
+import urlparse
+from utils import net
+
+# gRPC may not be installed on the worker machine. This is fine, as long as
+# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
+# Full external requirements are: grpcio, certifi.
+try:
+ import grpc
+ from google import auth as google_auth
+ from google.auth.transport import grpc as google_auth_transport_grpc
+ from google.auth.transport import requests as google_auth_transport_requests
+except ImportError as err:
+ grpc = None
+
+
+# If gRPC was successfully imported, try to import certifi as well. This is not
+# actually used anywhere in this module, but if certifi is missing,
+# google.auth.transport will fail (see
+# https://stackoverflow.com/questions/24973326). So checking it here allows us
+# to print out a somewhat-sane error message.
+certifi = None
+if grpc is not None:
+ try:
+ import certifi
+ except ImportError:
+ # Will print out error messages later (ie when we have a logger)
+ pass
+
+
+# How many times to retry a gRPC call
+MAX_GRPC_ATTEMPTS = 30
+
+
+# Longest time to sleep between gRPC calls
+MAX_GRPC_SLEEP = 10.
+
+
+# Start the timeout at three minutes.
+GRPC_TIMEOUT_SEC = 3 * 60
+
+
+def available():
+ """Returns true if gRPC can be used on this host."""
+ return grpc != None
+
+
+class Proxy(object):
+ """Represents a gRPC proxy.
+
+ If the proxy begins with 'https', the returned channel will be secure and
+ authorized using default application credentials - see
+ https://developers.google.com/identity/protocols/application-default-credentials.
+ Currently, we're using Cloud Container Builder scopes for testing; this may
+ change in the future to allow different scopes to be passed in for different
+ channels.
+
+ To use the returned channel to call methods directly, say:
+
+ proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix',
+ myapi_pb2.MyApiStub)
+
+ To make a unary call with retries (recommended):
+
+ proto_output = proxy.call_unary('MyMethod', proto_input)
+
+ To make a unary call without retries, or to pass in a client side stream
+ (proto_input can be an iterator here):
+
+ proto_output = proxy.call_no_retries('MyMethod', proto_input)
+
+ You can also call the stub directly (not recommended, since no errors will be
+ caught or logged):
+
+ proto_output = proxy.stub.MyMethod(proto_input)
+
+ To make a call to a server-side streaming call (these are not retried):
+
+ for response in proxy.get_stream('MyStreaminingMethod', proto_input):
+ <process response>
+
+ To retrieve the prefix:
+
+ prefix = proxy.prefix # returns "prefix/for/resource/names"
+
+ All exceptions are logged using "logging.warning."
+ """
+
+ def __init__(self, proxy, stub_class):
+ self._verbose = os.environ.get('LUCI_GRPC_PROXY_VERBOSE')
+ if self._verbose:
+ logging.info('Enabled verbose mode for %s with stub %s',
+ proxy, stub_class.__name__)
+ # NB: everything in url is unicode; convert to strings where
+ # needed.
+ url = urlparse.urlparse(proxy)
+ if self._verbose:
+ logging.info('Parsed URL for proxy is %r', url)
+ if url.scheme == 'http':
+ self._secure = False
+ elif url.scheme == 'https':
+ self._secure = True
+ else:
+ raise ValueError('gRPC proxy %s must use http[s], not %s' % (
+ proxy, url.scheme))
+ if url.netloc == '':
+ raise ValueError('gRPC proxy is missing hostname: %s' % proxy)
+ self._host = url.netloc
+ self._prefix = url.path
+ if self._prefix.endswith('/'):
+ self._prefix = self._prefix[:-1]
+ if self._prefix.startswith('/'):
+ self._prefix = self._prefix[1:]
+ if url.params != '' or url.fragment != '':
+ raise ValueError('gRPC proxy may not contain params or fragments: %s' %
+ proxy)
+ self._debug_info = ['full proxy name: ' + proxy]
+ self._channel = self._create_channel()
+ self._stub = stub_class(self._channel)
+ logging.info('%s: initialized', self.name)
+ if self._verbose:
+ self._dump_proxy_info()
+
+ @property
+ def prefix(self):
+ return self._prefix
+
+ @property
+ def channel(self):
+ return self._channel
+
+ @property
+ def stub(self):
+ return self._stub
+
+ @property
+ def name(self):
+ security = 'insecure'
+ if self._secure:
+ security = 'secure'
+ return 'gRPC %s proxy %s/%s' % (
+ security, self._host, self._stub.__class__.__name__)
+
+ def call_unary(self, name, request):
+ """Calls a method, waiting if the service is not available.
+
+ Usage: proto_output = proxy.call_unary('MyMethod', proto_input)
+ """
+ for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
+ try:
+ return self.call_no_retries(name, request)
+ except grpc.RpcError as g:
+ if g.code() is not grpc.StatusCode.UNAVAILABLE:
+ raise
+ logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)',
+ self.name, attempt, MAX_GRPC_ATTEMPTS)
+ # Save the error in case we need to return it
+ grpc_error = g
+ time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
+ # If we get here, it must be because we got (and saved) an error
+ assert grpc_error is not None
+ raise grpc_error
+
+ def get_stream(self, name, request):
+ """Calls a server-side streaming method, returning an iterator.
+
+ Usage: for resp in proxy.get_stream('MyMethod', proto_input'):
+ """
+ stream = self.call_no_retries(name, request)
+ while True:
+ # The lambda "next(stream, 1)" will return a protobuf on success, or the
+ # integer 1 if the stream has ended. This allows us to avoid attempting
+ # to catch StopIteration, which gets logged by _wrap_grpc_operation.
+ response = self._wrap_grpc_operation(name + ' pull from stream',
+ lambda: next(stream, 1))
+ if isinstance(response, int):
+ # Iteration is finished
+ return
+ yield response
+
+ def call_no_retries(self, name, request):
+ """Calls a method without any retries.
+
+ Recommended for client-side streaming or nonidempotent unary calls.
+ """
+ method = getattr(self._stub, name)
+ if method is None:
+ raise NameError('%s: "%s" is not a valid method name', self.name, name)
+ return self._wrap_grpc_operation(
+ name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC))
+
+ def _wrap_grpc_operation(self, name, fn):
+ """Wraps a gRPC operation (call or iterator increment) for logging."""
+ if self._verbose:
+ logging.info('%s/%s - starting gRPC operation', self.name, name)
+ try:
+ return fn()
+ except grpc.RpcError as g:
+ logging.warning('\n\nFailure in %s/%s: gRPC error %s', self.name, name, g)
+ self._dump_proxy_info()
+ raise g
+ except Exception as e:
+ logging.warning('\n\nFailure in %s/%s: exception %s', self.name, name, e)
+ self._dump_proxy_info()
+ raise e
+
+ def _dump_proxy_info(self):
+ logging.warning('DETAILED PROXY INFO')
+ logging.warning('prefix = %s', self.prefix)
+ logging.warning('debug info:\n\t%s\n\n',
+ '\n\t'.join(self._debug_info))
+
+ def _create_channel(self):
+ # Make sure grpc was successfully imported
+ assert available()
+
+ if not self._secure:
+ return grpc.insecure_channel(self._host)
+
+ # Authenticate the host.
+ #
+ # You're allowed to override the root certs and server if necessary. For
+ # example, if you're running your proxy on localhost, you'll need to set
+ # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate
+ # for the root CA that your localhost server has used to certify itself, and
+ # the GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to
+ # identify itself. For example, the ROOTS env var might be
+ # "/path/to/roots.crt" while the OVERRIDE env var might be "test_server," if
+ # this is what's used by the server you're running.
+ #
+ # If you're connecting to a real server with real SSL, none of this should
+ # be used.
+ if not certifi:
+ self._debug_info.append('CERTIFI IS NOT PRESENT;' +
+ ' gRPC HTTPS CONNECTIONS MAY FAIL')
+ root_certs = None
+ roots = os.environ.get('LUCI_GRPC_PROXY_TLS_ROOTS')
+ if roots:
+ self._debug_info.append('Overridden root CA: %s' % roots)
+ with open(roots) as f:
+ root_certs = f.read()
+ else:
+ self._debug_info.append('Using default root CAs from certifi')
+ overd = os.environ.get('LUCI_GRPC_PROXY_TLS_OVERRIDE')
+ options = ()
+ if overd:
+ options=(('grpc.ssl_target_name_override', overd),)
+ ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
+
+ # Authenticate the user.
+ scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
+ self._debug_info.append('Scopes are: %r' % scopes)
+ user_creds, _ = google_auth.default(scopes=scopes)
+
+ # Create the channel.
+ request = google_auth_transport_requests.Request()
+ self._debug_info.append('Options are: %r' % options)
+ return google_auth_transport_grpc.secure_authorized_channel(
+ user_creds, request, self._host, ssl_creds, options=options)