summaryrefslogtreecommitdiffstats
path: root/utils/grpc_proxy.py
blob: 0215df530426d5a08c25cb7f1d924f3d6adc8cd9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# Copyright 2017 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.

"""Common gRPC implementation for Swarming and Isolate"""

import logging
import os
import re
import time
import urlparse
from utils import net

# gRPC may not be installed on the worker machine. This is fine, as long as
# the bot doesn't attempt to use gRPC (checked in IsolateServerGrpc.__init__).
# Full external requirements are: grpcio, certifi.
try:
  import grpc
  from google import auth as google_auth
  from google.auth.transport import grpc as google_auth_transport_grpc
  from google.auth.transport import requests as google_auth_transport_requests
except ImportError as err:
  grpc = None


# If gRPC was successfully imported, try to import certifi as well.  This is not
# actually used anywhere in this module, but if certifi is missing,
# google.auth.transport will fail (see
# https://stackoverflow.com/questions/24973326). So checking it here allows us
# to print out a somewhat-sane error message.
certifi = None
if grpc is not None:
  try:
    import certifi
  except ImportError:
    # Will print out error messages later (ie when we have a logger)
    pass


# How many times to retry a gRPC call
MAX_GRPC_ATTEMPTS = 30


# Longest time to sleep between gRPC calls
MAX_GRPC_SLEEP = 10.


# Start the timeout at three minutes.
GRPC_TIMEOUT_SEC = 3 * 60


def available():
  """Returns true if gRPC can be used on this host."""
  return grpc != None


