creating groups and synchronizing all tasks in calcu-
lations. The Manager node has its own tasks and can
execute parallel programs.
The Manager is the Master of a group of all tasks
and has group identifier equals to 0. Each node has its
own, unique for whole calculations, identifier. That
node is called physical id or node id in short. All
nodes are connected to each other and that connec-
tion is accomplished before starting a calculation. At
this stage, nodes are exchanging their physical node
ids.
At the beginning, user who wants to start us-
ing PCJ for parallel execution has to execute static
method PCJ.start() providing information about re-
quested StartPoint and Storage classes and list of
nodes. The list of nodes is used to number PCJ nodes
and PCJ threads. Every PCJ node is processing the
list to localize items that contain its hostname data –
items number will be used to number PCJ threads.
There is a special node, called node0, that is co-
ordinating other nodes in a startup. Node0 is a node
located as the first item on the list. After processing
the list, each node connects to node0 and tells the
items numbers from the list, that contains its host-
name. When node0 receives information about ev-
ery node from the list, it number nodes with numbers
starting from 0, increasing the number by one on each
distinguished node – the number is called physicalId.
Node0 responses to all other nodes with their physi-
calId.
At this point every node is connected with node0
and knows its physicalId. Next step is to exchange in-
formation between nodes and to connect every node
with each other. To do that, node0 is broadcasting in-
formation about each node. The broadcast is made
using a balanced tree structure, where each node con-
tains at most two children. At the beginning of the op-
eration, the tree has only one vertex, which is node0
– root. Broadcasted message contains information
about new node in the tree: physicalId, parent phys-
icalId,threadsIds and hostname.
When the node receives that data, it sends it down
the tree, save information about a new node, and when
a node is the parent of the new node, it adds it as
own children. After that, the node connects to new
node and sends information about itself (physicalId
and threadIds). At the end, when the new node re-
ceives information from all nodes with the physical id
less physical id of the new node, it sends information
to node0, which completes initialization step.
When all nodes send information about comple-
tion of the initialization step, node0 sends a mes-
sage to start user application. Each node starts ad-
equate number of PCJ threads using provided Start-
Point class.
3.2 Communication
The communication between different PCJ threads
has to be realized in different manners. If communi-
cating threads run within the same JVM, the Java con-
currency mechanisms can be used to synchronize and
exchange information. If data exchange has to be re-
alized between different JVM’s the network commu-
nication using, for example, sockets have to be used.
The PCJ library handles both situations hiding de-
tails from the user. It distinguishes between inter- and
intranode communication and pick up proper data ex-
change mechanism. Moreover, nodes are organized in
the graph which allows to optimize global communi-
cation.
The communication between tasks running on the
same JVM is performed using Java methods for thread
synchronization. One should note that from the PCJ
user point of view both mechanisms are transparent.
The particular mechanism is used depends on the task
ids involved in the communication.
PCJ uses TCP/IP protocol for the connection. The
TCP protocol was chosen because of its features: it
gives a reliable and ordered way of transmitting data
with and error-checking mechanism over an IP net-
work. Of course, it has some drawbacks, especially
associated with performance because TCP is opti-
mized for accurate rather than timely delivery. Us-
age of other protocols, like UDP, would require ad-
ditional work for implementing required features: or-
dering out-of-order messages and retransmissions of
lost or incorrect messages.
The network communication takes place between
nodes and is performed using Java New IO classes
(java.nio.*). There is one thread per node for
receiving incoming data and another one for process-
ing messages. The communication is nonblocking and
uses 256 KB buffer by default [3]. The buffer size can
be changed using dedicated JVM parameter.
PCJ threads can exchange data in an asyn-
chronous way. Sending a value to another task storage
is performed using the put method as presented in the
listing 1. Since the data transfer is asynchronous the
put method is accompanying with the waitFor state-
ment executed by the PCJ thread receiving data. The
get method is used for the getting value from other
task storage. In these two methods, the other task
is nonblocking when process puts or gets a message,
but the task which initiated exchange process, blocks.
There is also the getFutureObject method that works
in fully nonblocking manner – the initializing task can
check if the response is received and in the meantime
do other calculations.
1@Shared
2double a;
WSEAS TRANSACTIONS on COMPUTERS
DOI: 10.37394/23205.2022.21.12
Marek Nowicki, Magdalena Ryczkowska,
Łukasz Gorski, Michał Szynkiewicz, Piotr Bała