PDA

View Full Version : [C][UNIX] Mergesort Multithread


teomatteo89
19-06-2012, 23:32
Ciao,
cercherò di esser più chiaro possibile:

tutto parte da un array non ordinato di MAXNUMBERS.

(Per facilità, il numero di elementi è un multiplo del numero dei threads.)

Il codice in questione deve ordinare questo array dividendo il lavoro in diversi threads.
Ogni thread copia dalla memoria condivisa una "partizione" dell'array disordinato, ordinando con un algoritmo bubblesort la piccola parte prelevata.

Prima di terminarsi, il thread copia il proprio array ordinato in un'altra zona di memoria, allocata tramite un array bidimensionale "biarray[thread][numbers]".

Successivamente, il main prende in mano la situazione, invocando (teoricamente) una mergesort sull'array bidimensionale, unendo le partizioni precedentemente ordinate in un'unico grande array ordinato.

La soluzione qui sotto proposta funziona, MA: l'ho brutalmente costruita con uno switch case. :oink: I thread possono essere 2, 4 o 8, definiti dal #define.

Sto cercando continuamente di sistemare il tutto in modo che possa diventare più dinamica possibile, ma non riesco ad "incollare i pezzi" per gestire il mergesort ed il merge dell'array bidimensionale.



#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <pthread.h>

// DEVE ESSERE UN MULTIPLO DEI THREAD
#define MAXNUMBERS 160

// POSSONO ESSERE 2 - 4 - 8 THREAD
#define NTHREADS 8


// array con numeri casuali
int array[MAXNUMBERS];

// dimensione array per ogni processo
int dimension = (MAXNUMBERS/NTHREADS);

// array bidimensionale contenente tutti i numeri ordinati (thread x grandezza array)
int threadsArrays[NTHREADS][MAXNUMBERS/NTHREADS];
int threadInUse = 0;

// Condivisa ed in mutua esclusione. Calcola la "partizione" dell'array principale da inviare ad ogni thread
int partition = 0;

//SEMAFORI && Funzioni di gestione (tutti semafori binari)
sem_t mutex;
sem_t rewriteArray;

void initialize_sem()
{
sem_init( &mutex, 0, 1 );
sem_init( &rewriteArray, 0, 1 );
}

void destroySem()
{
sem_destroy(&mutex);
sem_destroy(&rewriteArray);
}

//Gestione degli errori Thread
void die(char *s, int e){ printf("%s[%d]", s, e); exit(1);}

/*
Bubblesort standard: Viene passata alla funzione un array numbers[] e la dimensione dell'array stesso,
dim_array. L'array entrante viene ordinato effettuando uno swap tra i valori, se a[y] > a[y+1].
*/
void bubbleSort(int numbers[], int dim_array)
{
int i, j, temp;
int ordinato=1;
for (i = (dim_array - 1); i >= 0; i--)
{
ordinato=1;
for (j = 1; j <= i; j++)
{

if (numbers[j-1] > numbers[j])
{
temp = numbers[j-1];
numbers[j-1] = numbers[j];
numbers[j] = temp;
ordinato=0;
}
}
if (ordinato) { break; }
}

}

/*
Funzione Merge:
richiede in ingresso due array e la loro dimensione.
Ritorna un puntatore ad una zona in memoria contenente l'array ordinato di lunghezza dimA + dimB.
L'ordinamento parte confrontando gli indice 0 degli array, selezionando il minore. L'array da cui
verrà prelevato l'elemento aumenterà l'indice, ricominciando la selezione. Quando uno dei due array
in input termina, la parte rimanente del secondo viene copiata direttamente nell'array risultante.
*/

int * merge( int a[], int b[], int dimA, int dimB )
{
int i = 0, j = 0, k = 0;
int *c = malloc(( dimA+ dimB) * sizeof(int));
while (i < dimA && j < dimB)
{
if( a[i] < b[j] )
{
c[k++] = a[i++];
}
else
{
c[k++] = b[j++];
}
}

while ( i < dimA )
{
c[k++] = a[i++];
}

while ( j < dimB )
{
c[k++] = b[j++];
}
return c;
}