class Proxy(object):
  """Represents a gRPC proxy.

  If the proxy begins with 'https', the returned channel will be secure and
  authorized using default application credentials - see
  https://developers.google.com/identity/protocols/application-default-credentials.
  Currently, we're using Cloud Container Builder scopes for testing; this may
  change in the future to allow different scopes to be passed in for different
  channels.

  To use the returned channel to call methods directly, say:

    proxy = grpc_proxy.Proxy('https://grpc.luci.org/resource/prefix',
                             myapi_pb2.MyApiStub)

  To make a unary call with retries (recommended):

    proto_output = proxy.call_unary('MyMethod', proto_input)

  To make a unary call without retries, or to pass in a client side stream
  (proto_input can be an iterator here):

    proto_output = proxy.call_no_retries('MyMethod', proto_input)

  You can also call the stub directly (not recommended, since no errors will be
  caught or logged):

    proto_output = proxy.stub.MyMethod(proto_input)

  To make a call to a server-side streaming call (these are not retried):

    for response in proxy.get_stream('MyStreaminingMethod', proto_input):
      <process response>

  To retrieve the prefix:

    prefix = proxy.prefix # returns "prefix/for/resource/names"

  All exceptions are logged using "logging.warning."
  """

  def __init__(self, proxy, stub_class):
    self._verbose = os.environ.get('LUCI_GRPC_PROXY_VERBOSE')
    if self._verbose:
      logging.info('Enabled verbose mode for %s with stub %s',
                   proxy, stub_class.__name__)
    # NB: everything in url is unicode; convert to strings where
    # needed.
    url = urlparse.urlparse(proxy)
    if self._verbose:
      logging.info('Parsed URL for proxy is %r', url)
    if url.scheme == 'http':
      self._secure = False
    elif url.scheme == 'https':
      self._secure = True
    else:
      raise ValueError('gRPC proxy %s must use http[s], not %s' % (
          proxy, url.scheme))
    if url.netloc == '':
      raise ValueError('gRPC proxy is missing hostname: %s' % proxy)
    self._host = url.netloc
    self._prefix = url.path
    if self._prefix.endswith('/'):
      self._prefix = self._prefix[:-1]
    if self._prefix.startswith('/'):
      self._prefix = self._prefix[1:]
    if url.params != '' or url.fragment != '':
      raise ValueError('gRPC proxy may not contain params or fragments: %s' %
                       proxy)
    self._debug_info = ['full proxy name: ' + proxy]
    self._channel = self._create_channel()
    self._stub = stub_class(self._channel)
    logging.info('%s: initialized', self.name)
    if self._verbose:
      self._dump_proxy_info()

  @property
  def prefix(self):
    return self._prefix

  @property
  def channel(self):
    return self._channel

  @property
  def stub(self):
    return self._stub

  @property
  def name(self):
    security = 'insecure'
    if self._secure:
      security = 'secure'
    return 'gRPC %s proxy %s/%s' % (
        security, self._host, self._stub.__class__.__name__)

  def call_unary(self, name, request):
    """Calls a method, waiting if the service is not available.

    Usage: proto_output = proxy.call_unary('MyMethod', proto_input)
    """
    for attempt in range(1, MAX_GRPC_ATTEMPTS+1):
      try:
        return self.call_no_retries(name, request)
      except grpc.RpcError as g:
        if g.code() is not grpc.StatusCode.UNAVAILABLE:
          raise
        logging.warning('%s: call_grpc - proxy is unavailable (attempt %d/%d)',
                        self.name, attempt, MAX_GRPC_ATTEMPTS)
        # Save the error in case we need to return it
        grpc_error = g
        time.sleep(net.calculate_sleep_before_retry(attempt, MAX_GRPC_SLEEP))
    # If we get here, it must be because we got (and saved) an error
    assert grpc_error is not None
    raise grpc_error

  def get_stream(self, name, request):
    """Calls a server-side streaming method, returning an iterator.

    Usage: for resp in proxy.get_stream('MyMethod', proto_input'):
    """
    stream = self.call_no_retries(name, request)
    while True:
      # The lambda "next(stream, 1)" will return a protobuf on success, or the
      # integer 1 if the stream has ended. This allows us to avoid attempting
      # to catch StopIteration, which gets logged by _wrap_grpc_operation.
      response = self._wrap_grpc_operation(name + ' pull from stream',
                                           lambda: next(stream, 1))
      if isinstance(response, int):
        # Iteration is finished
        return
      yield response

  def call_no_retries(self, name, request):
    """Calls a method without any retries.

    Recommended for client-side streaming or nonidempotent unary calls.
    """
    method = getattr(self._stub, name)
    if method is None:
      raise NameError('%s: "%s" is not a valid method name', self.name, name)
    return self._wrap_grpc_operation(
        name, lambda: method(request, timeout=GRPC_TIMEOUT_SEC))

  def _wrap_grpc_operation(self, name, fn):
    """Wraps a gRPC operation (call or iterator increment) for logging."""
    if self._verbose:
      logging.info('%s/%s - starting gRPC operation', self.name, name)
    try:
      return fn()
    except grpc.RpcError as g:
      logging.warning('\n\nFailure in %s/%s: gRPC error %s', self.name, name, g)
      self._dump_proxy_info()
      raise g
    except Exception as e:
      logging.warning('\n\nFailure in %s/%s: exception %s', self.name, name, e)
      self._dump_proxy_info()
      raise e

  def _dump_proxy_info(self):
    logging.warning('DETAILED PROXY INFO')
    logging.warning('prefix = %s', self.prefix)
    logging.warning('debug info:\n\t%s\n\n',
                    '\n\t'.join(self._debug_info))

  def _create_channel(self):
    # Make sure grpc was successfully imported
    assert available()

    if not self._secure:
      return grpc.insecure_channel(self._host)

    # Authenticate the host.
    #
    # You're allowed to override the root certs and server if necessary. For
    # example, if you're running your proxy on localhost, you'll need to set
    # GRPC_PROXY_TLS_ROOTS to the "roots.crt" file specifying the certificate
    # for the root CA that your localhost server has used to certify itself, and
    # the GRPC_PROXY_TLS_OVERRIDE to the name that your server is using to
    # identify itself. For example, the ROOTS env var might be
    # "/path/to/roots.crt" while the OVERRIDE env var might be "test_server," if
    # this is what's used by the server you're running.
    #
    # If you're connecting to a real server with real SSL, none of this should
    # be used.
    if not certifi:
      self._debug_info.append('CERTIFI IS NOT PRESENT;' +
                              ' gRPC HTTPS CONNECTIONS MAY FAIL')
    root_certs = None
    roots = os.environ.get('LUCI_GRPC_PROXY_TLS_ROOTS')
    if roots:
      self._debug_info.append('Overridden root CA: %s' % roots)
      with open(roots) as f:
        root_certs = f.read()
    else:
      self._debug_info.append('Using default root CAs from certifi')
    overd = os.environ.get('LUCI_GRPC_PROXY_TLS_OVERRIDE')
    options = ()
    if overd:
      options=(('grpc.ssl_target_name_override', overd),)
    ssl_creds = grpc.ssl_channel_credentials(root_certificates=root_certs)

    # Authenticate the user.
    scopes = ('https://www.googleapis.com/auth/cloud-source-tools',)
    self._debug_info.append('Scopes are: %r' % scopes)
    user_creds, _ = google_auth.default(scopes=scopes)

    # Create the channel.
    request = google_auth_transport_requests.Request()
    self._debug_info.append('Options are: %r' % options)
    return google_auth_transport_grpc.secure_authorized_channel(
        user_creds, request, self._host, ssl_creds, options=options)