/* diversity-curl.c - DiversityCurl * * Copyright 2008 OpenMoko, Inc. * Authored by Chia-I Wu * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ #include #include #include #include #include #include /* for mode_t flags */ /* TODO remove the limit */ #define TILE_MAX_SIZE (256 * 1024) #define JOB_ABORT(job) (((DiversityCurlJobInt *) (job))->abort = TRUE) #define JOB_ABORTED(job) (((DiversityCurlJobInt *) (job))->abort) typedef struct _DiversityCurlJobInt DiversityCurlJobInt; typedef struct _DiversityCurlHandle DiversityCurlHandle; enum { JOB_COMPLETED, LAST_SIGNAL }; enum { PROP_0, PROP_POOL_SIZE, PROP_QUEUE_SIZE, }; static void diversity_curl_job_free(DiversityCurlJob *job); static void process_jobs_threaded(gpointer data, gpointer user_data); #define DIVERSITY_CURL_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE((o), DIVERSITY_TYPE_CURL, DiversityCurlPrivate)) typedef struct _DiversityCurlPrivate DiversityCurlPrivate; struct _DiversityCurlPrivate { gboolean disposed; gint pool_size; gint queue_size; GThreadPool *pool; GMutex *mutex; GSList *handles; GSList *pending_jobs; GSList *active_jobs; guint active_id; }; struct _DiversityCurlJobInt { DiversityCurlJob pub; gboolean processing; gboolean abort; }; struct _DiversityCurlHandle { CURL *curl; GByteArray *buf; DiversityCurlJob *job; }; static guint curl_signals[LAST_SIGNAL] = { 0 }; G_DEFINE_TYPE(DiversityCurl, diversity_curl, G_TYPE_OBJECT); static void diversity_curl_dispose(GObject *obj) { DiversityCurlPrivate *priv = DIVERSITY_CURL_GET_PRIVATE(obj); priv->disposed = TRUE; ((GObjectClass *) diversity_curl_parent_class)->dispose(obj); } static void diversity_curl_finalize(GObject *obj) { DiversityCurl *curl = DIVERSITY_CURL(obj); DiversityCurlPrivate *priv = DIVERSITY_CURL_GET_PRIVATE(curl); GSList *tmp_list; gint num_jobs = 0; while (priv->pending_jobs) diversity_curl_job_cancel(curl, priv->pending_jobs->data); g_mutex_lock(priv->mutex); tmp_list = priv->active_jobs; while (tmp_list) { JOB_ABORT(tmp_list->data); tmp_list = tmp_list->next; num_jobs++; } g_mutex_unlock(priv->mutex); if (priv->pool) { if (num_jobs) g_debug("waiting for %d download jobs to cancel\n", num_jobs); g_thread_pool_free(priv->pool, TRUE, TRUE); } g_mutex_free(priv->mutex); g_assert(!priv->pending_jobs); if (priv->active_id) g_source_remove(priv->active_id); g_slist_foreach(priv->active_jobs, (GFunc) diversity_curl_job_free, NULL); g_slist_free(priv->active_jobs); tmp_list = priv->handles; while (tmp_list) { DiversityCurlHandle *handle = tmp_list->data; if (handle) { curl_easy_cleanup(handle->curl); g_byte_array_free(handle->buf, TRUE); } tmp_list = tmp_list->next; } g_slist_free(priv->handles); ((GObjectClass *) diversity_curl_parent_class)->finalize(obj); } static void diversity_curl_set_property(GObject *obj, guint prop_id, const GValue *value, GParamSpec *pspec) { DiversityCurlPrivate *priv = DIVERSITY_CURL_GET_PRIVATE(obj); switch (prop_id) { case PROP_POOL_SIZE: priv->pool_size = g_value_get_int(value); break; case PROP_QUEUE_SIZE: priv->queue_size = g_value_get_int(value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(obj, prop_id, pspec); break; } } static void diversity_curl_get_property(GObject *obj, guint prop_id, GValue *value, GParamSpec *pspec) { DiversityCurlPrivate *priv = DIVERSITY_CURL_GET_PRIVATE(obj); switch (prop_id) { case PROP_POOL_SIZE: g_value_set_int(value, priv->pool_size); break; case PROP_QUEUE_SIZE: g_value_set_int(value, priv->queue_size); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(obj, prop_id, pspec); break; } } static void diversity_curl_class_init(DiversityCurlClass *klass) { GObjectClass *o_class = (GObjectClass *) klass; o_class->dispose = diversity_curl_dispose; o_class->finalize = diversity_curl_finalize; o_class->set_property = diversity_curl_set_property; o_class->get_property = diversity_curl_get_property; g_type_class_add_private(klass, sizeof(DiversityCurlPrivate)); g_object_class_install_property(o_class, PROP_POOL_SIZE, g_param_spec_int("pool-size", "Pool size", "Size of thread pool", 1, 5, 2, G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(o_class, PROP_QUEUE_SIZE, g_param_spec_int("queue-size", "Queue size", "Size of queue", 1, G_MAXINT, 16, G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); curl_signals[JOB_COMPLETED] = g_signal_new("job-completed", G_TYPE_FROM_CLASS(klass), G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET(DiversityCurlClass, job_completed), NULL, NULL, g_cclosure_marshal_VOID__POINTER, G_TYPE_NONE, 1, G_TYPE_POINTER); } static void diversity_curl_init(DiversityCurl *curl) { DiversityCurlPrivate *priv = DIVERSITY_CURL_GET_PRIVATE(curl); priv->disposed = FALSE; priv->pool = NULL; priv->mutex = g_mutex_new(); priv->handles = NULL; priv->pending_jobs = NULL; priv->active_jobs = NULL; } DiversityCurl *diversity_curl_new(gint pool_size, gint queue_size) { DiversityCurl *curl; DiversityCurlPrivate *priv; GError *error = NULL; gint i; curl = g_object_new(DIVERSITY_TYPE_CURL, "pool-size", pool_size, "queue-size", queue_size, NULL); priv = DIVERSITY_CURL_GET_PRIVATE(curl); /* there is pool_size + 1 threads to always allow cancellation */ priv->pool = g_thread_pool_new(process_jobs_threaded, curl, priv->pool_size + 1, TRUE, &error); if (!priv->pool) { g_warning("failed to create thread pool: %s\n", error->message); g_error_free(error); g_object_unref(curl); return NULL; } for (i = 0; i < priv->pool_size; i++) priv->handles = g_slist_prepend(priv->handles, NULL); return curl; } static DiversityCurlJob *diversity_curl_job_new(void) { DiversityCurlJobInt *job; job = g_slice_new0(DiversityCurlJobInt); return (DiversityCurlJob *) job; } static void diversity_curl_job_free(DiversityCurlJob *job) { g_free(job->uri); g_free(job->path); g_slice_free(DiversityCurlJobInt, (DiversityCurlJobInt *) job); } DiversityCurlJob *diversity_curl_job_get(DiversityCurl *curl) { DiversityCurlPrivate *priv = DIVERSITY_CURL_GET_PRIVATE(curl); DiversityCurlJob *job; g_mutex_lock(priv->mutex); if (g_slist_length(priv->pending_jobs) + g_slist_length(priv->active_jobs) >= priv->queue_size) { g_mutex_unlock(priv->mutex); return NULL; } g_mutex_unlock(priv->mutex); job = diversity_curl_job_new(); return job; } void diversity_curl_job_submit(DiversityCurl *curl, DiversityCurlJob *job) { DiversityCurlPrivate *priv = DIVERSITY_CURL_GET_PRIVATE(curl); g_mutex_lock(priv->mutex); priv->pending_jobs = g_slist_append(priv->pending_jobs, job); g_mutex_unlock(priv->mutex); g_thread_pool_push(priv->pool, job, NULL); } void diversity_curl_job_cancel(DiversityCurl *curl, DiversityCurlJob *job) { DiversityCurlPrivate *priv = DIVERSITY_CURL_GET_PRIVATE(curl); GSList *l; g_mutex_lock(priv->mutex); l = g_slist_find(priv->pending_jobs, job); if (l) { DiversityCurlJob *job = l->data; job->status = DIVERSITY_ATLAS_TILE_STATUS_CANCELLED; priv->pending_jobs = g_slist_remove_link(priv->pending_jobs, l); priv->active_jobs = g_slist_concat(l, priv->active_jobs); } else { l = g_slist_find(priv->active_jobs, job); if (l) JOB_ABORT(l->data); } g_mutex_unlock(priv->mutex); if (l) g_thread_pool_push(priv->pool, job, NULL); } static gboolean diversity_curl_process_active_jobs(DiversityCurl *curl) { DiversityCurlPrivate *priv = DIVERSITY_CURL_GET_PRIVATE(curl); g_mutex_lock(priv->mutex); while (priv->active_jobs) { GSList *tmp_list; DiversityCurlJob *job; /* find the first completed job */ tmp_list = priv->active_jobs; while (tmp_list) { job = tmp_list->data; if (!((DiversityCurlJobInt *) job)->processing) break; tmp_list = tmp_list->next; } if (!tmp_list) break; priv->active_jobs = g_slist_delete_link(priv->active_jobs, tmp_list); g_mutex_unlock(priv->mutex); if (!priv->disposed) g_signal_emit(curl, curl_signals[JOB_COMPLETED], 0, job); diversity_curl_job_free(job); g_mutex_lock(priv->mutex); } priv->active_id = 0; g_mutex_unlock(priv->mutex); return FALSE; } static gboolean write_file(const gchar *path, const guint8 *buf, gssize len) { GError *error = NULL; g_file_set_contents(path, (gchar *) buf, len, &error); if (error && error->code == G_FILE_ERROR_NOENT) { char *dir; mode_t mode; g_error_free(error); dir = g_path_get_dirname(path); mode = S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH; if (g_mkdir_with_parents(dir, mode) == 0) { error = NULL; g_file_set_contents(path, (gchar *) buf, len, &error); if (error) g_error_free(error); } g_free(dir); } return (error == NULL); } static size_t write_data(void *buffer, size_t size, size_t nmemb, void *userp) { DiversityCurlHandle *handle = userp; size_t total; if (JOB_ABORTED(handle->job)) return 0; total = size * nmemb; if (handle->buf->len + total > TILE_MAX_SIZE) return 0; g_byte_array_append(handle->buf, buffer, total); return total; } static CURLcode _process_pending_job(DiversityCurlHandle *handle, DiversityCurlJob *job) { CURLcode ret; gboolean saved = FALSE; handle->job = job; curl_easy_setopt(handle->curl, CURLOPT_URL, job->uri); curl_easy_setopt(handle->curl, CURLOPT_WRITEDATA, handle); //printf("curl url: %s\n", job->uri); //curl_easy_setopt(handle, CURLOPT_VERBOSE, 1); ret = curl_easy_perform(handle->curl); if (ret == CURLE_OK) saved = write_file(job->path, handle->buf->data, handle->buf->len); if (JOB_ABORTED(job)) job->status = DIVERSITY_ATLAS_TILE_STATUS_CANCELLED; else if (saved) job->status = DIVERSITY_ATLAS_TILE_STATUS_DONE; else if (ret == CURLE_OPERATION_TIMEDOUT) job->status = DIVERSITY_ATLAS_TILE_STATUS_TIMEOUT; else job->status = DIVERSITY_ATLAS_TILE_STATUS_FAILED; if (handle->buf->len) g_byte_array_remove_range(handle->buf, 0, handle->buf->len); handle->job = NULL; return ret; } inline static GSList *_pop_handle(DiversityCurl *curl) { DiversityCurlPrivate *priv = DIVERSITY_CURL_GET_PRIVATE(curl); DiversityCurlHandle *handle; GSList *head; if (!priv->handles) return NULL; head = priv->handles; priv->handles = g_slist_remove_link(priv->handles, head); handle = head->data; if (!handle) { g_mutex_unlock(priv->mutex); handle = g_slice_new(DiversityCurlHandle); handle->curl = curl_easy_init(); curl_easy_setopt(handle->curl, CURLOPT_NOSIGNAL, 1); curl_easy_setopt(handle->curl, CURLOPT_TIMEOUT, 120); curl_easy_setopt(handle->curl, CURLOPT_WRITEFUNCTION, write_data); curl_easy_setopt(handle->curl, CURLOPT_USERAGENT, "diversity/0.0"); handle->buf = g_byte_array_new(); handle->job = NULL; head->data = handle; g_mutex_lock(priv->mutex); } return head; } static void process_jobs_threaded(gpointer data, gpointer user_data) { DiversityCurl *curl = user_data; DiversityCurlPrivate *priv = DIVERSITY_CURL_GET_PRIVATE(curl); GSList *handle; g_mutex_lock(priv->mutex); handle = _pop_handle(curl); while (priv->pending_jobs) { GSList *tmp; DiversityCurlJob *job; if (!handle) break; tmp = priv->pending_jobs; job = tmp->data; priv->pending_jobs = g_slist_remove_link(priv->pending_jobs, tmp); priv->active_jobs = g_slist_concat(tmp, priv->active_jobs); ((DiversityCurlJobInt *) job)->processing = TRUE; g_mutex_unlock(priv->mutex); _process_pending_job(handle->data, job); g_mutex_lock(priv->mutex); ((DiversityCurlJobInt *) job)->processing = FALSE; if (!priv->active_id) priv->active_id = g_idle_add((GSourceFunc) diversity_curl_process_active_jobs, curl); } if (priv->active_jobs && !priv->active_id) priv->active_id = g_idle_add((GSourceFunc) diversity_curl_process_active_jobs, curl); if (handle) priv->handles = g_slist_concat(handle, priv->handles); g_mutex_unlock(priv->mutex); }