/*
Codice eseguito dai vari thread:

Ogni thread crea un array privato sul quale copia parte dell'array condiviso (numeri disordinati).
L'azione di copia viene effettuata in mutua esclusione per evitare interferenze di accesso alla
memoria condivisa.
Al termine della copia, viene aumentata di 1 la variabile partition: tiene traccia della partizione
di memoria da copiare nell'array interno al thread.
Una volta rilasciato il semaforo mutex, viene richiamata la funzione bubbleSort, che ordina i
valori dell'array privato.
L'esecuzione continua con una wait ad un secondo semaforo, dove, all'interno della seconda sezione
critica, effettuerà la copia dell'array interno in un array bidimensionale in memoria condivisa.
*/
void *codice_thread(void *arg)
{
int privateArray[dimension];
int i, j;
int ptid = syscall(SYS_gettid);
//Sezione Critica
sem_wait(&mutex);
for(i=0; i<dimension; i++)
privateArray[i]=array[i+(dimension*partition)];
partition = partition+1;
sleep(1);

//Controllo di copia
printf("\n[Thread n° %i] Ho copiato: ", ptid);
for(i=0; i<dimension; i++)
printf("%d ", privateArray[i]);
sem_post(&mutex); // chiudo sezione critica

bubbleSort(privateArray, dimension);

sem_wait(&mutex); // Sez critica
printf("\n[Thread n° %i] Valori ordinati: ", ptid);
for(i=0; i<dimension; i++)
printf("%d ", privateArray[i]);
printf("\n");
sem_post(&mutex); //End sez critica

sem_wait(&rewriteArray);
for (j = 0; j < dimension; j++)
{ threadsArrays[threadInUse][j] = privateArray[j]; }

threadInUse = threadInUse + 1;
sem_post(&rewriteArray);

pthread_exit(NULL);
}

