summaryrefslogtreecommitdiffstats
path: root/isolate_storage.py
diff options
context:
space:
mode:
Diffstat (limited to 'isolate_storage.py')
-rw-r--r--isolate_storage.py91
1 files changed, 13 insertions, 78 deletions
diff --git a/isolate_storage.py b/isolate_storage.py
index a158bbf..ef0d79e 100644
--- a/isolate_storage.py
+++ b/isolate_storage.py
@@ -21,32 +21,15 @@ from utils import net
import isolated_format
-# gRPC may not be installed on the worker machine. This is fine, as long as
-# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
-# Full external requirements are: grpcio, certifi.
try:
- import grpc
- from google import auth as google_auth
- from google.auth.transport import grpc as google_auth_transport_grpc
- from google.auth.transport import requests as google_auth_transport_requests
+ import grpc # for error codes
+ from utils import grpc_proxy
from proto import bytestream_pb2
except ImportError as err:
grpc = None
+ grpc_proxy = None
bytestream_pb2 = None
-# If gRPC is installed, at least give a warning if certifi is not. This is not
-# actually used anywhere in this module, but if certifi is missing,
-# google.auth.transport will fail with
-# https://stackoverflow.com/questions/24973326
-certifi = None
-if grpc is not None:
- try:
- import certifi
- except ImportError:
- # Could not import certifi; gRPC HTTPS connections may fail. This will be
- # logged in IsolateServerGrpc.__init__, since the logger is not configured
- # during the import time.
- pass
# Chunk size to use when reading from network stream.
NET_IO_FILE_CHUNK = 16 * 1024
@@ -533,65 +516,17 @@ class IsolateServerGrpc(StorageApi):
def __init__(self, server, namespace, proxy):
super(IsolateServerGrpc, self).__init__()
logging.info('Using gRPC for Isolate')
- if not certifi:
- logging.warning(
- 'Could not import certifi; gRPC HTTPS connections may fail')
+ # Proxies only support the default-gzip namespace for now.
+ # TODO(aludwin): support other namespaces if necessary
+ assert namespace == 'default-gzip'
self._server = server
self._lock = threading.Lock()
self._memory_use = 0
self._num_pushes = 0
self._already_exists = 0
-
- # Proxies only support the default-gzip namespace for now.
- # TODO(aludwin): support other namespaces if necessary
- assert namespace == 'default-gzip'
+ self._proxy = grpc_proxy.Proxy(proxy, bytestream_pb2.ByteStreamStub)
self._namespace = namespace
- # Make sure grpc was successfully imported
- assert grpc
- assert bytestream_pb2
-
- roots = os.environ.get('ISOLATE_GRPC_PROXY_TLS_ROOTS')
- overd = os.environ.get('ISOLATE_GRPC_PROXY_TLS_OVERRIDE')
-
- # The "proxy" envvar must be of the form:
- # http[s]://<server>[:port][/prefix]
- m = re.search('^(https?):\/\/([^\/]+)/?(.*)$', proxy)
- if not m:
- raise ValueError(('gRPC proxy must have the form: '
- 'http[s]://<server>[:port][/prefix] '
- '(given: %s)') % proxy)
- transport = m.group(1)
- host = m.group(2)
- prefix = m.group(3)
- if not prefix.endswith('/'):
- prefix = prefix + '/'
- logging.info('gRPC proxy: transport %s, host %s, prefix %s',
- transport, host, prefix)
- self._prefix = prefix
-
- if transport == 'http':
- self._channel = grpc.insecure_channel(host)
- elif transport == 'https':
- # Using cloud container builder scopes for testing:
- scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
- credentials, _ = google_auth.default(scopes=scopes)
- request = google_auth_transport_requests.Request()
- options = ()
- root_certs = None
- if roots is not None:
- logging.info('Using root CA %s', roots)
- with open(roots) as f:
- root_certs = f.read()
- if overd is not None:
- logging.info('Using TLS server override %s', overd)
- options=(('grpc.ssl_target_name_override', overd),)
- ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)
- self._channel = google_auth_transport_grpc.secure_authorized_channel(
- credentials, request, host, ssl_creds, options=options)
- else:
- raise ValueError('unknown transport %s (should be http[s])' % transport)
- self._stub = bytestream_pb2.ByteStreamStub(self._channel)
@property
def location(self):
@@ -611,10 +546,10 @@ class IsolateServerGrpc(StorageApi):
assert offset == 0
request = bytestream_pb2.ReadRequest()
#TODO(aludwin): send the expected size of the item
- request.resource_name = '%sblobs/%s/0' % (
- self._prefix, digest)
+ request.resource_name = '%s/blobs/%s/0' % (
+ self._proxy.prefix, digest)
try:
- for response in self._stub.Read(request, timeout=DOWNLOAD_READ_TIMEOUT):
+ for response in self._proxy.get_stream('Read', request):
yield response.data
except grpc.RpcError as g:
logging.error('gRPC error during fetch: re-throwing as IOError (%s)' % g)
@@ -645,8 +580,8 @@ class IsolateServerGrpc(StorageApi):
# proto messages to send via gRPC.
request = bytestream_pb2.WriteRequest()
u = uuid.uuid4()
- request.resource_name = '%suploads/%s/blobs/%s/%d' % (
- self._prefix, u, item.digest, item.size)
+ request.resource_name = '%s/uploads/%s/blobs/%s/%d' % (
+ self._proxy.prefix, u, item.digest, item.size)
request.write_offset = 0
for chunk in chunker():
# Make sure we send at least one chunk for zero-length blobs
@@ -663,7 +598,7 @@ class IsolateServerGrpc(StorageApi):
response = None
try:
- response = self._stub.Write(slicer())
+ response = self._proxy.call_no_retries('Write', slicer())
except grpc.RpcError as r:
if r.code() == grpc.StatusCode.ALREADY_EXISTS:
# This is legit - we didn't check before we pushed so no problem if