Java – Concurrent pairing

I'm looking for a Java concurrency idiom to match a large number of elements with the highest throughput

Consider that I have "people" from multiple threads Everyone is looking for a game When it finds another "person" waiting, they match each other and are removed for processing

I don't want to lock a big structure to change the state Consider that person has getmatch and setmatch Before submitting, everyone's #getmatch is null But when they are unlocked (or captured), they either expire because they wait for a long game or #getmatch is not empty

Some problems in maintaining a high pass rate are that if person a and person B submit at the same time They match each other, but person B also matches person C, which is already waiting The status of person B changes to "available" at the time of submission However, when person B matches person C, person a does not need to accidentally obtain person B Reasonable? In addition, I want to perform this operation asynchronously In other words, I don't want every submitter to have to hold a person on a thread with something of type waitformatch

Again, I don't want requests to run on different threads, but it doesn't matter if there is an additional matcher thread

It seems that there should be some idioms, because it seems to be a very common thing But my Google search has dried up (I may use the wrong terms)

UPDATE

There are several things that make it difficult for me to solve this problem One is that I don't want to have objects in memory. I want all candidates waiting to use redis or Memcache or something similar Another is that anyone may have several possible games Consider the following interfaces:

person.getId();         // lets call this an Integer
person.getFriendIds();  // a collection of other person ids

Then I have a server that looks like this:

MatchServer:
   submit( personId,expiration ) -> void // non-blocking returns immediately
   isDone( personId ) -> boolean          // either expired or found a match
   getMatch( personId ) -> matchId        // also non-blocking

This is a rest interface. It will use redirection until you get the result My first idea is to have a cache in matchserver, which is supported by redis and other things, and has concurrent weak value hash mapping for the currently locked and operated objects Each personaid will be wrapped by a persistent state object with the States submitted, matched, and expired

up to now? Very simple. Submitting the code completes the initial work. It is as follows:

public void submit( Person p,long expiration ) {
    MatchStatus incoming = new MatchStatus( p.getId(),expiration );
    if ( !tryMatch( incoming,p.getFriendIds() ) )
        cache.put( p.getId(),incoming ); 
}

public boolean isDone( Integer personId ) {
    MatchStatus status = cache.get( personId );
    status.lock();
    try {
        return status.isMatched() || status.isExpired();

    } finally {
        status.unlock();
    }
}

public boolean tryMatch( MatchStatus incoming,Iterable<Integer> friends ) {
    for ( Integer friend : friends ) {
        if ( match( incoming,friend ) )
            return true;
    }

    return false;
}

private boolean match( MatchStatus incoming,Integer waitingId ) {
    CallStatus waiting = cache.get( waitingId );
    if ( waiting == null )
        return false;

    waiting.lock();
    try {
        if ( waiting.isMatched() )
            return false;

        waiting.setMatch( incoming.getId() );
        incoming.setMatch( waiting.getId() );

        return true
    } finally {
        waiting.unlock();
    }
}

So the problem here is that if two people come in at the same time and they are their only game, they won't find each other Are the competitive conditions right? I can see that the only way to solve it is to synchronize "trymatch()" But it will affect my throughput I can't loop trymatch indefinitely because I need these very short calls

So what better way to solve this problem? Every solution I propose forces people to use throughput one at a time For example, create a background thread and use a blocking queue to put and receive incoming threads at once

Any guidance would be appreciated

Solution

You can use concurrenthashmap I assume that your objects have keys that they can match. For example, person a and person B will have "person" keys

ConcurrentHashMap<String,Match> map = new ConcurrentHashMap<>();

void addMatch(Match match) {
    boolean success = false;
    while(!success) {
        Match oldMatch = map.remove(match.key);
        if(oldMatch != null) {
            match.setMatch(oldMatch);
            success = true;
       } else if(map.putIfAbsent(match.key,match) == null) {
            success = true;
       }
   }
}

You will continue to cycle until you add a match to the map, or until you delete an existing match and pair it Remove and putifabsent are both atomic

Edit: because you want to unload data to disk, you can use mongodb, for example, and use its findandmodify method If the object with the key already exists, the command will delete and return it so that you can pair the old object with the new object and possibly store the pair associated with the new key; If the object with the key does not exist, the command stores the object with the key This is equivalent to the behavior of concurrent HashMap, except that the data is stored on disk rather than in memory; You don't have to worry about writing two objects at the same time, because the findandmodify logic prevents them from inadvertently occupying the same key

If you need to serialize the object into JSON, use Jackson

There are also alternatives to Mongo, such as dynamodb, although dynamo only provides a small amount of data free of charge

Editor: since the friends list is not reflexive, I think you can solve this problem by combining mongodb (or other key database and atomic update) and concurrenthashmap