// ###### MAIN ######
/*
Il codice main inizializza le risorse necessarie (semafori, thread).
Successivamente viene eseguita in un ciclo while la generazione casuale di numeri interi,
immagazzinandoli nell'array in memoria condivisa.

Vengono poi creati i thread che eseguiranno il codice contenuto nella funzione codice_thread.
Il main continua richiamando una join, che aspetterà il termine di esecuzione dei thread.
*/
void main()
{
int i = 0;
int j, err;

// Array di Threads
pthread_t threads[NTHREADS];

//inizializzo i semafori
initialize_sem();

// Genera l'array arr contenente numeri casuali
printf("\nGenero i numeri casuali:");
while(i < MAXNUMBERS)
{
array[i] = ((int) (abs(rand()%MAXNUMBERS)));
printf(" %d", array[i++]);
}
printf("\n");

// Creazione Threads
for(j = 0; j < NTHREADS; j++)
if(err = pthread_create(&threads[j], NULL, codice_thread, NULL))
die("Errore creazione Thread", err);

printf("\nPthread_Create eseguita con successo\n");

// Join Threads
for (j = 0; j < NTHREADS; j++)
if(err = pthread_join(threads[j], NULL))
die("Errore join threads", err);

printf("\n");

printf("Elementi disordinati nell'array:");
for (i = 0; i < NTHREADS; i++)
{
for (j = 0; j < dimension; j++)
{ printf(" %d", threadsArrays[i][j]); }
}

printf("\n");

/*MERGE*/
// puntatori di ritorno
int length = dimension;
int *mergeRes24;
int *mergeRes23;
int *mergeRes22;
int *mergeRes21;
int *mergeRes11;
int *mergeRes12;
int *mergeRes1;
int *mergeRes2;
int *finalMerge2;
int *finalMerge4;
int *finalMerge8;

// array di puntatori
switch(NTHREADS)
{
case 2:
finalMerge2 = merge(threadsArrays[0], threadsArrays[1], dimension, dimension);
printf("\n#########################################################\nArray Finale = ");
for(i=0; i<MAXNUMBERS; i++) { printf("%d ", finalMerge2[i]); }

free(finalMerge2);

break;

case 4:
mergeRes1 = merge(threadsArrays[0], threadsArrays[1], dimension, dimension);
mergeRes2 = merge(threadsArrays[2], threadsArrays[3], dimension, dimension);
finalMerge4 = merge(mergeRes1, mergeRes2, (dimension*2), (dimension*2));
free(mergeRes1);
free(mergeRes2);
printf("\n#########################################################\nArray Finale = ");
for(i=0; i<MAXNUMBERS; i++) { printf("%d ", finalMerge4[i]); }

free(finalMerge4);

break;

case 8:
//first section
mergeRes21 = merge(threadsArrays[0], threadsArrays[1], length, length);
mergeRes22 = merge(threadsArrays[2], threadsArrays[3], length, length);
mergeRes23 = merge(threadsArrays[4], threadsArrays[5], length, length);
mergeRes24 = merge(threadsArrays[6], threadsArrays[7], length, length);
// second section
length = length*2;
mergeRes11 = merge(mergeRes21, mergeRes22, length, length);
mergeRes12 = merge(mergeRes23, mergeRes24, length, length);
free(mergeRes24);
free(mergeRes23);
free(mergeRes22);
free(mergeRes21);
// last section
length = length*2;
finalMerge8 = merge(mergeRes11, mergeRes12, length, length);
free(mergeRes11);
free(mergeRes12);
printf("\n#########################################################\nArray Finale = ");
for(i=0; i<MAXNUMBERS; i++) { printf("%d ", finalMerge8[i]); }

free(finalMerge8);

break;

default:
printf("########################################\n");
printf("########################################\n");
printf("############# ERROR THREADS ############\n");
printf("########################################\n");
printf("########################################\n");
break;
}
destroySem();
printf("\n\nProgramma Terminato\n\n");

}

Penso che il merge possa andar bene così com'è, è solo la suddivisione ed il passaggio di elementi nel mergeSort che non quadra. Fin'ora questo è un tentativo che ho provato a fare, non funzionante:

int * mergeSort(int arrT[], int numOfThreads)
{
if (NTHREADS <= 1)
return arrT;
else
{
int arrLeft[numOfThreads/2];
int arrRight[numOfThreads/2];
int middle = numOfThreads/2; // Dimezzo il numero di threads che si uniranno
int k;

for (k = 0; k < middle; k++)
{
arrLeft[k] = arrT[k];
}
for (k = middle; k < numOfThreads; k++)
{
arrRight[k] = arrT[k];
}
mergeSort(arrLeft, middle);
mergeSort(arrRight, middle);
return merge(arrLeft, arrRight, dimension, dimension);
}
}

Any suggestion?

Grazie.

__ZERO_UNO__
20-06-2012, 00:54
Cancellare il 90% di quel codice ed usare la libreria OpenMP, implementata in gcc. :D

banryu79
20-06-2012, 09:59
Cancellare il 90% di quel codice ed usare la libreria OpenMP, implementata in gcc. :D
Però se lo sta facendo per esercitarsi o imparare quella che gli hai suggerito non è una buona soluzione :D

teomatteo89
20-06-2012, 10:05
Cancellare il 90% di quel codice ed usare la libreria OpenMP, implementata in gcc. :D

Però se lo sta facendo per esercitarsi o imparare quella che gli hai suggerito non è una buona soluzione :D

Indeed :D Esame di Sistemi Operativi. Penso che quello che conti sia la gestione delle risorse tramite semafori, però alla consegna mi ha chiesto di cercare un modo per aver codice "dinamico", e non con soli 3 casi stabiliti.

Dovrei farlo anche in java, ma il problema resta proprio l'algoritmo e come gestire il passaggio di dati. :stordita: