1 /*
2 * Copyright 2009-2017 Alibaba Cloud All rights reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "CurlHttpClient.h"
18 #include <curl/curl.h>
19 #include <cassert>
20 #include <sstream>
21 #include <vector>
22 #include <mutex>
23 #include <condition_variable>
24 #include <atomic>
25 #include <algorithm>
26 #include <utils/Crc64.h>
27 #include <alibabacloud/oss/client/Error.h>
28 #include <alibabacloud/oss/client/RateLimiter.h>
29 #include "../utils/LogUtils.h"
30 #include "utils/Utils.h"
31
32 #ifdef TAG
33 #undef TAG
34 #endif
35 using namespace AlibabaCloud::OSS;
36
37 namespace AlibabaCloud
38 {
39 namespace OSS
40 {
41 const char * TAG = "CurlHttpClient";
42 ////////////////////////////////////////////////////////////////////////////////////////////
43 template< typename RESOURCE_TYPE>
44 class ResourceManager_
45 {
46 public:
ResourceManager_()47 ResourceManager_() : m_shutdown(false) {}
Acquire()48 RESOURCE_TYPE Acquire()
49 {
50 std::unique_lock<std::mutex> locker(m_queueLock);
51 while(!m_shutdown.load() && m_resources.size() == 0)
52 {
53 m_semaphore.wait(locker, [&](){ return m_shutdown.load() || m_resources.size() > 0; });
54 }
55
56 assert(!m_shutdown.load());
57
58 RESOURCE_TYPE resource = m_resources.back();
59 m_resources.pop_back();
60
61 return resource;
62 }
63
HasResourcesAvailable()64 bool HasResourcesAvailable()
65 {
66 std::lock_guard<std::mutex> locker(m_queueLock);
67 return m_resources.size() > 0 && !m_shutdown.load();
68 }
69
Release(RESOURCE_TYPE resource)70 void Release(RESOURCE_TYPE resource)
71 {
72 std::unique_lock<std::mutex> locker(m_queueLock);
73 m_resources.push_back(resource);
74 locker.unlock();
75 m_semaphore.notify_one();
76 }
77
PutResource(RESOURCE_TYPE resource)78 void PutResource(RESOURCE_TYPE resource)
79 {
80 m_resources.push_back(resource);
81 }
82
ShutdownAndWait(size_t resourceCount)83 std::vector<RESOURCE_TYPE> ShutdownAndWait(size_t resourceCount)
84 {
85 std::vector<RESOURCE_TYPE> resources;
86 std::unique_lock<std::mutex> locker(m_queueLock);
87 m_shutdown = true;
88 while (m_resources.size() < resourceCount)
89 {
90 m_semaphore.wait(locker, [&]() { return m_resources.size() == resourceCount; });
91 }
92 resources = m_resources;
93 m_resources.clear();
94 return resources;
95 }
96
97 private:
98 std::vector<RESOURCE_TYPE> m_resources;
99 std::mutex m_queueLock;
100 std::condition_variable m_semaphore;
101 std::atomic<bool> m_shutdown;
102 };
103
104
105 class CurlContainer
106 {
107 public:
CurlContainer(unsigned maxSize=16,long requestTimeout=10000,long connectTimeout=5000)108 CurlContainer(unsigned maxSize = 16, long requestTimeout = 10000, long connectTimeout = 5000):
109 maxPoolSize_(maxSize),
110 requestTimeout_(requestTimeout),
111 connectTimeout_(connectTimeout),
112 poolSize_(0)
113 {
114 }
115
~CurlContainer()116 ~CurlContainer()
117 {
118 for (CURL* handle : handleContainer_.ShutdownAndWait(poolSize_)) {
119 curl_easy_cleanup(handle);
120 }
121 }
122
Acquire()123 CURL* Acquire()
124 {
125 if(!handleContainer_.HasResourcesAvailable()) {
126 growPool();
127 }
128 CURL* handle = handleContainer_.Acquire();
129 return handle;
130 }
131
Release(CURL * handle,bool force)132 void Release(CURL* handle, bool force)
133 {
134 if (handle) {
135 curl_easy_reset(handle);
136 if (force) {
137 CURL* newhandle = curl_easy_init();
138 if (newhandle) {
139 curl_easy_cleanup(handle);
140 handle = newhandle;
141 }
142 }
143 setDefaultOptions(handle);
144 handleContainer_.Release(handle);
145 }
146 }
147
148 private:
149 CurlContainer(const CurlContainer&) = delete;
150 const CurlContainer& operator = (const CurlContainer&) = delete;
151 CurlContainer(const CurlContainer&&) = delete;
152 const CurlContainer& operator = (const CurlContainer&&) = delete;
153
growPool()154 bool growPool()
155 {
156 std::lock_guard<std::mutex> locker(containerLock_);
157 if (poolSize_ < maxPoolSize_) {
158 unsigned multiplier = poolSize_ > 0 ? poolSize_ : 1;
159 unsigned amountToAdd = (std::min)(multiplier * 2, maxPoolSize_ - poolSize_);
160
161 unsigned actuallyAdded = 0;
162 for (unsigned i = 0; i < amountToAdd; ++i) {
163 CURL* curlHandle = curl_easy_init();
164 if (curlHandle) {
165 setDefaultOptions(curlHandle);
166 handleContainer_.Release(curlHandle);
167 ++actuallyAdded;
168 } else {
169 break;
170 }
171 }
172 poolSize_ += actuallyAdded;
173 return actuallyAdded > 0;
174 }
175 return false;
176 }
177
setDefaultOptions(CURL * handle)178 void setDefaultOptions(CURL* handle)
179 {
180 curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L);
181 curl_easy_setopt(handle, CURLOPT_TCP_NODELAY, 1);
182 curl_easy_setopt(handle, CURLOPT_NETRC, CURL_NETRC_IGNORED);
183
184 curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, 0L);
185 curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT_MS, connectTimeout_);
186 curl_easy_setopt(handle, CURLOPT_LOW_SPEED_LIMIT, 1L);
187 curl_easy_setopt(handle, CURLOPT_LOW_SPEED_TIME, requestTimeout_ / 1000);
188
189 curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
190 curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0L);
191 }
192
193 private:
194 ResourceManager_<CURL*> handleContainer_;
195 unsigned maxPoolSize_;
196 unsigned long requestTimeout_;
197 unsigned long connectTimeout_;
198 unsigned poolSize_;
199 std::mutex containerLock_;
200 };
201
202 /////////////////////////////////////////////////////////////////////////////////////////////
203 struct TransferState {
204 CurlHttpClient *owner;
205 CURL * curl;
206 HttpRequest *request;
207 HttpResponse *response;
208 int64_t transferred;
209 int64_t total;
210 bool firstRecvData;
211 std::iostream::pos_type recvBodyPos;
212 TransferProgressHandler progress;
213 void *userData;
214 bool enableCrc64;
215 uint64_t sendCrc64Value;
216 uint64_t recvCrc64Value;
217 int sendSpeed;
218 int recvSpeed;
219 };
220
sendBody(char * ptr,size_t size,size_t nmemb,void * userdata)221 static size_t sendBody(char *ptr, size_t size, size_t nmemb, void *userdata)
222 {
223 TransferState *state = static_cast<TransferState*>(userdata);
224
225 if (state == nullptr ||
226 state->request == nullptr) {
227 return 0;
228 }
229
230 std::shared_ptr<std::iostream> &content = state->request->Body();
231 const size_t wanted = size * nmemb;
232 size_t got = 0;
233 if (content != nullptr && wanted > 0) {
234 size_t read = wanted;
235 if (state->total > 0) {
236 int64_t remains = state->total - state->transferred;
237 if (remains < static_cast<int64_t>(wanted)) {
238 read = static_cast<size_t>(remains);
239 }
240 }
241 // TODO: ethan
242 printf("read\r\n");
243 content->read(ptr, read);
244 got = static_cast<size_t>(content->gcount());
245 }
246
247 state->transferred += got;
248 if (state->progress) {
249 state->progress(got, state->transferred, state->total, state->userData);
250 }
251
252 if (state->enableCrc64) {
253 state->sendCrc64Value = CRC64::CalcCRC(state->sendCrc64Value, (void *)ptr, got);
254 }
255
256 return got;
257 }
258
recvBody(char * ptr,size_t size,size_t nmemb,void * userdata)259 static size_t recvBody(char *ptr, size_t size, size_t nmemb, void *userdata)
260 {
261 TransferState *state = static_cast<TransferState*>(userdata);
262 const size_t wanted = size * nmemb;
263
264 if (state == nullptr ||
265 state->response == nullptr ||
266 wanted == 0) {
267 return -1;
268 }
269
270 if (state->firstRecvData) {
271 long response_code = 0;
272 curl_easy_getinfo(state->curl, CURLINFO_RESPONSE_CODE, &response_code);
273 if (response_code / 100 == 2) {
274 state->response->addBody(state->request->ResponseStreamFactory()());
275 if (state->response->Body() != nullptr) {
276 state->recvBodyPos = state->response->Body()->tellp();
277 }
278 OSS_LOG(LogLevel::LogDebug, TAG, "request(%p) setResponseBody, recvBodyPos:%lld",
279 state->request, state->recvBodyPos);
280 }
281 else {
282 state->response->addBody(std::make_shared<std::stringstream>());
283 }
284 state->firstRecvData = false;
285 }
286
287 std::shared_ptr<std::iostream> &content = state->response->Body();
288 if (content == nullptr || content->fail()) {
289 printf("error!\r\n");
290 return -2;
291 }
292 // TODO: ethan
293 printf("write\r\n");
294 content->write(ptr, static_cast<std::streamsize>(wanted));
295 if (content->bad()) {
296 return -3;
297 }
298
299 state->transferred += wanted;
300 if (state->progress) {
301 state->progress(wanted, state->transferred, state->total, state->userData);
302 }
303
304 if (state->enableCrc64) {
305 state->recvCrc64Value = CRC64::CalcCRC(state->recvCrc64Value, (void *)ptr, wanted);
306 }
307
308 return wanted;
309 }
310
recvHeaders(char * buffer,size_t size,size_t nitems,void * userdata)311 static size_t recvHeaders(char *buffer, size_t size, size_t nitems, void *userdata)
312 {
313 TransferState *state = static_cast<TransferState*>(userdata);
314 const size_t length = nitems * size;
315
316 std::string line(buffer);
317 auto pos = line.find(':');
318 if (pos != line.npos)
319 {
320 size_t posEnd = line.rfind('\r');
321 if (posEnd != line.npos) {
322 posEnd = posEnd - pos - 2;
323 }
324 std::string name = line.substr(0, pos);
325 std::string value = line.substr(pos + 2, posEnd);
326 state->response->setHeader(name, value);
327 }
328
329 if (length == 2 && (buffer[0] == 0x0D) && (buffer[1] == 0x0A)) {
330 if (state->response->hasHeader(Http::CONTENT_LENGTH)) {
331 double dval;
332 curl_easy_getinfo(state->curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &dval);
333 state->total = (int64_t)dval;
334 }
335 }
336 return length;
337 }
338
debugCallback(void * handle,curl_infotype type,char * data,size_t size,void * userp)339 static int debugCallback(void *handle, curl_infotype type, char *data, size_t size, void *userp)
340 {
341 UNUSED_PARAM(userp);
342 switch (type) {
343 default:
344 break;
345 case CURLINFO_TEXT:
346 OSS_LOG(LogLevel::LogInfo, TAG, "handle(%p)=> Info: %.*s", handle, size, data);
347 break;
348 case CURLINFO_HEADER_OUT:
349 OSS_LOG(LogLevel::LogDebug, TAG, "handle(%p)=> Send header: %.*s", handle, size, data);
350 break;
351 case CURLINFO_HEADER_IN:
352 OSS_LOG(LogLevel::LogDebug, TAG, "handle(%p)=> Recv header: %.*s", handle, size, data);
353 break;
354 }
355 return 0;
356 }
357
progressCallback(void * userdata,double dltotal,double dlnow,double ultotal,double ulnow)358 static int progressCallback(void *userdata, double dltotal, double dlnow, double ultotal, double ulnow)
359 {
360 UNUSED_PARAM(dltotal);
361 UNUSED_PARAM(dlnow);
362 UNUSED_PARAM(ultotal);
363 UNUSED_PARAM(ulnow);
364 TransferState *state = static_cast<TransferState*>(userdata);
365 if (state == nullptr || state->owner == nullptr) {
366 return 0;
367 }
368
369 CurlHttpClient *thiz = static_cast<CurlHttpClient *>(state->owner);
370
371 //stop by upper caller
372 if (!thiz->isEnable()) {
373 return 1;
374 }
375
376 //for speed update
377 if (thiz->sendRateLimiter_ != nullptr) {
378 auto rate = thiz->sendRateLimiter_->Rate();
379 if (rate != state->sendSpeed) {
380 state->sendSpeed = rate;
381 auto speed = static_cast<curl_off_t>(rate);
382 speed = speed * 1024;
383 curl_easy_setopt(state->curl, CURLOPT_MAX_SEND_SPEED_LARGE, speed);
384 }
385 }
386
387 if (thiz->recvRateLimiter_ != nullptr) {
388 auto rate = thiz->recvRateLimiter_->Rate();
389 if (rate != state->recvSpeed) {
390 state->recvSpeed = rate;
391 auto speed = static_cast<curl_off_t>(rate);
392 speed = speed * 1024;
393 curl_easy_setopt(state->curl, CURLOPT_MAX_RECV_SPEED_LARGE, speed);
394 }
395 }
396
397 return 0;
398 }
399 }
400 }
401
initGlobalState()402 void CurlHttpClient::initGlobalState()
403 {
404 curl_global_init(CURL_GLOBAL_ALL);
405 }
406
cleanupGlobalState()407 void CurlHttpClient::cleanupGlobalState()
408 {
409 curl_global_cleanup();
410 }
411
CurlHttpClient(const ClientConfiguration & configuration)412 CurlHttpClient::CurlHttpClient(const ClientConfiguration &configuration) :
413 HttpClient(),
414 curlContainer_(new CurlContainer(configuration.maxConnections,
415 configuration.requestTimeoutMs,
416 configuration.connectTimeoutMs)),
417 userAgent_(configuration.userAgent),
418 proxyScheme_(configuration.proxyScheme),
419 proxyHost_(configuration.proxyHost),
420 proxyPort_(configuration.proxyPort),
421 proxyUserName_(configuration.proxyUserName),
422 proxyPassword_(configuration.proxyPassword),
423 verifySSL_(configuration.verifySSL),
424 caPath_(configuration.caPath),
425 caFile_(configuration.caFile),
426 networkInterface_(configuration.networkInterface),
427 sendRateLimiter_(configuration.sendRateLimiter),
428 recvRateLimiter_(configuration.recvRateLimiter)
429 {
430 }
431
~CurlHttpClient()432 CurlHttpClient::~CurlHttpClient()
433 {
434 if (curlContainer_) {
435 delete curlContainer_;
436 }
437 }
438
makeRequest(const std::shared_ptr<HttpRequest> & request)439 std::shared_ptr<HttpResponse> CurlHttpClient::makeRequest(const std::shared_ptr<HttpRequest> &request)
440 {
441 OSS_LOG(LogLevel::LogDebug, TAG, "request(%p) enter makeRequest", request.get());
442 curl_slist *list = nullptr;
443 auto& headers = request->Headers();
444 for (const auto &p : headers) {
445 if (p.second.empty())
446 continue;
447 std::string str = p.first;
448 str.append(": ").append(p.second);
449 list = curl_slist_append(list, str.c_str());
450 }
451 // Disable Expect: 100-continue
452 list = curl_slist_append(list, "Expect:");
453
454 auto response = std::make_shared<HttpResponse>(request);
455
456 std::iostream::pos_type requestBodyPos = -1;
457 if (request->Body() != nullptr) {
458 requestBodyPos = request->Body()->tellg();
459 }
460
461 CURL * curl = curlContainer_->Acquire();
462
463 OSS_LOG(LogLevel::LogDebug, TAG, "request(%p) acquire curl handle:%p", request.get(), curl);
464
465 uint64_t initCRC64 = 0;
466 #ifdef ENABLE_OSS_TEST
467 if (headers.find("oss-test-crc64") != headers.end()) {
468 initCRC64 = std::strtoull(headers.at("oss-test-crc64").c_str(), nullptr, 10);
469 }
470 #endif
471 TransferState transferState = {
472 this,
473 curl,
474 request.get(),
475 response.get(),
476 0, -1,
477 true, -1,
478 request->TransferProgress().Handler,
479 request->TransferProgress().UserData,
480 request->hasCheckCrc64(), initCRC64, initCRC64,
481 0, 0
482 };
483
484 if (request->hasHeader(Http::CONTENT_LENGTH)) {
485 transferState.total = std::atoll(request->Header(Http::CONTENT_LENGTH).c_str());
486 }
487
488 std::string url = request->url().toString();
489 curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
490 switch (request->method())
491 {
492 case Http::Method::Head:
493 curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
494 break;
495 case Http::Method::Put:
496 curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
497 break;
498 case Http::Method::Post:
499 curl_easy_setopt(curl, CURLOPT_POST, 1L);
500 break;
501 case Http::Method::Delete:
502 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
503 break;
504 case Http::Method::Get:
505 default:
506 break;
507 }
508
509 curl_easy_setopt(curl, CURLOPT_USERAGENT,userAgent_.c_str());
510
511 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list);
512 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &transferState);
513 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, recvHeaders);
514
515 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &transferState);
516 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, recvBody);
517
518 curl_easy_setopt(curl, CURLOPT_READDATA, &transferState);
519 curl_easy_setopt(curl, CURLOPT_READFUNCTION, sendBody);
520
521 if (verifySSL_) {
522 curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 1L);
523 curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 2L);
524 }
525 if(!caPath_.empty()) {
526 curl_easy_setopt(curl, CURLOPT_CAPATH, caPath_.c_str());
527 }
528 if(!caFile_.empty()){
529 curl_easy_setopt(curl, CURLOPT_CAINFO, caFile_.c_str());
530 }
531
532 if (!proxyHost_.empty()) {
533 std::stringstream ss;
534 ss << Http::SchemeToString(proxyScheme_) << "://" << proxyHost_;
535 curl_easy_setopt(curl, CURLOPT_PROXY, ss.str().c_str());
536 curl_easy_setopt(curl, CURLOPT_PROXYPORT, (long) proxyPort_);
537 curl_easy_setopt(curl, CURLOPT_PROXYUSERNAME, proxyUserName_.c_str());
538 curl_easy_setopt(curl, CURLOPT_PROXYPASSWORD, proxyPassword_.c_str());
539 }
540
541 if (!networkInterface_.empty()) {
542 curl_easy_setopt(curl, CURLOPT_INTERFACE, networkInterface_.c_str());
543 }
544
545 //debug
546 if (GetLogLevelInner() >= LogLevel::LogInfo) {
547 curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
548 curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, debugCallback);
549 }
550
551 //Error Buffer
552 char errbuf[CURL_ERROR_SIZE];
553 curl_easy_setopt(curl, CURLOPT_ERRORBUFFER, errbuf);
554 errbuf[0] = 0;
555
556 //progress Callback
557 curl_easy_setopt(curl, CURLOPT_PROGRESSFUNCTION, progressCallback);
558 curl_easy_setopt(curl, CURLOPT_PROGRESSDATA, &transferState);
559 curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);
560
561 //Send bytes/sec
562 if (sendRateLimiter_ != nullptr) {
563 transferState.sendSpeed = sendRateLimiter_->Rate();
564 auto speed = static_cast<curl_off_t>(transferState.sendSpeed);
565 speed = speed * 1024;
566 curl_easy_setopt(curl, CURLOPT_MAX_SEND_SPEED_LARGE, speed);
567 }
568
569 //Recv bytes/sec
570 if (recvRateLimiter_ != nullptr) {
571 transferState.recvSpeed = recvRateLimiter_->Rate();
572 auto speed = static_cast<curl_off_t>(transferState.recvSpeed);
573 speed = speed * 1024;
574 curl_easy_setopt(curl, CURLOPT_MAX_RECV_SPEED_LARGE, speed);
575 }
576
577 CURLcode res = curl_easy_perform(curl);
578 long response_code= 0;
579 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
580 printf("[%s]response_code: %d\n", __func__, response_code);
581 if (res == CURLE_OK) {
582 response->setStatusCode(response_code);
583 } else {
584 response->setStatusCode(res + ERROR_CURL_BASE);
585 switch (res) {
586 case 23: //CURLE_WRITE_ERROR
587 {
588 std::string msg(curl_easy_strerror(res));
589 if (response->Body() == nullptr) {
590 msg.append(". Caused by content is null.");
591 }
592 else if (response->Body()->bad()) {
593 msg.append(". Caused by content is in bad state(Read/writing error on i/o operation).");
594 }
595 else if (response->Body()->fail()) {
596 msg.append(". Caused by content is in fail state(Logical error on i/o operation).");
597 }
598 response->setStatusMsg(msg);
599 }
600 break;
601 default:
602 {
603 std::string msg(curl_easy_strerror(res));
604 msg.append(".").append(errbuf);
605 response->setStatusMsg(msg);
606 }
607 break;
608 };
609 }
610
611 switch (request->method())
612 {
613 case Http::Method::Put:
614 case Http::Method::Post:
615 request->setCrc64Result(transferState.sendCrc64Value);
616 break;
617 default:
618 request->setCrc64Result(transferState.recvCrc64Value);
619 break;
620 }
621 request->setTransferedBytes(transferState.transferred);
622
623 curlContainer_->Release(curl, (res != CURLE_OK));
624
625 curl_slist_free_all(list);
626
627 auto & body = response->Body();
628 if (body != nullptr) {
629 body->flush();
630 if (res != CURLE_OK && transferState.recvBodyPos != static_cast<std::streampos>(-1)) {
631 OSS_LOG(LogLevel::LogDebug, TAG, "request(%p) setResponseBody, tellp:%lld, recvBodyPos:%lld",
632 request.get(), body->tellp(), transferState.recvBodyPos);
633 body->clear();
634 body->seekp(transferState.recvBodyPos);
635 }
636 }
637 else {
638 response->addBody(std::make_shared<std::stringstream>());
639 }
640
641 if (requestBodyPos != static_cast<std::streampos>(-1)) {
642 request->Body()->clear();
643 request->Body()->seekg(requestBodyPos);
644 }
645
646 OSS_LOG(LogLevel::LogDebug, TAG, "request(%p) leave makeRequest, CURLcode:%d, ResponseCode:%d",
647 request.get(), res, response_code);
648
649 return response;
650 }
651