>People in mongodb are "matched" or "unmatched" (if I say "delete a person from mongodb", I mean "set the person's status to 'match'.) > When adding a new person, first create a concurrenthashmap < key, Boolean > for it, it may be in the global concurrenthashmap < key, concurrenthashmap < key, Boolean > > Iterate through the new person's friends: > if the friend is in mongodb, use findandmodify to set it to match atomically, then write the new person to mongodb in "match" status, and finally add the pair to the "pairs" collection in mongodb for end-user query Delete this person's concurrenthashmap from the global map. > If the friend is not in mongodb, check whether the friend has written the relevant concurrenthashmap of the current friend It has, and then does nothing; If not, check whether the friend has a concurrenthashmap associated with it; If yes, set the value associated with the current person's keyword to "true" (please note that since the current person cannot check his own map and modify his friend's map with an atomic operation, two friends can still write each other's hash map, but self hash map checking will reduce this possibility.) > If this person has not been matched, it will be written to mongodb in the "mismatch" state, its concurrenthashmap will be deleted from the global map, and a delay task will be created, which will traverse the concurrenthashmap of ID. person of all friends written to the file (that is, use concurrenthashmap#keyset()) The delay of this task should be random (for example, thread. Sleep (500 * rand. Nextint (30))), so that two friends will not always try to match at the same time If the current person doesn't have any friends to recheck, don't create a delay task for this. > When the delay ends, create a new concurrenthashmap for this person, delete the mismatched person from mongodb, and then loop back to step 1 If the person already matches, do not delete it from mongodb and terminate the delayed task

In general, a person either matches with friends, or does not match when iterating through the friend list while no friends join the system (that is, the person's concurrenthashmap will be empty) If you write friends at the same time:

Friend1 and Friend2 are added at the same time

>Friend1 wrote to Friend2's concurrenthashmap, saying they missed each other. > Friend2 writes the concurrenthashmap of friend1 to represent the same situation (this happens only when Friend2 checks whether friend1 writes its map when friend1 writes - usually Friend2 will detect that friend1 has written its map, so it will not write friend1's map). > Both friend1 and Friend2 are written to mongodb Friend1 randomly obtains a 5-second delay in its subsequent tasks, and Friend2 randomly obtains a 15 second delay. > The task of friend1 is triggered first and matches Friend2. > Friend2's task inspired the second; Friend2 is no longer in mongodb, so the task will be terminated immediately

Some hiccups:

>Friend1 and Friend2 may not associate concurrenthashmaps with them. For example, if Friend2 is still initializing its hash map when friend1 checks to see if the map is in memory This is good because Friend2 will write to the hash map of friend1, so we guarantee that we will eventually try to match - at least one of them will have a hash map and the other iteration, because the hash map is created before the iteration. > If two friends' tasks are fired at the same time in some way, the matching second iteration may fail In this case, if friends are in mongodb with matching status, they should delete friends from their list; Then, they should associate the union of the result list with the friend list written to their concurrenthashmap, and then use it as a new friend list in the next iteration Finally, the person will be matched, otherwise the person's "recheck" friend list will be cleared. > You should increase the task delay of each subsequent iteration to increase the possibility that the tasks of two friends will not run at the same time (for example, thread. Sleep (500 * rand. Nextint (30)) in the first iteration, thread Sleep (500 * rand. Nextint (60)) in the second iteration, thread Sleep (500 * rand. Nextint (90)) in the third iteration, etc.). > In subsequent iterations, you must create a new concurrenthashmap before deleting people from mongodb, otherwise you will compete for data Similarly, you must delete someone from mongodb when iterating over its potential match, otherwise you may inadvertently match it twice

Edit: some codes:

The addunmatchedtomongo (person1) method writes a "mismatched" person1 to mongodb

Settomatched (friend1) use findandmodify to set the friend1 atom to "matched"; If friend1 matches or does not exist, the method will return false; Returns true if the update is successful

If friend1 exists and matches, ismatched (friend1) returns true. If it does not exist or exists and does not match, it returns false

private ConcurrentHashMap<String,ConcurrentHashMap<String,Person>> globalMap;
private DelayQueue<DelayedRetry> delayQueue;
private ThreadPoolExecutor executor;

executor.execute(new Runnable() {
    public void run() {
        while(true) {
            Runnable runnable = delayQueue.take();
            executor.execute(runnable);
        }
    }
}

public static void findMatch(Person person,Collection<Person> friends) {
    findMatch(person,friends,1);
}

public static void findMatch(Person person,Collection<Person> friends,int delayMultiplier) {
    globalMap.put(person.id,new ConcurrentHashMap<String,Person>());
    for(Person friend : friends) {
        if(**setToMatched(friend)**) {
            // write person to MongoDB in "matched" state
            // write "Pair(person,friend)" to MongoDB so it can be queried by the end user
            globalMap.remove(person.id);
            return;
        } else {
            if(**!isMatched(friend)** && globalMap.get(person.id).get(friend.id) == null) {
                // the existence of "friendMap" indicates another thread is currently  trying to match the friend
                ConcurrentHashMap<String,Person> friendMap = globalMap.get(friend.id);
                if(friendMap != null) {
                    friendMap.put(person.id,person);
                }
            }
        }
    }
    **addUnmatchedToMongo(person)**;
    Collection<Person> retryFriends = globalMap.remove(person.id).values();
    if(retryFriends.size() > 0) {
        delayQueue.add(new DelayedRetry(500 * new Random().nextInt(30 * delayMultiplier),person,retryFriends,delayMultiplier));
    }
}

public class DelayedRetry implements Runnable,Delayed {
    private final long delay;
    private final Person person;
    private final Collection<Person> friends;
    private final int delayMultiplier;

    public DelayedRetry(long delay,Person person,delayMultiplier) {
        this.delay = delay;
        this.person = person;
        this.friends = friends;
        this.delayMultiplier = delayMultiplier;
    }

    public long getDelay(TimeUnit unit) {
        return unit.convert(delay,TimeUnit.MILLISECONDS);
    }

    public void run {
        findMatch(person,delayMultiplier + 1);
    }
}
The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>